Seguint P-2025-1, quan es prepara l’Assemblea General Ordinària, i de tant en tant, convé saber l’estat d’altes a l’associació.
Per evitar problemes de canvis de metodologia o errors històrics en les dades ho fem de la següent manera:
import argparse
import base64
import pprint
import difflib
from io import StringIO
from functools import cached_property, cache
from typing import List
import psycopg2
import attr
import datetime
import os
import sys
import matplotlib.pyplot as plt
pp = pprint.PrettyPrinter()
def dateStr(x):
return x.strftime("%Y-%m-%d")
def pngFileToDataUri(f):
return f"data:image/png;base64,{base64.b64encode(open(f, 'rb').read()).decode()}"
class GenderDeriver:
"""
This derives gender off the firstname.
It is not meant as a real data point as we do not collect this data,
but rather to give the association a feel of the development of
the gender distribution amongst its members.
Data source: https://www.idescat.cat/noms/"""
@staticmethod
@cache
def read_csv():
d = {}
s = {}
with open("idescat-noms---1.csv", "r") as f:
header_section = True
for lin in f.readlines():
if header_section and lin.startswith("Pos."):
header_section = False
continue
if header_section:
continue
_, g, n, f, _ = lin.strip().split(",", maxsplit=4)
f_int = int(f.replace(".", ""))
# Sometimes there are variants separated by '/'
for nv in n.split("/"):
# Only override if more frequent
if s.get(nv, 0) < f_int:
s[nv] = f_int
d[nv] = g
return d
Female = "D"
Male = "H"
@classmethod
@property
def options(cls):
return [cls.Female, cls.Male]
@classmethod
@cache
def from_name(cls, name: str):
# Find similar matches
options = difflib.get_close_matches(name.upper(), cls.read_csv().keys())
if options:
return cls.read_csv()[options[0]]
# Default to least diverse population
sys.stderr.write(f"MAYBE BUG. NAME GENDER DEFAULTED:\t{name}\n")
return cls.Male
@attr.s
class MembershipPeriod:
begin: datetime.date = attr.ib()
end: datetime.date = attr.ib()
ref: str = attr.ib()
_wiggle_room = datetime.timedelta(days=7)
def matches_date(self, ref_date: datetime.date) -> bool:
return (
self.begin - MembershipPeriod._wiggle_room <= ref_date
and ref_date <= self.end + MembershipPeriod._wiggle_room
)
@attr.s
class Member:
ref: str = attr.ib()
active: bool = attr.ib()
firstname: str = attr.ib()
lastname: str = attr.ib()
is_person: bool = attr.ib()
entity_name: str = attr.ib()
entity_ref: str = attr.ib()
membership_periods: List[MembershipPeriod] = attr.ib(factory=list)
@cached_property
def derived_gender(self):
return GenderDeriver.from_name(self.firstname)
def status_on_date(self, ref_date: datetime.date) -> str:
for p in self.membership_periods:
if p.begin is None:
sys.stderr.write(f"Got period begin None in:\t{repr(self)}\n")
if p.matches_date(ref_date):
return p.ref
return ""
def get_membership_periods(self, dbconn):
with dbconn.cursor() as cursor:
cursor.execute(
"select f.datef, f.fk_soc, fd.qty, fd.date_start, fd.date_end, s.ref "
"from llx_facture as f "
"left join llx_facturedet as fd on f.rowid=fd.fk_facture "
"left join llx_product as s on fd.fk_product=s.rowid "
"where f.fk_soc = %s and s.ref like 'Q%%'"
"order by f.datef",
(self.entity_ref,),
)
o = []
first_date, last_date, cur_s = None, None, None
for f_date, _, s_qty, s_date_start, s_date_end, s in cursor.fetchall():
if first_date is None:
first_date = f_date
last_date = f_date
cur_s = s
period_begin, period_end = self._simplify_dates(
f_date, s_qty, s_date_start, s_date_end
)
if period_begin - last_date < datetime.timedelta(days=7) and cur_s == s:
last_date = period_end
else:
o.append(
MembershipPeriod(begin=first_date, end=last_date, ref=cur_s)
)
first_date, last_date, cur_s = period_begin, period_end, s
if first_date and last_date and cur_s:
o.append(MembershipPeriod(begin=first_date, end=last_date, ref=cur_s))
return o
def _simplify_dates(self, f_date, s_qty, s_date_start, s_date_end):
if s_date_start and s_date_end:
return (s_date_start.date(), s_date_end.date())
int_qty = int(s_qty)
rest_qty = s_qty - int_qty
end_year = f_date.year + (f_date.month + int_qty) // 12
end_month = (f_date.month + int_qty) % 12
if end_month == 0:
end_month = 12
end_year += 1
end_day = max(1, int(rest_qty * 28))
return (f_date, datetime.date(year=end_year, month=end_month, day=end_day))
def get_members(dbconn) -> List[Member]:
with dbconn.cursor() as cursor:
cursor.execute(
"SELECT rowid,statut,firstname,lastname,morphy,societe,fk_soc FROM llx_adherent"
)
o: List[Member] = []
for (
rowid,
statut,
firstname,
lastname,
morphy,
societe,
fk_soc,
) in cursor.fetchall():
member = Member(
ref=rowid,
active=(statut in [1, "1", True]),
firstname=firstname,
lastname=lastname,
is_person=(morphy.strip() == "phy"),
entity_name=societe,
entity_ref=fk_soc,
)
member.membership_periods = member.get_membership_periods(dbconn)
o.append(member)
return o
def summary_at_date(members: List[Member], ref_date: datetime.date):
filtered = filter(lambda m: m.status_on_date(ref_date), members)
o = {
"Persones": 0,
"Entitats": 0,
"P_Q0": 0,
"P_Q1": 0,
"E_Q0": 0,
"E_Q1": 0,
"Total": 0,
}
g = {"D": 0, "H": 0}
for m in filtered:
prefix = "P" if m.is_person else "E"
bucket = f"{prefix}_{m.status_on_date(ref_date)}"
if bucket not in o:
o[bucket] = 0
o[bucket] += 1
o["Persones" if m.is_person else "Entitats"] += 1
o["Total"] += 1
if m.is_person:
g[m.derived_gender] += 1
# Save variations
vs = {"Altes": 0, "Baixes": 0, "Alta i Baixa": 0}
v = {"Altes": [], "Baixes": [], "Alta i Baixa": []}
p_begin = datetime.date(
year=ref_date.year - (1 if ref_date.month * ref_date.day == 1 else 0),
month=1,
day=1,
)
p_end = ref_date
changed = filter(
lambda m: (m.status_on_date(p_begin) != m.status_on_date(p_end))
or any((p_begin <= p.begin and p.end <= p_end for p in m.membership_periods)),
members,
)
for m in changed:
s = (
"Altes"
if m.status_on_date(p_end)
else ("Baixes" if m.status_on_date(p_begin) else "Alta i Baixa")
)
vs[s] += 1
v[s].append(m)
return o, g, v, vs
def get_members_overview(
doliUrl: str,
doliApiToken: str,
pgHost: str,
pgPort: str,
pgDb: str,
pgUser: str,
pgPass: str,
):
with psycopg2.connect(
host=pgHost, port=pgPort, database=pgDb, user=pgUser, password=pgPass
) as dbconn:
dbconn.set_client_encoding("UNICODE")
return get_members(dbconn)
def members_overview(report_date: datetime.date, **kwargs):
r = get_members_overview(**kwargs)
today = datetime.date.today()
start_date = datetime.date(year=2020, month=5, day=1) # Start of data is 2020
ref_dates = [
max(start_date, datetime.date(year=year, month=1, day=1))
for year in range(start_date.year, report_date.year + 1)
]
if report_date not in ref_dates:
ref_dates.append(report_date)
members = []
gender = []
variation = []
variation_summaries = []
for ref_date in ref_dates:
members_at_date, gender_at_date, var_at_date, var_sum_at_date = summary_at_date(
r, ref_date
)
members.append((ref_date, members_at_date))
gender.append((ref_date, gender_at_date))
variation.append((ref_date, var_at_date))
variation_summaries.append((ref_date, var_sum_at_date))
return r, members, gender, variation, variation_summaries
def data_to_md(headers, r):
s = StringIO()
s.write(f"| {' | '.join(['Data'] + headers)} |\n")
s.write(f"| {' --- |' * (len(headers) + 1)}\n")
for d, data in r:
s.write(
"| "
+ " | ".join([dateStr(d)] + [repr(data.get(k, "")) for k in headers])
+ " |\n"
)
return s.getvalue()
def data_to_csv(headers, r):
s = StringIO()
s.write(f"#{','.join(['date'] + headers)}\n")
for d, data in r:
s.write(",".join([dateStr(d)] + [repr(data.get(k, "")) for k in headers]))
s.write("\n")
return s.getvalue()
def main(ns: argparse.Namespace):
"""
Generar report d'altes i baixes i estat d'Associacions a l'eXO
Això es pot fer servir, o bé per tenir una vista de l'estat actual,
o bé per generar el punt corresponent a l'Assemblea General Ordinària.
"""
tot, res, gender, variation, variation_summaries = members_overview(
report_date=ns.report_date,
doliUrl=os.environ.get("DOLI_API_URL", "http://localhost:8080/api/index.php/"),
doliApiToken=os.environ["DOLI_API_TOKEN"],
pgHost=os.environ.get("DB_HOST", "localhost"),
pgPort=os.environ.get("DB_PORT", "5432"),
pgDb=os.environ.get("DB_NAME", "dolibarr"),
pgUser=os.environ.get("DB_USER", "postgres"),
pgPass=os.environ["DB_PASS"],
)
last_date = res[-1][0]
ks = list(sorted(set((k for d in res for k in d[1].keys()))))
# Resum altes i baixes de període, per tresoreria
sys.stderr.write("Resum d'altes i baixes (INTERN)\n")
pprint.pprint(variation[-2:], stream=sys.stderr)
sys.stderr.write("\n")
# Gràfic de variacions
dates = [i[0] for i in res]
plt.clf()
plt.figure(figsize=(12, 8))
k_ref = list(variation_summaries[0][1].keys())[0]
for k in variation_summaries[0][1].keys():
kt = k == k_ref
plt.plot(
dates[2:],
[i[1][k] for i in variation_summaries[2:]],
label=k,
linestyle="-" if kt else "-.",
marker="o" if kt else "D",
)
plt.annotate(
variation_summaries[-2][1][k],
(variation_summaries[-2][0], variation_summaries[-2][1][k]),
textcoords="offset points",
xytext=(0, 10),
ha="left",
)
plt.title(f"Variació en Associacions\neXO a data {ns.report_date}")
plt.xlabel("Any")
plt.ylabel("Altes i Baixes a l'eXO")
plt.legend()
plt.grid(True)
plt.savefig("doli-socis-variacio.png")
# Gràfic d'associacions
plt.clf()
plt.figure(figsize=(12, 8))
for k in ks:
plt.plot(
dates,
[i[1][k] for i in res],
label=k,
linestyle="-." if "Q" in k else "-",
marker=(
("s" if "P" in k else "x") if "Q" in k else ("o" if "T" in k else "D")
),
)
plt.annotate(
res[-1][1][k],
(res[-1][0], res[-1][1][k]),
textcoords="offset points",
xytext=(8, -4),
ha="left",
)
plt.title(f"Associacions\neXO a data {ns.report_date}")
plt.xlabel("Any")
plt.ylabel("Associacions a l'eXO")
plt.legend()
plt.grid(True)
plt.savefig("doli-socis.png")
# Gràfic de gènere
plt.clf()
plt.figure(figsize=(12, 8))
for k in GenderDeriver.options:
plt.plot(
dates,
[i[1][k] for i in gender],
label=k,
linestyle="-" if k == GenderDeriver.options[0] else "-.",
marker="o" if k == GenderDeriver.options[0] else "D",
)
plt.annotate(
gender[-1][1][k],
(gender[-1][0], gender[-1][1][k]),
textcoords="offset points",
xytext=(8, -4),
ha="left",
)
plt.title(f"Estimacions de gènere\neXO a data {ns.report_date}")
plt.xlabel("Any")
plt.ylabel("Associacions a l'eXO")
plt.legend()
plt.grid(True)
plt.savefig("doli-socis.genere.png")
if ns.format == "csv":
print(data_to_csv(ks, res))
print()
print(data_to_csv(GenderDeriver.options, gender))
elif ns.format == "md":
print(
f"""## 2. Resum d'altes i baixes de socis durant l'exercici {last_date.year - 1}
Presenta Tresoreria eXO
> - Generat a data: **{last_date}**
> - Metodologia: https://agora.exo.cat/t/p-2025-22-cens-dassociacions-a-lexo/400
> - `E` són **E**ntitats
> - `P` són **P**ersones
> - `Q0` són quotes bàsiques (3€/mes)
> - `Q1` són quotes contributives (10€/mes)
### Membres d'eXO
"""
)
# Entities at date:
last_year = datetime.date(year=datetime.date.today().year - 1, month=1, day=1)
print(f"#### Aprovació noves altes persona jurídica (des de {last_year})")
entities_at_date = list(
sorted(
(
p
for p in filter(
lambda m: not m.is_person
and m.status_on_date(last_date)
and m.membership_periods[0].begin >= last_year,
tot,
)
),
key=lambda k: k.membership_periods[-1].begin,
reverse=True,
)
)
print()
for e in entities_at_date:
print(f"""- [ ] {e.membership_periods[-1].ref}: {e.entity_name}
- {e.membership_periods[0].begin} -- {e.membership_periods[-1].end}""")
else:
print("**Sense noves altes jurídiques**")
print("""\n#### Associacions a l'eXO""")
print("""\n##### Altes i baixes per període""")
print("""\n\n\n\n""")
print("<details><summary>Detall altes i baixes a l'eXO</summary>")
print()
print(data_to_md(list(variation_summaries[0][1].keys()), variation_summaries))
print("</details>")
print("""\n##### Totals""")
print("""\n\n\n\n""")
print("<details><summary>Detall associacions a l'eXO</summary>")
print()
print(data_to_md(ks, res))
print("</details>")
print("""\n#### Estimació de gènere(*)""")
print("""\n\n\n\n""")
print("<details><summary>Detall estimació gènere sòcies d'eXO</summary>")
print()
print(data_to_md(GenderDeriver.options, gender))
print("</details>")
print()
print("""> (*): Estimació feta a partir dels noms de pila.
> Amb dades: https://www.idescat.cat/noms/
> L'associació no guarda ni pregunta aquesta dada i és purament de caire informatiu.""")
else:
pp.pprint(res)
return res
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument(
"--format",
choices=["csv", "md", "pprint"],
default="md",
help="En quin format volem veure les dades?",
)
parser.add_argument(
"--report-date",
default=datetime.date.today(),
type=datetime.date.fromisoformat,
help="La data final del report, normalment hi posaríem la data de l'AGO",
)
main(parser.parse_args())