Com fer cens de l'associació eXO durant l'Assemblea General amb dolibarr

Seguint P-2025-1, quan es prepara l’Assemblea General Ordinària, i de tant en tant, convé saber l’estat d’altes a l’associació.

Per evitar problemes de canvis de metodologia o errors històrics en les dades ho fem de la següent manera:

import argparse
import base64
import pprint

import difflib
from io import StringIO
from functools import cached_property, cache

from typing import List
import psycopg2
import attr
import datetime
import os
import sys

import matplotlib.pyplot as plt

pp = pprint.PrettyPrinter()


def dateStr(x):
    return x.strftime("%Y-%m-%d")


def pngFileToDataUri(f):
    return f"data:image/png;base64,{base64.b64encode(open(f, 'rb').read()).decode()}"


class GenderDeriver:
    """
    This derives gender off the firstname.
    It is not meant as a real data point as we do not collect this data,
    but rather to give the association a feel of the development of
    the gender distribution amongst its members.
    Data source: https://www.idescat.cat/noms/"""

    @staticmethod
    @cache
    def read_csv():
        d = {}
        s = {}
        with open("idescat-noms---1.csv", "r") as f:
            header_section = True
            for lin in f.readlines():
                if header_section and lin.startswith("Pos."):
                    header_section = False
                    continue
                if header_section:
                    continue
                _, g, n, f, _ = lin.strip().split(",", maxsplit=4)
                f_int = int(f.replace(".", ""))
                # Sometimes there are variants separated by '/'
                for nv in n.split("/"):
                    # Only override if more frequent
                    if s.get(nv, 0) < f_int:
                        s[nv] = f_int
                        d[nv] = g
        return d

    Female = "D"
    Male = "H"

    @classmethod
    @property
    def options(cls):
        return [cls.Female, cls.Male]

    @classmethod
    @cache
    def from_name(cls, name: str):
        # Find similar matches
        options = difflib.get_close_matches(name.upper(), cls.read_csv().keys())
        if options:
            return cls.read_csv()[options[0]]
        # Default to least diverse population
        sys.stderr.write(f"MAYBE BUG. NAME GENDER DEFAULTED:\t{name}\n")
        return cls.Male


@attr.s
class MembershipPeriod:
    begin: datetime.date = attr.ib()
    end: datetime.date = attr.ib()
    ref: str = attr.ib()

    _wiggle_room = datetime.timedelta(days=7)

    def matches_date(self, ref_date: datetime.date) -> bool:
        return (
            self.begin - MembershipPeriod._wiggle_room <= ref_date
            and ref_date <= self.end + MembershipPeriod._wiggle_room
        )


@attr.s
class Member:
    ref: str = attr.ib()
    active: bool = attr.ib()
    firstname: str = attr.ib()
    lastname: str = attr.ib()
    is_person: bool = attr.ib()
    entity_name: str = attr.ib()
    entity_ref: str = attr.ib()
    membership_periods: List[MembershipPeriod] = attr.ib(factory=list)

    @cached_property
    def derived_gender(self):
        return GenderDeriver.from_name(self.firstname)

    def status_on_date(self, ref_date: datetime.date) -> str:
        for p in self.membership_periods:
            if p.begin is None:
                sys.stderr.write(f"Got period begin None in:\t{repr(self)}\n")
            if p.matches_date(ref_date):
                return p.ref
        return ""

    def get_membership_periods(self, dbconn):
        with dbconn.cursor() as cursor:
            cursor.execute(
                "select f.datef, f.fk_soc, fd.qty, fd.date_start, fd.date_end, s.ref "
                "from llx_facture as f "
                "left join llx_facturedet as fd on f.rowid=fd.fk_facture "
                "left join llx_product as s on fd.fk_product=s.rowid "
                "where f.fk_soc = %s and s.ref like 'Q%%'"
                "order by f.datef",
                (self.entity_ref,),
            )
            o = []
            first_date, last_date, cur_s = None, None, None
            for f_date, _, s_qty, s_date_start, s_date_end, s in cursor.fetchall():
                if first_date is None:
                    first_date = f_date
                    last_date = f_date
                    cur_s = s
                period_begin, period_end = self._simplify_dates(
                    f_date, s_qty, s_date_start, s_date_end
                )
                if period_begin - last_date < datetime.timedelta(days=7) and cur_s == s:
                    last_date = period_end
                else:
                    o.append(
                        MembershipPeriod(begin=first_date, end=last_date, ref=cur_s)
                    )
                    first_date, last_date, cur_s = period_begin, period_end, s
            if first_date and last_date and cur_s:
                o.append(MembershipPeriod(begin=first_date, end=last_date, ref=cur_s))
            return o

    def _simplify_dates(self, f_date, s_qty, s_date_start, s_date_end):
        if s_date_start and s_date_end:
            return (s_date_start.date(), s_date_end.date())
        int_qty = int(s_qty)
        rest_qty = s_qty - int_qty
        end_year = f_date.year + (f_date.month + int_qty) // 12
        end_month = (f_date.month + int_qty) % 12
        if end_month == 0:
            end_month = 12
            end_year += 1
        end_day = max(1, int(rest_qty * 28))
        return (f_date, datetime.date(year=end_year, month=end_month, day=end_day))


def get_members(dbconn) -> List[Member]:
    with dbconn.cursor() as cursor:
        cursor.execute(
            "SELECT rowid,statut,firstname,lastname,morphy,societe,fk_soc FROM llx_adherent"
        )
        o: List[Member] = []
        for (
            rowid,
            statut,
            firstname,
            lastname,
            morphy,
            societe,
            fk_soc,
        ) in cursor.fetchall():
            member = Member(
                ref=rowid,
                active=(statut in [1, "1", True]),
                firstname=firstname,
                lastname=lastname,
                is_person=(morphy.strip() == "phy"),
                entity_name=societe,
                entity_ref=fk_soc,
            )
            member.membership_periods = member.get_membership_periods(dbconn)
            o.append(member)
        return o


def summary_at_date(members: List[Member], ref_date: datetime.date):
    filtered = filter(lambda m: m.status_on_date(ref_date), members)
    o = {
        "Persones": 0,
        "Entitats": 0,
        "P_Q0": 0,
        "P_Q1": 0,
        "E_Q0": 0,
        "E_Q1": 0,
        "Total": 0,
    }
    g = {"D": 0, "H": 0}
    for m in filtered:
        prefix = "P" if m.is_person else "E"
        bucket = f"{prefix}_{m.status_on_date(ref_date)}"
        if bucket not in o:
            o[bucket] = 0
        o[bucket] += 1
        o["Persones" if m.is_person else "Entitats"] += 1
        o["Total"] += 1
        if m.is_person:
            g[m.derived_gender] += 1
    # Save variations
    vs = {"Altes": 0, "Baixes": 0, "Alta i Baixa": 0}
    v = {"Altes": [], "Baixes": [], "Alta i Baixa": []}
    p_begin = datetime.date(
        year=ref_date.year - (1 if ref_date.month * ref_date.day == 1 else 0),
        month=1,
        day=1,
    )
    p_end = ref_date
    changed = filter(
        lambda m: (m.status_on_date(p_begin) != m.status_on_date(p_end))
        or any((p_begin <= p.begin and p.end <= p_end for p in m.membership_periods)),
        members,
    )
    for m in changed:
        s = (
            "Altes"
            if m.status_on_date(p_end)
            else ("Baixes" if m.status_on_date(p_begin) else "Alta i Baixa")
        )
        vs[s] += 1
        v[s].append(m)
    return o, g, v, vs


def get_members_overview(
    doliUrl: str,
    doliApiToken: str,
    pgHost: str,
    pgPort: str,
    pgDb: str,
    pgUser: str,
    pgPass: str,
):
    with psycopg2.connect(
        host=pgHost, port=pgPort, database=pgDb, user=pgUser, password=pgPass
    ) as dbconn:
        dbconn.set_client_encoding("UNICODE")
        return get_members(dbconn)


def members_overview(report_date: datetime.date, **kwargs):
    r = get_members_overview(**kwargs)
    today = datetime.date.today()
    start_date = datetime.date(year=2020, month=5, day=1)  # Start of data is 2020
    ref_dates = [
        max(start_date, datetime.date(year=year, month=1, day=1))
        for year in range(start_date.year, report_date.year + 1)
    ]
    if report_date not in ref_dates:
        ref_dates.append(report_date)
    members = []
    gender = []
    variation = []
    variation_summaries = []
    for ref_date in ref_dates:
        members_at_date, gender_at_date, var_at_date, var_sum_at_date = summary_at_date(
            r, ref_date
        )
        members.append((ref_date, members_at_date))
        gender.append((ref_date, gender_at_date))
        variation.append((ref_date, var_at_date))
        variation_summaries.append((ref_date, var_sum_at_date))
    return r, members, gender, variation, variation_summaries


def data_to_md(headers, r):
    s = StringIO()
    s.write(f"|  {' | '.join(['Data'] + headers)} |\n")
    s.write(f"| {' --- |' * (len(headers) + 1)}\n")
    for d, data in r:
        s.write(
            "| "
            + " | ".join([dateStr(d)] + [repr(data.get(k, "")) for k in headers])
            + " |\n"
        )
    return s.getvalue()


def data_to_csv(headers, r):
    s = StringIO()
    s.write(f"#{','.join(['date'] + headers)}\n")
    for d, data in r:
        s.write(",".join([dateStr(d)] + [repr(data.get(k, "")) for k in headers]))
        s.write("\n")
    return s.getvalue()


def main(ns: argparse.Namespace):
    """
    Generar report d'altes i baixes i estat d'Associacions a l'eXO

    Això es pot fer servir, o bé per tenir una vista de l'estat actual,
    o bé per generar el punt corresponent a l'Assemblea General Ordinària.
    """
    tot, res, gender, variation, variation_summaries = members_overview(
        report_date=ns.report_date,
        doliUrl=os.environ.get("DOLI_API_URL", "http://localhost:8080/api/index.php/"),
        doliApiToken=os.environ["DOLI_API_TOKEN"],
        pgHost=os.environ.get("DB_HOST", "localhost"),
        pgPort=os.environ.get("DB_PORT", "5432"),
        pgDb=os.environ.get("DB_NAME", "dolibarr"),
        pgUser=os.environ.get("DB_USER", "postgres"),
        pgPass=os.environ["DB_PASS"],
    )
    last_date = res[-1][0]
    ks = list(sorted(set((k for d in res for k in d[1].keys()))))

    # Resum altes i baixes de període, per tresoreria
    sys.stderr.write("Resum d'altes i baixes (INTERN)\n")
    pprint.pprint(variation[-2:], stream=sys.stderr)
    sys.stderr.write("\n")
    # Gràfic de variacions
    dates = [i[0] for i in res]
    plt.clf()
    plt.figure(figsize=(12, 8))
    k_ref = list(variation_summaries[0][1].keys())[0]
    for k in variation_summaries[0][1].keys():
        kt = k == k_ref
        plt.plot(
            dates[2:],
            [i[1][k] for i in variation_summaries[2:]],
            label=k,
            linestyle="-" if kt else "-.",
            marker="o" if kt else "D",
        )
        plt.annotate(
            variation_summaries[-2][1][k],
            (variation_summaries[-2][0], variation_summaries[-2][1][k]),
            textcoords="offset points",
            xytext=(0, 10),
            ha="left",
        )
    plt.title(f"Variació en Associacions\neXO a data {ns.report_date}")
    plt.xlabel("Any")
    plt.ylabel("Altes i Baixes a l'eXO")
    plt.legend()
    plt.grid(True)
    plt.savefig("doli-socis-variacio.png")
    # Gràfic d'associacions
    plt.clf()
    plt.figure(figsize=(12, 8))
    for k in ks:
        plt.plot(
            dates,
            [i[1][k] for i in res],
            label=k,
            linestyle="-." if "Q" in k else "-",
            marker=(
                ("s" if "P" in k else "x") if "Q" in k else ("o" if "T" in k else "D")
            ),
        )
        plt.annotate(
            res[-1][1][k],
            (res[-1][0], res[-1][1][k]),
            textcoords="offset points",
            xytext=(8, -4),
            ha="left",
        )
    plt.title(f"Associacions\neXO a data {ns.report_date}")
    plt.xlabel("Any")
    plt.ylabel("Associacions a l'eXO")
    plt.legend()
    plt.grid(True)
    plt.savefig("doli-socis.png")

    # Gràfic de gènere
    plt.clf()
    plt.figure(figsize=(12, 8))
    for k in GenderDeriver.options:
        plt.plot(
            dates,
            [i[1][k] for i in gender],
            label=k,
            linestyle="-" if k == GenderDeriver.options[0] else "-.",
            marker="o" if k == GenderDeriver.options[0] else "D",
        )
        plt.annotate(
            gender[-1][1][k],
            (gender[-1][0], gender[-1][1][k]),
            textcoords="offset points",
            xytext=(8, -4),
            ha="left",
        )
    plt.title(f"Estimacions de gènere\neXO a data {ns.report_date}")
    plt.xlabel("Any")
    plt.ylabel("Associacions a l'eXO")
    plt.legend()
    plt.grid(True)
    plt.savefig("doli-socis.genere.png")

    if ns.format == "csv":
        print(data_to_csv(ks, res))
        print()
        print(data_to_csv(GenderDeriver.options, gender))
    elif ns.format == "md":
        print(
            f"""## 2. Resum d'altes i baixes de socis durant l'exercici {last_date.year - 1}

Presenta Tresoreria eXO

> - Generat a data: **{last_date}**
> - Metodologia: https://agora.exo.cat/t/p-2025-22-cens-dassociacions-a-lexo/400
> - `E` són **E**ntitats
> - `P` són **P**ersones
> - `Q0` són quotes bàsiques (3€/mes)
> - `Q1` són quotes contributives (10€/mes)

### Membres d'eXO

"""
        )
        # Entities at date:
        last_year = datetime.date(year=datetime.date.today().year - 1, month=1, day=1)
        print(f"#### Aprovació noves altes persona jurídica (des de {last_year})")
        entities_at_date = list(
            sorted(
                (
                    p
                    for p in filter(
                        lambda m: not m.is_person
                        and m.status_on_date(last_date)
                        and m.membership_periods[0].begin >= last_year,
                        tot,
                    )
                ),
                key=lambda k: k.membership_periods[-1].begin,
                reverse=True,
            )
        )
        print()
        for e in entities_at_date:
            print(f"""- [ ] {e.membership_periods[-1].ref}: {e.entity_name}
  - {e.membership_periods[0].begin} -- {e.membership_periods[-1].end}""")
        else:
            print("**Sense noves altes jurídiques**")
        print("""\n#### Associacions a l'eXO""")
        print("""\n##### Altes i baixes per període""")

        print("""\n\n![Altes i baixes a l'eXO](./doli-socis-variacio.png)\n\n""")
        print("<details><summary>Detall altes i baixes a l'eXO</summary>")
        print()
        print(data_to_md(list(variation_summaries[0][1].keys()), variation_summaries))
        print("</details>")
        print("""\n##### Totals""")
        print("""\n\n![Associacions a l'eXO](./doli-socis.png)\n\n""")
        print("<details><summary>Detall associacions a l'eXO</summary>")
        print()
        print(data_to_md(ks, res))
        print("</details>")
        print("""\n#### Estimació de gènere(*)""")
        print("""\n\n![Estimació gènere sòcies d'eXO](./doli-socis.genere.png)\n\n""")
        print("<details><summary>Detall estimació gènere sòcies d'eXO</summary>")
        print()
        print(data_to_md(GenderDeriver.options, gender))
        print("</details>")
        print()
        print("""> (*): Estimació feta a partir dels noms de pila.
> Amb dades: https://www.idescat.cat/noms/
> L'associació no guarda ni pregunta aquesta dada i és purament de caire informatiu.""")
    else:
        pp.pprint(res)
    return res


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description=main.__doc__)
    parser.add_argument(
        "--format",
        choices=["csv", "md", "pprint"],
        default="md",
        help="En quin format volem veure les dades?",
    )
    parser.add_argument(
        "--report-date",
        default=datetime.date.today(),
        type=datetime.date.fromisoformat,
        help="La data final del report, normalment hi posaríem la data de l'AGO",
    )
    main(parser.parse_args())
1 'M'agrada'