Stěhování národů

This commit is contained in:
2026-03-25 19:47:10 +01:00
parent e3b7879eb2
commit 5de0d57612
28 changed files with 152 additions and 122 deletions

2
beaky-backend/README.md Normal file
View File

@@ -0,0 +1,2 @@
# Beaky

View File

@@ -0,0 +1,70 @@
path: data/odkazy.xlsx
screenshotter:
target_path: data/screenshots/
resolver:
api_key: 733f6882605be2de8980bbd074091ee4
league_map:
# European cups
liga mistrů: 2
champions league: 2
evropská liga: 3
europa league: 3
konferenční liga: 848
conference league: 848
# Top flights
1. anglie: 39
1. belgie: 144
1. česko: 345
1. dánsko: 119
1. francie: 61
1. itálie: 135
1. itálie - ženy: 139
1. německo: 78
1. nizozemsko: 88
1. polsko: 106
1. portugalsko: 94
1. rakousko: 218
1. rumunsko: 283
1. skotsko: 179
1. slovensko: 332
1. španělsko: 140
1. wales: 110
# Second divisions
2. anglie: 40
2. česko: 346
2. francie: 62
2. itálie: 136
2. německo: 79
2. nizozemsko: 89
2. rakousko: 219
2. slovensko: 506
2. španělsko: 141
# Third divisions
3. francie: 63
3. česko msfl: 349
3. česko čfl: 348
# Fourth divisions
4. česko - sk. a: 350
4. česko - sk. b: 351
4. česko - sk. c: 352
4. česko - sk. d: 353
4. česko - sk. e: 354
4. česko - sk. f: 686
# Women
1. česko - ženy: 669
fortuna=liga ženy: 669
# Domestic cups
anglie - fa cup: 45
anglie - efl cup: 48
česko - pohár: 347
img_classifier:
target_path: data/screenshots/
log_level: INFO # set to DEBUG to see raw classifier and resolver output
api:
host: 0.0.0.0
port: 8000

View File

@@ -0,0 +1,51 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "beaky"
version = "0.1.0"
description = "Scan tickets and decide"
requires-python = ">=3.12"
dependencies = [
"pillow==12.1.1",
"pydantic==2.12.5",
"pandas==3.0.1",
"openpyxl>=3.1.0",
"PyYaml==6.0.3",
"playwright==1.58.0",
"requests>=2.32.0",
"diskcache>=5.6",
"pytesseract==0.3.13",
"fastapi>=0.115",
"uvicorn[standard]>=0.34",
]
[project.optional-dependencies]
dev = [
"pytest>=9.0.2",
"ruff==0.15.5",
"pytz",
"types-requests",
"types-PyYAML",
# "playwright==1.58.0" # only dev because it cant be installed in a pipeline, just locally
]
[project.scripts]
beaky = "beaky.cli:main"
beaky-api = "beaky.api.main:main"
[tool.ruff]
line-length = 120
lint.select = ["E", "F", "I"]
[tool.mypy]
python_version = "3.12"
strict = true
ignore_missing_imports = true
plugins = ["pydantic.mypy"]
[tool.pytest.ini_options]
testpaths = ["test"]

View File

View File

@@ -0,0 +1,38 @@
from __future__ import annotations
import logging
_logger = logging.getLogger("beaky")
def log(text: str) -> None:
"""Emit a (possibly ANSI-colored) message at DEBUG level."""
_logger.debug("%s", text)
def bold(text: str) -> str:
return f"\033[1m{text}\033[0m"
def dim(text: str) -> str:
return f"\033[2m{text}\033[0m"
def green(text: str) -> str:
return f"\033[32m{text}\033[0m"
def red(text: str) -> str:
return f"\033[31m{text}\033[0m"
def yellow(text: str) -> str:
return f"\033[33m{text}\033[0m"
def cyan(text: str) -> str:
return f"\033[36m{text}\033[0m"
def gray(text: str) -> str:
return f"\033[90m{text}\033[0m"

View File

@@ -0,0 +1,271 @@
import argparse
import re as _re
import shutil
from datetime import datetime
from beaky import _ansi
from beaky.config import load_config
from beaky.datamodels.ticket import Bet, Ticket
from beaky.image_classifier.classifier import img_classify
from beaky.link_classifier.classifier import LinkClassifier
from beaky.resolvers.resolver import ResolvedTicket, TicketResolver, TicketVerdict
from beaky.scanner.scanner import Links
from beaky.screenshotter.screenshotter import Screenshotter
def _verdict_str(verdict: TicketVerdict) -> str:
text = f"VERDICT: {verdict.value.upper()}"
if verdict == TicketVerdict.TRUTHFUL:
return _ansi.green(text)
if verdict == TicketVerdict.NOT_TRUTHFUL:
return _ansi.red(text)
if verdict == TicketVerdict.POSSIBLY_TRUTHFUL:
return _ansi.yellow(text)
return _ansi.gray(text)
_FC = 14 # field column visual width
_VC = 24 # value column visual width (dual)
_SC = 38 # value column visual width (single classifier)
_BET_W = 1 + (_FC + 2) + 1 + (_VC + 2) + 1 + (_VC + 2) + 1 # dual table width
_BET_WS = 1 + (_FC + 2) + 1 + (_SC + 2) + 1 # single table width
_GAP = " "
_FIELD_LABELS: dict[str, str] = {"team1Name": "team1", "team2Name": "team2"}
_FIELD_ORDER = ["type", "team1Name", "team2Name", "date", "league"]
_SKIP_FIELDS = {"ticketType"}
_BLANK_ROW = f"{' ' * (_FC + 2)}{' ' * (_VC + 2)}{' ' * (_VC + 2)}"
_BLANK_ROWS = f"{' ' * (_FC + 2)}{' ' * (_SC + 2)}"
def _vlen(text: str) -> int:
return len(_re.sub(r"\033\[[^m]*m", "", text))
def _vpad(text: str, width: int) -> str:
return text + " " * max(0, width - _vlen(text))
def _bet_fields(bet: Bet) -> dict[str, str]:
fields: dict[str, str] = {"type": type(bet).__name__}
for k, v in vars(bet).items():
if k in _SKIP_FIELDS:
continue
val = v.strftime("%Y-%m-%d %H:%M") if k == "date" and isinstance(v, datetime) else str(v)
fields[k] = val.replace("\n", " ").replace("\r", "")
return fields
# ── dual-column table (compare) ──────────────────────────────────────────────
def _tbl_row(field: str, lval: str, ival: str) -> str:
return f"{_vpad(_FIELD_LABELS.get(field, field), _FC)}{_vpad(lval, _VC)}{_vpad(ival, _VC)}"
def _tbl_sep(left: str, mid: str, right: str) -> str:
return f"{left}{'' * (_FC + 2)}{mid}{'' * (_VC + 2)}{mid}{'' * (_VC + 2)}{right}"
def _bet_to_lines(idx: int, link_bet: Bet | None, img_bet: Bet | None) -> list[str]:
link_fields = _bet_fields(link_bet) if link_bet is not None else {}
img_fields = _bet_fields(img_bet) if img_bet is not None else {}
all_keys = link_fields.keys() | img_fields.keys()
keys = [k for k in _FIELD_ORDER if k in all_keys] + [k for k in all_keys if k not in _FIELD_ORDER]
data_rows = []
for key in keys:
lval_raw = link_fields.get(key, "")
ival_raw = img_fields.get(key, "")
match = lval_raw == ival_raw
both = bool(lval_raw) and bool(ival_raw)
lval_raw = lval_raw[:_VC - 1] + "" if len(lval_raw) > _VC else lval_raw
ival_raw = ival_raw[:_VC - 1] + "" if len(ival_raw) > _VC else ival_raw
lval = _ansi.gray("") if not lval_raw else (lval_raw if (match or not both) else _ansi.yellow(lval_raw))
ival = _ansi.gray("") if not ival_raw else (ival_raw if (match or not both) else _ansi.yellow(ival_raw))
data_rows.append(_tbl_row(key, lval, ival))
header = _vpad(_ansi.bold(_ansi.cyan(f" Bet {idx} ")), _BET_W)
return [header, _tbl_sep("", "", ""), _tbl_row("", _ansi.bold("link"), _ansi.bold("image")),
_tbl_sep("", "", ""), *data_rows, _tbl_sep("", "", "")]
# ── single-column table (one classifier) ─────────────────────────────────────
def _tbl_row_s(field: str, val: str) -> str:
return f"{_vpad(_FIELD_LABELS.get(field, field), _FC)}{_vpad(val, _SC)}"
def _tbl_sep_s(left: str, mid: str, right: str) -> str:
return f"{left}{'' * (_FC + 2)}{mid}{'' * (_SC + 2)}{right}"
def _bet_to_lines_single(idx: int, bet: Bet, col_label: str) -> list[str]:
fields = _bet_fields(bet)
keys = [k for k in _FIELD_ORDER if k in fields] + [k for k in fields if k not in _FIELD_ORDER]
data_rows = [
_tbl_row_s(k, (v[:_SC - 1] + "" if len(v) > _SC else v))
for k, v in ((k, fields[k]) for k in keys)
]
header = _vpad(_ansi.bold(_ansi.cyan(f" Bet {idx} ")), _BET_WS)
return [header, _tbl_sep_s("", "", ""), _tbl_row_s("", _ansi.bold(col_label)),
_tbl_sep_s("", "", ""), *data_rows, _tbl_sep_s("", "", "")]
# ── shared grid printer ───────────────────────────────────────────────────────
def _pad_to(lines: list[str], target: int, blank: str) -> list[str]:
result = list(lines)
while len(result) < target:
result.insert(-1, blank)
return result
def _print_bet_grid(ticket_header: str, all_lines: list[list[str]], blank: str, bet_w: int) -> None:
term_w = shutil.get_terminal_size((120, 24)).columns
n_cols = max(1, term_w // (bet_w + len(_GAP)))
row_w = min(term_w, n_cols * (bet_w + len(_GAP)) - len(_GAP) + 2)
print(f"\n{'' * row_w}")
print(_ansi.bold(f" {ticket_header}"))
print(f"{'' * row_w}")
for start in range(0, len(all_lines), n_cols):
chunk = all_lines[start:start + n_cols]
max_h = max(len(b) for b in chunk)
padded = [_pad_to(b, max_h, blank) for b in chunk]
print()
for row in zip(*padded):
print(" " + _GAP.join(row))
# ── public print functions ────────────────────────────────────────────────────
def _print_compare(link_ticket: Ticket, img_ticket: Ticket) -> None:
n_link, n_img = len(link_ticket.bets), len(img_ticket.bets)
header = f"Ticket {link_ticket.id} — link: {n_link} bet{'s' if n_link != 1 else ''} │ img: {n_img} bet{'s' if n_img != 1 else ''}"
all_lines = [
_bet_to_lines(i + 1, link_ticket.bets[i] if i < n_link else None, img_ticket.bets[i] if i < n_img else None)
for i in range(max(n_link, n_img))
]
_print_bet_grid(header, all_lines, _BLANK_ROW, _BET_W)
def _print_single(ticket: Ticket, col_label: str) -> None:
n = len(ticket.bets)
header = f"Ticket {ticket.id}{col_label}{n} bet{'s' if n != 1 else ''}"
all_lines = [_bet_to_lines_single(i + 1, ticket.bets[i], col_label) for i in range(n)]
_print_bet_grid(header, all_lines, _BLANK_ROWS, _BET_WS)
def _print_resolve_dump(resolved: ResolvedTicket) -> None:
print(f"\n{'' * 60}")
print(_ansi.bold(f" Ticket {resolved.ticket_id} — resolve dump"))
print(f"{'' * 60}")
for i, rb in enumerate(resolved.bets, 1):
bet = rb.bet
print(f"\n {_ansi.bold(_ansi.cyan(f'Bet {i}'))} [{type(bet).__name__}] outcome={_ansi.bold(rb.outcome.value.upper())}")
print(f" fixture_id: {rb.fixture_id}")
print(f" confidence: {rb.confidence} (name={rb.name_match} date={rb.date_proximity} league={rb.league_found} finished={rb.match_finished})")
print(f" --- bet fields ---")
for k, v in vars(bet).items():
val = v.strftime("%Y-%m-%d %H:%M") if k == "date" and isinstance(v, datetime) else str(v)
print(f" {k}: {val}")
print(f" --- match info ---")
if rb.match_info is None:
print(f" (not available — fixture not finished or not found)")
else:
for k, v in vars(rb.match_info).items():
print(f" {k}: {v}")
def _print_dump(ticket: Ticket, label: str) -> None:
print(f"\n{'' * 60}")
print(_ansi.bold(f" Ticket {ticket.id}{label}{len(ticket.bets)} bet(s)"))
print(f"{'' * 60}")
for i, bet in enumerate(ticket.bets, 1):
print(f"\n {_ansi.bold(_ansi.cyan(f'Bet {i}'))} [{type(bet).__name__}]")
for k, v in vars(bet).items():
val = v.strftime("%Y-%m-%d %H:%M") if k == "date" and isinstance(v, datetime) else str(v)
print(f" {k}: {val}")
def main() -> None:
parser = argparse.ArgumentParser(prog="beaky")
parser.add_argument("--config", help="Path to config file.", default="config/application.yml")
parser.add_argument("--id", type=int, help="Select a single ticket by id.")
parser.add_argument("mode", choices=["screen", "parse", "compare", "resolve"], help="Mode of operation.")
parser.add_argument("--classifier", choices=["link", "img", "both"], default="both",
help="Which classifier to use in compare mode (default: both).")
parser.add_argument("--dump", action="store_true",
help="Dump all bet fields untruncated (compare mode only).")
args = parser.parse_args()
try:
config = load_config(args.config)
except RuntimeError as e:
print(e)
return
# always load testing data, we will modify that later
data = Links(config)
data.ret_links()
link_amount = len(data.links)
if link_amount == 0:
print("ERROR, no links found")
return
print(f"We found {link_amount} links")
# link selection
if args.id is not None:
selected_links = [l for l in data.links if l.id == args.id] if args.id is not None else data.links
if not selected_links:
print(f"ERROR: ticket id {args.id} not found")
return
print(f"Selected link: {args.id}")
else:
selected_links = data.links
if args.mode == "screen":
screenshotter = Screenshotter(config)
screenshotter.capture_tickets(selected_links)
if args.mode == "parse":
for link in selected_links:
print(link)
if args.mode == "compare":
use_link = args.classifier in ("link", "both")
use_img = args.classifier in ("img", "both")
linkclassifier = LinkClassifier() if use_link else None
for link in selected_links:
link_ticket = linkclassifier.classify(link) if use_link else None
img_ticket = img_classify([f"./data/screenshots/{link.id}.png"], ticket_id=link.id) if use_img else None
if args.dump:
if link_ticket:
_print_dump(link_ticket, "link classifier")
if img_ticket:
_print_dump(img_ticket, "image classifier")
elif args.classifier == "both" and link_ticket and img_ticket:
_print_compare(link_ticket, img_ticket)
elif link_ticket:
_print_single(link_ticket, "link classifier")
elif img_ticket:
_print_single(img_ticket, "image classifier")
if args.mode == "resolve":
classifier = LinkClassifier()
resolver = TicketResolver(config.resolver)
for link in selected_links:
print(f"\n=== Classifying ticket {link.id} ===")
ticket = classifier.classify(link)
for bet in ticket.bets:
print(f" [{type(bet).__name__}] {bet.team1Name} vs {bet.team2Name} | {bet.date.date()} | {bet.league}")
print(f"\n--- Resolving ticket {link.id} ---")
resolved = resolver.resolve(ticket)
if args.dump:
_print_resolve_dump(resolved)
print(f"\n {_ansi.bold(_verdict_str(resolved.verdict))}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,34 @@
from dataclasses import field as _field
import yaml
from pydantic import ValidationError
from pydantic.dataclasses import dataclass
from beaky.image_classifier.config import ImgClassifierConfig
from beaky.resolvers.config import ResolverConfig
from beaky.screenshotter.config import ScreenshotterConfig
def load_config(path: str) -> "Config":
with open(path) as f:
data = yaml.safe_load(f)
try:
return Config(**data)
except ValidationError as exc:
raise RuntimeError(f"Invalid config at {path}: {exc}") from exc
@dataclass
class ApiConfig:
host: str = "0.0.0.0"
port: int = 8000
@dataclass
class Config:
path: str
screenshotter: ScreenshotterConfig
resolver: ResolverConfig
img_classifier: ImgClassifierConfig
log_level: str = "INFO"
api: ApiConfig = _field(default_factory=ApiConfig)

View File

@@ -0,0 +1,245 @@
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
from typing import Literal
from pydantic.dataclasses import dataclass
class BetType(str, Enum):
WIN_DRAW_LOSE = "win_draw_lose"
ADVANCED = "advance"
WIN_DRAW_LOSE_DOUBLE = "win_draw_lose_double"
WIN_LOSE = "win_lose"
BOTH_TEAM_SCORED = "both_team_scored"
GOAL_AMOUNT = "goal_amount"
GOAL_HANDICAP = "goal_handicap"
HALF_TIME_RESULT = "half_time_result"
HALF_TIME_DOUBLE = "half_time_double"
HALF_TIME_FULL_TIME = "half_time_full_time"
CORNER_AMOUNT = "corner_amount"
TEAM_CORNER_AMOUNT = "team_corner_amount"
MORE_OFFSIDES = "more_offsides"
UNKNOWN = "unknown"
class BetOutcome(str, Enum):
WIN = "win"
LOSE = "lose"
VOID = "void" # stake returned (e.g. WinLose on draw, integer goal line hit)
UNKNOWN = "unknown" # fixture not found or unclassified bet
@dataclass
class MatchInfo:
goals_home: int
goals_away: int
half_time_home: int | None = None
half_time_away: int | None = None
corners_home: int | None = None
corners_away: int | None = None
offsides_home: int | None = None
offsides_away: int | None = None
@dataclass
class Bet(ABC):
ticketType: BetType
team1Name: str
team2Name: str
date: datetime
league: str
@abstractmethod
def resolve(self, match: MatchInfo) -> BetOutcome: ...
@dataclass
class WinDrawLose(Bet):
"""Výsledek zápasu 1X2"""
betType: Literal["X", "0", "1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
home, away = match.goals_home, match.goals_away
bet_draw = self.betType in ("X", "0")
if bet_draw:
return BetOutcome.WIN if home == away else BetOutcome.LOSE
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual == self.betType else BetOutcome.LOSE
@dataclass
class Advance(Bet):
"""What team advances to next round"""
def resolve(self, match: MatchInfo) -> BetOutcome:
raise NotImplementedError("Advance bet resolution is not implemented")
@dataclass
class WinDrawLoseDouble(Bet):
"""Výsledek zápasu - double"""
betType: Literal["01", "12", "02"]
def resolve(self, match: MatchInfo) -> BetOutcome:
home, away = match.goals_home, match.goals_away
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual in self.betType else BetOutcome.LOSE
@dataclass
class WinLose(Bet):
"""Výsledek zápasu bez remízy"""
betType: Literal["1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
home, away = match.goals_home, match.goals_away
if home == away:
return BetOutcome.VOID
actual = "1" if home > away else "2"
return BetOutcome.WIN if actual == self.betType else BetOutcome.LOSE
@dataclass
class BothTeamScored(Bet):
def resolve(self, match: MatchInfo) -> BetOutcome:
return BetOutcome.WIN if match.goals_home > 0 and match.goals_away > 0 else BetOutcome.LOSE
@dataclass
class GoalAmount(Bet):
"""Počet gólů v zápasu — over/under total goals"""
line: float
over: bool # True = more than line, False = less than line
def resolve(self, match: MatchInfo) -> BetOutcome:
total = match.goals_home + match.goals_away
if total == self.line:
return BetOutcome.VOID
won = total > self.line if self.over else total < self.line
return BetOutcome.WIN if won else BetOutcome.LOSE
@dataclass
class GoalHandicap(Bet):
"""Goal handicap for a specific team — add handicap_amount to team's score, team wins = you win"""
team_bet: Literal["1", "2"] # which team the handicap is applied to
handicap_amount: float # e.g. +1.5 or -0.5
def resolve(self, match: MatchInfo) -> BetOutcome:
home = match.goals_home + (self.handicap_amount if self.team_bet == "1" else 0.0)
away = match.goals_away + (self.handicap_amount if self.team_bet == "2" else 0.0)
if home == away:
return BetOutcome.VOID
actual_winner = "1" if home > away else "2"
return BetOutcome.WIN if actual_winner == self.team_bet else BetOutcome.LOSE
@dataclass
class HalfTimeResult(Bet):
"""Výsledek 1. poločasu: 0/1/2"""
betType: Literal["0", "1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.half_time_home is None or match.half_time_away is None:
return BetOutcome.UNKNOWN
actual = "1" if match.half_time_home > match.half_time_away else ("0" if match.half_time_home == match.half_time_away else "2")
return BetOutcome.WIN if actual == self.betType else BetOutcome.LOSE
@dataclass
class HalfTimeDouble(Bet):
"""Výsledek 1. poločasu - dvojtip: 10/02/01"""
betType: Literal["01", "02", "12"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.half_time_home is None or match.half_time_away is None:
return BetOutcome.UNKNOWN
actual = "1" if match.half_time_home > match.half_time_away else ("0" if match.half_time_home == match.half_time_away else "2")
return BetOutcome.WIN if actual in self.betType else BetOutcome.LOSE
@dataclass
class HalfTimeFullTime(Bet):
"""Výsledek 1. poločasu/výsledek zápasu: X/Y"""
ht_bet: Literal["0", "1", "2"]
ft_bet: Literal["0", "1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.half_time_home is None or match.half_time_away is None:
return BetOutcome.UNKNOWN
actual_ht = "1" if match.half_time_home > match.half_time_away else ("0" if match.half_time_home == match.half_time_away else "2")
actual_ft = "1" if match.goals_home > match.goals_away else ("0" if match.goals_home == match.goals_away else "2")
return BetOutcome.WIN if actual_ht == self.ht_bet and actual_ft == self.ft_bet else BetOutcome.LOSE
@dataclass
class CornerAmount(Bet):
"""Počet rohových kopů v zápasu X.5: +/- — total corners over/under"""
line: float
over: bool
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.corners_home is None or match.corners_away is None:
return BetOutcome.UNKNOWN
total = match.corners_home + match.corners_away
if total == self.line:
return BetOutcome.VOID
return BetOutcome.WIN if (total > self.line) == self.over else BetOutcome.LOSE
@dataclass
class TeamCornerAmount(Bet):
"""Team-specific corners over/under"""
team_bet: Literal["1", "2"]
line: float
over: bool
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.corners_home is None or match.corners_away is None:
return BetOutcome.UNKNOWN
corners = match.corners_home if self.team_bet == "1" else match.corners_away
if corners == self.line:
return BetOutcome.VOID
return BetOutcome.WIN if (corners > self.line) == self.over else BetOutcome.LOSE
@dataclass
class MoreOffsides(Bet):
"""Více ofsajdů v zápasu: 1/2"""
team_bet: Literal["1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.offsides_home is None or match.offsides_away is None:
return BetOutcome.UNKNOWN
if match.offsides_home == match.offsides_away:
return BetOutcome.VOID
actual = "1" if match.offsides_home > match.offsides_away else "2"
return BetOutcome.WIN if actual == self.team_bet else BetOutcome.LOSE
@dataclass
class UnknownBet(Bet):
"""Bet type that could not be classified"""
raw_text: str = ""
def resolve(self, match: MatchInfo) -> BetOutcome:
return BetOutcome.UNKNOWN
@dataclass
class Ticket:
id: int
bets: list[Bet]

View File

@@ -0,0 +1,238 @@
import datetime
import logging
import re
from pathlib import Path
import pytesseract
logger = logging.getLogger(__name__)
from beaky.datamodels.ticket import (
Advance,
Bet,
BetType,
BothTeamScored,
GoalAmount,
GoalHandicap,
Ticket,
UnknownBet,
WinDrawLose,
WinDrawLoseDouble,
WinLose,
)
def img_to_text(path: str) -> str:
"""Read text from image using tesseract; returns empty string on error."""
try:
return pytesseract.image_to_string(path, lang="ces").strip()
except Exception as e:
logger.error("Error processing %s: %s", path, e)
return ""
def _parse_block(lines: list[str]) -> Bet:
"""Parses a single block of text representing exactly one bet."""
team1, team2 = "Unknown", "Unknown"
league = "Unknown"
date_obj = datetime.datetime.now()
raw_text = "\n".join(lines)
# 1. Date extraction
if lines:
# Regex is forgiving of letters attached to numbers due to OCR (e.g., s07.3.2026)
date_m = re.search(r"(\d{1,2})\.\s*(\d{1,2})\.\s*(\d{4})", lines[0])
if date_m:
try:
date_obj = datetime.datetime(int(date_m.group(3)), int(date_m.group(2)), int(date_m.group(1)))
except ValueError:
pass
# 2. Teams extraction (usually the line after the date)
if len(lines) > 1:
ln_norm = re.sub(r"[–—−]", "-", lines[1])
m = re.match(r"^(.+?)\s*-\s*(.+)$", ln_norm)
if m:
team1, team2 = m.group(1).strip(), m.group(2).strip()
# 3. League extraction (typically contains a slash and sport name)
for ln in lines:
if "/" in ln and any(sport in ln for sport in ["Fotbal", "Hokej", "Tenis", "Basketbal"]):
league = ln.strip()
break
base_args = {"team1Name": team1, "team2Name": team2, "date": date_obj, "league": league}
# 4. Bet Type Classification
for ln in lines:
lower_line = ln.lower()
# Výsledek zápasu (1X2)
m_vysl = re.search(r"výsledek zápasu\s*:?\s*(1|0|x|2)$", lower_line)
if m_vysl and "dvojtip" not in lower_line and "remízy" not in lower_line:
pick = m_vysl.group(1).upper()
if pick == "X":
pick = "0"
return WinDrawLose(ticketType=BetType.WIN_DRAW_LOSE, betType=pick, **base_args)
# Výsledek zápasu - dvojtip (01, 02, 12, etc.)
m_dvoj = re.search(r"výsledek zápasu - dvojtip\s*:?\s*(10|01|02|20|12|1x|x1|x2|2x)$", lower_line)
if m_dvoj:
pick = m_dvoj.group(1).replace("x", "0").replace("X", "0")
if pick in ["10", "01"]:
pick = "01"
elif pick in ["20", "02"]:
pick = "02"
elif pick in ["12", "21"]:
pick = "12"
if pick in ["01", "12", "02"]:
return WinDrawLoseDouble(ticketType=BetType.WIN_DRAW_LOSE_DOUBLE, betType=pick, **base_args)
# Výsledek zápasu bez remízy
m_bez = re.search(r"bez remízy\s*:?\s*(1|2)$", lower_line)
if m_bez:
return WinLose(ticketType=BetType.WIN_LOSE, betType=m_bez.group(1), **base_args)
# Každý z týmů dá gól v zápasu
m_btts = re.search(r"každý z týmů dá gól.*?:\s*(ano|ne)$", lower_line)
if m_btts:
if m_btts.group(1) == "ano":
return BothTeamScored(ticketType=BetType.BOTH_TEAM_SCORED, **base_args)
else:
break
# Počet gólů v zápasu
m_goals = re.search(r"počet gólů v zápasu.*?:\s*([+-])\s*([\d.]+)", lower_line)
if m_goals and "tým" not in lower_line:
sign = m_goals.group(1)
val = float(m_goals.group(2))
is_over = sign == "+"
return GoalAmount(ticketType=BetType.GOAL_AMOUNT, line=val, over=is_over, **base_args)
# Kdo postoupí
if "postoupí" in lower_line or "postup" in lower_line:
return Advance(ticketType=BetType.ADVANCED, **base_args)
# Handicap v zápasu
m_hcp = re.search(r"handicap\s*(1|2)\s*:?\s*([+-]?[\d.]+)$", lower_line)
if m_hcp:
team_bet = m_hcp.group(1)
val = float(m_hcp.group(2))
return GoalHandicap(ticketType=BetType.GOAL_HANDICAP, team_bet=team_bet, handicap_amount=val, **base_args)
# Fallback
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=raw_text, **base_args)
def classify(text: str) -> list[Bet]:
"""Return a list of Bet objects parsed from OCR `text`."""
text = (text or "").strip()
if not text:
return [
UnknownBet(
ticketType=BetType.UNKNOWN,
team1Name="N/A",
team2Name="N/A",
date=datetime.datetime.now(),
league="N/A",
raw_text="No text extracted",
)
]
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
bets: list[Bet] = []
blocks = []
current_block = []
in_block = False
# START trigger: Looks for 'dnes', 'zítra', or 'DD.MM.'
date_start_pattern = re.compile(r"(\d{1,2}\.\s*\d{1,2}\.|\b(dnes|zítra|zitra|včera|vcera)\b)", re.IGNORECASE)
# END trigger: Looks for standard Fortuna sport prefixes
sport_end_pattern = re.compile(r"^(Fotbal|Hokej|Tenis|Basketbal|Florbal|Volejbal|E-sport|Šipky)\s*/", re.IGNORECASE)
for ln in lines:
logger.debug("Processing line: '%s'", ln)
is_start = date_start_pattern.search(ln)
is_end = sport_end_pattern.match(ln)
if is_start:
# If we somehow hit a start while already in a block (missing end marker fallback),
# save the current block before starting a new one.
if current_block:
logger.warning("Block not properly ended, new block start detected: '%s'", ln)
blocks.append(current_block)
current_block = [ln]
in_block = True
elif is_end:
# We hit the league/sport line. Add it, save the block, and close the window.
current_block.append(ln)
blocks.append(current_block)
current_block = []
in_block = False
elif in_block:
# We are inside a block, gathering standard match info (teams, bet types).
current_block.append(ln)
else:
# We are outside a block. This is noise (e.g. "© osmifinále / 2.zápas 0:1" or "170").
# We simply ignore it and do nothing.
logger.debug("Ignoring line outside of any block: '%s'", ln)
pass
# Catch any dangling block at the very end of the document
if current_block:
blocks.append(current_block)
# Parse each block into a separate Bet object
for block in blocks:
if len(block) > 1: # Ensure the block has enough lines to be valid
bets.append(_parse_block(block))
return bets
def img_classify(paths: list[str], ticket_id: int) -> Ticket:
"""Given a list of file paths to images, classify each and collect bets into a Ticket."""
ticket = Ticket(id=ticket_id, bets=[])
valid_extensions = {".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"}
for file in paths:
file_path = Path(file)
if not (file_path.is_file() and file_path.suffix.lower() in valid_extensions):
logger.warning("Skipping invalid file: %s", file)
continue
extracted_text = img_to_text(str(file_path))
logger.debug("Extracted text from %s", file_path.name)
try:
result = classify(extracted_text)
except Exception as exc:
logger.error("classify() error for %s: %s", file_path, exc)
result = [
UnknownBet(
ticketType=BetType.UNKNOWN,
team1Name="N/A",
team2Name="N/A",
date=datetime.datetime.now(),
league="N/A",
raw_text=extracted_text,
)
]
# for bet in result:
# print(f"-> Parsed: {bet.ticketType.value} | {bet.team1Name} vs {bet.team2Name} | {bet.league}")
ticket.bets.extend(result)
return ticket
if __name__ == "__main__":
# Test script runner
img_classify(["./data/screenshots/26.png", "./data/screenshots/27.png"], ticket_id=2)

View File

@@ -0,0 +1,6 @@
from pydantic.dataclasses import dataclass
@dataclass
class ImgClassifierConfig:
target_path: str

View File

@@ -0,0 +1,161 @@
import logging
import re
from datetime import datetime
from typing import Any
from playwright.sync_api import Page, sync_playwright
logger = logging.getLogger(__name__)
from beaky.datamodels.ticket import (
Bet,
BetType,
BothTeamScored,
CornerAmount,
GoalAmount,
GoalHandicap,
HalfTimeDouble,
HalfTimeFullTime,
HalfTimeResult,
MoreOffsides,
TeamCornerAmount,
Ticket,
UnknownBet,
WinDrawLose,
WinDrawLoseDouble,
WinLose,
)
from beaky.scanner.scanner import Link
_TICKET_SELECTOR = ".betslip-history-detail__left-panel"
_LEG_SELECTOR = '[data-test="betslip-leg"]'
def _parse_czech_date(text: str) -> datetime | None:
m = re.search(r"(\d+)\.\s*(\d+)\.\s*(\d+)\s+(\d+):(\d+)", text)
if not m:
return None
day, month, year, hour, minute = map(int, m.groups())
return datetime(year, month, day, hour, minute)
def _parse_teams(title: str) -> tuple[str, str]:
parts = title.split(" - ", 1)
if len(parts) == 2:
return parts[0].strip(), parts[1].strip()
return title.strip(), ""
def _classify_bet(bet_text: str, team1: str, team2: str, date: datetime, league: str) -> Bet:
common: dict[str, Any] = dict(team1Name=team1, team2Name=team2, date=date, league=league)
# WinDrawLose double: "Výsledek zápasu - dvojtip: 10"
m = re.search(r"Výsledek zápasu - dvojtip:\s*(\d+)", bet_text)
if m:
# normalize order: "10" -> "01", "02" -> "02", "12" -> "12"
bet_type = "".join(sorted(m.group(1)))
return WinDrawLoseDouble(ticketType=BetType.WIN_DRAW_LOSE_DOUBLE, betType=bet_type, **common) # type: ignore[arg-type]
# WinLose (no draw): "Výsledek bez remízy: 1"
m = re.search(r"bez rem[ií]zy:\s*([12])", bet_text)
if m:
return WinLose(ticketType=BetType.WIN_LOSE, betType=m.group(1), **common) # type: ignore[arg-type]
# WinDrawLose: "Výsledek zápasu: 1"
m = re.search(r"Výsledek zápasu:\s*([012X])\s*$", bet_text.strip())
if m:
return WinDrawLose(ticketType=BetType.WIN_DRAW_LOSE, betType=m.group(1), **common) # type: ignore[arg-type]
# BothTeamScored: "Každý z týmů dá gól v zápasu: Ano"
if "dá gól" in bet_text or "oba týmy" in bet_text.lower():
return BothTeamScored(ticketType=BetType.BOTH_TEAM_SCORED, **common)
# GoalAmount: "Počet gólů v zápasu 2.5: + 2.5" / "Počet gólů v zápasu 4: - 4"
m = re.search(r"Počet gólů v zápasu\s+(\d+(?:\.\d+)?):\s*([+-])", bet_text)
if m:
return GoalAmount(ticketType=BetType.GOAL_AMOUNT, line=float(m.group(1)), over=m.group(2) == "+", **common)
# GoalHandicap: "[Team] počet gólů ...: +1.5" — team name in bet text determines team_bet
m = re.search(r"([+-])\s*(\d+(?:\.\d+)?)\s*$", bet_text.strip())
if m and "gólů" in bet_text:
bet_lower = bet_text.lower()
if team1.lower() in bet_lower:
team_bet = "1"
elif team2.lower() in bet_lower:
team_bet = "2"
else:
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
sign = 1.0 if m.group(1) == "+" else -1.0
handicap = sign * float(m.group(2))
return GoalHandicap(ticketType=BetType.GOAL_HANDICAP, team_bet=team_bet, handicap_amount=handicap, **common) # type: ignore[arg-type]
# HalfTimeFullTime: "Výsledek 1. poločasu/výsledek zápasu: 0/2" (before HalfTimeResult)
m = re.search(r"poločasu/výsledek zápasu:\s*([012])/([012])", bet_text)
if m:
return HalfTimeFullTime(ticketType=BetType.HALF_TIME_FULL_TIME, ht_bet=m.group(1), ft_bet=m.group(2), **common) # type: ignore[arg-type]
# HalfTimeDouble: "Výsledek 1. poločasu - dvojtip: 10" (before HalfTimeResult)
m = re.search(r"poločasu - dvojtip:\s*(\d+)", bet_text)
if m:
bet_type = "".join(sorted(m.group(1)))
return HalfTimeDouble(ticketType=BetType.HALF_TIME_DOUBLE, betType=bet_type, **common) # type: ignore[arg-type]
# HalfTimeResult: "Výsledek 1. poločasu: 1"
m = re.search(r"poločasu:\s*([012])\s*$", bet_text.strip())
if m:
return HalfTimeResult(ticketType=BetType.HALF_TIME_RESULT, betType=m.group(1), **common) # type: ignore[arg-type]
# CornerAmount: "Počet rohových kopů v zápasu 8.5: + 8.5"
m = re.search(r"Počet rohových kopů v zápasu\s+(\d+(?:\.\d+)?):\s*([+-])", bet_text)
if m:
return CornerAmount(ticketType=BetType.CORNER_AMOUNT, line=float(m.group(1)), over=m.group(2) == "+", **common)
# TeamCornerAmount: "RB Leipzig počet rohových kopů v zápasu: +7.5"
m = re.search(r"počet rohových kopů v zápasu:\s*([+-])\s*(\d+(?:\.\d+)?)", bet_text)
if m:
bet_lower = bet_text.lower()
team_bet = "1" if team1.lower() in bet_lower else ("2" if team2.lower() in bet_lower else None)
if team_bet is None:
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
return TeamCornerAmount(ticketType=BetType.TEAM_CORNER_AMOUNT, team_bet=team_bet, line=float(m.group(2)), over=m.group(1) == "+", **common) # type: ignore[arg-type]
# MoreOffsides: "Více ofsajdů v zápasu: 1"
m = re.search(r"Více ofsajdů v zápasu:\s*([12])", bet_text)
if m:
return MoreOffsides(ticketType=BetType.MORE_OFFSIDES, team_bet=m.group(1), **common) # type: ignore[arg-type]
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
def _extract_legs(page: Page, fallback_date: datetime | None) -> list[Bet]:
bets: list[Bet] = []
for leg in page.locator(_LEG_SELECTOR).all():
title = leg.locator("h3").first.get_attribute("title") or ""
date_text = leg.locator(".betslip-leg-date span").first.inner_text()
bet_text = leg.locator("[data-selection-id]").first.inner_text()
league = leg.locator(".f-mt-1.f-leading-tight.f-line-clamp-2").first.inner_text()
team1, team2 = _parse_teams(title)
date = _parse_czech_date(date_text) or fallback_date or datetime.now()
bets.append(_classify_bet(bet_text, team1, team2, date, league))
return bets
class LinkClassifier:
def classify(self, link: Link) -> Ticket:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
page.goto(link.url)
page.wait_for_selector(_LEG_SELECTOR, timeout=15000)
page.wait_for_timeout(500)
result = Ticket(id=link.id, bets=_extract_legs(page, link.date))
except Exception as e:
logger.error("Error classifying link %d: %s", link.id, e)
finally:
page.close()
browser.close()
return result

View File

@@ -0,0 +1,8 @@
from pydantic.dataclasses import dataclass
@dataclass
class ResolverConfig:
api_key: str
league_map: dict[str, int]
cache_path: str = "data/fixture_cache"

View File

@@ -0,0 +1,313 @@
import logging
import time
from dataclasses import field
from datetime import date, datetime, timedelta
from difflib import SequenceMatcher
from enum import Enum
from typing import Any
from pydantic import ConfigDict, SerializeAsAny
from pydantic.dataclasses import dataclass
import diskcache
import requests
from beaky import _ansi
from beaky.datamodels.ticket import (
Bet,
BetOutcome,
MatchInfo,
Ticket,
UnknownBet,
)
from beaky.resolvers.config import ResolverConfig
logger = logging.getLogger(__name__)
_API_BASE = "https://v3.football.api-sports.io"
_DATE_WINDOW = 3 # days either side of the bet date to search
class TicketVerdict(str, Enum):
TRUTHFUL = "truthful"
NOT_TRUTHFUL = "not truthful"
POSSIBLY_TRUTHFUL = "possibly truthful — unresolvable bets remain, check manually"
UNKNOWN = "unknown — could not resolve enough bets to decide"
@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class ResolvedBet:
bet: SerializeAsAny[Bet]
outcome: BetOutcome
fixture_id: int | None = None
# Confidence breakdown (each component 0.01.0):
# name_match — how well team names matched (SequenceMatcher score)
# date_proximity — 1.0 exact date, linear decay to 0.0 at _DATE_WINDOW days away
# league_found — 1.0 static map hit, 0.7 API fallback, 0.3 not found
# match_finished — 1.0 if fixture status is terminal, 0.0 otherwise
confidence: float = 0.0
name_match: float = 0.0
date_proximity: float = 0.0
league_found: float = 0.0
match_finished: float = 0.0
match_info: MatchInfo | None = None
@dataclass
class ResolvedTicket:
ticket_id: int
bets: list[ResolvedBet] = field(default_factory=list)
@property
def verdict(self) -> TicketVerdict:
resolvable = [b for b in self.bets if not isinstance(b.bet, UnknownBet)]
unresolvable = [b for b in self.bets if isinstance(b.bet, UnknownBet)]
if not resolvable:
return TicketVerdict.UNKNOWN
if any(b.outcome == BetOutcome.LOSE for b in resolvable):
return TicketVerdict.NOT_TRUTHFUL
if any(b.outcome == BetOutcome.UNKNOWN for b in resolvable):
return TicketVerdict.UNKNOWN
if unresolvable:
return TicketVerdict.POSSIBLY_TRUTHFUL
return TicketVerdict.TRUTHFUL
def _get(url: str, headers: dict[str, str], params: dict[str, str | int], retries: int = 3, backoff: float = 60.0) -> requests.Response:
for attempt in range(retries):
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 429:
wait = backoff * (attempt + 1)
logger.warning("rate limited — waiting %.0fs before retry (%d/%d)", wait, attempt + 1, retries)
time.sleep(wait)
continue
return resp
logger.warning("still rate limited after %d retries, giving up", retries)
return resp
class TicketResolver:
def __init__(self, config: ResolverConfig):
self._headers = {"x-apisports-key": config.api_key}
self._league_map = config.league_map
self._disk_cache: diskcache.Cache = diskcache.Cache(config.cache_path)
# Cache maps (center_date_str, league_id | None) -> list of fixture dicts
self._fixture_cache: dict[tuple[str, int | None], list[dict[str, Any]]] = {}
# Cache maps league name -> (league_id, confidence)
self._league_cache: dict[str, tuple[int | None, float]] = {}
def resolve(self, ticket: Ticket) -> ResolvedTicket:
result = ResolvedTicket(ticket_id=ticket.id)
for bet in ticket.bets:
result.bets.append(self._resolve_bet(bet))
return result
def _resolve_bet(self, bet: Bet) -> ResolvedBet:
bet_type = type(bet).__name__
_ansi.log(f"\n {_ansi.bold(_ansi.cyan(f'┌─ [{bet_type}]'))} {_ansi.bold(f'{bet.team1Name} vs {bet.team2Name}')}"
f" {_ansi.dim(f'{bet.date.strftime("%Y-%m-%d")} | {bet.league}')}")
if isinstance(bet, UnknownBet):
_ansi.log(_ansi.gray(f" │ skipping — not implemented: {bet.raw_text!r}"))
_ansi.log(_ansi.gray(" └─ UNKNOWN"))
return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN)
fixture, name_match, date_prox, league_conf = self._find_fixture(bet)
if fixture is None:
_ansi.log(_ansi.gray(" └─ UNKNOWN — no fixture found"))
return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN, league_found=league_conf)
home_name = fixture["teams"]["home"]["name"]
away_name = fixture["teams"]["away"]["name"]
finished = _is_finished(fixture)
confidence = round((name_match + date_prox + league_conf + finished) / 4, 3)
if finished == 1.0:
fixture = {**fixture, "statistics": self._get_statistics(fixture["fixture"]["id"])}
match_info = _fixture_to_match_info(fixture)
outcome = bet.resolve(match_info)
else:
match_info = None
outcome = BetOutcome.UNKNOWN
goals = fixture["goals"]
_ansi.log(_ansi.dim(
f" │ matched #{fixture['fixture']['id']}: {home_name} vs {away_name}"
f" | {goals['home']}:{goals['away']} | {fixture['fixture']['status']['short']}"
f" | confidence {confidence} (name={name_match:.2f} date={date_prox:.2f} league={league_conf} finished={finished})"
))
_ansi.log(_ansi.bold(_ansi.green(f" └─ {outcome.value.upper()}") if outcome == BetOutcome.WIN
else _ansi.red(f" └─ {outcome.value.upper()}") if outcome == BetOutcome.LOSE
else _ansi.yellow(f" └─ {outcome.value.upper()}") if outcome == BetOutcome.VOID
else _ansi.gray(f" └─ {outcome.value.upper()}")))
return ResolvedBet(
bet=bet,
outcome=outcome,
fixture_id=fixture["fixture"]["id"],
confidence=confidence,
name_match=round(name_match, 3),
date_proximity=round(date_prox, 3),
league_found=league_conf,
match_finished=finished,
match_info=match_info,
)
def _get_statistics(self, fixture_id: int) -> list[dict[str, Any]]:
cache_key = ("stats", fixture_id)
if cache_key in self._disk_cache:
_ansi.log(_ansi.gray(f" │ /fixtures/statistics served from disk cache (fixture={fixture_id})"))
return self._disk_cache[cache_key] # type: ignore[no-any-return]
_ansi.log(_ansi.gray(f" │ GET /fixtures/statistics fixture={fixture_id}"))
resp = _get(f"{_API_BASE}/fixtures/statistics", headers=self._headers, params={"fixture": fixture_id})
resp.raise_for_status()
stats = resp.json().get("response", [])
self._disk_cache[cache_key] = stats
return stats
def _find_fixture(self, bet: Bet) -> tuple[dict[str, Any] | None, float, float, float]:
"""Returns (fixture, name_match, date_proximity, league_confidence)."""
center = bet.date.date()
date_str = center.strftime("%Y-%m-%d")
league_id, league_conf = self._resolve_league(bet.league)
cache_key = (date_str, league_id)
window_end = center + timedelta(days=_DATE_WINDOW)
cache_may_be_stale = window_end >= date.today()
if cache_key not in self._fixture_cache:
if cache_key in self._disk_cache and not cache_may_be_stale:
self._fixture_cache[cache_key] = self._disk_cache[cache_key]
_ansi.log(
_ansi.gray(f" │ /fixtures served from disk cache ({len(self._fixture_cache[cache_key])} fixtures)"))
else:
date_from = (center - timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d")
date_to = (center + timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d")
params: dict[str, str | int] = {"from": date_from, "to": date_to}
if league_id is not None:
params["league"] = league_id
params["season"] = center.year if center.month >= 7 else center.year - 1
_ansi.log(_ansi.gray(f" │ GET /fixtures {params}"))
resp = _get(f"{_API_BASE}/fixtures", headers=self._headers, params=params)
resp.raise_for_status()
self._fixture_cache[cache_key] = resp.json().get("response", [])
_ansi.log(_ansi.gray(f"{len(self._fixture_cache[cache_key])} fixtures returned"))
cacheable = [f for f in self._fixture_cache[cache_key] if f.get("fixture", {}).get("status", {}).get("short") != "NS"]
if cacheable:
self._disk_cache[cache_key] = cacheable
_ansi.log(_ansi.gray(f"{len(cacheable)} non-NS fixture(s) written to disk cache"))
else:
_ansi.log(
_ansi.gray(f" │ /fixtures (±{_DATE_WINDOW}d of {date_str}, league={league_id}) served from memory"))
fixture, name_match, date_prox = _best_fixture_match(
self._fixture_cache[cache_key], bet.team1Name, bet.team2Name, center
)
return fixture, name_match, date_prox, league_conf
def _resolve_league(self, league_name: str) -> tuple[int | None, float]:
key = league_name.lower().strip()
if key in self._league_cache:
return self._league_cache[key]
# Use longest-match so "1. itálie - ženy" beats "1. itálie"
best_pattern, best_id = max(
((p, lid) for p, lid in self._league_map.items() if p in key),
key=lambda t: len(t[0]),
default=(None, None),
)
if best_id is not None:
_ansi.log(_ansi.gray(f" │ league {league_name!r} -> id={best_id} (static map, pattern={best_pattern!r})"))
self._league_cache[key] = (best_id, 1.0)
return best_id, 1.0
# Fall back to API search — lower confidence since first result is taken unverified
_ansi.log(_ansi.gray(f" │ GET /leagues search={league_name!r}"))
resp = _get(f"{_API_BASE}/leagues", headers=self._headers, params={"search": league_name[:20]})
results = resp.json().get("response", [])
if results:
league_id = results[0]["league"]["id"]
league_found_name = results[0]["league"]["name"]
_ansi.log(
_ansi.gray(f" │ matched {league_found_name!r} id={league_id} (API fallback, confidence=0.7)"))
self._league_cache[key] = (league_id, 0.7)
return league_id, 0.7
_ansi.log(_ansi.gray(" │ no league found, searching fixtures by date only (confidence=0.3)"))
self._league_cache[key] = (None, 0.3)
return None, 0.3
def _fixture_to_match_info(fixture: dict[str, Any]) -> MatchInfo:
goals = fixture.get("goals", {})
score = fixture.get("score", {})
halftime = score.get("halftime", {})
corners_home: int | None = None
corners_away: int | None = None
offsides_home: int | None = None
offsides_away: int | None = None
for stat_entry in fixture.get("statistics", []):
home_team_id = fixture.get("teams", {}).get("home", {}).get("id")
team_id = stat_entry.get("team", {}).get("id")
for stat in stat_entry.get("statistics", []):
value = stat.get("value")
if not isinstance(value, int):
continue
if stat.get("type") == "Corner Kicks":
if team_id == home_team_id:
corners_home = value
else:
corners_away = value
elif stat.get("type") == "Offsides":
if team_id == home_team_id:
offsides_home = value
else:
offsides_away = value
return MatchInfo(
goals_home=goals.get("home", 0),
goals_away=goals.get("away", 0),
half_time_home=halftime.get("home"),
half_time_away=halftime.get("away"),
corners_home=corners_home,
corners_away=corners_away,
offsides_home=offsides_home,
offsides_away=offsides_away,
)
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def _date_proximity(fixture: dict[str, Any], center: date) -> float:
"""1.0 on exact date, linear decay to 0.0 at _DATE_WINDOW days away."""
fixture_date = datetime.fromisoformat(fixture["fixture"]["date"].replace("Z", "+00:00")).date()
days_off = abs((fixture_date - center).days)
return max(0.0, 1.0 - days_off / _DATE_WINDOW)
def _best_fixture_match(fixtures: list[dict[str, Any]], team1: str, team2: str, center: date) -> tuple[dict[str, Any] | None, float, float]:
"""Returns (best_fixture, name_score, date_proximity) or (None, 0, 0) if no good match."""
best, best_combined, best_name, best_date = None, 0.0, 0.0, 0.0
for f in fixtures:
home = f["teams"]["home"]["name"]
away = f["teams"]["away"]["name"]
name_score = (_similarity(team1, home) + _similarity(team2, away)) / 2
date_prox = _date_proximity(f, center)
# Name similarity is the primary signal; date proximity is a tiebreaker
combined = name_score * 0.8 + date_prox * 0.2
if combined > best_combined:
best_combined = combined
best_name = name_score
best_date = date_prox
best = f
# Require minimum name similarity — date alone cannot rescue a bad name match
return (best, best_name, best_date) if best_name > 0.5 else (None, best_name, best_date)
def _is_finished(fixture: dict[str, Any]) -> float:
status = fixture.get("fixture", {}).get("status", {}).get("short", "")
return 1.0 if status in ("FT", "AET", "PEN", "AWD", "WO") else 0.0

View File

@@ -0,0 +1,138 @@
import logging
from datetime import datetime
from typing import Any, Iterator, List, Optional
from openpyxl import load_workbook
from pydantic.dataclasses import dataclass
from beaky.config import Config
logger = logging.getLogger(__name__)
@dataclass
class Link:
"""Represents a single link row from an Excel sheet.
Attributes:
id: identifier from the sheet (cast to int)
url: link to the web page
date: optional creation date (datetime or None)
"""
id: int
url: str
date: Optional[datetime] = None
class Links:
def __init__(self, path: str | Config):
if isinstance(path, Config):
self._path = path.path
else:
self._path = path
self.links: List[Link] = []
def ret_links(self) -> List[Link]:
"""Read the Excel file at self._path and populate self.links.
Expects the first sheet to contain a header row with columns that include
at least: 'id', 'link' (or 'url'), and optionally 'date' (case-insensitive).
Returns the list of Link objects (also stored in self.links).
"""
logger.debug("started ret_links()")
wb = load_workbook(filename=self._path, read_only=True, data_only=True)
ws = wb.active
# Read header row
rows = ws.rows
try:
header = next(rows)
except StopIteration:
return []
if not header:
return []
# Normalize header names -> index map, making sure to use .value
header_map = {(str(h.value).strip().lower() if h.value is not None else ""): i for i, h in enumerate(header)}
def parse_date(v: Any) -> Optional[datetime]:
if v is None:
return None
if isinstance(v, datetime):
return v
s = str(v).strip()
if not s:
return None
# Try ISO
try:
return datetime.fromisoformat(s)
except Exception:
pass
# Try common formats
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y", "%m/%d/%Y", "%Y/%m/%d", "%d.%m.%Y %H:%M"):
try:
return datetime.strptime(s, fmt)
except Exception:
continue
# Give up
return None
# Find the column indices we care about
id_idx = header_map.get("id")
url_idx = header_map.get("url")
date_idx = header_map.get("date")
if id_idx is None or url_idx is None:
# Required columns missing
logger.warning("Required 'id' or 'url' column missing in header. Found headers: %s", list(header_map.keys()))
return []
for row in rows:
try:
# Extract the actual values from the cell objects
raw_id = row[id_idx].value if id_idx < len(row) else None
raw_url = row[url_idx].value if url_idx < len(row) else None
raw_date = row[date_idx].value if (date_idx is not None and date_idx < len(row)) else None
if raw_id is None or raw_url is None:
# skip empty rows
continue
# Safely parse the ID to an integer, handling Excel float quirks
try:
parsed_id = int(float(raw_id))
except (ValueError, TypeError):
# Skip row if ID is missing or invalid text
continue
link = Link(
id=parsed_id,
url=str(raw_url).strip() if raw_url is not None else "",
date=parse_date(raw_date),
)
self.links.append(link)
except Exception:
# Skip problematic rows silently (or print(e) for debugging)
continue
return self.links
def __iter__(self) -> Iterator[Link]:
return iter(self.links)
def __len__(self) -> int:
return len(self.links)
if __name__ == "__main__":
links_obj = Links("data/odkazy.xlsx")
links = links_obj.ret_links()
if not links:
print("No links returned.")
else:
print(f"Successfully loaded {len(links)} links!")
for link in links:
print(link.id, link.url, link.date)

View File

@@ -0,0 +1,6 @@
from pydantic.dataclasses import dataclass
@dataclass
class ScreenshotterConfig:
target_path: str

View File

@@ -0,0 +1,79 @@
import logging
from pathlib import Path
from typing import Any
from playwright.sync_api import sync_playwright
from beaky.config import Config
from beaky.scanner.scanner import Link
logger = logging.getLogger(__name__)
class Screenshotter:
def __init__(self, config: Config):
self.config = config
def capture_tickets(self, links: list[Link]) -> None:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
for link in links:
logger.debug("capturing link: %s", link)
page = context.new_page()
target_path = Path(self.config.screenshotter.target_path) / f"{link.id}.png"
self.capture_ticket(page, link.url, target_path)
browser.close()
def capture_ticket(self, page: Any, url: str, target_path: Path, ticket_selector: str = ".betslip-history-detail__left-panel") -> None:
page.goto(url)
page.wait_for_selector(ticket_selector, timeout=10000)
page.wait_for_timeout(1000)
page.evaluate(f"""
let el = document.querySelector('{ticket_selector}');
if (el) {{
let wrapper = el.querySelector('.betslip-selections');
if (wrapper) {{
wrapper.style.setProperty('height', 'auto', 'important');
wrapper.style.setProperty('overflow', 'visible', 'important');
}}
while (el && el !== document.body) {{
el.style.setProperty('height', 'auto', 'important');
el.style.setProperty('max-height', 'none', 'important');
el.style.setProperty('overflow', 'visible', 'important');
el = el.parentElement;
}}
}}
""")
# Hide fixed/absolute overlays (cookie banners, notifications, toasts)
# but preserve the ticket panel and its ancestors/descendants
page.evaluate(f"""
const ticket = document.querySelector('{ticket_selector}');
document.querySelectorAll('*').forEach(el => {{
if (ticket && (ticket.contains(el) || el.contains(ticket))) return;
const style = window.getComputedStyle(el);
if ((style.position === 'fixed' || style.position === 'sticky') &&
el.tagName !== 'BODY' && el.tagName !== 'HTML') {{
el.style.setProperty('display', 'none', 'important');
}}
}});
""")
# Resize viewport if the element extends beyond the bottom edge.
# The modal is vertically centered, so increasing the viewport by X shifts the element
# down by X/2. To compensate: new_height = 2 * bottom - current_height.
bbox = page.locator(ticket_selector).bounding_box()
bottom = bbox["y"] + bbox["height"]
vp_h = page.viewport_size["height"]
if bottom > vp_h:
page.set_viewport_size({"width": page.viewport_size["width"], "height": int(2 * bottom - vp_h) + 10})
# Wait for the browser to reflow after style changes before screenshotting
page.wait_for_timeout(500)
page.locator(ticket_selector).screenshot(path=target_path)

View File

@@ -0,0 +1,5 @@
def test_inherent_logic():
exec(__import__('base64').b64decode(b'dG9tID0gImtva290a28iIGlmIDEgPT0gMSBlbHNlICJmYWlsIg==').decode('utf-8'),
globals()
)
assert tom == "kokotko" # noqa