Files
garmin-coach-to-cal-sync/tests/test_sync_service.py
2026-06-16 19:32:37 +02:00

498 lines
17 KiB
Python

from __future__ import annotations
from datetime import date, datetime
import garmin_coach_clone.sync_service as sync_service
from garmin_coach_clone.config import load_settings
from garmin_coach_clone.db import Database
from garmin_coach_clone.repository import Repository, ScheduleConfig
from garmin_coach_clone.sync_service import SyncService, should_run_now
from garmin_coach_clone.workouts import (
build_dummy_cycling_workout,
clone_workout_payload,
workout_source_hash,
)
class FakeClient:
def __init__(self) -> None:
self.uploads: list[dict] = []
self.scheduled: list[tuple[str, str]] = []
self.unscheduled: list[str] = []
self.deleted: list[str] = []
self.workouts: list[dict] = []
self.workout_details: dict[str, dict] = {}
self.calendar_items: list[dict] = []
def get_workouts(self, limit: int = 100) -> list[dict]:
_ = limit
return self.workouts
def get_workout_by_id(self, workout_id: str) -> dict:
if str(workout_id) in self.workout_details:
return self.workout_details[str(workout_id)]
for workout in self.workouts:
if str(workout.get("workoutId") or workout.get("id")) == str(workout_id):
return workout
raise ValueError(f"unknown workout: {workout_id}")
def upload_workout(self, payload: dict) -> dict:
self.uploads.append(payload)
return {"workoutId": f"w{len(self.uploads)}"}
def schedule_workout(self, workout_id: str, date_s: str) -> dict:
self.scheduled.append((workout_id, date_s))
return {"scheduledWorkoutId": f"s{len(self.scheduled)}"}
def unschedule_workout(self, scheduled_id: str) -> None:
self.unscheduled.append(scheduled_id)
def delete_workout(self, workout_id: str) -> None:
self.deleted.append(workout_id)
def get_scheduled_workouts(self, year: int, month: int) -> dict:
_ = year, month
return {"calendarItems": self.calendar_items}
class FakeGarmin:
def __init__(self, source: dict | None) -> None:
self.client = FakeClient()
self.source = source
def authenticated_client(self) -> FakeClient:
return self.client
def active_adaptive_plan(self, client: FakeClient) -> dict:
_ = client
return {"trainingPlanId": 1}
def adaptive_plan_detail(self, client: FakeClient, plan: dict) -> dict:
_ = client, plan
return {}
def coach_workout_for_date(
self, client: FakeClient, plan_detail: dict, target_date: date
) -> tuple[dict | None, dict | None]:
_ = client, plan_detail, target_date
if self.source is None:
return {"restDay": True, "workoutUuid": "rest"}, None
return {"restDay": False, "workoutUuid": self.source["workoutUuid"]}, self.source
def test_should_run_now_for_interval_and_fixed_times() -> None:
config = ScheduleConfig(
enabled=True,
interval_minutes=30,
active_window="05:00-22:00",
fixed_times=["06:15"],
days_ahead=1,
)
assert should_run_now(datetime(2026, 6, 16, 6, 0), config)
assert should_run_now(datetime(2026, 6, 16, 6, 15), config)
assert not should_run_now(datetime(2026, 6, 16, 4, 30), config)
def test_sync_creates_missing_clone(tmp_path, monkeypatch) -> None:
service, repo, garmin = _service(tmp_path, monkeypatch, _source("Sprint"))
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["created"] == 1
assert garmin.client.uploads
mapping = repo.get_clone_mapping("2026-06-16")
assert mapping is not None
assert mapping["status"] == "scheduled"
def test_sync_skips_unchanged_clone(tmp_path, monkeypatch) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = "GCClone 2026-06-16 Sprint"
repo.upsert_clone_mapping(
{
"scheduled_date": "2026-06-16",
"source_uuid": source["workoutUuid"],
"source_hash": workout_source_hash(source),
"source_name": "Sprint",
"clone_workout_id": "w1",
"clone_workout_name": clone_name,
"scheduled_workout_id": "s1",
"status": "scheduled",
"message": "current",
}
)
_install_garmin_clone(garmin, source, workout_id="w1", clone_name=clone_name)
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s1",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
}
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["skipped"] == 1
assert garmin.client.uploads == []
def test_sync_skips_unchanged_clone_with_calendar_id_key(tmp_path, monkeypatch) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = "GCClone 2026-06-16 Sprint"
repo.upsert_clone_mapping(
{
"scheduled_date": "2026-06-16",
"source_uuid": source["workoutUuid"],
"source_hash": workout_source_hash(source),
"source_name": "Sprint",
"clone_workout_id": "w1",
"clone_workout_name": clone_name,
"scheduled_workout_id": "99",
"status": "scheduled",
"message": "current",
}
)
_install_garmin_clone(garmin, source, workout_id="w1", clone_name=clone_name)
garmin.client.calendar_items = [
{
"id": 99,
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
}
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["skipped"] == 1
assert garmin.client.uploads == []
def test_sync_matches_current_clone_by_date_name_marker_and_equal_steps(
tmp_path, monkeypatch
) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
mapped_name = _store_current_mapping(repo, source, scheduled_id="s1")
garmin_name = "GCClone 2026-06-16 Renamed"
_install_garmin_clone(garmin, source, workout_id="w-new", clone_name=garmin_name)
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s1",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": garmin_name},
}
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert mapped_name != garmin_name
assert result["skipped"] == 1
assert garmin.client.uploads == []
mapping = repo.get_clone_mapping("2026-06-16")
assert mapping is not None
assert mapping["clone_workout_id"] == "w-new"
assert mapping["clone_workout_name"] == garmin_name
def test_sync_recreates_date_name_match_with_different_steps(tmp_path, monkeypatch) -> None:
source = _source("Sprint")
changed_source = _source("Different")
changed_source["workoutSegments"][0]["workoutSteps"][0]["endConditionValue"] = 123
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = _store_current_mapping(repo, source, scheduled_id="s1")
_install_garmin_clone(garmin, changed_source, workout_id="w-old", clone_name=clone_name)
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s1",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
}
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["created"] == 1
assert garmin.client.unscheduled == ["s1"]
assert garmin.client.scheduled == [("w1", "2026-06-16")]
def test_skip_unchanged_removes_duplicate_scheduled_entries(tmp_path, monkeypatch) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = "GCClone 2026-06-16 Sprint"
repo.upsert_clone_mapping(
{
"scheduled_date": "2026-06-16",
"source_uuid": source["workoutUuid"],
"source_hash": workout_source_hash(source),
"source_name": "Sprint",
"clone_workout_id": "w1",
"clone_workout_name": clone_name,
"scheduled_workout_id": "s2",
"status": "scheduled",
"message": "current",
}
)
_install_garmin_clone(garmin, source, workout_id="w1", clone_name=clone_name)
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s1",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
},
{
"scheduledWorkoutId": "s2",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
},
{
"scheduledWorkoutId": "s3",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
},
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["skipped"] == 1
assert garmin.client.uploads == []
assert garmin.client.unscheduled == ["s1", "s3"]
def test_current_mapping_recreates_when_garmin_clone_is_missing(
tmp_path, monkeypatch
) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
_store_current_mapping(repo, source, scheduled_id="s-old")
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["created"] == 1
assert garmin.client.uploads
assert garmin.client.scheduled == [("w1", "2026-06-16")]
mapping = repo.get_clone_mapping("2026-06-16")
assert mapping is not None
assert mapping["clone_workout_id"] == "w1"
assert mapping["scheduled_workout_id"] == "s1"
assert repo.list_traces(limit=1)[0]["action"] == "recreate_missing"
def test_current_mapping_recreates_when_calendar_entry_is_missing(
tmp_path, monkeypatch
) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = _store_current_mapping(repo, source, scheduled_id="s-old")
_install_garmin_clone(garmin, source, workout_id="w-old", clone_name=clone_name)
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["created"] == 1
assert garmin.client.uploads
assert garmin.client.scheduled == [("w1", "2026-06-16")]
assert garmin.client.unscheduled == []
mapping = repo.get_clone_mapping("2026-06-16")
assert mapping is not None
assert mapping["clone_workout_id"] == "w1"
assert mapping["scheduled_workout_id"] == "s1"
def test_current_mapping_recreates_when_workout_template_is_missing(
tmp_path, monkeypatch
) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = _store_current_mapping(repo, source, scheduled_id="s-old")
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s-old",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
}
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["created"] == 1
assert garmin.client.unscheduled == ["s-old"]
assert garmin.client.scheduled == [("w1", "2026-06-16")]
mapping = repo.get_clone_mapping("2026-06-16")
assert mapping is not None
assert mapping["clone_workout_id"] == "w1"
assert mapping["scheduled_workout_id"] == "s1"
def test_sync_adopts_external_existing_clone_ids(tmp_path, monkeypatch) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = "GCClone 2026-06-16 Sprint"
_install_garmin_clone(garmin, source, workout_id="w-existing", clone_name=clone_name)
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s-existing",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
}
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["warnings"] == 1
assert garmin.client.uploads == []
mapping = repo.get_clone_mapping("2026-06-16")
assert mapping is not None
assert mapping["clone_workout_id"] == "w-existing"
assert mapping["scheduled_workout_id"] == "s-existing"
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["skipped"] == 1
assert garmin.client.uploads == []
def test_replace_changed_unschedules_matching_calendar_entries(
tmp_path, monkeypatch
) -> None:
source = _source("Sprint")
service, repo, garmin = _service(tmp_path, monkeypatch, source)
clone_name = "GCClone 2026-06-16 Sprint"
repo.upsert_clone_mapping(
{
"scheduled_date": "2026-06-16",
"source_uuid": source["workoutUuid"],
"source_hash": "old-hash",
"source_name": "Sprint",
"clone_workout_id": "w-old",
"clone_workout_name": clone_name,
"scheduled_workout_id": None,
"status": "scheduled",
"message": "old",
}
)
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s-old-1",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
},
{
"scheduledWorkoutId": "s-old-2",
"scheduledDate": "2026-06-16",
"workout": {"workoutName": clone_name},
},
]
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["replaced"] == 1
assert garmin.client.unscheduled == ["s-old-1", "s-old-2"]
assert len(garmin.client.uploads) == 1
def test_sync_deletes_generated_clones_older_than_retention(tmp_path, monkeypatch) -> None:
monkeypatch.setattr(sync_service, "_today", lambda settings: date(2026, 6, 16))
source = _source("Sprint")
service, _repo, garmin = _service(tmp_path, monkeypatch, source)
old_name = "GCClone 2026-06-10 Sprint"
recent_name = "GCClone 2026-06-11 Threshold"
garmin.client.workouts = [
{"workoutId": "w-old", "workoutName": old_name},
{"workoutId": "w-recent", "workoutName": recent_name},
{"workoutId": "w-probe", "workoutName": "GCClone Probe Dummy 2026-06-01"},
{"workoutId": "w-other", "workoutName": "Other 2026-06-01 Ride"},
]
garmin.client.calendar_items = [
{
"scheduledWorkoutId": "s-old",
"scheduledDate": "2026-06-10",
"workout": {"workoutName": old_name},
},
{
"scheduledWorkoutId": "s-recent",
"scheduledDate": "2026-06-11",
"workout": {"workoutName": recent_name},
},
]
service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert garmin.client.unscheduled == ["s-old"]
assert garmin.client.deleted == ["w-old"]
def test_rest_day_with_existing_clone_warns_without_delete(tmp_path, monkeypatch) -> None:
service, repo, garmin = _service(tmp_path, monkeypatch, None)
repo.upsert_clone_mapping(
{
"scheduled_date": "2026-06-16",
"source_uuid": "old",
"source_hash": "old",
"source_name": "Old",
"clone_workout_id": "w1",
"clone_workout_name": "GCClone 2026-06-16 Old",
"scheduled_workout_id": "s1",
"status": "scheduled",
"message": "old",
}
)
result = service._run_sync("test", [date(2026, 6, 16)], dry_run=False)
assert result["warnings"] == 1
assert garmin.client.unscheduled == []
assert garmin.client.deleted == []
def _service(tmp_path, monkeypatch, source: dict | None):
monkeypatch.setenv("DATA_DIR", str(tmp_path))
settings = load_settings()
db = Database(tmp_path / "app.db")
db.initialize()
repo = Repository(db, settings)
garmin = FakeGarmin(source)
return SyncService(settings, repo, garmin), repo, garmin
def _source(name: str) -> dict:
workout = build_dummy_cycling_workout(name)
workout["workoutUuid"] = f"uuid-{name}"
return workout
def _store_current_mapping(repo: Repository, source: dict, scheduled_id: str | None) -> str:
clone_name = "GCClone 2026-06-16 Sprint"
repo.upsert_clone_mapping(
{
"scheduled_date": "2026-06-16",
"source_uuid": source["workoutUuid"],
"source_hash": workout_source_hash(source),
"source_name": "Sprint",
"clone_workout_id": "w-old",
"clone_workout_name": clone_name,
"scheduled_workout_id": scheduled_id,
"status": "scheduled",
"message": "current",
}
)
return clone_name
def _install_garmin_clone(
garmin: FakeGarmin,
source: dict,
workout_id: str,
clone_name: str,
) -> None:
payload = clone_workout_payload(source, date(2026, 6, 16), "GCClone")
payload["workoutId"] = workout_id
payload["workoutName"] = clone_name
garmin.client.workouts = [
*garmin.client.workouts,
{"workoutId": workout_id, "workoutName": clone_name},
]
garmin.client.workout_details[workout_id] = payload