148 lines
4.8 KiB
Python
148 lines
4.8 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import date, datetime
|
|
|
|
from garmin_coach_clone.config import load_settings
|
|
from garmin_coach_clone.db import Database
|
|
from garmin_coach_clone.repository import Repository, ScheduleConfig
|
|
from garmin_coach_clone.sync_service import SyncService, should_run_now
|
|
from garmin_coach_clone.workouts import build_dummy_cycling_workout, workout_source_hash
|
|
|
|
|
|
class FakeClient:
|
|
def __init__(self) -> None:
|
|
self.uploads: list[dict] = []
|
|
self.scheduled: list[tuple[str, str]] = []
|
|
self.unscheduled: list[str] = []
|
|
self.deleted: list[str] = []
|
|
|
|
def get_workouts(self, limit: int = 100) -> list[dict]:
|
|
return []
|
|
|
|
def upload_workout(self, payload: dict) -> dict:
|
|
self.uploads.append(payload)
|
|
return {"workoutId": f"w{len(self.uploads)}"}
|
|
|
|
def schedule_workout(self, workout_id: str, date_s: str) -> dict:
|
|
self.scheduled.append((workout_id, date_s))
|
|
return {"scheduledWorkoutId": f"s{len(self.scheduled)}"}
|
|
|
|
def unschedule_workout(self, scheduled_id: str) -> None:
|
|
self.unscheduled.append(scheduled_id)
|
|
|
|
def delete_workout(self, workout_id: str) -> None:
|
|
self.deleted.append(workout_id)
|
|
|
|
|
|
class FakeGarmin:
|
|
def __init__(self, source: dict | None) -> None:
|
|
self.client = FakeClient()
|
|
self.source = source
|
|
|
|
def authenticated_client(self) -> FakeClient:
|
|
return self.client
|
|
|
|
def active_adaptive_plan(self, client: FakeClient) -> dict:
|
|
_ = client
|
|
return {"trainingPlanId": 1}
|
|
|
|
def adaptive_plan_detail(self, client: FakeClient, plan: dict) -> dict:
|
|
_ = client, plan
|
|
return {}
|
|
|
|
def coach_workout_for_date(
|
|
self, client: FakeClient, plan_detail: dict, target_date: date
|
|
) -> tuple[dict | None, dict | None]:
|
|
_ = client, plan_detail, target_date
|
|
if self.source is None:
|
|
return {"restDay": True, "workoutUuid": "rest"}, None
|
|
return {"restDay": False, "workoutUuid": self.source["workoutUuid"]}, self.source
|
|
|
|
|
|
def test_should_run_now_for_interval_and_fixed_times() -> None:
|
|
config = ScheduleConfig(
|
|
enabled=True,
|
|
interval_minutes=30,
|
|
active_window="05:00-22:00",
|
|
fixed_times=["06:15"],
|
|
days_ahead=1,
|
|
)
|
|
|
|
assert should_run_now(datetime(2026, 6, 16, 6, 0), config)
|
|
assert should_run_now(datetime(2026, 6, 16, 6, 15), config)
|
|
assert not should_run_now(datetime(2026, 6, 16, 4, 30), config)
|
|
|
|
|
|
def test_sync_creates_missing_clone(tmp_path, monkeypatch) -> None:
|
|
service, repo, garmin = _service(tmp_path, monkeypatch, _source("Sprint"))
|
|
|
|
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
|
|
|
|
assert result["created"] == 1
|
|
assert garmin.client.uploads
|
|
mapping = repo.get_clone_mapping("2026-06-16")
|
|
assert mapping is not None
|
|
assert mapping["status"] == "scheduled"
|
|
|
|
|
|
def test_sync_skips_unchanged_clone(tmp_path, monkeypatch) -> None:
|
|
source = _source("Sprint")
|
|
service, repo, garmin = _service(tmp_path, monkeypatch, source)
|
|
repo.upsert_clone_mapping(
|
|
{
|
|
"scheduled_date": "2026-06-16",
|
|
"source_uuid": source["workoutUuid"],
|
|
"source_hash": workout_source_hash(source),
|
|
"source_name": "Sprint",
|
|
"clone_workout_id": "w1",
|
|
"clone_workout_name": "GCClone 2026-06-16 Sprint",
|
|
"scheduled_workout_id": "s1",
|
|
"status": "scheduled",
|
|
"message": "current",
|
|
}
|
|
)
|
|
|
|
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
|
|
|
|
assert result["skipped"] == 1
|
|
assert garmin.client.uploads == []
|
|
|
|
|
|
def test_rest_day_with_existing_clone_warns_without_delete(tmp_path, monkeypatch) -> None:
|
|
service, repo, garmin = _service(tmp_path, monkeypatch, None)
|
|
repo.upsert_clone_mapping(
|
|
{
|
|
"scheduled_date": "2026-06-16",
|
|
"source_uuid": "old",
|
|
"source_hash": "old",
|
|
"source_name": "Old",
|
|
"clone_workout_id": "w1",
|
|
"clone_workout_name": "GCClone 2026-06-16 Old",
|
|
"scheduled_workout_id": "s1",
|
|
"status": "scheduled",
|
|
"message": "old",
|
|
}
|
|
)
|
|
|
|
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
|
|
|
|
assert result["warnings"] == 1
|
|
assert garmin.client.unscheduled == []
|
|
assert garmin.client.deleted == []
|
|
|
|
|
|
def _service(tmp_path, monkeypatch, source: dict | None):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
settings = load_settings()
|
|
db = Database(tmp_path / "app.db")
|
|
db.initialize()
|
|
repo = Repository(db, settings)
|
|
garmin = FakeGarmin(source)
|
|
return SyncService(settings, repo, garmin), repo, garmin
|
|
|
|
|
|
def _source(name: str) -> dict:
|
|
workout = build_dummy_cycling_workout(name)
|
|
workout["workoutUuid"] = f"uuid-{name}"
|
|
return workout
|