from __future__ import annotations import hashlib import json from copy import deepcopy from datetime import date, datetime from typing import Any from garminconnect.workout import ( CyclingWorkout, WorkoutSegment, create_cooldown_step, create_interval_step, create_recovery_step, create_repeat_group, create_warmup_step, ) CYCLING_SPORT = {"sportTypeId": 2, "sportTypeKey": "cycling", "displayOrder": 2} CLONE_ID_FIELDS = { "author", "createdDate", "ownerId", "shared", "stepId", "updatedDate", "workoutId", } VALID_STEP_TYPES = { "warmup", "cooldown", "interval", "recovery", "rest", "repeat", "other", "main", } def build_dummy_cycling_workout(name: str | None = None) -> dict[str, Any]: workout = CyclingWorkout( workoutName=name or f"GCClone Probe Dummy {datetime.now():%Y-%m-%d %H:%M}", description="Minimal cycling workout uploaded by the Garmin Coach clone probe.", estimatedDurationInSecs=14 * 60, workoutSegments=[ WorkoutSegment( segmentOrder=1, sportType=CYCLING_SPORT, workoutSteps=[ create_warmup_step(5 * 60, step_order=1), create_repeat_group( iterations=1, step_order=2, workout_steps=[ create_interval_step(2 * 60, step_order=3), create_recovery_step(2 * 60, step_order=4), ], ), create_cooldown_step(5 * 60, step_order=5), ], ) ], ) return workout.to_dict() def clone_workout_payload( source: dict[str, Any], scheduled_date: date, prefix: str ) -> dict[str, Any]: if "workoutSegments" not in source: raise ValueError("source object does not contain workoutSegments") payload = deepcopy(source) _strip_ids(payload) original_name = str( source.get("workoutName") or source.get("name") or source.get("title") or "Garmin Coach Workout" ) payload["workoutName"] = f"{prefix} {scheduled_date.isoformat()} {original_name}"[:120] payload["description"] = ( "Generated by garmin-coach-to-cal-sync probe from a Garmin Coach/adaptive " "workout-like object. Verify targets on the Edge before relying on it." ) payload["sportType"] = CYCLING_SPORT for idx, segment in enumerate(payload.get("workoutSegments", []), start=1): if isinstance(segment, dict): segment["segmentOrder"] = segment.get("segmentOrder") or idx segment["sportType"] = CYCLING_SPORT payload.setdefault("estimatedDurationInSecs", estimate_duration(payload)) return payload def validate_workout_payload(workout: dict[str, Any]) -> list[str]: errors: list[str] = [] name = workout.get("workoutName") if not isinstance(name, str) or not name.strip(): errors.append("workoutName must be a non-empty string") sport_key = _key(workout.get("sportType")) if sport_key != "cycling": errors.append(f"sportType must be cycling, got {sport_key or ''}") duration = workout.get("estimatedDurationInSecs") if duration is not None and (not isinstance(duration, int) or duration < 0): errors.append("estimatedDurationInSecs must be a non-negative integer when present") segments = workout.get("workoutSegments") if not isinstance(segments, list) or not segments: errors.append("workoutSegments must be a non-empty list") return errors for segment_idx, segment in enumerate(segments, start=1): path = f"workoutSegments[{segment_idx}]" if not isinstance(segment, dict): errors.append(f"{path} must be an object") continue if _key(segment.get("sportType")) != "cycling": errors.append(f"{path}.sportType must be cycling") steps = segment.get("workoutSteps") if not isinstance(steps, list) or not steps: errors.append(f"{path}.workoutSteps must be a non-empty list") continue _validate_steps(steps, f"{path}.workoutSteps", errors) return errors def estimate_duration(workout: dict[str, Any]) -> int: total = 0 for segment in workout.get("workoutSegments", []): if isinstance(segment, dict): total += _steps_duration(segment.get("workoutSteps", []), multiplier=1) return total def workout_source_hash(workout: dict[str, Any]) -> str: value = deepcopy(workout) _strip_ids(value) for key in ( "uploadTimestamp", "workoutThumbnailUrl", "sharedWithUsers", "consumer", "consumerImageURL", "consumerName", "consumerWebsiteURL", ): value.pop(key, None) encoded = json.dumps(value, separators=(",", ":"), sort_keys=True) return hashlib.sha256(encoded.encode("utf-8")).hexdigest() def existing_clone_names( workouts: list[dict[str, Any]], scheduled_date: date, prefix: str ) -> list[str]: marker = f"{prefix} {scheduled_date.isoformat()}" return [ str(workout.get("workoutName")) for workout in workouts if str(workout.get("workoutName", "")).startswith(marker) ] def generated_workouts(workouts: list[dict[str, Any]], prefix: str) -> list[dict[str, Any]]: return [ workout for workout in workouts if str(workout.get("workoutName", "")).startswith(prefix) ] def find_generated_workout( workouts: list[dict[str, Any]], workout_id: str, prefix: str ) -> dict[str, Any] | None: for workout in generated_workouts(workouts, prefix): if str(workout.get("workoutId") or workout.get("id")) == str(workout_id): return workout return None def generated_calendar_entries(calendar_data: Any, prefix: str) -> list[dict[str, Any]]: entries: list[dict[str, Any]] = [] def walk(node: Any) -> None: if isinstance(node, dict): name = calendar_entry_name(node) if name is not None and name.startswith(prefix) and calendar_entry_id(node) is not None: entries.append(node) for child in node.values(): walk(child) elif isinstance(node, list): for child in node: walk(child) walk(calendar_data) return _dedupe_calendar_entries(entries) def find_generated_calendar_entry( calendar_data: Any, scheduled_id: str, prefix: str ) -> dict[str, Any] | None: for entry in generated_calendar_entries(calendar_data, prefix): if calendar_entry_id(entry) == str(scheduled_id): return entry return None def calendar_entry_id(entry: dict[str, Any]) -> str | None: for key in ("scheduledWorkoutId", "calendarItemId"): value = entry.get(key) if value is not None: return str(value) return None def calendar_entry_name(entry: dict[str, Any]) -> str | None: for key in ("workoutName", "name", "title"): value = entry.get(key) if isinstance(value, str) and value: return value for key in ("workout", "workoutDTO", "workoutSummary"): nested = entry.get(key) if isinstance(nested, dict): name = calendar_entry_name(nested) if name is not None: return name return None def calendar_entry_date(entry: dict[str, Any]) -> str | None: for key in ("date", "startDate", "scheduledDate", "calendarDate"): value = entry.get(key) if isinstance(value, str) and value: return value[:10] return None def summarize_workout(workout: dict[str, Any]) -> str: lines: list[str] = [] name = workout.get("workoutName") or workout.get("name") or workout.get("title") or "" lines.append(f"Workout: {name}") for segment in workout.get("workoutSegments", []): if not isinstance(segment, dict): continue segment_order = segment.get("segmentOrder", "?") lines.append(f" Segment {segment_order}:") _summarize_steps(lines, segment.get("workoutSteps", []), indent=" ") if len(lines) == 1: lines.append(" No workoutSegments/workoutSteps found.") return "\n".join(lines) def _summarize_steps(lines: list[str], steps: Any, indent: str) -> None: if not isinstance(steps, list): return for step in steps: if not isinstance(step, dict): continue step_type = _key(step.get("stepType")) or step.get("type") or "step" if str(step.get("type")) == "RepeatGroupDTO" or step_type == "repeat": iterations = step.get("numberOfIterations") or step.get("endConditionValue") or "?" lines.append(f"{indent}repeat x{iterations}") _summarize_steps(lines, step.get("workoutSteps", []), indent=f"{indent} ") continue end = _key(step.get("endCondition")) or "unknown-end" end_value = step.get("endConditionValue") target = _key(step.get("targetType")) or "no target" lines.append(f"{indent}{step_type}: {end} {end_value}; target {target}") def _key(value: Any) -> str | None: if isinstance(value, dict): for key in ("stepTypeKey", "conditionTypeKey", "workoutTargetTypeKey", "sportTypeKey"): if value.get(key): return str(value[key]) return None def _steps_duration(steps: Any, multiplier: int) -> int: if not isinstance(steps, list): return 0 total = 0 for step in steps: if not isinstance(step, dict): continue step_type = _key(step.get("stepType")) if str(step.get("type")) == "RepeatGroupDTO" or step_type == "repeat": iterations = int(step.get("numberOfIterations") or step.get("endConditionValue") or 1) total += _steps_duration(step.get("workoutSteps", []), multiplier=iterations) continue end = _key(step.get("endCondition")) if end == "time" and isinstance(step.get("endConditionValue"), int | float): total += int(step["endConditionValue"]) * multiplier return total def _validate_steps(steps: list[Any], path: str, errors: list[str]) -> None: for step_idx, step in enumerate(steps, start=1): step_path = f"{path}[{step_idx}]" if not isinstance(step, dict): errors.append(f"{step_path} must be an object") continue step_type = _key(step.get("stepType")) if step_type not in VALID_STEP_TYPES: errors.append(f"{step_path}.stepType is invalid or missing: {step_type}") continue if step_type == "repeat" or str(step.get("type")) == "RepeatGroupDTO": iterations = step.get("numberOfIterations") or step.get("endConditionValue") if not isinstance(iterations, int | float) or iterations <= 0: errors.append(f"{step_path}.numberOfIterations must be positive") nested = step.get("workoutSteps") if not isinstance(nested, list) or not nested: errors.append(f"{step_path}.workoutSteps must be a non-empty list") else: _validate_steps(nested, f"{step_path}.workoutSteps", errors) continue end_condition = _key(step.get("endCondition")) if end_condition is None: errors.append(f"{step_path}.endCondition is missing") elif end_condition != "lap.button": end_value = step.get("endConditionValue") if not isinstance(end_value, int | float) or end_value <= 0: errors.append(f"{step_path}.endConditionValue must be positive") if _key(step.get("targetType")) is None: errors.append(f"{step_path}.targetType is missing") def _strip_ids(value: Any) -> None: if isinstance(value, dict): for key in list(value.keys()): if key in CLONE_ID_FIELDS: value.pop(key, None) else: _strip_ids(value[key]) elif isinstance(value, list): for item in value: _strip_ids(item) def _dedupe_calendar_entries(items: list[dict[str, Any]]) -> list[dict[str, Any]]: seen: set[str | int] = set() deduped: list[dict[str, Any]] = [] for item in items: marker: str | int = calendar_entry_id(item) or id(item) if marker not in seen: seen.add(marker) deduped.append(item) return deduped