Compare commits

..

23 Commits

Author SHA1 Message Date
78d9fab189 RELEASE 0.1.0
Added resolver running on pure vibes
2026-03-22 00:56:05 +01:00
a6deeeaebf Fix screenshotter again 2026-03-21 23:16:43 +01:00
5fc7bfafad fix cli 2026-03-21 22:22:47 +01:00
7cd45f497a Rename ticket -> bet create ticket class 2026-03-21 22:20:09 +01:00
f40a7911ca Add one more classified class 2026-03-21 21:12:03 +01:00
57ad6c71f8 Ruffing 2026-03-21 20:43:34 +01:00
f4475ef1d4 Modify cli 2026-03-21 20:42:49 +01:00
5add445949 Add classifier contracts 2026-03-21 20:16:01 +01:00
987bdb2b63 Implement data types contract: FIXUP 2026-03-21 20:10:05 +01:00
2b29a1c662 Implement data types contract 2026-03-21 19:39:39 +01:00
697fe2548c Add txt file extraction 2026-03-21 18:54:28 +01:00
770966e21f Update dev dependency 2026-03-21 18:53:28 +01:00
Chlupaty
abb59aabe3 Scanner debug part 2 (done) 2026-03-21 18:49:01 +01:00
8adc374408 Add parsing to cli 2026-03-21 18:42:09 +01:00
1b2fee9b8d Fix int 2026-03-21 18:30:04 +01:00
Chlupaty
1f9997b430 Scanner debug part 1 2026-03-21 18:28:51 +01:00
6c096e4300 fix Screenshotter 2026-03-21 18:25:15 +01:00
e94d96f153 fix Screenshotter 2026-03-21 18:21:46 +01:00
86e0bc8e51 Screenshotter 2026-03-21 17:58:06 +01:00
5126a985bf Fix cli 2026-03-21 17:22:23 +01:00
f7369e29f2 Fix cli 2026-03-21 17:21:26 +01:00
Chlupaty
b6fc78e038 xddd 2026-03-21 16:51:49 +01:00
96c75ea0cc test 2026-03-21 16:45:51 +01:00
18 changed files with 871 additions and 51 deletions

7
config/application.yml Normal file
View File

@@ -0,0 +1,7 @@
path: data/odkazy.xlsx
screenshotter:
target_path: data/screenshots/
resolver:
api_key: 733f6882605be2de8980bbd074091ee4

88
data/extract_to_excel.py Normal file
View File

@@ -0,0 +1,88 @@
import os
import re
import sys
import argparse
from datetime import datetime
import pytz
from openpyxl import Workbook
def process_files(starting_id, output_filename="output.xlsx"):
# Find all txt files in the current directory
txt_files = [f for f in os.listdir('.') if f.endswith('.txt')]
if not txt_files:
print("No .txt files found in the current directory.")
return
# Regex patterns for input data
date_pattern = re.compile(r'\[.*?(\d{1,2})\s+(\d{1,2}),\s+(\d{4})\s+at\s+(\d{1,2}:\d{2})\]')
url_pattern = re.compile(r'(https?://[^\s]+)')
# Timezone setup (CET to UTC)
local_tz = pytz.timezone("Europe/Prague")
# Set up the Excel Workbook
wb = Workbook()
ws = wb.active
ws.title = "Fortuna Data"
ws.append(["ID", "URL", "Date_UTC"]) # Add headers
current_id = starting_id
success_files = []
for filename in txt_files:
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
dates = date_pattern.findall(content)
urls = url_pattern.findall(content)
# Extract and format the data
for i in range(min(len(dates), len(urls))):
month, day, year, time_str = dates[i]
# Parse the datetime from the text file
dt_str = f"{year}-{month}-{day} {time_str}"
local_dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M")
# Convert CET to UTC
localized_dt = local_tz.localize(local_dt)
utc_dt = localized_dt.astimezone(pytz.utc)
# NEW: Format to ISO 8601 with T and Z
formatted_date = utc_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
# Add a new row to the Excel sheet
ws.append([current_id, urls[i], formatted_date])
current_id += 1
# Queue file for deletion
success_files.append(filename)
except Exception as e:
print(f"Error processing {filename}: {e}", file=sys.stderr)
# Save the Excel file
try:
wb.save(output_filename)
print(f"Successfully saved data to {output_filename}")
# Clean up only if save was successful
for filename in success_files:
os.remove(filename)
print(f"Deleted: {filename}")
except Exception as e:
print(f"Failed to save {output_filename}. No text files were deleted. Error: {e}", file=sys.stderr)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Extract URLs to an Excel file with ISO UTC dates.")
parser.add_argument("start_id", type=int, help="Starting ID for the output")
parser.add_argument("--output", type=str, default="extracted_data.xlsx",
help="Output Excel filename (default: extracted_data.xlsx)")
args = parser.parse_args()
process_files(args.start_id, args.output)

View File

@@ -4,19 +4,24 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "beaky" name = "beaky"
version = "0.0.1" version = "0.1.0"
description = "Scan tickets and decide" description = "Scan tickets and decide"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"pillow==12.1.1", "pillow==12.1.1",
"pydantic==2.12.5", "pydantic==2.12.5",
"pandas==3.0.1",
"openpyxl>=3.1.0", "openpyxl>=3.1.0",
"PyYaml==6.0.3",
"playwright==1.58.0",
"requests>=2.32.0"
] ]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
"pytest>=9.0.2", "pytest>=9.0.2",
"ruff==0.15.5", "ruff==0.15.5",
"pytz"
# "playwright==1.58.0" # only dev because it cant be installed in a pipeline, just locally # "playwright==1.58.0" # only dev because it cant be installed in a pipeline, just locally
] ]

View File

@@ -1,28 +1,89 @@
import argparse import argparse
import yaml
from pydantic import ValidationError from pydantic import ValidationError
from beaky.config import Config from beaky.config import Config
from beaky.scanner.scanner import Links from beaky.scanner.scanner import Links
from beaky.screenshotter.screenshotter import Screenshotter
from beaky.link_classifier.classifier import LinkClassifier
from beaky.resolvers.resolver import TicketResolver
from beaky.resolvers.resolver import TicketVerdict, _R, _B, _GREEN, _RED, _YELLOW, _GRAY
_VERDICT_COLOR = {
TicketVerdict.TRUTHFUL: _GREEN,
TicketVerdict.NOT_TRUTHFUL: _RED,
TicketVerdict.POSSIBLY_TRUTHFUL: _YELLOW,
TicketVerdict.UNKNOWN: _GRAY,
}
def load_config(path: str) -> Config | None:
with open(path) as f:
config_dict = yaml.safe_load(f)
try:
return Config(**config_dict)
except ValidationError as e:
print("Bad config")
print(e)
return None
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(prog="beaky")
prog="beaky" parser.add_argument("--config", help="Path to config file.", default="config/application.yml")
) parser.add_argument("--id", type=int, help="Resolve a single ticket by id (only used with resolve mode).")
parser.add_argument("path", help="Path to config file.") parser.add_argument("mode", choices=["screenshotter", "parser", "class", "resolve"], help="Mode of operation.")
args = parser.parse_args()
try: args = parser.parse_args()
config = Config(**vars(args)) config = load_config(args.config)
except ValidationError as e: if config is None:
print("Bad arguments")
print(e)
return return
data = Links(config.path) # always load testing data, we will modify that later
data = Links(config)
data.ret_links() data.ret_links()
for link in data: link_amount = len(data.links)
print(link) print(f"We found {link_amount} links")
if link_amount == 0:
print("ERROR, no links found")
return
if args.mode == "screenshotter":
screenshotter = Screenshotter(config)
screenshotter.capture_tickets(data.links)
if args.mode == "parser":
for link in data.links:
print(link)
if args.mode == "class":
classifier = LinkClassifier()
results = []
for link in data.links:
results.append(classifier.classify(link))
ticket = results[-1]
print(f"\n=== Link {ticket.id} ({len(ticket.bets)} bets) ===")
for bet in ticket.bets:
print(f" [{type(bet).__name__}]")
for k, v in vars(bet).items():
print(f" {k}: {v}")
if args.mode == "resolve":
classifier = LinkClassifier()
resolver = TicketResolver(config.resolver)
links = [l for l in data.links if l.id == args.id] if args.id is not None else data.links
if args.id is not None and not links:
print(f"ERROR: ticket id {args.id} not found")
return
for link in links:
print(f"\n=== Classifying ticket {link.id} ===")
ticket = classifier.classify(link)
for bet in ticket.bets:
print(f" [{type(bet).__name__}] {bet.team1Name} vs {bet.team2Name} | {bet.date.date()} | {bet.league}")
print(f"\n--- Resolving ticket {link.id} ---")
resolved = resolver.resolve(ticket)
color = _VERDICT_COLOR.get(resolved.verdict, "")
print(f"\n {color}{_B}VERDICT: {resolved.verdict.value.upper()}{_R}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,6 +1,11 @@
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from beaky.resolvers.config import ResolverConfig
from beaky.screenshotter.config import ScreenshotterConfig
@dataclass @dataclass
class Config: class Config:
path: str path: str
screenshotter: ScreenshotterConfig
resolver: ResolverConfig

View File

@@ -1,9 +0,0 @@
from pydantic.dataclasses import dataclass
from datetime import datetime
@dataclass
class Scan:
id: int
date: datetime
event_name: str

View File

@@ -1,21 +1,91 @@
from abc import ABC from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum from enum import Enum
from typing import Literal
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from typing import Callable
class TicketType(str, Enum): class BetType(str, Enum):
WIN_DRAW_LOSE = "win_draw_lose" WIN_DRAW_LOSE = "win_draw_lose"
# postup? ADVANCED = "advance"
WIN_DRAW_LOSE_DOUBLE = "win_draw_lose_double" WIN_DRAW_LOSE_DOUBLE = "win_draw_lose_double"
WIN_LOSE = "win_lose" WIN_LOSE = "win_lose"
BOTH_TEAM_SCORED = "both_team_scored" BOTH_TEAM_SCORED = "both_team_scored"
GOAL_AMOUNT = "goal_amount" GOAL_AMOUNT = "goal_amount"
GOAL_HANDICAP = "goal_handicap"
UNKNOWN = "unknown"
... ...
# Classes that inherit from this are defined in resolution file, so the deciding function can be used
@dataclass @dataclass
class Ticket(ABC): class Bet(ABC):
ticketType: TicketType ticketType: BetType
decidingFunction: Callable team1Name: str
team2Name: str
date: datetime
league: str
@abstractmethod
def resolve(self): pass
@dataclass
class WinDrawLose(Bet):
"""Výsledek zápasu 1X2"""
betType: Literal["X", "0", "1", "2"] = "0"
def resolve(self):
...
@dataclass
class Advance(Bet):
"""What team advances to next round"""
def resolve(self):
raise NotImplementedError("Vyser si voko vine")
@dataclass
class WinDrawLoseDouble(Bet):
"""Výsledek zápasu - double"""
betType: Literal["01", "12", "02"] = "01"
def resolve(self):
...
@dataclass
class WinLose(Bet):
"""Výsledek zápasu bez remízy"""
betType: Literal["1", "2"] = "1"
def resolve(self):
...
@dataclass
class BothTeamScored(Bet):
def resolve(self):
...
@dataclass
class GoalAmount(Bet):
"""Počet gólů v zápasu — over/under total goals"""
line: float = 0.0 # goal line, e.g. 2.5
over: bool = True # True = more than line, False = less than line
def resolve(self):
...
@dataclass
class GoalHandicap(Bet):
"""Goal handicap for a specific team — add handicap_amount to team's score, team wins = you win"""
team_bet: Literal["1", "2"] = "1" # which team the handicap is applied to
handicap_amount: float = 0.0 # e.g. +1.5 or -0.5
def resolve(self):
...
@dataclass
class UnknownTicket(Bet):
"""Bet type that could not be classified"""
raw_text: str = ""
def resolve(self):
...
@dataclass
class Ticket:
id: int
bets: list[Bet]

View File

View File

@@ -0,0 +1,7 @@
from datetime import datetime
from beaky.datamodels.ticket import Ticket
def f(path:str, date:datetime) -> list[Ticket]:
...

View File

View File

@@ -0,0 +1,116 @@
import re
from datetime import datetime
from playwright.sync_api import Page, sync_playwright
from beaky.datamodels.ticket import (
BothTeamScored,
GoalAmount,
GoalHandicap,
Ticket,
BetType,
UnknownTicket,
WinDrawLose,
WinDrawLoseDouble,
WinLose,
Bet
)
from beaky.scanner.scanner import Link
_TICKET_SELECTOR = ".betslip-history-detail__left-panel"
_LEG_SELECTOR = '[data-test="betslip-leg"]'
def _parse_czech_date(text: str) -> datetime | None:
m = re.search(r"(\d+)\.\s*(\d+)\.\s*(\d+)\s+(\d+):(\d+)", text)
if not m:
return None
day, month, year, hour, minute = map(int, m.groups())
return datetime(year, month, day, hour, minute)
def _parse_teams(title: str) -> tuple[str, str]:
parts = title.split(" - ", 1)
if len(parts) == 2:
return parts[0].strip(), parts[1].strip()
return title.strip(), ""
def _classify_bet(bet_text: str, team1: str, team2: str, date: datetime, league: str) -> Bet:
common = dict(team1Name=team1, team2Name=team2, date=date, league=league)
# WinDrawLose double: "Výsledek zápasu - dvojtip: 10"
m = re.search(r"Výsledek zápasu - dvojtip:\s*(\d+)", bet_text)
if m:
# normalize order: "10" -> "01", "02" -> "02", "12" -> "12"
bet_type = "".join(sorted(m.group(1)))
return WinDrawLoseDouble(ticketType=BetType.WIN_DRAW_LOSE_DOUBLE, betType=bet_type, **common)
# WinLose (no draw): "Výsledek bez remízy: 1"
m = re.search(r"bez rem[ií]zy:\s*([12])", bet_text)
if m:
return WinLose(ticketType=BetType.WIN_LOSE, betType=m.group(1), **common)
# WinDrawLose: "Výsledek zápasu: 1"
m = re.search(r"Výsledek zápasu:\s*([012X])\s*$", bet_text.strip())
if m:
return WinDrawLose(ticketType=BetType.WIN_DRAW_LOSE, betType=m.group(1), **common)
# BothTeamScored: "Každý z týmů dá gól v zápasu: Ano"
if "dá gól" in bet_text or "oba týmy" in bet_text.lower():
return BothTeamScored(ticketType=BetType.BOTH_TEAM_SCORED, **common)
# GoalAmount: "Počet gólů v zápasu 2.5: + 2.5" / "Počet gólů v zápasu 4: - 4"
m = re.search(r"Počet gólů v zápasu\s+(\d+(?:\.\d+)?):\s*([+-])", bet_text)
if m:
return GoalAmount(ticketType=BetType.GOAL_AMOUNT, line=float(m.group(1)), over=m.group(2) == "+", **common)
# GoalHandicap: "[Team] počet gólů ...: +1.5" — team name in bet text determines team_bet
m = re.search(r"([+-])\s*(\d+(?:\.\d+)?)\s*$", bet_text.strip())
if m and "gólů" in bet_text:
bet_lower = bet_text.lower()
if team1.lower() in bet_lower:
team_bet = "1"
elif team2.lower() in bet_lower:
team_bet = "2"
else:
return UnknownTicket(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
sign = 1.0 if m.group(1) == "+" else -1.0
handicap = sign * float(m.group(2))
return GoalHandicap(ticketType=BetType.GOAL_HANDICAP, team_bet=team_bet, handicap_amount=handicap, **common)
return UnknownTicket(ticketType=BetType.UNKNOWN, raw_text=bet_text, **common)
def _extract_legs(page: Page, fallback_date: datetime | None) -> list[Bet]:
bets: list[Bet] = []
for leg in page.locator(_LEG_SELECTOR).all():
title = leg.locator("h3").first.get_attribute("title") or ""
date_text = leg.locator(".betslip-leg-date span").first.inner_text()
bet_text = leg.locator("[data-selection-id]").first.inner_text()
league = leg.locator(".f-mt-1.f-leading-tight.f-line-clamp-2").first.inner_text()
team1, team2 = _parse_teams(title)
date = _parse_czech_date(date_text) or fallback_date or datetime.now()
bets.append(_classify_bet(bet_text, team1, team2, date, league))
return bets
class LinkClassifier:
def classify(self, link: Link) -> Ticket:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
try:
page.goto(link.url)
page.wait_for_selector(_LEG_SELECTOR, timeout=15000)
page.wait_for_timeout(500)
result = Ticket(id=link.id, bets=_extract_legs(page, link.date))
except Exception as e:
print(f"Error classifying link {link.id}: {e}")
finally:
page.close()
browser.close()
return result

View File

View File

@@ -0,0 +1,6 @@
from pydantic.dataclasses import dataclass
@dataclass
class ResolverConfig:
api_key: str

View File

@@ -0,0 +1,353 @@
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from difflib import SequenceMatcher
from enum import Enum
import requests
from beaky.datamodels.ticket import (
Bet,
BothTeamScored,
GoalAmount,
GoalHandicap,
Ticket,
UnknownTicket,
WinDrawLose,
WinDrawLoseDouble,
WinLose,
)
from beaky.resolvers.config import ResolverConfig
_API_BASE = "https://v3.football.api-sports.io"
# Fortuna league strings (lowercased substring match) -> api-football league ID
_LEAGUE_MAP: dict[str, int] = {
# European cups
"liga mistrů": 2,
"champions league": 2,
"evropská liga": 3,
"europa league": 3,
"konferenční liga": 848,
"conference league": 848,
# Top flights
"1. anglie": 39,
"1. belgie": 144,
"1. česko": 345,
"1. dánsko": 119,
"1. francie": 61,
"1. itálie": 135,
"1. itálie - ženy": 794,
"1. německo": 78,
"1. nizozemsko": 88,
"1. polsko": 106,
"1. portugalsko": 94,
"1. rakousko": 218,
"1. rumunsko": 283,
"1. skotsko": 179,
"1. slovensko": 332,
"1. španělsko": 140,
"1. wales": 771,
# Second divisions
"2. anglie": 40,
"2. česko": 346,
"2. francie": 62,
"2. itálie": 136,
"2. německo": 79,
"2. nizozemsko": 89,
"2. rakousko": 219,
"2. slovensko": 333,
"2. španělsko": 141,
# Third divisions
"3. francie": 63,
"3. česko msfl": 349,
"3. česko čfl": 348,
# Fourth divisions
"4. česko - sk. a": 350,
"4. česko - sk. b": 351,
"4. česko - sk. c": 352,
"4. česko - sk. d": 353,
"4. česko - sk. e": 354,
"4. česko - sk. f": 686,
# Women
"1. česko - ženy": 669,
"fortuna=liga ženy": 669,
# Domestic cups
"anglie - fa cup": 45,
"anglie - efl cup": 48,
"česko - pohár": 347,
}
_DATE_WINDOW = 3 # days either side of the bet date to search
# ANSI color helpers
_R = "\033[0m"
_B = "\033[1m"
_DIM= "\033[2m"
_GREEN = "\033[32m"
_RED = "\033[31m"
_YELLOW = "\033[33m"
_CYAN = "\033[36m"
_GRAY = "\033[90m"
_OUTCOME_COLOR = {
"win": _GREEN,
"lose": _RED,
"void": _YELLOW,
"unknown": _GRAY,
}
class TicketVerdict(str, Enum):
TRUTHFUL = "truthful"
NOT_TRUTHFUL = "not truthful"
POSSIBLY_TRUTHFUL = "possibly truthful — unresolvable bets remain, check manually"
UNKNOWN = "unknown — could not resolve enough bets to decide"
class BetOutcome(str, Enum):
WIN = "win"
LOSE = "lose"
VOID = "void" # stake returned (e.g. WinLose on draw, integer goal line hit)
UNKNOWN = "unknown" # fixture not found or unclassified bet
@dataclass
class ResolvedBet:
bet: Bet
outcome: BetOutcome
fixture_id: int | None = None
# Confidence breakdown (each component 0.01.0):
# name_match — how well team names matched (SequenceMatcher score)
# date_proximity — 1.0 exact date, linear decay to 0.0 at _DATE_WINDOW days away
# league_found — 1.0 static map hit, 0.7 API fallback, 0.3 not found
# match_finished — 1.0 if fixture status is terminal, 0.0 otherwise
confidence: float = 0.0
name_match: float = 0.0
date_proximity: float = 0.0
league_found: float = 0.0
match_finished: float = 0.0
@dataclass
class ResolvedTicket:
ticket_id: int
bets: list[ResolvedBet] = field(default_factory=list)
@property
def verdict(self) -> TicketVerdict:
resolvable = [b for b in self.bets if not isinstance(b.bet, UnknownTicket)]
unresolvable = [b for b in self.bets if isinstance(b.bet, UnknownTicket)]
if not resolvable:
return TicketVerdict.UNKNOWN
if any(b.outcome == BetOutcome.LOSE for b in resolvable):
return TicketVerdict.NOT_TRUTHFUL
if any(b.outcome == BetOutcome.UNKNOWN for b in resolvable):
return TicketVerdict.UNKNOWN
if unresolvable:
return TicketVerdict.POSSIBLY_TRUTHFUL
return TicketVerdict.TRUTHFUL
def _get(url: str, headers: dict, params: dict, retries: int = 3, backoff: float = 60.0) -> requests.Response:
for attempt in range(retries):
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 429:
wait = backoff * (attempt + 1)
print(f" !! rate limited — waiting {wait:.0f}s before retry ({attempt + 1}/{retries})")
time.sleep(wait)
continue
return resp
print(f" !! still rate limited after {retries} retries, giving up")
return resp
class TicketResolver:
def __init__(self, config: ResolverConfig):
self._headers = {"x-apisports-key": config.api_key}
# Cache maps (center_date_str, league_id | None) -> list of fixture dicts
self._fixture_cache: dict[tuple[str, int | None], list[dict]] = {}
# Cache maps league name -> (league_id, confidence)
self._league_cache: dict[str, tuple[int | None, float]] = {}
def resolve(self, ticket: Ticket) -> ResolvedTicket:
result = ResolvedTicket(ticket_id=ticket.id)
for bet in ticket.bets:
result.bets.append(self._resolve_bet(bet))
return result
def _resolve_bet(self, bet: Bet) -> ResolvedBet:
bet_type = type(bet).__name__
print(f"\n {_B}{_CYAN}┌─ [{bet_type}]{_R} {_B}{bet.team1Name} vs {bet.team2Name}{_R}"
f" {_DIM}{bet.date.strftime('%Y-%m-%d')} | {bet.league}{_R}")
if isinstance(bet, UnknownTicket):
print(f" {_GRAY}│ skipping — not implemented: {bet.raw_text!r}{_R}")
print(f" {_GRAY}└─ UNKNOWN{_R}")
return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN)
fixture, name_match, date_prox, league_conf = self._find_fixture(bet)
if fixture is None:
print(f" {_GRAY}└─ UNKNOWN — no fixture found{_R}")
return ResolvedBet(bet=bet, outcome=BetOutcome.UNKNOWN, league_found=league_conf)
home_name = fixture["teams"]["home"]["name"]
away_name = fixture["teams"]["away"]["name"]
finished = _is_finished(fixture)
confidence = round((name_match + date_prox + league_conf + finished) / 4, 3)
outcome = _evaluate_bet(bet, fixture) if finished == 1.0 else BetOutcome.UNKNOWN
goals = fixture["goals"]
color = _OUTCOME_COLOR.get(outcome.value, _GRAY)
print(f" {_DIM}│ matched #{fixture['fixture']['id']}: {home_name} vs {away_name}"
f" | {goals['home']}:{goals['away']} | {fixture['fixture']['status']['short']}"
f" | confidence {confidence} (name={name_match:.2f} date={date_prox:.2f} league={league_conf} finished={finished}){_R}")
print(f" {color}{_B}└─ {outcome.value.upper()}{_R}")
return ResolvedBet(
bet=bet,
outcome=outcome,
fixture_id=fixture["fixture"]["id"],
confidence=confidence,
name_match=round(name_match, 3),
date_proximity=round(date_prox, 3),
league_found=league_conf,
match_finished=finished,
)
def _find_fixture(self, bet: Bet) -> tuple[dict | None, float, float, float]:
"""Returns (fixture, name_match, date_proximity, league_confidence)."""
center = bet.date.date()
date_str = center.strftime("%Y-%m-%d")
league_id, league_conf = self._resolve_league(bet.league)
cache_key = (date_str, league_id)
if cache_key not in self._fixture_cache:
date_from = (center - timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d")
date_to = (center + timedelta(days=_DATE_WINDOW)).strftime("%Y-%m-%d")
params: dict = {"from": date_from, "to": date_to}
if league_id is not None:
params["league"] = league_id
params["season"] = center.year if center.month >= 7 else center.year - 1
print(f" {_GRAY}│ GET /fixtures {params}{_R}")
resp = _get(f"{_API_BASE}/fixtures", headers=self._headers, params=params)
resp.raise_for_status()
self._fixture_cache[cache_key] = resp.json().get("response", [])
print(f" {_GRAY}{len(self._fixture_cache[cache_key])} fixtures returned (cached){_R}")
else:
print(f" {_GRAY}│ /fixtures (±{_DATE_WINDOW}d of {date_str}, league={league_id}) served from cache{_R}")
fixture, name_match, date_prox = _best_fixture_match(
self._fixture_cache[cache_key], bet.team1Name, bet.team2Name, center
)
return fixture, name_match, date_prox, league_conf
def _resolve_league(self, league_name: str) -> tuple[int | None, float]:
key = league_name.lower().strip()
if key in self._league_cache:
return self._league_cache[key]
for pattern, league_id in _LEAGUE_MAP.items():
if pattern in key:
print(f" {_GRAY}│ league {league_name!r} -> id={league_id} (static map){_R}")
self._league_cache[key] = (league_id, 1.0)
return league_id, 1.0
# Fall back to API search — lower confidence since first result is taken unverified
print(f" {_GRAY}│ GET /leagues search={league_name!r}{_R}")
resp = _get(f"{_API_BASE}/leagues", headers=self._headers, params={"search": league_name[:20]})
results = resp.json().get("response", [])
if results:
league_id = results[0]["league"]["id"]
league_found_name = results[0]["league"]["name"]
print(f" {_GRAY}│ matched {league_found_name!r} id={league_id} (API fallback, confidence=0.7){_R}")
self._league_cache[key] = (league_id, 0.7)
return league_id, 0.7
print(f" {_GRAY}│ no league found, searching fixtures by date only (confidence=0.3){_R}")
self._league_cache[key] = (None, 0.3)
return None, 0.3
def _similarity(a: str, b: str) -> float:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def _date_proximity(fixture: dict, center) -> float:
"""1.0 on exact date, linear decay to 0.0 at _DATE_WINDOW days away."""
fixture_date = datetime.fromisoformat(fixture["fixture"]["date"].replace("Z", "+00:00")).date()
days_off = abs((fixture_date - center).days)
return max(0.0, 1.0 - days_off / _DATE_WINDOW)
def _best_fixture_match(fixtures: list[dict], team1: str, team2: str, center) -> tuple[dict | None, float, float]:
"""Returns (best_fixture, name_score, date_proximity) or (None, 0, 0) if no good match."""
best, best_combined, best_name, best_date = None, 0.0, 0.0, 0.0
for f in fixtures:
home = f["teams"]["home"]["name"]
away = f["teams"]["away"]["name"]
name_score = max(
_similarity(team1, home) + _similarity(team2, away),
_similarity(team1, away) + _similarity(team2, home),
) / 2
date_prox = _date_proximity(f, center)
# Name similarity is the primary signal; date proximity is a tiebreaker
combined = name_score * 0.8 + date_prox * 0.2
if combined > best_combined:
best_combined = combined
best_name = name_score
best_date = date_prox
best = f
# Require minimum name similarity — date alone cannot rescue a bad name match
return (best, best_name, best_date) if best_name > 0.5 else (None, best_name, best_date)
def _is_finished(fixture: dict) -> float:
status = fixture.get("fixture", {}).get("status", {}).get("short", "")
return 1.0 if status in ("FT", "AET", "PEN", "AWD", "WO") else 0.0
def _evaluate_bet(bet: Bet, fixture: dict) -> BetOutcome:
goals = fixture.get("goals", {})
home = goals.get("home")
away = goals.get("away")
if home is None or away is None:
return BetOutcome.UNKNOWN
if isinstance(bet, WinDrawLose):
bet_draw = bet.betType in ("X", "0")
if bet_draw:
return BetOutcome.WIN if home == away else BetOutcome.LOSE
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual == bet.betType else BetOutcome.LOSE
if isinstance(bet, WinDrawLoseDouble):
actual = "1" if home > away else ("0" if home == away else "2")
return BetOutcome.WIN if actual in bet.betType else BetOutcome.LOSE
if isinstance(bet, WinLose):
if home == away:
return BetOutcome.VOID
actual = "1" if home > away else "2"
return BetOutcome.WIN if actual == bet.betType else BetOutcome.LOSE
if isinstance(bet, BothTeamScored):
return BetOutcome.WIN if home > 0 and away > 0 else BetOutcome.LOSE
if isinstance(bet, GoalAmount):
total = home + away
if total == bet.line:
return BetOutcome.VOID
won = total > bet.line if bet.over else total < bet.line
return BetOutcome.WIN if won else BetOutcome.LOSE
if isinstance(bet, GoalHandicap):
h_home = home + (bet.handicap_amount if bet.team_bet == "1" else 0.0)
h_away = away + (bet.handicap_amount if bet.team_bet == "2" else 0.0)
if h_home == h_away:
return BetOutcome.VOID
actual_winner = "1" if h_home > h_away else "2"
return BetOutcome.WIN if actual_winner == bet.team_bet else BetOutcome.LOSE
return BetOutcome.UNKNOWN

View File

@@ -1,5 +1,5 @@
from datetime import datetime from datetime import datetime
from typing import Iterator, List, Optional from typing import Any, Iterator, List, Optional
from openpyxl import load_workbook from openpyxl import load_workbook
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
@@ -9,11 +9,18 @@ from beaky.config import Config
@dataclass @dataclass
class Link: class Link:
id: str """Represents a single link row from an Excel sheet.
Attributes:
id: identifier from the sheet (cast to int)
url: link to the web page
date: optional creation date (datetime or None)
"""
id: int
url: str url: str
date: Optional[datetime] = None date: Optional[datetime] = None
class Links: class Links:
def __init__(self, path: str | Config): def __init__(self, path: str | Config):
if isinstance(path, Config): if isinstance(path, Config):
@@ -26,15 +33,16 @@ class Links:
"""Read the Excel file at self._path and populate self.links. """Read the Excel file at self._path and populate self.links.
Expects the first sheet to contain a header row with columns that include Expects the first sheet to contain a header row with columns that include
at least: 'id', 'link' (or 'url'), and 'date' (case-insensitive). The at least: 'id', 'link' (or 'url'), and optionally 'date' (case-insensitive).
method will attempt to parse dates and will store them as datetime when Returns the list of Link objects (also stored in self.links).
possible; missing or unparsable dates become None.
""" """
print("started ret_links()")
wb = load_workbook(filename=self._path, read_only=True, data_only=True) wb = load_workbook(filename=self._path, read_only=True, data_only=True)
ws = wb.active ws = wb.active
# Read header row # Read header row
rows = ws.iter_rows(values_only=True) rows = ws.rows
try: try:
header = next(rows) header = next(rows)
except StopIteration: except StopIteration:
@@ -43,11 +51,10 @@ class Links:
if not header: if not header:
return [] return []
# Normalize header names -> index map # Normalize header names -> index map, making sure to use .value
header_map = {(str(h).strip().lower() if h is not None else ""): i for i, h in enumerate(header) } header_map = {(str(h.value).strip().lower() if h.value is not None else ""): i for i, h in enumerate(header)}
# Helper to parse date-like values def parse_date(v: Any) -> Optional[datetime]:
def parse_date(v: None | datetime) -> Optional[datetime]:
if v is None: if v is None:
return None return None
if isinstance(v, datetime): if isinstance(v, datetime):
@@ -71,28 +78,40 @@ class Links:
# Find the column indices we care about # Find the column indices we care about
id_idx = header_map.get("id") id_idx = header_map.get("id")
url_idx = header_map.get("link") url_idx = header_map.get("url")
date_idx = header_map.get("date") date_idx = header_map.get("date")
if id_idx is None or url_idx is None: if id_idx is None or url_idx is None:
# Required columns missing # Required columns missing
print(f"Required 'id' or 'url' column missing in header. Found headers: {list(header_map.keys())}")
return [] return []
for row in rows: for row in rows:
try: try:
raw_id = row[id_idx] if id_idx < len(row) else None # Extract the actual values from the cell objects
raw_url = row[url_idx] if url_idx < len(row) else None raw_id = row[id_idx].value if id_idx < len(row) else None
raw_date = row[date_idx] if (date_idx is not None and date_idx < len(row)) else None raw_url = row[url_idx].value if url_idx < len(row) else None
raw_date = row[date_idx].value if (date_idx is not None and date_idx < len(row)) else None
if raw_id is None and raw_url is None: if raw_id is None or raw_url is None:
# skip empty rows # skip empty rows
continue continue
link = Link(id=str(raw_id).strip() if raw_id is not None else "", # Safely parse the ID to an integer, handling Excel float quirks
url=str(raw_url).strip() if raw_url is not None else "", date=parse_date(raw_date)) try:
parsed_id = int(float(raw_id))
except (ValueError, TypeError):
# Skip row if ID is missing or invalid text
continue
link = Link(
id=parsed_id,
url=str(raw_url).strip() if raw_url is not None else "",
date=parse_date(raw_date),
)
self.links.append(link) self.links.append(link)
except Exception: except Exception:
# Skip problematic rows silently # Skip problematic rows silently (or print(e) for debugging)
continue continue
return self.links return self.links
@@ -105,4 +124,15 @@ class Links:
# Backwards-compatible alias in case other modules referenced Linker # Backwards-compatible alias in case other modules referenced Linker
Linker = Links Linker = Links
if __name__ == "__main__":
links_obj = Links("data/odkazy.xlsx")
links = links_obj.ret_links()
if not links:
print("No links returned.")
else:
print(f"Successfully loaded {len(links)} links!")
for link in links:
print(link.id, link.url, link.date)

View File

View File

@@ -0,0 +1,6 @@
from pydantic.dataclasses import dataclass
@dataclass
class ScreenshotterConfig:
target_path: str

View File

@@ -0,0 +1,75 @@
from pathlib import Path
from playwright.sync_api import sync_playwright
from beaky.config import Config
from beaky.scanner.scanner import Link
class Screenshotter:
def __init__(self, config: Config):
self.config = config
def capture_tickets(self, links: list[Link]):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context()
for link in links:
print("capturing link:", link)
page = context.new_page()
target_path = Path(self.config.screenshotter.target_path) / f"{link.id}.png"
self.capture_ticket(page, link.url, target_path)
browser.close()
def capture_ticket(self,page, url, target_path, ticket_selector=".betslip-history-detail__left-panel"):
page.goto(url)
page.wait_for_selector(ticket_selector, timeout=10000)
page.wait_for_timeout(1000)
page.evaluate(f"""
let el = document.querySelector('{ticket_selector}');
if (el) {{
let wrapper = el.querySelector('.betslip-selections');
if (wrapper) {{
wrapper.style.setProperty('height', 'auto', 'important');
wrapper.style.setProperty('overflow', 'visible', 'important');
}}
while (el && el !== document.body) {{
el.style.setProperty('height', 'auto', 'important');
el.style.setProperty('max-height', 'none', 'important');
el.style.setProperty('overflow', 'visible', 'important');
el = el.parentElement;
}}
}}
""")
# Hide fixed/absolute overlays (cookie banners, notifications, toasts)
# but preserve the ticket panel and its ancestors/descendants
page.evaluate(f"""
const ticket = document.querySelector('{ticket_selector}');
document.querySelectorAll('*').forEach(el => {{
if (ticket && (ticket.contains(el) || el.contains(ticket))) return;
const style = window.getComputedStyle(el);
if ((style.position === 'fixed' || style.position === 'sticky') &&
el.tagName !== 'BODY' && el.tagName !== 'HTML') {{
el.style.setProperty('display', 'none', 'important');
}}
}});
""")
# Resize viewport if the element extends beyond the bottom edge.
# The modal is vertically centered, so increasing the viewport by X shifts the element
# down by X/2. To compensate: new_height = 2 * bottom - current_height.
bbox = page.locator(ticket_selector).bounding_box()
bottom = bbox["y"] + bbox["height"]
vp_h = page.viewport_size["height"]
if bottom > vp_h:
page.set_viewport_size({"width": page.viewport_size["width"], "height": int(2 * bottom - vp_h) + 10})
# Wait for the browser to reflow after style changes before screenshotting
page.wait_for_timeout(500)
page.locator(ticket_selector).screenshot(path=target_path)