DatabaseCreation: seeding_script.py

File seeding_script.py, 55.5 KB (added by 231091, 5 hours ago)
Line 
1"""
21. StatsBomb — single source of truth for matches, lineups, events, coaches, stadiums, and players.
3
4 - Players carry a randomly generated date_of_birth
5 - Stadiums get a randomly generated capacity and year_built
6 - Coach appearances are *collected* during the import
7 (one tuple per match per coach) and then collapsed
8 into proper stints with realistic start/end dates
9 and a generated salary in a single finalisation
10 step.
11
122. Football-data — bookmaker odds and referee assignments for the StatsBomb matches that are already in the DB.
13
14 - A 'Generated' time-series of odds is produced
15 for every match at minutes 0, 10, 20, ..., 90 using
16 the StatsBomb final score.
17
183. Transfermarkt — player valuations and transfers (unchanged).
19
20"""
21
22import csv
23import json
24import math
25import os
26import random
27import re
28from datetime import date, datetime, time, timedelta
29
30import pandas as pd
31import psycopg2
32from psycopg2.extras import execute_values
33from rapidfuzz import process as fuzz_process, fuzz
34
35DB_CONFIG = {
36 "host": "<host>",
37 "port": "<port>",
38 "dbname": "<dbname>",
39 "user": "<user>",
40 "password": "<password>",
41}
42
43# football-data.co.uk CSV folder (one CSV per division/season)
44FD_FOLDER = "<fd_root>"
45
46# Transfermarkt Kaggle dataset folder
47TM_ROOT = "<transfermarkt_root>"
48TM_PLAYERS = f"{TM_ROOT}/players.csv"
49TM_VALUATIONS = f"{TM_ROOT}/player_valuations.csv"
50TM_TRANSFERS = f"{TM_ROOT}/transfers.csv"
51
52# StatsBomb open-data repo folder
53SB_ROOT = "<statsbomb_root>"
54SB_COMPETITIONS = os.path.join(SB_ROOT, "data", "competitions.json")
55SB_MATCHES_DIR = os.path.join(SB_ROOT, "data", "matches")
56SB_LINEUPS_DIR = os.path.join(SB_ROOT, "data", "lineups")
57SB_EVENTS_DIR = os.path.join(SB_ROOT, "data", "events")
58
59FD_TEAM_ALIASES: dict[str, str] = {
60 "Hamburg": "Hamburger SV",
61 "M'gladbach": "Borussia Mönchengladbach",
62 "West Ham": "West Ham United",
63 "Wolves": "Wolverhampton Wanderers",
64 "Man City": "Manchester City",
65 "West Brom": "West Bromwich Albion",
66 "Man United": "Manchester United",
67 "Paris SG": "Paris Saint-Germain",
68}
69
70TEAM_FUZZY_THRESHOLD = 80 # team name matching
71PLAYER_FUZZY_THRESHOLD = 90 # stricter — player names collide more easily
72
73# StatsBomb matches to process between DB commits.
74SB_COMMIT_EVERY = 500
75
76PLAYER_DOB_START = date(1975, 1, 1)
77PLAYER_DOB_END = date(2008, 12, 31)
78
79STADIUM_CAPACITY_MIN = 8_000
80STADIUM_CAPACITY_MAX = 90_000
81
82STADIUM_YEAR_MIN = 1900
83STADIUM_YEAR_MAX = 2015
84
85COACH_SALARY_MIN = 500_000
86COACH_SALARY_MAX = 10_000_000
87
88COACH_STILL_ACTIVE_DAYS = 180 # 6 months
89
90COACH_STINT_GAP_DAYS = 90
91
92COACH_STINT_END_BUFFER_DAYS = 30
93
94REFEREE_DOB_START = date(1955, 1, 1)
95REFEREE_DOB_END = date(1985, 12, 31)
96
97
98# pick a random player date of birth within configured bounds
99def _random_player_dob() -> date:
100 span = (PLAYER_DOB_END - PLAYER_DOB_START).days
101 return PLAYER_DOB_START + timedelta(days=random.randint(0, span))
102
103
104# pick a stadium capacity in the configured range
105def _random_stadium_capacity() -> int:
106 return random.randint(STADIUM_CAPACITY_MIN, STADIUM_CAPACITY_MAX)
107
108
109# pick a plausible stadium build year
110def _random_stadium_year() -> int:
111 return random.randint(STADIUM_YEAR_MIN, STADIUM_YEAR_MAX)
112
113
114# pick a random coach salary and round to nearest 1000
115def _random_coach_salary() -> int:
116 return round(random.randint(COACH_SALARY_MIN, COACH_SALARY_MAX), -3)
117
118
119# pick a random referee date of birth within configured bounds
120def _random_referee_dob() -> date:
121 span = (REFEREE_DOB_END - REFEREE_DOB_START).days
122 return REFEREE_DOB_START + timedelta(days=random.randint(0, span))
123
124
125def load_json(path: str):
126 with open(path, encoding="utf-8") as f:
127 return json.load(f)
128
129
130# resolves team name to db team_id
131# order: cache -> exact match -> alias -> single-word match -> fuzzy -> new team
132class TeamResolver:
133 def __init__(self, cur, manual_aliases: dict[str, str] | None = None):
134 self._by_name: dict[str, int] = {}
135 self._by_alias: dict[str, int] = {}
136 self._fuzzy_list: list[tuple[str, int]] = []
137 self._resolve_cache: dict[str, int | None] = {}
138 self._pending_aliases: list[tuple[int, str]] = []
139
140 cur.execute("SELECT team_id, name FROM Team")
141 for tid, name in cur.fetchall():
142 self._by_name[name.lower()] = tid
143 self._fuzzy_list.append((name, tid))
144
145 cur.execute("SELECT team_id, alias_name FROM Team_alias")
146 for tid, alias in cur.fetchall():
147 self._by_alias[alias.lower()] = tid
148
149 if manual_aliases:
150 for src, dst in manual_aliases.items():
151 tid = self._by_name.get(dst.lower())
152 if tid:
153 self._by_alias[src.lower()] = tid
154 else:
155 print(f" [alias warning] '{dst}' not found in Team table")
156
157 def resolve(self, name: str, country: str | None = None,
158 cur=None, create_if_missing: bool = False) -> int | None:
159 if not name:
160 return None
161 if name in self._resolve_cache:
162 return self._resolve_cache[name]
163
164 key = name.lower()
165
166 # 1. exact name
167 if key in self._by_name:
168 tid = self._by_name[key]
169 self._resolve_cache[name] = tid
170 return tid
171
172 # 2. alias
173 if key in self._by_alias:
174 tid = self._by_alias[key]
175 self._resolve_cache[name] = tid
176 return tid
177
178 # 3. word match (e.g. "FC Barcelona" → "Barcelona")
179 for word in name.split():
180 if len(word) <= 2:
181 continue
182 w = word.lower()
183 tid = self._by_name.get(w) or self._by_alias.get(w)
184 if tid:
185 print(f" [word match] '{name}' → '{word}' (team_id={tid})")
186 self._pending_aliases.append((tid, name))
187 self._by_alias[key] = tid
188 self._resolve_cache[name] = tid
189 return tid
190
191 # 4. fuzzy match
192 is_womens = "women" in key
193 candidates = [
194 (n, t) for n, t in self._fuzzy_list
195 if ("women" in n.lower()) == is_womens
196 ]
197 if candidates:
198 result = fuzz_process.extractOne(
199 name,
200 [n for n, _ in candidates],
201 scorer=fuzz.token_sort_ratio
202 )
203 if result and result[1] >= TEAM_FUZZY_THRESHOLD:
204 best_name, best_score = result[0], result[1]
205 tid = next(t for n, t in candidates if n == best_name)
206 print(f" [fuzzy] '{name}' → '{best_name}' (score {best_score})")
207 self._pending_aliases.append((tid, name))
208 self._by_alias[key] = tid
209 self._resolve_cache[name] = tid
210 return tid
211
212 # 5. create new
213 if create_if_missing and cur is not None:
214 print(f" [new team] inserting '{name}' (country={country})")
215 cur.execute(
216 "INSERT INTO Team (name, country) VALUES (%s, %s) RETURNING team_id",
217 (name[:100], country)
218 )
219 tid = cur.fetchone()[0]
220 self._by_name[key] = tid
221 self._fuzzy_list.append((name, tid))
222 self._resolve_cache[name] = tid
223 return tid
224
225 print(f" [no match] '{name}'")
226 self._resolve_cache[name] = None
227 return None
228
229 def flush_aliases(self, cur):
230 if not self._pending_aliases:
231 return
232 execute_values(
233 cur,
234 "INSERT INTO Team_alias (team_id, alias_name) VALUES %s ON CONFLICT DO NOTHING",
235 self._pending_aliases,
236 page_size=2000, # OPT-3
237 )
238 self._pending_aliases.clear()
239
240
241# statsbomb
242
243class _SBCache:
244 def __init__(self, cur, team_resolver: TeamResolver):
245 self.teams = team_resolver # shared TeamResolver instance
246 self.player: dict[str, int] = {}
247 self.event_type: dict[str, int] = {}
248 self.referee: dict[str, int] = {}
249 self.coach: dict[str, int] = {}
250 self.division: dict[tuple, int] = {}
251 self.season: dict[tuple, int] = {}
252 self.stadium: dict[tuple, int] = {}
253 self.referee_match_pairs: set[tuple[int, int]] = set()
254
255 # per-match coach appearances, finalised into stints at the end.
256 # each entry is (coach_id, team_id, season_id, match_date).
257 self.coach_appearances: list[tuple[int, int, int, date]] = []
258
259 # final scores keyed by DB match_id, used by the football-data
260 # pipeline to drive the generated odds time-series.
261 self.match_scores: dict[int, tuple[int, int]] = {}
262
263 cur.execute("SELECT player_id, name FROM Player")
264 for pid, name in cur.fetchall():
265 self.player[name] = pid
266
267 cur.execute("SELECT event_type_id, type FROM Event_type")
268 for etid, t in cur.fetchall():
269 self.event_type[t] = etid
270
271 cur.execute("SELECT referee_id, name FROM Referee")
272 for rid, name in cur.fetchall():
273 self.referee[name] = rid
274
275 cur.execute("SELECT coach_id, name FROM Coach")
276 for cid, name in cur.fetchall():
277 self.coach[name] = cid
278
279 cur.execute("SELECT referee_id, match_id FROM Referee_match")
280 for rid, mid in cur.fetchall():
281 self.referee_match_pairs.add((rid, mid))
282
283 cur.execute("SELECT division_id, name, country FROM Division")
284 for did, name, country in cur.fetchall():
285 self.division[(name, country)] = did
286
287 cur.execute("SELECT season_id, division_id, season_name FROM Season")
288 for sid, did, sname in cur.fetchall():
289 self.season[(did, sname)] = sid
290
291 cur.execute("SELECT stadium_id, name, country FROM Stadium")
292 for stid, name, country in cur.fetchall():
293 self.stadium[(name, country)] = stid
294
295
296 def ensure_division(self, cur, name: str, country: str | None) -> int:
297 key = (name, country)
298 if key in self.division:
299 return self.division[key]
300 cur.execute(
301 "INSERT INTO Division (name, country) VALUES (%s,%s) RETURNING division_id",
302 (name, country)
303 )
304 did = cur.fetchone()[0]
305 self.division[key] = did
306 return did
307
308 def ensure_season(self, cur, division_id: int, season_name: str) -> int:
309 key = (division_id, season_name)
310 if key in self.season:
311 return self.season[key]
312 cur.execute(
313 "SELECT season_id FROM Season WHERE division_id=%s AND season_name=%s LIMIT 1",
314 (division_id, season_name)
315 )
316 row = cur.fetchone()
317 if row is None:
318 cur.execute(
319 "INSERT INTO Season (division_id, season_name) VALUES (%s,%s) RETURNING season_id",
320 (division_id, season_name)
321 )
322 row = cur.fetchone()
323 self.season[key] = row[0]
324 return row[0]
325
326 def ensure_stadium(self, cur, name: str, country: str | None) -> int:
327 # get existing stadium or insert one with generated capacity/year when missing
328 key = (name, country)
329 if key in self.stadium:
330 return self.stadium[key]
331 cur.execute(
332 "SELECT stadium_id FROM Stadium WHERE name=%s "
333 "AND (country=%s OR (country IS NULL AND %s IS NULL)) LIMIT 1",
334 (name, country, country)
335 )
336 row = cur.fetchone()
337 if row:
338 sid = row[0]
339 else:
340 # random reasonable values for the columns the source data lacks.
341 cur.execute(
342 "INSERT INTO Stadium (name, country, capacity, year_built) "
343 "VALUES (%s,%s,%s,%s) RETURNING stadium_id",
344 (name[:100], country, _random_stadium_capacity(), _random_stadium_year())
345 )
346 sid = cur.fetchone()[0]
347 self.stadium[key] = sid
348 return sid
349
350
351 def ensure_players(self, cur, names: list[str]):
352 # batch-insert unseen players and cache their ids
353 new = [(n, _random_player_dob())
354 for n in {n for n in names if n and n not in self.player}]
355 if not new:
356 return
357 rows = execute_values(
358 cur,
359 "INSERT INTO Player (name, date_of_birth) VALUES %s RETURNING player_id, name",
360 new, fetch=True, page_size=500, # OPT-3
361 )
362 for pid, name in rows:
363 self.player[name] = pid
364
365 def ensure_event_types(self, cur, type_names: list[str]):
366 new = list({t for t in type_names if t and t not in self.event_type})
367 if not new:
368 return
369 rows = execute_values(
370 cur,
371 "INSERT INTO Event_type (type) VALUES %s "
372 "ON CONFLICT (type) DO UPDATE SET type=EXCLUDED.type "
373 "RETURNING event_type_id, type",
374 [(t,) for t in new], fetch=True, page_size=2000, # OPT-3
375 )
376 for etid, t in rows:
377 self.event_type[t] = etid
378
379 def ensure_referee(self, cur, name: str, country: str | None = None) -> int:
380 # get existing referee or insert a new one with generated dob
381 if name in self.referee:
382 return self.referee[name]
383 cur.execute("SELECT referee_id FROM Referee WHERE name=%s LIMIT 1", (name,))
384 row = cur.fetchone()
385 if not row:
386 cur.execute(
387 "INSERT INTO Referee (name, country, date_of_birth) "
388 "VALUES (%s,%s,%s) RETURNING referee_id",
389 (name, country, _random_referee_dob())
390 )
391 row = cur.fetchone()
392 self.referee[name] = row[0]
393 return row[0]
394
395 def ensure_coach(self, cur, name: str, nationality: str | None) -> int:
396 if name in self.coach:
397 return self.coach[name]
398 cur.execute("SELECT coach_id FROM Coach WHERE name=%s LIMIT 1", (name,))
399 row = cur.fetchone()
400 if not row:
401 cur.execute(
402 "INSERT INTO Coach (name, nationality) VALUES (%s,%s) RETURNING coach_id",
403 (name, nationality)
404 )
405 row = cur.fetchone()
406 self.coach[name] = row[0]
407 return row[0]
408
409
410def _sb_find_or_create_match(cur, cache: _SBCache, sb_match: dict, comp: dict
411 ) -> tuple[int | None, bool, int | None, str | None]:
412 # find matching db match row or create it and cache the final score
413 date_str = sb_match.get("match_date")
414 if not date_str:
415 return None, False, None, None
416
417 home_sb = sb_match["home_team"]["home_team_name"]
418 away_sb = sb_match["away_team"]["away_team_name"]
419 country = comp.get("country_name") or None
420
421 comp_name = comp.get("competition_name", "Unknown Competition")
422 division_id = cache.ensure_division(cur, comp_name[:100], country)
423 season_id = cache.ensure_season(cur, division_id, comp.get("season_name", "Unknown")[:20])
424
425 home_id = cache.teams.resolve(home_sb, country=country, cur=cur, create_if_missing=True)
426 away_id = cache.teams.resolve(away_sb, country=country, cur=cur, create_if_missing=True)
427
428 if not home_id or not away_id or home_id == away_id:
429 if home_id and away_id and home_id == away_id:
430 print(f" [skip] same team resolved for both sides (id={home_id})")
431 return None, False, None, None
432
433 sb_stadium = sb_match.get("stadium")
434 stadium_name = (sb_stadium.get("name", "").strip() if sb_stadium else "") \
435 or f"Unknown home stadium - {home_sb}"
436 stadium_id = cache.ensure_stadium(cur, stadium_name[:100], country)
437
438 cur.execute(
439 'SELECT match_id FROM "match" '
440 'WHERE home_team_id=%s AND away_team_id=%s AND match_date=%s LIMIT 1',
441 (home_id, away_id, date_str)
442 )
443 row = cur.fetchone()
444 if row:
445 match_id = row[0]
446 was_created = False
447 else:
448 cur.execute(
449 'INSERT INTO "match" (season_id, home_team_id, away_team_id, stadium_id, match_date) '
450 'VALUES (%s,%s,%s,%s,%s) RETURNING match_id',
451 (season_id, home_id, away_id, stadium_id, date_str)
452 )
453 match_id = cur.fetchone()[0]
454 was_created = True
455
456 # cache the final score for the football-data odds generator.
457 home_score = int(sb_match.get("home_score", 0) or 0)
458 away_score = int(sb_match.get("away_score", 0) or 0)
459 cache.match_scores[match_id] = (home_score, away_score)
460
461 return match_id, was_created, season_id, date_str
462
463
464# replace placeholder stadium names when statsbomb later provides the real one
465def _sb_enrich_stadium(cur, match_id: int, sb_match: dict):
466 sb_stadium = sb_match.get("stadium")
467 if not sb_stadium:
468 return
469 real_name = sb_stadium.get("name", "").strip()
470 if not real_name:
471 return
472 cur.execute(
473 """UPDATE Stadium s SET name=%s FROM "match" m
474 WHERE m.match_id=%s AND m.stadium_id=s.stadium_id
475 AND s.name LIKE 'Unknown home stadium - %%'""",
476 (real_name[:100], match_id)
477 )
478
479
480def _sb_enrich_referee(cur, match_id: int, sb_match: dict, cache: _SBCache,
481 country: str | None = None):
482 # ensure referee exists and link them to the match
483 sb_ref = sb_match.get("referee")
484 if not sb_ref:
485 return
486 ref_name = sb_ref.get("name", "").strip()
487 if not ref_name:
488 return
489
490 # treat "International" as unusable — use the
491 # participating team's country instead.
492 if not country or "international" in country.lower():
493 home_team = sb_match.get("home_team") or {}
494 away_team = sb_match.get("away_team") or {}
495 country = (
496 (home_team.get("country") or {}).get("name")
497 or (away_team.get("country") or {}).get("name")
498 ) or None
499
500 ref_id = cache.ensure_referee(cur, ref_name, country)
501 if (ref_id, match_id) in cache.referee_match_pairs:
502 return
503 cur.execute(
504 "INSERT INTO Referee_match (referee_id, match_id, role) "
505 "VALUES (%s,%s,'main') ON CONFLICT DO NOTHING",
506 (ref_id, match_id)
507 )
508 cache.referee_match_pairs.add((ref_id, match_id))
509
510
511# collect coach appearances in memory for later stint building
512def _sb_collect_coach_appearances(match_id: int, sb_match: dict, cache: _SBCache,
513 season_id: int, match_date_str: str):
514 match_date = date.fromisoformat(match_date_str)
515
516 for side in ("home_team", "away_team"):
517 team_data = sb_match.get(side, {})
518 team_id = cache.teams.resolve(team_data.get(f"{side}_name", ""))
519 if not team_id:
520 continue
521 for mgr in team_data.get("managers", []):
522 name = (mgr.get("name", "") or "").strip()
523 if not name:
524 continue
525 nationality = (mgr.get("country", {}).get("name", "") or "").strip() or None
526
527 cache.coach_appearances.append((name, nationality, team_id, season_id, match_date))
528
529
530# collapse collected coach appearances into stint rows
531def _sb_finalize_coach_assignments(cur, cache: _SBCache):
532 if not cache.coach_appearances:
533 print(" No coach appearances to finalise.")
534 return
535
536 # resolve / create coach rows in bulk before building the stint map.
537 # appearances stored as (name, nationality, team_id, season_id, match_date)
538 resolved: list[tuple[int, int, int, date]] = []
539 for name, nationality, team_id, season_id, match_date in cache.coach_appearances:
540 coach_id = cache.ensure_coach(cur, name, nationality)
541 resolved.append((coach_id, team_id, season_id, match_date))
542
543 # group by (coach_id, team_id) — one bucket per potential stint sequence.
544 groups: dict[tuple[int, int], list[tuple[int, date]]] = {}
545 for cid, tid, sid, d in resolved:
546 groups.setdefault((cid, tid), []).append((sid, d))
547
548
549 if groups:
550 pair_cids = [cid for cid, _ in groups.keys()]
551 pair_tids = [tid for _, tid in groups.keys()]
552 cur.execute(
553 "DELETE FROM Coach_assignment "
554 "WHERE (coach_id, team_id) IN "
555 "(SELECT * FROM UNNEST(%s::int[], %s::int[]))",
556 (pair_cids, pair_tids)
557 )
558
559 rows: list[tuple] = []
560 today = date.today()
561 for (cid, tid), entries in groups.items():
562 # sort by date so the linear walk below detects gaps correctly.
563 entries.sort(key=lambda x: x[1])
564
565 # split entries into stints based on gap-from-previous.
566 # each stint is itself a list of (season_id, match_date) tuples.
567 stints: list[list[tuple[int, date]]] = []
568 for entry in entries:
569 _, d = entry
570 if not stints or (d - stints[-1][-1][1]).days > COACH_STINT_GAP_DAYS:
571 stints.append([entry])
572 else:
573 stints[-1].append(entry)
574
575 # one Coach_assignment row per stint with proper boundaries.
576 for i, stint in enumerate(stints):
577 start_season = stint[0][0]
578 start_d = stint[0][1]
579 last_d = stint[-1][1]
580
581 if i + 1 < len(stints):
582 # there's another stint: this one ends the day before it starts.
583 end_d = stints[i + 1][0][1] - timedelta(days=1)
584 elif (today - last_d).days <= COACH_STILL_ACTIVE_DAYS:
585 # most recent stint and last match is recent → still active.
586 end_d = None
587 else:
588 # closed historical stint; pad with a small buffer.
589 end_d = last_d + timedelta(days=COACH_STINT_END_BUFFER_DAYS)
590
591 rows.append((cid, start_season, tid, _random_coach_salary(), start_d, end_d))
592
593 if rows:
594 execute_values(
595 cur,
596 "INSERT INTO Coach_assignment "
597 "(coach_id, season_id, team_id, assignment_value, start_date, end_date) "
598 "VALUES %s",
599 rows, page_size=2000, # OPT-3
600 )
601 print(f" Coach stints inserted: {len(rows)} "
602 f"(from {len(cache.coach_appearances)} per-match appearances)")
603
604
605# insert lineup file players into lineup rows
606def _sb_import_lineups(cur, match_id: int, lineup_path: str, cache: _SBCache) -> int:
607 lineups = load_json(lineup_path)
608 inserted = 0
609 for team_entry in lineups:
610 team_id = cache.teams.resolve(team_entry.get("team_name", ""))
611 if not team_id:
612 continue
613 cache.ensure_players(
614 cur, [p.get("player_name", "").strip() for p in team_entry.get("lineup", [])]
615 )
616 lineup_rows = []
617 for p in team_entry.get("lineup", []):
618 player_name = p.get("player_name", "").strip()
619 if not player_name or player_name not in cache.player:
620 continue
621 positions = p.get("positions", [])
622 position = positions[0].get("position", "Unknown") if positions else "Unknown"
623 shirt_no = p.get("jersey_number")
624 is_starter = any(pos.get("start_reason") == "Starting XI" for pos in positions) \
625 if positions else True
626 if shirt_no is None:
627 continue
628 shirt_no = int(shirt_no)
629 if shirt_no < 1:
630 continue
631 lineup_rows.append(
632 (match_id, team_id, cache.player[player_name],
633 shirt_no, position[:50], is_starter)
634 )
635 if lineup_rows:
636 execute_values(
637 cur,
638 "INSERT INTO Lineup (match_id, team_id, player_id, shirt_number, position, is_starter) "
639 "VALUES %s ON CONFLICT DO NOTHING",
640 lineup_rows, page_size=2000, # OPT-3
641 )
642 inserted += len(lineup_rows)
643 return inserted
644
645
646# StatsBomb event-attribute keys we deliberately drop when flattening
647_SB_SKIP_ATTR_KEYS = {
648 "id", "index", "period", "timestamp", "minute", "second",
649 "type", "possession", "possession_team", "play_pattern",
650 "team", "player", "position", "location", "match_id",
651 "related_events", "off_camera", "out",
652}
653
654# don't save this attributes to keep event_attribute < 30M
655_SB_SKIP_FULL_KEYS = {"duration", "pass.angle"}
656
657
658# flatten nested even◘t payload into string key/value attributes
659def _sb_flatten_attrs(event: dict, prefix: str = "") -> dict[str, str]:
660 result = {}
661 for k, v in event.items():
662 full_key = f"{prefix}{k}" if prefix else k
663 if not prefix and k in _SB_SKIP_ATTR_KEYS:
664 continue
665 if full_key in _SB_SKIP_FULL_KEYS:
666 continue
667 if isinstance(v, dict):
668 if "name" in v:
669 result[full_key] = str(v["name"])
670 else:
671 result.update(_sb_flatten_attrs(v, prefix=full_key + "."))
672 elif isinstance(v, list):
673 result[full_key] = json.dumps(v)
674 elif v is not None:
675 result[full_key] = str(v)
676 return result
677
678
679# build lineup rows from starting xi events when lineup files are missing
680def _sb_lineup_from_starting_xi(ev: dict, match_id: int, cache: _SBCache) -> list[tuple]:
681 try:
682 team_id = cache.teams.resolve(ev.get("team", {}).get("name", ""))
683 if not team_id:
684 return []
685 sb_lineup = ev.get("tactics", {}).get("lineup", [])
686 if not sb_lineup:
687 return []
688 rows = []
689 for entry in sb_lineup:
690 try:
691 player_name = entry["player"]["name"].strip()
692 shirt_no = int(entry["jersey_number"])
693 position = entry.get("position", {}).get("name", "Unknown") or "Unknown"
694 except (KeyError, TypeError, ValueError):
695 continue
696 if not player_name or shirt_no < 1:
697 continue
698 player_id = cache.player.get(player_name)
699 if not player_id:
700 continue
701 rows.append((match_id, team_id, player_id, shirt_no, position[:50], True))
702 return rows
703 except Exception:
704 return []
705
706
707# insert non-starting-xi events and return counts for events and xi rows
708def _sb_import_events(cur, match_id: int, events_path: str, cache: _SBCache
709 ) -> tuple[int, int]:
710 events = load_json(events_path)
711
712 # pre-create any new players (regular event participants + Starting XI).
713 cache.ensure_players(cur, [ev.get("player", {}).get("name", "") for ev in events])
714 cache.ensure_players(cur, [
715 entry["player"]["name"].strip()
716 for ev in events
717 if ev.get("type", {}).get("name") == "Starting XI"
718 for entry in ev.get("tactics", {}).get("lineup", [])
719 if isinstance(entry.get("player"), dict) and entry["player"].get("name", "").strip()
720 ])
721 cache.ensure_event_types(cur, [ev.get("type", {}).get("name", "") for ev in events])
722
723 event_rows: list[tuple] = []
724 attr_sets: list[dict] = []
725 xi_lineup_rows: list[tuple] = []
726
727 for ev in events:
728 ev_type_name = ev.get("type", {}).get("name", "").strip()
729 if not ev_type_name:
730 continue
731 # starting XI events synthesise lineup rows rather than event rows.
732 if ev_type_name == "Starting XI":
733 xi_lineup_rows.extend(_sb_lineup_from_starting_xi(ev, match_id, cache))
734 continue
735 if ev_type_name not in cache.event_type:
736 continue
737 team_id = cache.teams.resolve(ev.get("team", {}).get("name", ""))
738 if not team_id:
739 continue
740 player_name = ev.get("player", {}).get("name", "")
741 if not player_name or player_name not in cache.player:
742 continue
743 event_rows.append((
744 match_id, team_id, cache.player[player_name], cache.event_type[ev_type_name],
745 max(0, min(130, int(ev.get("minute", 0)))),
746 max(0, min(59, int(ev.get("second", 0)))),
747 ))
748 attr_sets.append(_sb_flatten_attrs(ev))
749
750 n_xi = 0
751 if xi_lineup_rows:
752 execute_values(
753 cur,
754 "INSERT INTO Lineup (match_id, team_id, player_id, shirt_number, position, is_starter) "
755 "VALUES %s ON CONFLICT DO NOTHING",
756 xi_lineup_rows, page_size=2000, # OPT-3
757 )
758 n_xi = len(xi_lineup_rows)
759
760 if not event_rows:
761 return 0, n_xi
762
763 inserted_rows = execute_values(
764 cur,
765 "INSERT INTO Event (match_id, team_id, player_id, event_type_id, minute, second) "
766 "VALUES %s RETURNING event_id",
767 event_rows, fetch=True, page_size=2000, # OPT-3
768 )
769 inserted_ids = [r[0] for r in inserted_rows]
770
771 attr_rows = [
772 (eid, name[:50], val)
773 for eid, attrs in zip(inserted_ids, attr_sets)
774 for name, val in attrs.items()
775 ]
776 if attr_rows:
777 execute_values(
778 cur,
779 "INSERT INTO Event_attribute (event_id, attribute_name, attribute_value) "
780 "VALUES %s ON CONFLICT DO NOTHING",
781 attr_rows, page_size=2000, # OPT-3
782 )
783
784 return len(inserted_ids), n_xi
785
786
787# statsbomb - returns {match_id → (home, away) goals} (the football-data odds generator needs it)
788def run_statsbomb(conn) -> dict[int, tuple[int, int]]:
789 print("\n" + "═" * 70)
790 print("PIPELINE 1 — STATSBOMB")
791 print("═" * 70)
792
793 competitions = load_json(SB_COMPETITIONS)
794 print(f" Found {len(competitions)} competition/season entries")
795
796 cur = conn.cursor()
797
798 # TeamResolver also lives across pipelines: it'll be re-initialised in
799 # the football-data pipeline so it picks up everything we insert here
800 # (including any aliases we flush at the end).
801 team_resolver = TeamResolver(cur)
802 cache = _SBCache(cur, team_resolver)
803
804 print(f" Cache loaded: {len(cache.teams._by_name)} teams, "
805 f"{len(cache.player)} players, {len(cache.event_type)} event types\n")
806
807 total_linked = total_created = total_skipped = 0
808 total_players = total_xi = total_events = 0
809 matches_since_commit = 0
810
811 for comp in competitions:
812 comp_id = comp["competition_id"]
813 season_id = comp["season_id"]
814 matches_file = os.path.join(SB_MATCHES_DIR, str(comp_id), f"{season_id}.json")
815 if not os.path.exists(matches_file):
816 continue
817
818 # load matches
819 matches = load_json(matches_file)
820 print(f"\n -- {comp['competition_name']} / {comp['season_name']} "
821 f"({len(matches)} matches) --")
822
823 for i, sb_match in enumerate(matches, 1):
824 sb_mid = sb_match["match_id"]
825 home_name = sb_match["home_team"]["home_team_name"]
826 away_name = sb_match["away_team"]["away_team_name"]
827 print(f" [{i}/{len(matches)}] {home_name} vs {away_name} "
828 f"({sb_match.get('match_date', '?')})", end=" ... ", flush=True)
829
830 db_mid, was_created, match_season_id, date_str = _sb_find_or_create_match(cur, cache, sb_match, comp)
831 if db_mid is None:
832 print("skipped")
833 total_skipped += 1
834 continue
835 if was_created:
836 total_created += 1
837 print(f"\n [new match] id={db_mid}", end="")
838
839 _sb_enrich_stadium(cur, db_mid, sb_match)
840 # pass the competition's country so newly created Referee rows get
841 # a non-NULL country; comp is already in scope from the outer loop.
842 _sb_enrich_referee(cur, db_mid, sb_match, cache,
843 comp.get("country_name") or None)
844
845 _sb_collect_coach_appearances(db_mid, sb_match, cache,
846 match_season_id, date_str)
847
848 n_players = 0
849 lineup_path = os.path.join(SB_LINEUPS_DIR, f"{sb_mid}.json")
850 if os.path.exists(lineup_path):
851 n_players = _sb_import_lineups(cur, db_mid, lineup_path, cache)
852
853 n_events = n_xi = 0
854 events_path = os.path.join(SB_EVENTS_DIR, f"{sb_mid}.json")
855 if os.path.exists(events_path):
856 n_events, n_xi = _sb_import_events(cur, db_mid, events_path, cache)
857
858 cache.teams.flush_aliases(cur)
859
860 total_linked += 1
861 total_players += n_players
862 total_xi += n_xi
863 total_events += n_events
864 matches_since_commit += 1
865 print(f"done ({n_players} lineup, {n_xi} XI, {n_events} events)")
866
867 if matches_since_commit >= SB_COMMIT_EVERY:
868 conn.commit()
869 matches_since_commit = 0
870
871 # finalize coach assignments
872 print("\n Finalising coach assignments (collapsing match appearances "
873 "into stints)...")
874 _sb_finalize_coach_assignments(cur, cache)
875
876 conn.commit()
877 cur.close()
878
879 print("\n -- StatsBomb Summary --")
880 print(f" Matches linked (pre-existing) : {total_linked - total_created}")
881 print(f" Matches created from SB data : {total_created}")
882 print(f" Matches skipped : {total_skipped}")
883 print(f" Lineup rows (lineup files) : {total_players}")
884 print(f" Lineup rows (Starting XI) : {total_xi}")
885 print(f" Events inserted : {total_events}")
886 print(f" Final scores cached : {len(cache.match_scores)}")
887
888 return cache.match_scores
889
890
891# football-data
892
893
894# strip BOMs, NBSPs and weirdness and lowercase the header
895def _fd_normalize_header(name: str) -> str:
896 name = name.replace("", "").replace("\xa0", " ")
897 return re.sub(r"\s+", "", name.strip().strip('"')).lower()
898
899
900def _fd_parse_date(value):
901 if value is None or (isinstance(value, float) and pd.isna(value)):
902 return None
903 v = str(value).strip()
904 if not v:
905 return None
906 for fmt in ("%d/%m/%Y", "%d/%m/%y"):
907 try:
908 return pd.to_datetime(v, format=fmt).date()
909 except ValueError:
910 continue
911 return None
912
913
914# extracts bookmaker names that have a full triple
915def _fd_bookmaker_prefixes(columns: list[str]) -> list[str]:
916 col_set = set(columns)
917 prefixes = []
918 for col in columns:
919 if re.match(r"^[a-z0-9]+h$", col) and len(col) > 1:
920 prefix = col[:-1]
921 if prefix and (prefix + "d") in col_set and (prefix + "a") in col_set:
922 prefixes.append(prefix)
923 return sorted(set(prefixes))
924
925
926def _fd_read_csv(path: str) -> pd.DataFrame:
927 raw = None
928 used_encoding = None
929 for enc in ("utf-8-sig", "cp1252"):
930 try:
931 with open(path, encoding=enc, newline="") as f:
932 raw = f.read()
933 used_encoding = enc
934 break
935 except UnicodeDecodeError:
936 continue
937 if raw is None:
938 raise ValueError(f"Could not decode {path} with UTF-8 or WIN1252")
939 print(f" encoding: {used_encoding}")
940 raw = raw.replace("\r", "").replace("\xa0", " ")
941 lines = [l for l in raw.split("\n") if l.strip()]
942 if not lines:
943 return pd.DataFrame()
944 header_fields = next(csv.reader([lines[0]]))
945 num_cols = len(header_fields)
946 rows = []
947 for line in lines[1:]:
948 fields = next(csv.reader([line]))
949 # ad short rows with None / truncate long rows so the resulting
950 if len(fields) < num_cols:
951 fields += [None] * (num_cols - len(fields))
952 elif len(fields) > num_cols:
953 fields = fields[:num_cols]
954 rows.append(fields)
955 return pd.DataFrame(rows, columns=header_fields)
956
957
958def _fd_clean_str(value) -> str | None:
959 if value is None:
960 return None
961 s = str(value).strip()
962 return None if s in ("", "nan", "None") else s
963
964
965# returns dicts for each row
966def _fd_process_file(path: str) -> list[dict]:
967 df = _fd_read_csv(path)
968 if df.empty:
969 print(" Empty file, skipping.")
970 return []
971
972 df.columns = [_fd_normalize_header(c) for c in df.columns]
973 required = {"date", "hometeam", "awayteam"}
974 if required - set(df.columns):
975 print(f" Missing columns {required - set(df.columns)}, skipping.")
976 return []
977
978 bookmaker_prefixes = _fd_bookmaker_prefixes(list(df.columns))
979 print(f" bookmaker triples found: {len(bookmaker_prefixes)}")
980
981 out: list[dict] = []
982 for _, row in df.iterrows():
983 match_date = _fd_parse_date(row.get("date"))
984 home_team = _fd_clean_str(row.get("hometeam"))
985 away_team = _fd_clean_str(row.get("awayteam"))
986 referee = _fd_clean_str(row.get("referee"))
987
988 if not match_date or not home_team or not away_team or home_team == away_team:
989 continue
990
991 # pull every bookmaker that has a complete H/D/A triple for this row
992 odds = []
993 for prefix in bookmaker_prefixes:
994 try:
995 home_win = float(row.get(prefix + "h") or 0)
996 draw = float(row.get(prefix + "d") or 0)
997 away_win = float(row.get(prefix + "a") or 0)
998 except (ValueError, TypeError):
999 continue
1000 if home_win > 0 and draw > 0 and away_win > 0:
1001 odds.append({
1002 "bookmaker": prefix.upper(),
1003 "home_win": home_win,
1004 "draw": draw,
1005 "away_win": away_win,
1006 })
1007
1008 out.append({
1009 "home": home_team,
1010 "away": away_team,
1011 "date": match_date,
1012 "referee": referee,
1013 "odds": odds,
1014 })
1015
1016 return out
1017
1018
1019# generate one full pre-match to full-time series of synthetic odds
1020def _fd_generate_timeseries_odds(match_id: int,
1021 match_date: date,
1022 baseline: tuple[float, float, float],
1023 score: tuple[int, int]) -> list[tuple]:
1024 # build synthetic odds snapshots at 0,10,...,90 minutes from baseline and final score
1025 p_home, p_draw, p_away = baseline
1026 home_goals, away_goals = score
1027 rows = []
1028
1029 base_ts = datetime.combine(match_date, time(0, 0, 0))
1030
1031 for minute in range(0, 91, 10):
1032 t_w = minute / 90 # 0 at kick-off, 1 at full time
1033 h_so_far = math.floor(home_goals * t_w)
1034 a_so_far = math.floor(away_goals * t_w)
1035 diff = h_so_far - a_so_far # +ve when home leads
1036
1037 # Home win prob raises when home is leading; effect grows with time.
1038 adj_h = max(0.02, min(0.97, p_home + diff * 0.08 * t_w))
1039 # Draw prob ↓ as |goal_diff| grows and as time runs out.
1040 adj_d = max(0.02, min(0.97, p_draw - abs(diff) * 0.06 * t_w))
1041 # Away win prob mirrors home in reverse.
1042 adj_a = max(0.02, min(0.97, p_away + (-diff) * 0.08 * t_w))
1043
1044 total = adj_h + adj_d + adj_a
1045 # Renormalise (so the three probs again sum to 1) then convert to decimal odds.
1046 home_odds = max(1.01, round(1.0 / (adj_h / total) * 0.95, 2))
1047 draw_odds = max(1.01, round(1.0 / (adj_d / total) * 0.95, 2))
1048 away_odds = max(1.01, round(1.0 / (adj_a / total) * 0.95, 2))
1049
1050 rt = base_ts + timedelta(minutes=minute)
1051 rows.append((match_id, "Generated", home_odds, draw_odds, away_odds, rt))
1052
1053 return rows
1054
1055
1056# import football-data odds and referees then generate synthetic odds series
1057def run_football_data_odds(conn, match_scores: dict[int, tuple[int, int]]):
1058 print("\n" + "═" * 70)
1059 print("PIPELINE 2 — FOOTBALL-DATA (odds + referees only)")
1060 print("═" * 70)
1061
1062 csv_files = sorted(f for f in os.listdir(FD_FOLDER) if f.lower().endswith(".csv"))
1063 if not csv_files:
1064 print(f" No CSV files found in {FD_FOLDER}, skipping.")
1065 return
1066
1067 cur = conn.cursor()
1068
1069 team_resolver = TeamResolver(cur, manual_aliases=FD_TEAM_ALIASES)
1070
1071 # build the (home_id, away_id, match_date) to match_id map once.
1072 cur.execute('SELECT match_id, home_team_id, away_team_id, match_date FROM "match"')
1073 match_lookup: dict[tuple[int, int, date], int] = {
1074 (h, a, d): m for m, h, a, d in cur.fetchall()
1075 }
1076 print(f" Loaded {len(match_lookup)} existing matches for lookup.")
1077
1078 # referee caches so we don't query the DB once per row.
1079 cur.execute("SELECT referee_id, name FROM Referee")
1080 ref_cache: dict[str, int] = {n: i for i, n in cur.fetchall()}
1081 cur.execute("SELECT referee_id, match_id FROM Referee_match")
1082 ref_match_pairs: set[tuple[int, int]] = {(r, m) for r, m in cur.fetchall()}
1083
1084 # for referee's country we choose home team's country
1085 cur.execute("SELECT team_id, country FROM Team")
1086 team_country_map: dict[int, str | None] = {tid: ctry for tid, ctry in cur.fetchall()}
1087
1088 real_odds_rows: list[tuple] = []
1089 referee_rows: list[tuple] = []
1090 matched_rows = unmatched_rows = 0
1091 matched_match_ids: set[int] = set()
1092
1093 print(f" Processing {len(csv_files)} CSV file(s)...")
1094 for filename in csv_files:
1095 path = os.path.join(FD_FOLDER, filename)
1096 print(f" Reading: {filename}")
1097 for r in _fd_process_file(path):
1098 home_id = team_resolver.resolve(r["home"])
1099 away_id = team_resolver.resolve(r["away"])
1100 if not home_id or not away_id:
1101 unmatched_rows += 1
1102 continue
1103 mid = match_lookup.get((home_id, away_id, r["date"]))
1104 if mid is None:
1105 unmatched_rows += 1
1106 continue
1107
1108 matched_rows += 1
1109 matched_match_ids.add(mid)
1110
1111 # one Match_odds row per bookmaker.
1112 kickoff_ts = datetime.combine(r["date"], time(12, 0, 0))
1113 for o in r["odds"]:
1114 real_odds_rows.append((
1115 mid, o["bookmaker"], o["home_win"], o["draw"], o["away_win"],
1116 kickoff_ts,
1117 ))
1118
1119 # referee linking: ensure the row exists in Referee then attach
1120 # it to this match if not already attached.
1121 if r["referee"]:
1122 ref_id = ref_cache.get(r["referee"])
1123 if ref_id is None:
1124 # use the home team's country as the referee's country
1125 # (fall back to the away team if home has no country set).
1126 ref_country = (team_country_map.get(home_id)
1127 or team_country_map.get(away_id))
1128 cur.execute(
1129 "INSERT INTO Referee (name, country, date_of_birth) "
1130 "VALUES (%s,%s,%s) RETURNING referee_id",
1131 (r["referee"], ref_country, _random_referee_dob())
1132 )
1133 ref_id = cur.fetchone()[0]
1134 ref_cache[r["referee"]] = ref_id
1135 if (ref_id, mid) not in ref_match_pairs:
1136 referee_rows.append((ref_id, mid, "main"))
1137 ref_match_pairs.add((ref_id, mid))
1138
1139 print(f"\n Matched FD rows to SB matches : {matched_rows}")
1140 print(f" Unmatched (no team or no SB match) : {unmatched_rows}")
1141
1142 cur.execute("DELETE FROM Match_odds WHERE bookmaker = 'Generated'")
1143
1144 # real bookmaker rows
1145 if real_odds_rows:
1146 execute_values(
1147 cur,
1148 """INSERT INTO Match_odds (match_id, bookmaker, home_win, draw, away_win, recorded_time)
1149 VALUES %s
1150 ON CONFLICT (match_id, bookmaker, recorded_time) DO UPDATE
1151 SET home_win=EXCLUDED.home_win,
1152 draw=EXCLUDED.draw,
1153 away_win=EXCLUDED.away_win""",
1154 real_odds_rows, page_size=2000,
1155 )
1156
1157 if referee_rows:
1158 execute_values(
1159 cur,
1160 "INSERT INTO Referee_match (referee_id, match_id, role) VALUES %s "
1161 "ON CONFLICT DO NOTHING",
1162 referee_rows, page_size=2000,
1163 )
1164
1165 conn.commit()
1166
1167 # generated time-series
1168 # for every match that now has at least one real bookmaker row AND a
1169 # cached final score from StatsBomb, derive the kick-off implied
1170 # probability (averaged across bookmakers) and synthesise the 0..90 min
1171 # series.
1172 if not matched_match_ids:
1173 print(" No matched matches → no time-series odds to generate.")
1174 cur.close()
1175 return
1176
1177 # pull per-match baseline implied probabilities + match_date in one query.
1178 cur.execute(
1179 """
1180 SELECT mo.match_id,
1181 AVG(1.0 / mo.home_win) AS p_home,
1182 AVG(1.0 / mo.draw) AS p_draw,
1183 AVG(1.0 / mo.away_win) AS p_away,
1184 m.match_date
1185 FROM Match_odds mo
1186 JOIN "match" m ON m.match_id = mo.match_id
1187 WHERE mo.bookmaker <> 'Generated'
1188 AND mo.match_id = ANY(%s)
1189 GROUP BY mo.match_id, m.match_date
1190 """,
1191 (list(matched_match_ids),)
1192 )
1193 baselines = cur.fetchall()
1194
1195 generated: list[tuple] = []
1196 skipped_no_score = 0
1197 for mid, p_h, p_d, p_a, mdate in baselines:
1198 score = match_scores.get(mid)
1199 if score is None:
1200 skipped_no_score += 1
1201 continue
1202 generated.extend(_fd_generate_timeseries_odds(
1203 mid, mdate, (float(p_h), float(p_d), float(p_a)), score
1204 ))
1205
1206 if generated:
1207 execute_values(
1208 cur,
1209 """INSERT INTO Match_odds (match_id, bookmaker, home_win, draw, away_win, recorded_time)
1210 VALUES %s
1211 ON CONFLICT (match_id, bookmaker, recorded_time) DO NOTHING""",
1212 generated, page_size=2000, # OPT-3
1213 )
1214
1215 conn.commit()
1216 cur.close()
1217
1218 print(f" Real bookmaker rows inserted : {len(real_odds_rows)}")
1219 print(f" Referee links inserted : {len(referee_rows)}")
1220 print(f" Generated time-series rows : {len(generated)} "
1221 f"({len(generated) // 10 if generated else 0} matches × 10 snapshots)")
1222 if skipped_no_score:
1223 print(f" Skipped (no SB final score) : {skipped_no_score}")
1224
1225
1226# transfermarkt
1227
1228# resolves transfermarkt player name to DB player_id
1229# order: exact name -> word match (split on space/hyphen, skip ≤ 2 chars) -> fuzzy
1230class _PlayerResolver:
1231 def __init__(self, cur):
1232 self._by_name: dict[str, int] = {}
1233 self._cache: dict[str, int | None] = {}
1234 self._fuzzy_list: list[tuple[str, int]] = []
1235
1236 cur.execute("SELECT player_id, name FROM Player")
1237 for pid, name in cur.fetchall():
1238 self._by_name[name.lower()] = pid
1239 self._fuzzy_list.append((name, pid))
1240
1241 print(f" PlayerResolver: {len(self._by_name)} players loaded from DB")
1242
1243 def resolve(self, tm_name: str) -> int | None:
1244 if not tm_name:
1245 return None
1246 if tm_name in self._cache:
1247 return self._cache[tm_name]
1248 key = tm_name.lower().strip()
1249
1250 pid = self._by_name.get(key)
1251 if pid:
1252 self._cache[tm_name] = pid
1253 return pid
1254
1255 for word in re.split(r"[\s\-]+", tm_name):
1256 if len(word) <= 2:
1257 continue
1258 pid = self._by_name.get(word.lower())
1259 if pid:
1260 print(f" [word] '{tm_name}' → '{word}' (id={pid})")
1261 self._cache[tm_name] = pid
1262 return pid
1263
1264 if self._fuzzy_list:
1265 result = fuzz_process.extractOne(
1266 tm_name,
1267 [n for n, _ in self._fuzzy_list],
1268 scorer=fuzz.token_sort_ratio
1269 )
1270 if result and result[1] >= PLAYER_FUZZY_THRESHOLD:
1271 best_name, best_score = result[0], result[1]
1272 pid = next(p for n, p in self._fuzzy_list if n == best_name)
1273 print(f" [fuzzy] '{tm_name}' → '{best_name}' (score={best_score})")
1274 self._cache[tm_name] = pid
1275 return pid
1276
1277 self._cache[tm_name] = None
1278 return None
1279
1280
1281# map messy transfer_fee text into canonical transfer type labels◘
1282
1283
1284def _tm_classify_transfer_type(row) -> str:
1285 fee = row.get("transfer_fee", None)
1286 fee_str = str(fee).strip().lower() if pd.notna(fee) else ""
1287 if fee_str in ("", "nan", "none", "-", "?"):
1288 return "Unknown"
1289 if fee_str in ("0", "0.0"):
1290 return "Free Transfer"
1291 if "loan" in fee_str and "end" in fee_str:
1292 return "End of Loan"
1293 if "loan" in fee_str:
1294 return "Loan"
1295 return "Permanent"
1296
1297
1298def _tm_get_or_create_transfer_type(cur, cache: dict[str, int], type_name: str) -> int:
1299 if type_name in cache:
1300 return cache[type_name]
1301 cur.execute("SELECT transfer_type_id FROM Transfer_type WHERE type=%s LIMIT 1", (type_name,))
1302 row = cur.fetchone()
1303 if row:
1304 cache[type_name] = row[0]
1305 else:
1306 cur.execute(
1307 "INSERT INTO Transfer_type (type) VALUES (%s) RETURNING transfer_type_id",
1308 (type_name,)
1309 )
1310 cache[type_name] = cur.fetchone()[0]
1311 return cache[type_name]
1312
1313
1314def _tm_parse_fee(raw) -> float | None:
1315 if raw is None or (isinstance(raw, float) and pd.isna(raw)):
1316 return None
1317 try:
1318 return max(float(raw), 0.0)
1319 except (ValueError, TypeError):
1320 return None
1321
1322
1323_season_resolve_cache: dict[str, int | None] = {}
1324
1325
1326def _tm_resolve_season(cur, season_str: str) -> int | None:
1327 if season_str in _season_resolve_cache:
1328 return _season_resolve_cache[season_str]
1329 candidates = [season_str]
1330 if re.match(r"^\d{4}$", season_str):
1331 y = int(season_str)
1332 candidates += [f"{y - 1}/{str(y)[2:]}", f"{y}/{y + 1}", f"{str(y)[2:]}/{str(y + 1)[2:]}"]
1333 m = re.match(r"^(\d{4})/(\d{4})$", season_str)
1334 if m:
1335 y1, y2 = m.group(1), m.group(2)
1336 candidates += [f"{y1}/{y2[2:]}", f"{y1[2:]}/{y2[2:]}"]
1337 for c in candidates:
1338 cur.execute("SELECT season_id FROM Season WHERE season_name=%s LIMIT 1", (c,))
1339 row = cur.fetchone()
1340 if row:
1341 _season_resolve_cache[season_str] = row[0]
1342 return row[0]
1343 _season_resolve_cache[season_str] = None
1344 return None
1345
1346
1347def _tm_build_player_map(players_df: pd.DataFrame, resolver: _PlayerResolver
1348 ) -> dict[int, int]:
1349 mapping: dict[int, int] = {}
1350 matched = 0
1351 for _, row in players_df.iterrows():
1352 tm_id = int(row["player_id"])
1353 tm_name = str(row.get("name", "")).strip()
1354 db_id = resolver.resolve(tm_name)
1355 if db_id:
1356 mapping[tm_id] = db_id
1357 matched += 1
1358 print(f" Player mapping: {matched}/{len(players_df)} TM players matched")
1359 return mapping
1360
1361
1362def _tm_import_valuations(cur, df: pd.DataFrame, player_map: dict[int, int]) -> int:
1363 rows = []
1364 skipped_player = skipped_season = skipped_fee = 0
1365 for _, row in df.iterrows():
1366 db_pid = player_map.get(int(row["player_id"]))
1367 if not db_pid:
1368 skipped_player += 1
1369 continue
1370 raw_date = row.get("date")
1371 if pd.isna(raw_date) or not raw_date:
1372 skipped_season += 1
1373 continue
1374 val_date = pd.to_datetime(raw_date, errors="coerce")
1375 if pd.isna(val_date):
1376 skipped_season += 1
1377 continue
1378 mv = _tm_parse_fee(row.get("market_value_in_eur"))
1379 if mv is None:
1380 skipped_fee += 1
1381 continue
1382 year = val_date.year
1383 season_id = _tm_resolve_season(cur, f"{year - 1}/{str(year)[2:]}") or \
1384 _tm_resolve_season(cur, str(year))
1385 if not season_id:
1386 skipped_season += 1
1387 continue
1388 rows.append((db_pid, season_id, mv, val_date.date()))
1389
1390 if rows:
1391 execute_values(
1392 cur,
1393 "INSERT INTO Player_valuation (player_id, season_id, market_value, valuation_date) "
1394 "VALUES %s ON CONFLICT DO NOTHING",
1395 rows, page_size=2000, # OPT-3
1396 )
1397 print(f" Valuations: {len(rows)} inserted | "
1398 f"{skipped_player} no player | {skipped_season} no season | {skipped_fee} no value")
1399 return len(rows)
1400
1401
1402def _tm_import_transfers(cur, df: pd.DataFrame, player_map: dict[int, int],
1403 team_resolver: TeamResolver) -> int:
1404 type_cache: dict[str, int] = {}
1405 rows = []
1406 skipped_player = skipped_team = skipped_date = skipped_same = 0
1407
1408 for _, row in df.iterrows():
1409 db_pid = player_map.get(int(row["player_id"]))
1410 if not db_pid:
1411 skipped_player += 1
1412 continue
1413 raw_date = row.get("transfer_date")
1414 if pd.isna(raw_date) or not raw_date:
1415 skipped_date += 1
1416 continue
1417 transfer_date = pd.to_datetime(raw_date, errors="coerce")
1418 if pd.isna(transfer_date):
1419 skipped_date += 1
1420 continue
1421
1422 from_id = team_resolver.resolve(str(row.get("from_club_name", "") or "").strip())
1423 to_id = team_resolver.resolve(str(row.get("to_club_name", "") or "").strip())
1424 if not from_id or not to_id:
1425 skipped_team += 1
1426 continue
1427 if from_id == to_id:
1428 skipped_same += 1
1429 continue
1430
1431 fee = _tm_parse_fee(row.get("transfer_fee")) or 0.0
1432 type_id = _tm_get_or_create_transfer_type(
1433 cur, type_cache, _tm_classify_transfer_type(row)
1434 )
1435 rows.append((db_pid, from_id, to_id, transfer_date.date(), fee, type_id))
1436
1437 if rows:
1438 execute_values(
1439 cur,
1440 "INSERT INTO Transfer "
1441 "(player_id, from_team_id, to_team_id, transfer_date, fee, transfer_type_id) "
1442 "VALUES %s ON CONFLICT DO NOTHING",
1443 rows, page_size=2000, # OPT-3
1444 )
1445 print(f" Transfers: {len(rows)} inserted | "
1446 f"{skipped_player} no player | {skipped_team} no team | "
1447 f"{skipped_date} bad date | {skipped_same} same from/to")
1448 return len(rows)
1449
1450
1451def run_transfermarkt(conn):
1452 print("\n" + "═" * 70)
1453 print("PIPELINE 3 — TRANSFERMARKT")
1454 print("═" * 70)
1455
1456 print(" Loading CSVs...")
1457 players_df = pd.read_csv(TM_PLAYERS, low_memory=False)
1458 valuations_df = pd.read_csv(TM_VALUATIONS, low_memory=False)
1459 transfers_df = pd.read_csv(TM_TRANSFERS, low_memory=False)
1460 print(f" players: {len(players_df):,} | "
1461 f"valuations: {len(valuations_df):,} | transfers: {len(transfers_df):,}")
1462
1463 cur = conn.cursor()
1464 resolver = _PlayerResolver(cur)
1465 team_resolver = TeamResolver(cur)
1466 player_map = _tm_build_player_map(players_df, resolver)
1467
1468 print(" Importing valuations...")
1469 _tm_import_valuations(cur, valuations_df, player_map)
1470 conn.commit()
1471
1472 print(" Importing transfers...")
1473 _tm_import_transfers(cur, transfers_df, player_map, team_resolver)
1474 conn.commit()
1475
1476 cur.close()
1477
1478
1479def main():
1480 print("PitchMap Master Importer")
1481 print("Pipelines: statsbomb → football-data (odds) → transfermarkt\n")
1482
1483 conn = psycopg2.connect(**DB_CONFIG)
1484 # disable synchronous_commit for the session so Postgres doesn't fsync after every commit
1485 cur = conn.cursor()
1486 cur.execute("SET synchronous_commit = off")
1487 conn.commit()
1488 cur.close()
1489 try:
1490 match_scores = run_statsbomb(conn)
1491
1492 run_football_data_odds(conn, match_scores)
1493
1494 run_transfermarkt(conn)
1495 finally:
1496 conn.close()
1497
1498 print("\n" + "═" * 70)
1499 print("All pipelines complete.")
1500 print("═" * 70)
1501
1502
1503if __name__ == "__main__":
1504 main()