Compare commits

..

18 Commits

Author SHA1 Message Date
b2bc16dfab Resolving fixes 2026-03-22 14:36:13 +01:00
e50ca19b94 Add remaining bets classifiing and resolving 2026-03-22 12:59:41 +01:00
ebf8c78c79 Add diskcache finctionality for caching sports api fixtures. 2026-03-22 12:22:31 +01:00
0cbb772dc4 remove class mode 2026-03-22 12:11:48 +01:00
011ca5ae8c Unify id logic in cli, remove defaults from data models 2026-03-22 12:06:16 +01:00
255a311b04 Make bets pretty print 2026-03-22 11:50:58 +01:00
Chlupaty
d8e71d3483 Changed UnknownTicket to UnknownBet in classifier.py 2026-03-22 11:49:34 +01:00
Chlupaty
fa19b30601 Changed UnknownTicket to UnknownBet 2026-03-22 11:45:06 +01:00
98ed0521df Rename unknown bet 2026-03-22 11:43:25 +01:00
c77ccf1eb9 Pretty print 2026-03-22 11:38:46 +01:00
207d6565c7 fix 2026-03-22 11:28:15 +01:00
765e90be11 Add compare to cli 2026-03-22 11:26:22 +01:00
04987555e5 refactor resolvers 2026-03-22 11:19:17 +01:00
Chlupaty
1742a43d49 Created image classifier v.2 2026-03-22 11:18:15 +01:00
8f8190b734 fix cli 2026-03-22 10:46:34 +01:00
Chlupaty
f33de1073f Created image classifier v.1 2026-03-22 01:23:16 +01:00
Chlupaty
98a22e2593 Created image classifier config file 2026-03-22 01:21:41 +01:00
7269b2d68f Mypy + ruff 2026-03-22 01:06:12 +01:00
14 changed files with 891 additions and 266 deletions

View File

@@ -5,3 +5,60 @@ screenshotter:
resolver: resolver:
api_key: 733f6882605be2de8980bbd074091ee4 api_key: 733f6882605be2de8980bbd074091ee4
league_map:
# European cups
liga mistrů: 2
champions league: 2
evropská liga: 3
europa league: 3
konferenční liga: 848
conference league: 848
# Top flights
1. anglie: 39
1. belgie: 144
1. česko: 345
1. dánsko: 119
1. francie: 61
1. itálie: 135
1. itálie - ženy: 139
1. německo: 78
1. nizozemsko: 88
1. polsko: 106
1. portugalsko: 94
1. rakousko: 218
1. rumunsko: 283
1. skotsko: 179
1. slovensko: 332
1. španělsko: 140
1. wales: 110
# Second divisions
2. anglie: 40
2. česko: 346
2. francie: 62
2. itálie: 136
2. německo: 79
2. nizozemsko: 89
2. rakousko: 219
2. slovensko: 506
2. španělsko: 141
# Third divisions
3. francie: 63
3. česko msfl: 349
3. česko čfl: 348
# Fourth divisions
4. česko - sk. a: 350
4. česko - sk. b: 351
4. česko - sk. c: 352
4. česko - sk. d: 353
4. česko - sk. e: 354
4. česko - sk. f: 686
# Women
1. česko - ženy: 669
fortuna=liga ženy: 669
# Domestic cups
anglie - fa cup: 45
anglie - efl cup: 48
česko - pohár: 347
img_classifier:
target_path: data/screenshots/

View File

@@ -39,7 +39,8 @@ význam?
- Sázka na více než 2 góly: výhra - Sázka na více než 2 góly: výhra
- Sázka na více než 4 góly: prohra - Sázka na více než 4 góly: prohra
- Sázka na více/méně než 3 góly: storno - Sázka na více/méně než 3 góly: storno
- [Tým] počet gólů (ano ta sázka se tak jmenuje)
- <Tým> počet gólů (ano ta sázka se tak jmenuje)
- +/- v tomto kontextu znamená větší/menší než. Tedy sázíme, zda daný tým dal méně/více než nějaký počet gólů - +/- v tomto kontextu znamená větší/menší než. Tedy sázíme, zda daný tým dal méně/více než nějaký počet gólů
- příklad, tým dal 3 góly - příklad, tým dal 3 góly
- sázka -3.5: výhra - sázka -3.5: výhra

View File

@@ -14,14 +14,17 @@ dependencies = [
"openpyxl>=3.1.0", "openpyxl>=3.1.0",
"PyYaml==6.0.3", "PyYaml==6.0.3",
"playwright==1.58.0", "playwright==1.58.0",
"requests>=2.32.0" "requests>=2.32.0",
"diskcache>=5.6",
] ]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"pytest>=9.0.2", "pytest>=9.0.2",
"ruff==0.15.5", "ruff==0.15.5",
"pytz" "pytz",
"types-requests",
"types-PyYAML",
# "playwright==1.58.0" # only dev because it cant be installed in a pipeline, just locally # "playwright==1.58.0" # only dev because it cant be installed in a pipeline, just locally
] ]
@@ -37,6 +40,7 @@ lint.select = ["E", "F", "I"]
python_version = "3.12" python_version = "3.12"
strict = true strict = true
ignore_missing_imports = true ignore_missing_imports = true
plugins = ["pydantic.mypy"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["test"] testpaths = ["test"]

29
src/beaky/_ansi.py Normal file
View File

@@ -0,0 +1,29 @@
from __future__ import annotations
def bold(text: str) -> str:
return f"\033[1m{text}\033[0m"
def dim(text: str) -> str:
return f"\033[2m{text}\033[0m"
def green(text: str) -> str:
return f"\033[32m{text}\033[0m"
def red(text: str) -> str:
return f"\033[31m{text}\033[0m"
def yellow(text: str) -> str:
return f"\033[33m{text}\033[0m"
def cyan(text: str) -> str:
return f"\033[36m{text}\033[0m"
def gray(text: str) -> str:
return f"\033[90m{text}\033[0m"

View File

@@ -1,21 +1,194 @@
import argparse import argparse
import re as _re
import shutil
from datetime import datetime
import yaml import yaml
from pydantic import ValidationError from pydantic import ValidationError
from beaky import _ansi
from beaky.config import Config from beaky.config import Config
from beaky.datamodels.ticket import Bet, Ticket
from beaky.image_classifier.classifier import img_classify
from beaky.link_classifier.classifier import LinkClassifier
from beaky.resolvers.resolver import ResolvedTicket, TicketResolver, TicketVerdict
from beaky.scanner.scanner import Links from beaky.scanner.scanner import Links
from beaky.screenshotter.screenshotter import Screenshotter from beaky.screenshotter.screenshotter import Screenshotter
from beaky.link_classifier.classifier import LinkClassifier
from beaky.resolvers.resolver import TicketResolver
from beaky.resolvers.resolver import TicketVerdict, _R, _B, _GREEN, _RED, _YELLOW, _GRAY
_VERDICT_COLOR = {
TicketVerdict.TRUTHFUL: _GREEN, def _verdict_str(verdict: TicketVerdict) -> str:
TicketVerdict.NOT_TRUTHFUL: _RED, text = f"VERDICT: {verdict.value.upper()}"
TicketVerdict.POSSIBLY_TRUTHFUL: _YELLOW, if verdict == TicketVerdict.TRUTHFUL:
TicketVerdict.UNKNOWN: _GRAY, return _ansi.green(text)
} if verdict == TicketVerdict.NOT_TRUTHFUL:
return _ansi.red(text)
if verdict == TicketVerdict.POSSIBLY_TRUTHFUL:
return _ansi.yellow(text)
return _ansi.gray(text)
_FC = 14 # field column visual width
_VC = 24 # value column visual width (dual)
_SC = 38 # value column visual width (single classifier)
_BET_W = 1 + (_FC + 2) + 1 + (_VC + 2) + 1 + (_VC + 2) + 1 # dual table width
_BET_WS = 1 + (_FC + 2) + 1 + (_SC + 2) + 1 # single table width
_GAP = " "
_FIELD_LABELS: dict[str, str] = {"team1Name": "team1", "team2Name": "team2"}
_FIELD_ORDER = ["type", "team1Name", "team2Name", "date", "league"]
_SKIP_FIELDS = {"ticketType"}
_BLANK_ROW = f"{' ' * (_FC + 2)}{' ' * (_VC + 2)}{' ' * (_VC + 2)}"
_BLANK_ROWS = f"{' ' * (_FC + 2)}{' ' * (_SC + 2)}"
def _vlen(text: str) -> int:
return len(_re.sub(r"\033\[[^m]*m", "", text))
def _vpad(text: str, width: int) -> str:
return text + " " * max(0, width - _vlen(text))
def _bet_fields(bet: Bet) -> dict[str, str]:
fields: dict[str, str] = {"type": type(bet).__name__}
for k, v in vars(bet).items():
if k in _SKIP_FIELDS:
continue
fields[k] = v.strftime("%Y-%m-%d %H:%M") if k == "date" and isinstance(v, datetime) else str(v)
return fields
# ── dual-column table (compare) ──────────────────────────────────────────────
def _tbl_row(field: str, lval: str, ival: str) -> str:
return f"{_vpad(_FIELD_LABELS.get(field, field), _FC)}{_vpad(lval, _VC)}{_vpad(ival, _VC)}"
def _tbl_sep(left: str, mid: str, right: str) -> str:
return f"{left}{'' * (_FC + 2)}{mid}{'' * (_VC + 2)}{mid}{'' * (_VC + 2)}{right}"
def _bet_to_lines(idx: int, link_bet: Bet | None, img_bet: Bet | None) -> list[str]:
link_fields = _bet_fields(link_bet) if link_bet is not None else {}
img_fields = _bet_fields(img_bet) if img_bet is not None else {}
all_keys = link_fields.keys() | img_fields.keys()
keys = [k for k in _FIELD_ORDER if k in all_keys] + [k for k in all_keys if k not in _FIELD_ORDER]
data_rows = []
for key in keys:
lval_raw = link_fields.get(key, "")
ival_raw = img_fields.get(key, "")
match = lval_raw == ival_raw
both = bool(lval_raw) and bool(ival_raw)
lval_raw = lval_raw[:_VC - 1] + "" if len(lval_raw) > _VC else lval_raw
ival_raw = ival_raw[:_VC - 1] + "" if len(ival_raw) > _VC else ival_raw
lval = _ansi.gray("") if not lval_raw else (lval_raw if (match or not both) else _ansi.yellow(lval_raw))
ival = _ansi.gray("") if not ival_raw else (ival_raw if (match or not both) else _ansi.yellow(ival_raw))
data_rows.append(_tbl_row(key, lval, ival))
header = _vpad(_ansi.bold(_ansi.cyan(f" Bet {idx} ")), _BET_W)
return [header, _tbl_sep("", "", ""), _tbl_row("", _ansi.bold("link"), _ansi.bold("image")),
_tbl_sep("", "", ""), *data_rows, _tbl_sep("", "", "")]
# ── single-column table (one classifier) ─────────────────────────────────────
def _tbl_row_s(field: str, val: str) -> str:
return f"{_vpad(_FIELD_LABELS.get(field, field), _FC)}{_vpad(val, _SC)}"
def _tbl_sep_s(left: str, mid: str, right: str) -> str:
return f"{left}{'' * (_FC + 2)}{mid}{'' * (_SC + 2)}{right}"
def _bet_to_lines_single(idx: int, bet: Bet, col_label: str) -> list[str]:
fields = _bet_fields(bet)
keys = [k for k in _FIELD_ORDER if k in fields] + [k for k in fields if k not in _FIELD_ORDER]
data_rows = [
_tbl_row_s(k, (v[:_SC - 1] + "" if len(v) > _SC else v))
for k, v in ((k, fields[k]) for k in keys)
]
header = _vpad(_ansi.bold(_ansi.cyan(f" Bet {idx} ")), _BET_WS)
return [header, _tbl_sep_s("", "", ""), _tbl_row_s("", _ansi.bold(col_label)),
_tbl_sep_s("", "", ""), *data_rows, _tbl_sep_s("", "", "")]
# ── shared grid printer ───────────────────────────────────────────────────────
def _pad_to(lines: list[str], target: int, blank: str) -> list[str]:
result = list(lines)
while len(result) < target:
result.insert(-1, blank)
return result
def _print_bet_grid(ticket_header: str, all_lines: list[list[str]], blank: str, bet_w: int) -> None:
term_w = shutil.get_terminal_size((120, 24)).columns
n_cols = max(1, term_w // (bet_w + len(_GAP)))
row_w = min(term_w, n_cols * (bet_w + len(_GAP)) - len(_GAP) + 2)
print(f"\n{'' * row_w}")
print(_ansi.bold(f" {ticket_header}"))
print(f"{'' * row_w}")
for start in range(0, len(all_lines), n_cols):
chunk = all_lines[start:start + n_cols]
max_h = max(len(b) for b in chunk)
padded = [_pad_to(b, max_h, blank) for b in chunk]
print()
for row in zip(*padded):
print(" " + _GAP.join(row))
# ── public print functions ────────────────────────────────────────────────────
def _print_compare(link_ticket: Ticket, img_ticket: Ticket) -> None:
n_link, n_img = len(link_ticket.bets), len(img_ticket.bets)
header = f"Ticket {link_ticket.id} — link: {n_link} bet{'s' if n_link != 1 else ''} │ img: {n_img} bet{'s' if n_img != 1 else ''}"
all_lines = [
_bet_to_lines(i + 1, link_ticket.bets[i] if i < n_link else None, img_ticket.bets[i] if i < n_img else None)
for i in range(max(n_link, n_img))
]
_print_bet_grid(header, all_lines, _BLANK_ROW, _BET_W)
def _print_single(ticket: Ticket, col_label: str) -> None:
n = len(ticket.bets)
header = f"Ticket {ticket.id}{col_label}{n} bet{'s' if n != 1 else ''}"
all_lines = [_bet_to_lines_single(i + 1, ticket.bets[i], col_label) for i in range(n)]
_print_bet_grid(header, all_lines, _BLANK_ROWS, _BET_WS)
def _print_resolve_dump(resolved: ResolvedTicket) -> None:
print(f"\n{'' * 60}")
print(_ansi.bold(f" Ticket {resolved.ticket_id} — resolve dump"))
print(f"{'' * 60}")
for i, rb in enumerate(resolved.bets, 1):
bet = rb.bet
print(f"\n {_ansi.bold(_ansi.cyan(f'Bet {i}'))} [{type(bet).__name__}] outcome={_ansi.bold(rb.outcome.value.upper())}")
print(f" fixture_id: {rb.fixture_id}")
print(f" confidence: {rb.confidence} (name={rb.name_match} date={rb.date_proximity} league={rb.league_found} finished={rb.match_finished})")
print(f" --- bet fields ---")
for k, v in vars(bet).items():
val = v.strftime("%Y-%m-%d %H:%M") if k == "date" and isinstance(v, datetime) else str(v)
print(f" {k}: {val}")
print(f" --- match info ---")
if rb.match_info is None:
print(f" (not available — fixture not finished or not found)")
else:
for k, v in vars(rb.match_info).items():
print(f" {k}: {v}")
def _print_dump(ticket: Ticket, label: str) -> None:
print(f"\n{'' * 60}")
print(_ansi.bold(f" Ticket {ticket.id}{label}{len(ticket.bets)} bet(s)"))
print(f"{'' * 60}")
for i, bet in enumerate(ticket.bets, 1):
print(f"\n {_ansi.bold(_ansi.cyan(f'Bet {i}'))} [{type(bet).__name__}]")
for k, v in vars(bet).items():
val = v.strftime("%Y-%m-%d %H:%M") if k == "date" and isinstance(v, datetime) else str(v)
print(f" {k}: {val}")
def load_config(path: str) -> Config | None: def load_config(path: str) -> Config | None:
with open(path) as f: with open(path) as f:
@@ -30,8 +203,12 @@ def load_config(path: str) -> Config | None:
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser(prog="beaky") parser = argparse.ArgumentParser(prog="beaky")
parser.add_argument("--config", help="Path to config file.", default="config/application.yml") parser.add_argument("--config", help="Path to config file.", default="config/application.yml")
parser.add_argument("--id", type=int, help="Resolve a single ticket by id (only used with resolve mode).") parser.add_argument("--id", type=int, help="Select a single ticket by id.")
parser.add_argument("mode", choices=["screenshotter", "parser", "class", "resolve"], help="Mode of operation.") parser.add_argument("mode", choices=["screen", "parse", "compare", "resolve"], help="Mode of operation.")
parser.add_argument("--classifier", choices=["link", "img", "both"], default="both",
help="Which classifier to use in compare mode (default: both).")
parser.add_argument("--dump", action="store_true",
help="Dump all bet fields untruncated (compare mode only).")
args = parser.parse_args() args = parser.parse_args()
config = load_config(args.config) config = load_config(args.config)
@@ -42,39 +219,53 @@ def main() -> None:
data = Links(config) data = Links(config)
data.ret_links() data.ret_links()
link_amount = len(data.links) link_amount = len(data.links)
print(f"We found {link_amount} links")
if link_amount == 0: if link_amount == 0:
print("ERROR, no links found") print("ERROR, no links found")
return return
print(f"We found {link_amount} links")
if args.mode == "screenshotter": # link selection
if args.id is not None:
selected_links = [l for l in data.links if l.id == args.id] if args.id is not None else data.links
if not selected_links:
print(f"ERROR: ticket id {args.id} not found")
return
print(f"Selected link: {args.id}")
else:
selected_links = data.links
if args.mode == "screen":
screenshotter = Screenshotter(config) screenshotter = Screenshotter(config)
screenshotter.capture_tickets(data.links) screenshotter.capture_tickets(selected_links)
if args.mode == "parser": if args.mode == "parse":
for link in data.links: for link in selected_links:
print(link) print(link)
if args.mode == "class": if args.mode == "compare":
classifier = LinkClassifier() use_link = args.classifier in ("link", "both")
results = [] use_img = args.classifier in ("img", "both")
for link in data.links: linkclassifier = LinkClassifier() if use_link else None
results.append(classifier.classify(link)) for link in selected_links:
ticket = results[-1] link_ticket = linkclassifier.classify(link) if use_link else None
print(f"\n=== Link {ticket.id} ({len(ticket.bets)} bets) ===") img_ticket = img_classify([f"./data/screenshots/{link.id}.png"], ticket_id=link.id) if use_img else None
for bet in ticket.bets: if args.dump:
print(f" [{type(bet).__name__}]") if link_ticket:
for k, v in vars(bet).items(): _print_dump(link_ticket, "link classifier")
print(f" {k}: {v}") if img_ticket:
_print_dump(img_ticket, "image classifier")
elif args.classifier == "both" and link_ticket and img_ticket:
_print_compare(link_ticket, img_ticket)
elif link_ticket:
_print_single(link_ticket, "link classifier")
elif img_ticket:
_print_single(img_ticket, "image classifier")
if args.mode == "resolve": if args.mode == "resolve":
classifier = LinkClassifier() classifier = LinkClassifier()
resolver = TicketResolver(config.resolver) resolver = TicketResolver(config.resolver)
links = [l for l in data.links if l.id == args.id] if args.id is not None else data.links
if args.id is not None and not links: for link in selected_links:
print(f"ERROR: ticket id {args.id} not found")
return
for link in links:
print(f"\n=== Classifying ticket {link.id} ===") print(f"\n=== Classifying ticket {link.id} ===")
ticket = classifier.classify(link) ticket = classifier.classify(link)
for bet in ticket.bets: for bet in ticket.bets:
@@ -82,8 +273,9 @@ def main() -> None:
print(f"\n--- Resolving ticket {link.id} ---") print(f"\n--- Resolving ticket {link.id} ---")
resolved = resolver.resolve(ticket) resolved = resolver.resolve(ticket)
color = _VERDICT_COLOR.get(resolved.verdict, "") if args.dump:
print(f"\n {color}{_B}VERDICT: {resolved.verdict.value.upper()}{_R}") _print_resolve_dump(resolved)
print(f"\n {_ansi.bold(_verdict_str(resolved.verdict))}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,5 +1,6 @@
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from beaky.image_classifier.config import ImgClassifierConfig
from beaky.resolvers.config import ResolverConfig from beaky.resolvers.config import ResolverConfig
from beaky.screenshotter.config import ScreenshotterConfig from beaky.screenshotter.config import ScreenshotterConfig
@@ -9,3 +10,4 @@ class Config:
path: str path: str
screenshotter: ScreenshotterConfig screenshotter: ScreenshotterConfig
resolver: ResolverConfig resolver: ResolverConfig
img_classifier: ImgClassifierConfig

View File

@@ -14,8 +14,32 @@ class BetType(str, Enum):
BOTH_TEAM_SCORED = "both_team_scored" BOTH_TEAM_SCORED = "both_team_scored"
GOAL_AMOUNT = "goal_amount" GOAL_AMOUNT = "goal_amount"
GOAL_HANDICAP = "goal_handicap" GOAL_HANDICAP = "goal_handicap"
HALF_TIME_RESULT = "half_time_result"
HALF_TIME_DOUBLE = "half_time_double"
HALF_TIME_FULL_TIME = "half_time_full_time"
CORNER_AMOUNT = "corner_amount"
TEAM_CORNER_AMOUNT = "team_corner_amount"
MORE_OFFSIDES = "more_offsides"
UNKNOWN = "unknown" UNKNOWN = "unknown"
...
class BetOutcome(str, Enum):
WIN = "win"
LOSE = "lose"
VOID = "void" # stake returned (e.g. WinLose on draw, integer goal line hit)
UNKNOWN = "unknown" # fixture not found or unclassified bet
@dataclass
class MatchInfo:
goals_home: int
goals_away: int
half_time_home: int | None = None
half_time_away: int | None = None
corners_home: int | None = None
corners_away: int | None = None
offsides_home: int | None = None
offsides_away: int | None = None
@dataclass @dataclass
@@ -25,64 +49,194 @@ class Bet(ABC):
team2Name: str team2Name: str
date: datetime date: datetime
league: str league: str
@abstractmethod @abstractmethod
def resolve(self): pass def resolve(self, match: MatchInfo) -> BetOutcome: ...
@dataclass @dataclass
class WinDrawLose(Bet): class WinDrawLose(Bet):
"""Výsledek zápasu 1X2""" """Výsledek zápasu 1X2"""
betType: Literal["X", "0", "1", "2"] = "0"
def resolve(self): betType: Literal["X", "0", "1", "2"]
...
def resolve(self, match: MatchInfo) -> BetOutcome:
home, away = match.goals_home, match.goals_away
bet_draw = self.betType in ("X", "0")
if bet_draw:
return BetOutcome.WIN if home == away else BetOutcome.LOSE
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual == self.betType else BetOutcome.LOSE
@dataclass @dataclass
class Advance(Bet): class Advance(Bet):
"""What team advances to next round""" """What team advances to next round"""
def resolve(self):
raise NotImplementedError("Vyser si voko vine") def resolve(self, match: MatchInfo) -> BetOutcome:
raise NotImplementedError("Advance bet resolution is not implemented")
@dataclass @dataclass
class WinDrawLoseDouble(Bet): class WinDrawLoseDouble(Bet):
"""Výsledek zápasu - double""" """Výsledek zápasu - double"""
betType: Literal["01", "12", "02"] = "01"
def resolve(self): betType: Literal["01", "12", "02"]
...
def resolve(self, match: MatchInfo) -> BetOutcome:
home, away = match.goals_home, match.goals_away
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual in self.betType else BetOutcome.LOSE
@dataclass @dataclass
class WinLose(Bet): class WinLose(Bet):
"""Výsledek zápasu bez remízy""" """Výsledek zápasu bez remízy"""
betType: Literal["1", "2"] = "1"
def resolve(self): betType: Literal["1", "2"]
...
def resolve(self, match: MatchInfo) -> BetOutcome:
home, away = match.goals_home, match.goals_away
if home == away:
return BetOutcome.VOID
actual = "1" if home > away else "2"
return BetOutcome.WIN if actual == self.betType else BetOutcome.LOSE
@dataclass @dataclass
class BothTeamScored(Bet): class BothTeamScored(Bet):
def resolve(self): def resolve(self, match: MatchInfo) -> BetOutcome:
... return BetOutcome.WIN if match.goals_home > 0 and match.goals_away > 0 else BetOutcome.LOSE
@dataclass @dataclass
class GoalAmount(Bet): class GoalAmount(Bet):
"""Počet gólů v zápasu — over/under total goals""" """Počet gólů v zápasu — over/under total goals"""
line: float = 0.0 # goal line, e.g. 2.5
over: bool = True # True = more than line, False = less than line line: float
def resolve(self): over: bool # True = more than line, False = less than line
...
def resolve(self, match: MatchInfo) -> BetOutcome:
total = match.goals_home + match.goals_away
if total == self.line:
return BetOutcome.VOID
won = total > self.line if self.over else total < self.line
return BetOutcome.WIN if won else BetOutcome.LOSE
@dataclass @dataclass
class GoalHandicap(Bet): class GoalHandicap(Bet):
"""Goal handicap for a specific team — add handicap_amount to team's score, team wins = you win""" """Goal handicap for a specific team — add handicap_amount to team's score, team wins = you win"""
team_bet: Literal["1", "2"] = "1" # which team the handicap is applied to
handicap_amount: float = 0.0 # e.g. +1.5 or -0.5 team_bet: Literal["1", "2"] # which team the handicap is applied to
def resolve(self): handicap_amount: float # e.g. +1.5 or -0.5
...
def resolve(self, match: MatchInfo) -> BetOutcome:
home = match.goals_home + (self.handicap_amount if self.team_bet == "1" else 0.0)
away = match.goals_away + (self.handicap_amount if self.team_bet == "2" else 0.0)
if home == away:
return BetOutcome.VOID
actual_winner = "1" if home > away else "2"
return BetOutcome.WIN if actual_winner == self.team_bet else BetOutcome.LOSE
@dataclass @dataclass
class UnknownTicket(Bet): class HalfTimeResult(Bet):
"""Bet type that could not be classified""" """Výsledek 1. poločasu: 0/1/2"""
raw_text: str = ""
def resolve(self):
...
betType: Literal["0", "1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.half_time_home is None or match.half_time_away is None:
return BetOutcome.UNKNOWN
actual = "1" if match.half_time_home > match.half_time_away else ("0" if match.half_time_home == match.half_time_away else "2")
return BetOutcome.WIN if actual == self.betType else BetOutcome.LOSE
@dataclass
class HalfTimeDouble(Bet):
"""Výsledek 1. poločasu - dvojtip: 10/02/01"""
betType: Literal["01", "02", "12"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.half_time_home is None or match.half_time_away is None:
return BetOutcome.UNKNOWN
actual = "1" if match.half_time_home > match.half_time_away else ("0" if match.half_time_home == match.half_time_away else "2")
return BetOutcome.WIN if actual in self.betType else BetOutcome.LOSE
@dataclass
class HalfTimeFullTime(Bet):
"""Výsledek 1. poločasu/výsledek zápasu: X/Y"""
ht_bet: Literal["0", "1", "2"]
ft_bet: Literal["0", "1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.half_time_home is None or match.half_time_away is None:
return BetOutcome.UNKNOWN
actual_ht = "1" if match.half_time_home > match.half_time_away else ("0" if match.half_time_home == match.half_time_away else "2")
actual_ft = "1" if match.goals_home > match.goals_away else ("0" if match.goals_home == match.goals_away else "2")
return BetOutcome.WIN if actual_ht == self.ht_bet and actual_ft == self.ft_bet else BetOutcome.LOSE
@dataclass
class CornerAmount(Bet):
"""Počet rohových kopů v zápasu X.5: +/- — total corners over/under"""
line: float
over: bool
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.corners_home is None or match.corners_away is None:
return BetOutcome.UNKNOWN
total = match.corners_home + match.corners_away
if total == self.line:
return BetOutcome.VOID
return BetOutcome.WIN if (total > self.line) == self.over else BetOutcome.LOSE
@dataclass
class TeamCornerAmount(Bet):
"""Team-specific corners over/under"""
team_bet: Literal["1", "2"]
line: float
over: bool
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.corners_home is None or match.corners_away is None:
return BetOutcome.UNKNOWN
corners = match.corners_home if self.team_bet == "1" else match.corners_away
if corners == self.line:
return BetOutcome.VOID
return BetOutcome.WIN if (corners > self.line) == self.over else BetOutcome.LOSE
@dataclass
class MoreOffsides(Bet):
"""Více ofsajdů v zápasu: 1/2"""
team_bet: Literal["1", "2"]
def resolve(self, match: MatchInfo) -> BetOutcome:
if match.offsides_home is None or match.offsides_away is None:
return BetOutcome.UNKNOWN
if match.offsides_home == match.offsides_away:
return BetOutcome.VOID
actual = "1" if match.offsides_home > match.offsides_away else "2"
return BetOutcome.WIN if actual == self.team_bet else BetOutcome.LOSE
@dataclass
class UnknownBet(Bet):
"""Bet type that could not be classified"""
raw_text: str = ""
def resolve(self, match: MatchInfo) -> BetOutcome:
return BetOutcome.UNKNOWN
@dataclass @dataclass

View File

@@ -1,7 +1,194 @@
from datetime import datetime import datetime
import logging
import re
from pathlib import Path
from beaky.datamodels.ticket import Ticket from pytesseract import pytesseract
from beaky.datamodels.ticket import (
Advance,
Bet,
BetType,
BothTeamScored,
GoalAmount,
GoalHandicap,
Ticket,
UnknownBet,
WinDrawLose,
WinDrawLoseDouble,
WinLose,
)
def f(path:str, date:datetime) -> list[Ticket]: def img_to_text(path: str) -> str:
... """Given a path to an image, return the text contained in that image.
Bypasses PIL and lets Tesseract read the file directly.
"""
try:
text = pytesseract.image_to_string(path, lang="ces")
return text.strip()
except pytesseract.TesseractNotFoundError:
print("Error: Tesseract executable not found on your system.")
return ""
except Exception as e:
print(f"Error processing {path}: {e}")
return ""
def classify(text: str) -> Bet:
"""Given text extracted from an image and a date, return a Bet object that is
relevant to that text."""
logger = logging.getLogger(__name__)
if not text:
return UnknownBet(
ticketType=BetType.UNKNOWN,
team1Name="N/A",
team2Name="N/A",
date=datetime.datetime.now(),
league="N/A",
raw_text="No text extracted",
)
# 1. Defaults & Normalization
text_lower = text.lower()
date_obj = datetime.datetime.now()
team1, team2 = "Unknown", "Unknown"
league = "Unknown"
# 2. Heuristic extraction of Teams (Looking for "Team A - Team B" patterns)
lines = [line.strip() for line in text.split("\n") if line.strip()]
for line in lines:
if " - " in line or " vs " in line or " v " in line:
# Avoid splitting on hyphens in dates or numbers
if not re.search(r"\d\s*-\s*\d", line):
parts = re.split(r" - | vs | v ", line)
if len(parts) >= 2:
team1, team2 = parts[0].strip(), parts[1].strip()
break
# 3. Heuristic extraction of Date (Looking for DD.MM. YYYY HH:MM)
date_match = re.search(r"(\d{1,2}\.\s*\d{1,2}\.?\s*(?:\d{2,4})?)\s*(\d{1,2}:\d{2})?", text)
if date_match:
try:
# Fallback to current year if missing, basic parse attempt
date_str = f"{date_match.group(1).replace(' ', '')} {date_match.group(2) or '00:00'}"
if len(date_str.split(".")[2]) <= 5: # Missing year
date_str = date_str.replace(" ", f"{datetime.datetime.now().year} ")
date_obj = datetime.datetime.strptime(date_str, "%d.%m.%Y %H:%M")
except Exception:
pass # Keep default if parsing fails
# 4. Classification Logic based on keywords
base_args = {"team1Name": team1, "team2Name": team2, "date": date_obj, "league": league}
# Advance / Postup
if any(kw in text_lower for kw in ["postup", "postoupí", "advance"]):
return Advance(ticketType=BetType.ADVANCED, **base_args)
# Both Teams to Score / Oba dají gól
if any(kw in text_lower for kw in ["oba dají gól", "btts", "oba týmy dají gól"]):
return BothTeamScored(ticketType=BetType.BOTH_TEAM_SCORED, **base_args)
# Goal Amount (Over/Under)
if any(kw in text_lower for kw in ["počet gólů", "více než", "méně než", "over", "under"]):
# Attempt to find the goal line (e.g., 2.5, 3.5)
line_match = re.search(r"(\d+\.\d+)", text)
line_val = float(line_match.group(1)) if line_match else 2.5
is_over = any(kw in text_lower for kw in ["více", "over", "+"])
return GoalAmount(ticketType=BetType.GOAL_AMOUNT, line=line_val, over=is_over, **base_args)
# Goal Handicap
if any(kw in text_lower for kw in ["handicap", "hcp"]):
hcp_match = re.search(r"([+-]?\d+\.\d+)", text)
hcp_val = float(hcp_match.group(1)) if hcp_match else 0.0
# Simplistic logic: guess team 1 if not explicitly stated
team_bet = "2" if " 2 " in text else "1"
return GoalHandicap(ticketType=BetType.GOAL_HANDICAP, team_bet=team_bet, handicap_amount=hcp_val, **base_args)
# Win Draw Lose Double (1X, X2, 12)
if any(kw in text_lower for kw in ["1x", "x2", "12", "dvojitá šance", "neprohra"]):
bet_type = "01" if "1x" in text_lower else "02" if "x2" in text_lower else "12"
return WinDrawLoseDouble(ticketType=BetType.WIN_DRAW_LOSE_DOUBLE, betType=bet_type, **base_args)
# Win Lose (Draw no bet / Vítěz do rozhodnutí)
if any(kw in text_lower for kw in ["bez remízy", "vítěz do rozhodnutí", "konečný vítěz"]):
bet_type = "2" if re.search(r"\b2\b", text) else "1"
return WinLose(ticketType=BetType.WIN_LOSE, betType=bet_type, **base_args)
# Win Draw Lose (Standard Match Odds)
if any(kw in text_lower for kw in ["zápas", "výsledek zápasu", "1x2"]):
# Look for isolated 1, X (or 0), or 2
match_pick = re.search(r"\b(1|x|0|2)\b", text_lower)
bet_type = match_pick.group(1).upper() if match_pick else "1"
if bet_type == "X":
bet_type = "0"
return WinDrawLose(ticketType=BetType.WIN_DRAW_LOSE, betType=bet_type, **base_args)
# Fallback Unknown
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=text, **base_args)
def img_classify(paths: list[str], ticket_id: int) -> Ticket:
"""Given a path to an image and a date, return a list of Tickets that are
relevant to that image and date."""
# Define valid image extensions to ignore system files or text documents
ticket = Ticket(id=ticket_id, bets=[])
valid_extensions = {".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"}
# Iterate through all files in the folder
for file in paths:
file_path = Path(file)
if file_path.is_file() and file_path.suffix.lower() in valid_extensions:
# 1. Extract the text (called separately)
extracted_text = img_to_text(str(file_path))
print(extracted_text)
# 2. Classify based on the extracted text (called separately)
try:
result = classify(extracted_text)
except Exception as exc: # pragma: no cover - defensive fallback
# Ensure result is always defined so downstream code cannot reference an unbound name
print(f"classify() raised an exception: {exc}")
result = UnknownBet(
ticketType=BetType.UNKNOWN,
team1Name="N/A",
team2Name="N/A",
league="N/A",
raw_text=extracted_text,
date=datetime.datetime.now(),
)
# 3. Add the resulting tickets to our main list
# Support classifier returning either a single Bet or a list of Bet
if result is None:
continue
if isinstance(result, list):
for r in result:
print(
r.date,
getattr(r, "ticketType", None),
r.team1Name,
r.team2Name,
r.league,
)
ticket.bets.extend(result)
else:
print(
result.date,
getattr(result, "ticketType", None),
result.team1Name,
result.team2Name,
result.league,
)
ticket.bets.append(result)
return ticket
if __name__ == "__main__":
img_classify(["./data/screenshots/2.png"], ticket_id=1)

View File

@@ -0,0 +1,6 @@
from pydantic.dataclasses import dataclass
@dataclass
class ImgClassifierConfig:
target_path: str

View File

@@ -1,19 +1,26 @@
import re import re
from datetime import datetime from datetime import datetime
from typing import Any
from playwright.sync_api import Page, sync_playwright from playwright.sync_api import Page, sync_playwright
from beaky.datamodels.ticket import ( from beaky.datamodels.ticket import (
Bet,
BetType,
BothTeamScored, BothTeamScored,
CornerAmount,
GoalAmount, GoalAmount,
GoalHandicap, GoalHandicap,
HalfTimeDouble,
HalfTimeFullTime,
HalfTimeResult,
MoreOffsides,
TeamCornerAmount,
Ticket, Ticket,
BetType, UnknownBet,
UnknownTicket,
WinDrawLose, WinDrawLose,
WinDrawLoseDouble, WinDrawLoseDouble,
WinLose, WinLose,
Bet
) )
from beaky.scanner.scanner import Link from beaky.scanner.scanner import Link
@@ -37,24 +44,24 @@ def _parse_teams(title: str) -> tuple[str, str]:
def _classify_bet(bet_text: str, team1: str, team2: str, date: datetime, league: str) -> Bet: def _classify_bet(bet_text: str, team1: str, team2: str, date: datetime, league: str) -> Bet:
common = dict(team1Name=team1, team2Name=team2, date=date, league=league) common: dict[str, Any] = dict(team1Name=team1, team2Name=team2, date=date, league=league)
# WinDrawLose double: "Výsledek zápasu - dvojtip: 10" # WinDrawLose double: "Výsledek zápasu - dvojtip: 10"
m = re.search(r"Výsledek zápasu - dvojtip:\s*(\d+)", bet_text) m = re.search(r"Výsledek zápasu - dvojtip:\s*(\d+)", bet_text)
if m: if m:
# normalize order: "10" -> "01", "02" -> "02", "12" -> "12" # normalize order: "10" -> "01", "02" -> "02", "12" -> "12"
bet_type = "".join(sorted(m.group(1))) bet_type = "".join(sorted(m.group(1)))
return WinDrawLoseDouble(ticketType=BetType.WIN_DRAW_LOSE_DOUBLE, betType=bet_type, **common) return WinDrawLoseDouble(ticketType=BetType.WIN_DRAW_LOSE_DOUBLE, betType=bet_type, **common) # type: ignore[arg-type]
# WinLose (no draw): "Výsledek bez remízy: 1" # WinLose (no draw): "Výsledek bez remízy: 1"
m = re.search(r"bez rem[ií]zy:\s*([12])", bet_text) m = re.search(r"bez rem[ií]zy:\s*([12])", bet_text)
if m: if m:
return WinLose(ticketType=BetType.WIN_LOSE, betType=m.group(1), **common) return WinLose(ticketType=BetType.WIN_LOSE, betType=m.group(1), **common) # type: ignore[arg-type]
# WinDrawLose: "Výsledek zápasu: 1" # WinDrawLose: "Výsledek zápasu: 1"
m = re.search(r"Výsledek zápasu:\s*([012X])\s*$", bet_text.strip()) m = re.search(r"Výsledek zápasu:\s*([012X])\s*$", bet_text.strip())
if m: if m:
return WinDrawLose(ticketType=BetType.WIN_DRAW_LOSE, betType=m.group(1), **common) return WinDrawLose(ticketType=BetType.WIN_DRAW_LOSE, betType=m.group(1), **common) # type: ignore[arg-type]
# BothTeamScored: "Každý z týmů dá gól v zápasu: Ano" # BothTeamScored: "Každý z týmů dá gól v zápasu: Ano"
if "dá gól" in bet_text or "oba týmy" in bet_text.lower(): if "dá gól" in bet_text or "oba týmy" in bet_text.lower():
@@ -74,12 +81,47 @@ def _classify_bet(bet_text: str, team1: str, team2: str, date: datetime, league:
elif team2.lower() in bet_lower: elif team2.lower() in bet_lower:
team_bet = "2" team_bet = "2"
else: else:
return UnknownTicket(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common) return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
sign = 1.0 if m.group(1) == "+" else -1.0 sign = 1.0 if m.group(1) == "+" else -1.0
handicap = sign * float(m.group(2)) handicap = sign * float(m.group(2))
return GoalHandicap(ticketType=BetType.GOAL_HANDICAP, team_bet=team_bet, handicap_amount=handicap, **common) return GoalHandicap(ticketType=BetType.GOAL_HANDICAP, team_bet=team_bet, handicap_amount=handicap, **common) # type: ignore[arg-type]
return UnknownTicket(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common) # HalfTimeFullTime: "Výsledek 1. poločasu/výsledek zápasu: 0/2" (before HalfTimeResult)
m = re.search(r"poločasu/výsledek zápasu:\s*([012])/([012])", bet_text)
if m:
return HalfTimeFullTime(ticketType=BetType.HALF_TIME_FULL_TIME, ht_bet=m.group(1), ft_bet=m.group(2), **common) # type: ignore[arg-type]
# HalfTimeDouble: "Výsledek 1. poločasu - dvojtip: 10" (before HalfTimeResult)
m = re.search(r"poločasu - dvojtip:\s*(\d+)", bet_text)
if m:
bet_type = "".join(sorted(m.group(1)))
return HalfTimeDouble(ticketType=BetType.HALF_TIME_DOUBLE, betType=bet_type, **common) # type: ignore[arg-type]
# HalfTimeResult: "Výsledek 1. poločasu: 1"
m = re.search(r"poločasu:\s*([012])\s*$", bet_text.strip())
if m:
return HalfTimeResult(ticketType=BetType.HALF_TIME_RESULT, betType=m.group(1), **common) # type: ignore[arg-type]
# CornerAmount: "Počet rohových kopů v zápasu 8.5: + 8.5"
m = re.search(r"Počet rohových kopů v zápasu\s+(\d+(?:\.\d+)?):\s*([+-])", bet_text)
if m:
return CornerAmount(ticketType=BetType.CORNER_AMOUNT, line=float(m.group(1)), over=m.group(2) == "+", **common)
# TeamCornerAmount: "RB Leipzig počet rohových kopů v zápasu: +7.5"
m = re.search(r"počet rohových kopů v zápasu:\s*([+-])\s*(\d+(?:\.\d+)?)", bet_text)
if m:
bet_lower = bet_text.lower()
team_bet = "1" if team1.lower() in bet_lower else ("2" if team2.lower() in bet_lower else None)
if team_bet is None:
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
return TeamCornerAmount(ticketType=BetType.TEAM_CORNER_AMOUNT, team_bet=team_bet, line=float(m.group(2)), over=m.group(1) == "+", **common) # type: ignore[arg-type]
# MoreOffsides: "Více ofsajdů v zápasu: 1"
m = re.search(r"Více ofsajdů v zápasu:\s*([12])", bet_text)
if m:
return MoreOffsides(ticketType=BetType.MORE_OFFSIDES, team_bet=m.group(1), **common) # type: ignore[arg-type]
return UnknownBet(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
def _extract_legs(page: Page, fallback_date: datetime | None) -> list[Bet]: def _extract_legs(page: Page, fallback_date: datetime | None) -> list[Bet]:

View File

@@ -4,3 +4,5 @@ from pydantic.dataclasses import dataclass
@dataclass @dataclass
class ResolverConfig: class ResolverConfig:
api_key: str api_key: str
league_map: dict[str, int]
cache_path: str = "data/fixture_cache"

View File

@@ -1,102 +1,27 @@
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timedelta from datetime import date, datetime, timedelta
from difflib import SequenceMatcher from difflib import SequenceMatcher
from enum import Enum from enum import Enum
from typing import Any
import diskcache
import requests import requests
from beaky import _ansi
from beaky.datamodels.ticket import ( from beaky.datamodels.ticket import (
Bet, Bet,
BothTeamScored, BetOutcome,
GoalAmount, MatchInfo,
GoalHandicap,
Ticket, Ticket,
UnknownTicket, UnknownBet,
WinDrawLose,
WinDrawLoseDouble,
WinLose,
) )
from beaky.resolvers.config import ResolverConfig from beaky.resolvers.config import ResolverConfig
_API_BASE = "https://v3.football.api-sports.io" _API_BASE = "https://v3.football.api-sports.io"
# Fortuna league strings (lowercased substring match) -> api-football league ID
_LEAGUE_MAP: dict[str, int] = {
# European cups
"liga mistrů": 2,
"champions league": 2,
"evropská liga": 3,
"europa league": 3,
"konferenční liga": 848,
"conference league": 848,
# Top flights
"1. anglie": 39,
"1. belgie": 144,
"1. česko": 345,
"1. dánsko": 119,
"1. francie": 61,
"1. itálie": 135,
"1. itálie - ženy": 794,
"1. německo": 78,
"1. nizozemsko": 88,
"1. polsko": 106,
"1. portugalsko": 94,
"1. rakousko": 218,
"1. rumunsko": 283,
"1. skotsko": 179,
"1. slovensko": 332,
"1. španělsko": 140,
"1. wales": 771,
# Second divisions
"2. anglie": 40,
"2. česko": 346,
"2. francie": 62,
"2. itálie": 136,
"2. německo": 79,
"2. nizozemsko": 89,
"2. rakousko": 219,
"2. slovensko": 333,
"2. španělsko": 141,
# Third divisions
"3. francie": 63,
"3. česko msfl": 349,
"3. česko čfl": 348,
# Fourth divisions
"4. česko - sk. a": 350,
"4. česko - sk. b": 351,
"4. česko - sk. c": 352,
"4. česko - sk. d": 353,
"4. česko - sk. e": 354,
"4. česko - sk. f": 686,
# Women
"1. česko - ženy": 669,
"fortuna=liga ženy": 669,
# Domestic cups
"anglie - fa cup": 45,
"anglie - efl cup": 48,
"česko - pohár": 347,
}
_DATE_WINDOW = 3 # days either side of the bet date to search _DATE_WINDOW = 3 # days either side of the bet date to search
# ANSI color helpers
_R = "\033[0m"
_B = "\033[1m"
_DIM= "\033[2m"
_GREEN = "\033[32m"
_RED = "\033[31m"
_YELLOW = "\033[33m"
_CYAN = "\033[36m"
_GRAY = "\033[90m"
_OUTCOME_COLOR = {
"win": _GREEN,
"lose": _RED,
"void": _YELLOW,
"unknown": _GRAY,
}
class TicketVerdict(str, Enum): class TicketVerdict(str, Enum):
TRUTHFUL = "truthful" TRUTHFUL = "truthful"
@@ -105,13 +30,6 @@ class TicketVerdict(str, Enum):
UNKNOWN = "unknown — could not resolve enough bets to decide" UNKNOWN = "unknown — could not resolve enough bets to decide"
class BetOutcome(str, Enum):
WIN = "win"
LOSE = "lose"
VOID = "void" # stake returned (e.g. WinLose on draw, integer goal line hit)
UNKNOWN = "unknown" # fixture not found or unclassified bet
@dataclass @dataclass
class ResolvedBet: class ResolvedBet:
bet: Bet bet: Bet
@@ -127,6 +45,7 @@ class ResolvedBet:
date_proximity: float = 0.0 date_proximity: float = 0.0
league_found: float = 0.0 league_found: float = 0.0
match_finished: float = 0.0 match_finished: float = 0.0
match_info: MatchInfo | None = None
@dataclass @dataclass
@@ -136,8 +55,8 @@ class ResolvedTicket:
@property @property
def verdict(self) -> TicketVerdict: def verdict(self) -> TicketVerdict:
resolvable = [b for b in self.bets if not isinstance(b.bet, UnknownTicket)] resolvable = [b for b in self.bets if not isinstance(b.bet, UnknownBet)]
unresolvable = [b for b in self.bets if isinstance(b.bet, UnknownTicket)] unresolvable = [b for b in self.bets if isinstance(b.bet, UnknownBet)]
if not resolvable: if not resolvable:
return TicketVerdict.UNKNOWN return TicketVerdict.UNKNOWN
if any(b.outcome == BetOutcome.LOSE for b in resolvable): if any(b.outcome == BetOutcome.LOSE for b in resolvable):
@@ -149,7 +68,7 @@ class ResolvedTicket:
return TicketVerdict.TRUTHFUL return TicketVerdict.TRUTHFUL
def _get(url: str, headers: dict, params: dict, retries: int = 3, backoff: float = 60.0) -> requests.Response: def _get(url: str, headers: dict[str, str], params: dict[str, str | int], retries: int = 3, backoff: float = 60.0) -> requests.Response:
for attempt in range(retries): for attempt in range(retries):
resp = requests.get(url, headers=headers, params=params) resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 429: if resp.status_code == 429:
@@ -165,8 +84,10 @@ def _get(url: str, headers: dict, params: dict, retries: int = 3, backoff: float
class TicketResolver: class TicketResolver:
def __init__(self, config: ResolverConfig): def __init__(self, config: ResolverConfig):
self._headers = {"x-apisports-key": config.api_key} self._headers = {"x-apisports-key": config.api_key}
self._league_map = config.league_map
self._disk_cache: diskcache.Cache = diskcache.Cache(config.cache_path)
# Cache maps (center_date_str, league_id | None) -> list of fixture dicts # Cache maps (center_date_str, league_id | None) -> list of fixture dicts
self._fixture_cache: dict[tuple[str, int | None], list[dict]] = {} self._fixture_cache: dict[tuple[str, int | None], list[dict[str, Any]]] = {}
# Cache maps league name -> (league_id, confidence) # Cache maps league name -> (league_id, confidence)
self._league_cache: dict[str, tuple[int | None, float]] = {} self._league_cache: dict[str, tuple[int | None, float]] = {}
@@ -178,31 +99,42 @@ class TicketResolver:
def _resolve_bet(self, bet: Bet) -> ResolvedBet: def _resolve_bet(self, bet: Bet) -> ResolvedBet:
bet_type = type(bet).__name__ bet_type = type(bet).__name__
print(f"\n {_B}{_CYAN}┌─ [{bet_type}]{_R} {_B}{bet.team1Name} vs {bet.team2Name}{_R}" print(f"\n {_ansi.bold(_ansi.cyan(f'┌─ [{bet_type}]'))} {_ansi.bold(f'{bet.team1Name} vs {bet.team2Name}')}"
f" {_DIM}{bet.date.strftime('%Y-%m-%d')} | {bet.league}{_R}") f" {_ansi.dim(f'{bet.date.strftime('%Y-%m-%d')} | {bet.league}')}")
if isinstance(bet, UnknownTicket): if isinstance(bet, UnknownBet):
print(f" {_GRAY}│ skipping — not implemented: {bet.raw_text!r}{_R}") print(_ansi.gray(f" │ skipping — not implemented: {bet.raw_text!r}"))
print(f" {_GRAY}└─ UNKNOWN{_R}") print(_ansi.gray(" └─ UNKNOWN"))
return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN) return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN)
fixture, name_match, date_prox, league_conf = self._find_fixture(bet) fixture, name_match, date_prox, league_conf = self._find_fixture(bet)
if fixture is None: if fixture is None:
print(f" {_GRAY}└─ UNKNOWN — no fixture found{_R}") print(_ansi.gray(" └─ UNKNOWN — no fixture found"))
return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN, league_found=league_conf) return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN, league_found=league_conf)
home_name = fixture["teams"]["home"]["name"] home_name = fixture["teams"]["home"]["name"]
away_name = fixture["teams"]["away"]["name"] away_name = fixture["teams"]["away"]["name"]
finished = _is_finished(fixture) finished = _is_finished(fixture)
confidence = round((name_match + date_prox + league_conf + finished) / 4, 3) confidence = round((name_match + date_prox + league_conf + finished) / 4, 3)
outcome = _evaluate_bet(bet, fixture) if finished == 1.0 else BetOutcome.UNKNOWN
if finished == 1.0:
fixture = {**fixture, "statistics": self._get_statistics(fixture["fixture"]["id"])}
match_info = _fixture_to_match_info(fixture)
outcome = bet.resolve(match_info)
else:
match_info = None
outcome = BetOutcome.UNKNOWN
goals = fixture["goals"] goals = fixture["goals"]
color = _OUTCOME_COLOR.get(outcome.value, _GRAY) print(_ansi.dim(
print(f" {_DIM}│ matched #{fixture['fixture']['id']}: {home_name} vs {away_name}" f" │ matched #{fixture['fixture']['id']}: {home_name} vs {away_name}"
f" | {goals['home']}:{goals['away']} | {fixture['fixture']['status']['short']}" f" | {goals['home']}:{goals['away']} | {fixture['fixture']['status']['short']}"
f" | confidence {confidence} (name={name_match:.2f} date={date_prox:.2f} league={league_conf} finished={finished}){_R}") f" | confidence {confidence} (name={name_match:.2f} date={date_prox:.2f} league={league_conf} finished={finished})"
print(f" {color}{_B}└─ {outcome.value.upper()}{_R}") ))
print(_ansi.bold(_ansi.green(f" └─ {outcome.value.upper()}") if outcome == BetOutcome.WIN
else _ansi.red(f" └─ {outcome.value.upper()}") if outcome == BetOutcome.LOSE
else _ansi.yellow(f" └─ {outcome.value.upper()}") if outcome == BetOutcome.VOID
else _ansi.gray(f" └─ {outcome.value.upper()}")))
return ResolvedBet( return ResolvedBet(
bet=bet, bet=bet,
@@ -213,29 +145,53 @@ class TicketResolver:
date_proximity=round(date_prox, 3), date_proximity=round(date_prox, 3),
league_found=league_conf, league_found=league_conf,
match_finished=finished, match_finished=finished,
match_info=match_info,
) )
def _find_fixture(self, bet: Bet) -> tuple[dict | None, float, float, float]: def _get_statistics(self, fixture_id: int) -> list[dict[str, Any]]:
cache_key = ("stats", fixture_id)
if cache_key in self._disk_cache:
print(_ansi.gray(f" │ /fixtures/statistics served from disk cache (fixture={fixture_id})"))
return self._disk_cache[cache_key] # type: ignore[no-any-return]
print(_ansi.gray(f" │ GET /fixtures/statistics fixture={fixture_id}"))
resp = _get(f"{_API_BASE}/fixtures/statistics", headers=self._headers, params={"fixture": fixture_id})
resp.raise_for_status()
stats = resp.json().get("response", [])
self._disk_cache[cache_key] = stats
return stats
def _find_fixture(self, bet: Bet) -> tuple[dict[str, Any] | None, float, float, float]:
"""Returns (fixture, name_match, date_proximity, league_confidence).""" """Returns (fixture, name_match, date_proximity, league_confidence)."""
center = bet.date.date() center = bet.date.date()
date_str = center.strftime("%Y-%m-%d") date_str = center.strftime("%Y-%m-%d")
league_id, league_conf = self._resolve_league(bet.league) league_id, league_conf = self._resolve_league(bet.league)
cache_key = (date_str, league_id) cache_key = (date_str, league_id)
window_end = center + timedelta(days=_DATE_WINDOW)
cache_may_be_stale = window_end >= date.today()
if cache_key not in self._fixture_cache: if cache_key not in self._fixture_cache:
if cache_key in self._disk_cache and not cache_may_be_stale:
self._fixture_cache[cache_key] = self._disk_cache[cache_key]
print(_ansi.gray(f" │ /fixtures served from disk cache ({len(self._fixture_cache[cache_key])} fixtures)"))
else:
date_from = (center - timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d") date_from = (center - timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d")
date_to = (center + timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d") date_to = (center + timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d")
params: dict = {"from": date_from, "to": date_to} params: dict[str, str | int] = {"from": date_from, "to": date_to}
if league_id is not None: if league_id is not None:
params["league"] = league_id params["league"] = league_id
params["season"] = center.year if center.month >= 7 else center.year - 1 params["season"] = center.year if center.month >= 7 else center.year - 1
print(f" {_GRAY}│ GET /fixtures {params}{_R}") print(_ansi.gray(f" │ GET /fixtures {params}"))
resp = _get(f"{_API_BASE}/fixtures", headers=self._headers, params=params) resp = _get(f"{_API_BASE}/fixtures", headers=self._headers, params=params)
resp.raise_for_status() resp.raise_for_status()
self._fixture_cache[cache_key] = resp.json().get("response", []) self._fixture_cache[cache_key] = resp.json().get("response", [])
print(f" {_GRAY}{len(self._fixture_cache[cache_key])} fixtures returned (cached){_R}") print(_ansi.gray(f"{len(self._fixture_cache[cache_key])} fixtures returned"))
cacheable = [f for f in self._fixture_cache[cache_key] if f.get("fixture", {}).get("status", {}).get("short") != "NS"]
if cacheable:
self._disk_cache[cache_key] = cacheable
print(_ansi.gray(f"{len(cacheable)} non-NS fixture(s) written to disk cache"))
else: else:
print(f" {_GRAY}│ /fixtures (±{_DATE_WINDOW}d of {date_str}, league={league_id}) served from cache{_R}") print(_ansi.gray(f" │ /fixtures (±{_DATE_WINDOW}d of {date_str}, league={league_id}) served from memory"))
fixture, name_match, date_prox = _best_fixture_match( fixture, name_match, date_prox = _best_fixture_match(
self._fixture_cache[cache_key], bet.team1Name, bet.team2Name, center self._fixture_cache[cache_key], bet.team1Name, bet.team2Name, center
@@ -247,49 +203,90 @@ class TicketResolver:
if key in self._league_cache: if key in self._league_cache:
return self._league_cache[key] return self._league_cache[key]
for pattern, league_id in _LEAGUE_MAP.items(): # Use longest-match so "1. itálie - ženy" beats "1. itálie"
if pattern in key: best_pattern, best_id = max(
print(f" {_GRAY}│ league {league_name!r} -> id={league_id} (static map){_R}") ((p, lid) for p, lid in self._league_map.items() if p in key),
self._league_cache[key] = (league_id, 1.0) key=lambda t: len(t[0]),
return league_id, 1.0 default=(None, None),
)
if best_id is not None:
print(_ansi.gray(f" │ league {league_name!r} -> id={best_id} (static map, pattern={best_pattern!r})"))
self._league_cache[key] = (best_id, 1.0)
return best_id, 1.0
# Fall back to API search — lower confidence since first result is taken unverified # Fall back to API search — lower confidence since first result is taken unverified
print(f" {_GRAY}│ GET /leagues search={league_name!r}{_R}") print(_ansi.gray(f" │ GET /leagues search={league_name!r}"))
resp = _get(f"{_API_BASE}/leagues", headers=self._headers, params={"search": league_name[:20]}) resp = _get(f"{_API_BASE}/leagues", headers=self._headers, params={"search": league_name[:20]})
results = resp.json().get("response", []) results = resp.json().get("response", [])
if results: if results:
league_id = results[0]["league"]["id"] league_id = results[0]["league"]["id"]
league_found_name = results[0]["league"]["name"] league_found_name = results[0]["league"]["name"]
print(f" {_GRAY}│ matched {league_found_name!r} id={league_id} (API fallback, confidence=0.7){_R}") print(_ansi.gray(f" │ matched {league_found_name!r} id={league_id} (API fallback, confidence=0.7)"))
self._league_cache[key] = (league_id, 0.7) self._league_cache[key] = (league_id, 0.7)
return league_id, 0.7 return league_id, 0.7
print(f" {_GRAY}│ no league found, searching fixtures by date only (confidence=0.3){_R}") print(_ansi.gray(" │ no league found, searching fixtures by date only (confidence=0.3)"))
self._league_cache[key] = (None, 0.3) self._league_cache[key] = (None, 0.3)
return None, 0.3 return None, 0.3
def _fixture_to_match_info(fixture: dict[str, Any]) -> MatchInfo:
goals = fixture.get("goals", {})
score = fixture.get("score", {})
halftime = score.get("halftime", {})
corners_home: int | None = None
corners_away: int | None = None
offsides_home: int | None = None
offsides_away: int | None = None
for stat_entry in fixture.get("statistics", []):
home_team_id = fixture.get("teams", {}).get("home", {}).get("id")
team_id = stat_entry.get("team", {}).get("id")
for stat in stat_entry.get("statistics", []):
value = stat.get("value")
if not isinstance(value, int):
continue
if stat.get("type") == "Corner Kicks":
if team_id == home_team_id:
corners_home = value
else:
corners_away = value
elif stat.get("type") == "Offsides":
if team_id == home_team_id:
offsides_home = value
else:
offsides_away = value
return MatchInfo(
goals_home=goals.get("home", 0),
goals_away=goals.get("away", 0),
half_time_home=halftime.get("home"),
half_time_away=halftime.get("away"),
corners_home=corners_home,
corners_away=corners_away,
offsides_home=offsides_home,
offsides_away=offsides_away,
)
def _similarity(a: str, b: str) -> float: def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio() return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def _date_proximity(fixture: dict, center) -> float: def _date_proximity(fixture: dict[str, Any], center: date) -> float:
"""1.0 on exact date, linear decay to 0.0 at _DATE_WINDOW days away.""" """1.0 on exact date, linear decay to 0.0 at _DATE_WINDOW days away."""
fixture_date = datetime.fromisoformat(fixture["fixture"]["date"].replace("Z", "+00:00")).date() fixture_date = datetime.fromisoformat(fixture["fixture"]["date"].replace("Z", "+00:00")).date()
days_off = abs((fixture_date - center).days) days_off = abs((fixture_date - center).days)
return max(0.0, 1.0 - days_off / _DATE_WINDOW) return max(0.0, 1.0 - days_off / _DATE_WINDOW)
def _best_fixture_match(fixtures: list[dict], team1: str, team2: str, center) -> tuple[dict | None, float, float]: def _best_fixture_match(fixtures: list[dict[str, Any]], team1: str, team2: str, center: date) -> tuple[dict[str, Any] | None, float, float]:
"""Returns (best_fixture, name_score, date_proximity) or (None, 0, 0) if no good match.""" """Returns (best_fixture, name_score, date_proximity) or (None, 0, 0) if no good match."""
best, best_combined, best_name, best_date = None, 0.0, 0.0, 0.0 best, best_combined, best_name, best_date = None, 0.0, 0.0, 0.0
for f in fixtures: for f in fixtures:
home = f["teams"]["home"]["name"] home = f["teams"]["home"]["name"]
away = f["teams"]["away"]["name"] away = f["teams"]["away"]["name"]
name_score = max( name_score = (_similarity(team1, home) + _similarity(team2, away)) / 2
_similarity(team1, home) + _similarity(team2, away),
_similarity(team1, away) + _similarity(team2, home),
) / 2
date_prox = _date_proximity(f, center) date_prox = _date_proximity(f, center)
# Name similarity is the primary signal; date proximity is a tiebreaker # Name similarity is the primary signal; date proximity is a tiebreaker
combined = name_score * 0.8 + date_prox * 0.2 combined = name_score * 0.8 + date_prox * 0.2
@@ -302,52 +299,6 @@ def _best_fixture_match(fixtures: list[dict], team1: str, team2: str, center) ->
return (best, best_name, best_date) if best_name > 0.5 else (None, best_name, best_date) return (best, best_name, best_date) if best_name > 0.5 else (None, best_name, best_date)
def _is_finished(fixture: dict) -> float: def _is_finished(fixture: dict[str, Any]) -> float:
status = fixture.get("fixture", {}).get("status", {}).get("short", "") status = fixture.get("fixture", {}).get("status", {}).get("short", "")
return 1.0 if status in ("FT", "AET", "PEN", "AWD", "WO") else 0.0 return 1.0 if status in ("FT", "AET", "PEN", "AWD", "WO") else 0.0
def _evaluate_bet(bet: Bet, fixture: dict) -> BetOutcome:
goals = fixture.get("goals", {})
home = goals.get("home")
away = goals.get("away")
if home is None or away is None:
return BetOutcome.UNKNOWN
if isinstance(bet, WinDrawLose):
bet_draw = bet.betType in ("X", "0")
if bet_draw:
return BetOutcome.WIN if home == away else BetOutcome.LOSE
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual == bet.betType else BetOutcome.LOSE
if isinstance(bet, WinDrawLoseDouble):
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual in bet.betType else BetOutcome.LOSE
if isinstance(bet, WinLose):
if home == away:
return BetOutcome.VOID
actual = "1" if home > away else "2"
return BetOutcome.WIN if actual == bet.betType else BetOutcome.LOSE
if isinstance(bet, BothTeamScored):
return BetOutcome.WIN if home > 0 and away > 0 else BetOutcome.LOSE
if isinstance(bet, GoalAmount):
total = home + away
if total == bet.line:
return BetOutcome.VOID
won = total > bet.line if bet.over else total < bet.line
return BetOutcome.WIN if won else BetOutcome.LOSE
if isinstance(bet, GoalHandicap):
h_home = home + (bet.handicap_amount if bet.team_bet == "1" else 0.0)
h_away = away + (bet.handicap_amount if bet.team_bet == "2" else 0.0)
if h_home == h_away:
return BetOutcome.VOID
actual_winner = "1" if h_home > h_away else "2"
return BetOutcome.WIN if actual_winner == bet.team_bet else BetOutcome.LOSE
return BetOutcome.UNKNOWN

View File

@@ -21,6 +21,7 @@ class Link:
url: str url: str
date: Optional[datetime] = None date: Optional[datetime] = None
class Links: class Links:
def __init__(self, path: str | Config): def __init__(self, path: str | Config):
if isinstance(path, Config): if isinstance(path, Config):
@@ -123,10 +124,6 @@ class Links:
return len(self.links) return len(self.links)
# Backwards-compatible alias in case other modules referenced Linker
Linker = Links
if __name__ == "__main__": if __name__ == "__main__":
links_obj = Links("data/odkazy.xlsx") links_obj = Links("data/odkazy.xlsx")
links = links_obj.ret_links() links = links_obj.ret_links()

View File

@@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import Any
from playwright.sync_api import sync_playwright from playwright.sync_api import sync_playwright
@@ -11,7 +12,7 @@ class Screenshotter:
self.config = config self.config = config
def capture_tickets(self, links: list[Link]): def capture_tickets(self, links: list[Link]) -> None:
with sync_playwright() as p: with sync_playwright() as p:
browser = p.chromium.launch(headless=True) browser = p.chromium.launch(headless=True)
context = browser.new_context() context = browser.new_context()
@@ -24,7 +25,7 @@ class Screenshotter:
browser.close() browser.close()
def capture_ticket(self,page, url, target_path, ticket_selector=".betslip-history-detail__left-panel"): def capture_ticket(self, page: Any, url: str, target_path: Path, ticket_selector: str = ".betslip-history-detail__left-panel") -> None:
page.goto(url) page.goto(url)
page.wait_for_selector(ticket_selector, timeout=10000) page.wait_for_selector(ticket_selector, timeout=10000)
page.wait_for_timeout(1000) page.wait_for_timeout(1000)