diff --git a/lib/service/firmware_update_service.dart b/lib/service/firmware_update_service.dart new file mode 100644 index 0000000..ff6e1d3 --- /dev/null +++ b/lib/service/firmware_update_service.dart @@ -0,0 +1,551 @@ +import 'dart:async'; + +import 'package:abawo_bt_app/controller/bluetooth.dart'; +import 'package:abawo_bt_app/model/shifter_types.dart'; +import 'package:abawo_bt_app/service/dfu_protocol.dart'; +import 'package:abawo_bt_app/service/shifter_service.dart'; +import 'package:anyhow/anyhow.dart'; + +const int _initialAckSequence = 0xFF; + +class FirmwareUpdateService { + FirmwareUpdateService({ + required FirmwareUpdateTransport transport, + this.defaultWindowSize = 8, + this.maxNoProgressRetries = 5, + this.defaultAckTimeout = const Duration(milliseconds: 800), + }) : _transport = transport; + + final FirmwareUpdateTransport _transport; + final int defaultWindowSize; + final int maxNoProgressRetries; + final Duration defaultAckTimeout; + + final StreamController _progressController = + StreamController.broadcast(); + + DfuUpdateProgress _currentProgress = const DfuUpdateProgress( + state: DfuUpdateState.idle, + totalBytes: 0, + sentBytes: 0, + lastAckedSequence: _initialAckSequence, + sessionId: 0, + flags: DfuUpdateFlags(), + ); + + StreamSubscription>? _ackSubscription; + Completer? _ackSignal; + Completer? _cancelSignal; + int _ackEventCount = 0; + String? _ackStreamError; + bool _isRunning = false; + bool _cancelRequested = false; + int _latestAckSequence = _initialAckSequence; + int _ackedFrames = 0; + int _totalFrames = 0; + int _totalBytes = 0; + + Stream get progressStream => _progressController.stream; + + DfuUpdateProgress get currentProgress => _currentProgress; + + bool get isUpdating => _isRunning; + + Future> startUpdate({ + required List imageBytes, + required int sessionId, + DfuUpdateFlags flags = const DfuUpdateFlags(), + int requestedMtu = universalShifterDfuPreferredMtu, + int? windowSize, + Duration? ackTimeout, + int? noProgressRetries, + }) async { + if (_isRunning) { + return bail( + 'Firmware update is already running. Cancel or wait for completion before starting a new upload.'); + } + if (imageBytes.isEmpty) { + return bail( + 'Firmware image is empty. Select a valid .bin file and retry.'); + } + + final effectiveWindowSize = windowSize ?? defaultWindowSize; + final effectiveAckTimeout = ackTimeout ?? defaultAckTimeout; + final effectiveNoProgressRetries = + noProgressRetries ?? maxNoProgressRetries; + + if (effectiveWindowSize <= 0) { + return bail( + 'DFU window size must be at least 1 frame. Got $effectiveWindowSize.'); + } + if (effectiveNoProgressRetries < 0) { + return bail( + 'No-progress retry limit cannot be negative. Got $effectiveNoProgressRetries.'); + } + + _isRunning = true; + _cancelRequested = false; + _cancelSignal = Completer(); + _ackSignal = null; + _ackEventCount = 0; + _ackStreamError = null; + _latestAckSequence = _initialAckSequence; + _ackedFrames = 0; + _totalFrames = + (imageBytes.length + universalShifterDfuFramePayloadSizeBytes - 1) ~/ + universalShifterDfuFramePayloadSizeBytes; + _totalBytes = imageBytes.length; + + final normalizedSessionId = sessionId & 0xFF; + final crc32 = DfuProtocol.crc32(imageBytes); + final frames = DfuProtocol.buildDataFrames(imageBytes); + var shouldAbortForCleanup = false; + + _emitProgress( + state: DfuUpdateState.starting, + totalBytes: imageBytes.length, + sentBytes: 0, + lastAckedSequence: _initialAckSequence, + sessionId: normalizedSessionId, + flags: flags, + ); + + try { + final preflightResult = await _transport.runPreflight( + requestedMtu: requestedMtu, + ); + if (preflightResult.isErr()) { + throw _DfuFailure( + 'DFU preflight check failed due to transport error: ${preflightResult.unwrapErr()}', + ); + } + final preflight = preflightResult.unwrap(); + if (!preflight.canStart) { + throw _DfuFailure( + preflight.message ?? + 'DFU preflight failed. Ensure button connection and MTU are ready, then retry.', + ); + } + + await _ackSubscription?.cancel(); + _ackSubscription = _transport.subscribeToAck().listen( + _handleAckPayload, + onError: (Object error) { + _ackStreamError = + 'ACK indication stream failed: $error. Reconnect and retry the update.'; + _signalAckWaiters(); + }, + ); + + _emitProgress(state: DfuUpdateState.waitingForAck); + final startEventCount = _ackEventCount; + final startWriteResult = await _transport.writeControl( + DfuProtocol.encodeStartPayload( + DfuStartPayload( + totalLength: imageBytes.length, + imageCrc32: crc32, + sessionId: normalizedSessionId, + flags: flags.rawValue, + ), + ), + ); + if (startWriteResult.isErr()) { + throw _DfuFailure( + 'Failed to send DFU START command: ${startWriteResult.unwrapErr()}', + ); + } + shouldAbortForCleanup = true; + + final initialAck = await _waitForInitialAck( + afterEventCount: startEventCount, + timeout: effectiveAckTimeout, + ); + if (initialAck != _initialAckSequence) { + throw _DfuFailure( + 'Device did not acknowledge START correctly (expected ACK 0xFF, got 0x${initialAck.toRadixString(16).padLeft(2, '0').toUpperCase()}). Send ABORT, reconnect if needed, and retry.', + ); + } + + _emitProgress(state: DfuUpdateState.transferring); + + var nextFrameIndex = 0; + var retriesWithoutProgress = 0; + + while (_ackedFrames < _totalFrames) { + _throwIfCancelled(); + _throwIfAckStreamErrored(); + + final ackedBeforeWindow = _ackedFrames; + final endExclusive = + (nextFrameIndex + effectiveWindowSize).clamp(0, frames.length); + + for (var frameIndex = nextFrameIndex; + frameIndex < endExclusive; + frameIndex++) { + _throwIfCancelled(); + final writeResult = + await _transport.writeDataFrame(frames[frameIndex].bytes); + if (writeResult.isErr()) { + throw _DfuFailure( + 'Failed sending DFU data frame #$frameIndex (seq 0x${frames[frameIndex].sequence.toRadixString(16).padLeft(2, '0').toUpperCase()}): ${writeResult.unwrapErr()}', + ); + } + } + + nextFrameIndex = endExclusive; + + if (_ackedFrames > ackedBeforeWindow) { + retriesWithoutProgress = 0; + nextFrameIndex = _ackedFrames; + continue; + } + + final gotProgress = await _waitForAckProgress( + ackedFramesBeforeWait: ackedBeforeWindow, + timeout: effectiveAckTimeout, + ); + + if (gotProgress) { + retriesWithoutProgress = 0; + nextFrameIndex = _ackedFrames; + continue; + } + + retriesWithoutProgress += 1; + if (retriesWithoutProgress > effectiveNoProgressRetries) { + throw _DfuFailure( + 'Upload stalled: no ACK progress after $retriesWithoutProgress retries (last ACK 0x${_latestAckSequence.toRadixString(16).padLeft(2, '0').toUpperCase()}). Check BLE signal quality and retry.', + ); + } + + nextFrameIndex = _ackedFrames; + } + + _emitProgress( + state: DfuUpdateState.finishing, sentBytes: imageBytes.length); + final finishResult = + await _transport.writeControl(DfuProtocol.encodeFinishPayload()); + if (finishResult.isErr()) { + throw _DfuFailure( + 'Failed to send DFU FINISH command: ${finishResult.unwrapErr()}', + ); + } + + shouldAbortForCleanup = false; + _emitProgress( + state: DfuUpdateState.completed, sentBytes: imageBytes.length); + return Ok(null); + } on _DfuCancelled { + if (shouldAbortForCleanup) { + await _sendAbortForCleanup(); + } + _emitProgress(state: DfuUpdateState.aborted); + return bail('Firmware update canceled by user.'); + } on _DfuFailure catch (failure) { + if (shouldAbortForCleanup) { + await _sendAbortForCleanup(); + } + _emitProgress( + state: DfuUpdateState.failed, errorMessage: failure.message); + return bail(failure.message); + } catch (error) { + if (shouldAbortForCleanup) { + await _sendAbortForCleanup(); + } + final message = + 'Firmware update failed unexpectedly: $error. Reconnect to the button and retry.'; + _emitProgress(state: DfuUpdateState.failed, errorMessage: message); + return bail(message); + } finally { + await _ackSubscription?.cancel(); + _ackSubscription = null; + _isRunning = false; + _cancelRequested = false; + _cancelSignal = null; + _ackSignal = null; + _ackEventCount = 0; + _ackStreamError = null; + _latestAckSequence = _currentProgress.lastAckedSequence; + _ackedFrames = 0; + _totalFrames = 0; + _totalBytes = 0; + } + } + + Future cancelUpdate() async { + if (!_isRunning || _cancelRequested) { + return; + } + _cancelRequested = true; + _cancelSignal?.complete(); + _signalAckWaiters(); + } + + Future dispose() async { + await cancelUpdate(); + await _ackSubscription?.cancel(); + _ackSubscription = null; + await _progressController.close(); + } + + void _handleAckPayload(List payload) { + try { + final sequence = DfuProtocol.parseAckPayload(payload); + final previousAck = _latestAckSequence; + _latestAckSequence = sequence; + + if (_totalFrames > 0 && + _currentProgress.state == DfuUpdateState.transferring) { + final delta = DfuProtocol.sequenceDistance(previousAck, sequence); + if (delta > 0) { + _ackedFrames = (_ackedFrames + delta).clamp(0, _totalFrames); + } + + _emitProgress( + lastAckedSequence: sequence, + sentBytes: + _ackedBytesFromFrames(_ackedFrames, _totalFrames, _totalBytes), + ); + } else { + _emitProgress(lastAckedSequence: sequence); + } + } on FormatException catch (error) { + _ackStreamError = + 'Received malformed ACK indication: $error. Reconnect and retry.'; + } finally { + _ackEventCount += 1; + _signalAckWaiters(); + } + } + + void _emitProgress({ + DfuUpdateState? state, + int? totalBytes, + int? sentBytes, + int? lastAckedSequence, + int? sessionId, + DfuUpdateFlags? flags, + String? errorMessage, + }) { + final next = DfuUpdateProgress( + state: state ?? _currentProgress.state, + totalBytes: totalBytes ?? _currentProgress.totalBytes, + sentBytes: sentBytes ?? _currentProgress.sentBytes, + lastAckedSequence: + lastAckedSequence ?? _currentProgress.lastAckedSequence, + sessionId: sessionId ?? _currentProgress.sessionId, + flags: flags ?? _currentProgress.flags, + errorMessage: errorMessage, + ); + _currentProgress = next; + _progressController.add(next); + } + + Future _waitForInitialAck({ + required int afterEventCount, + required Duration timeout, + }) async { + final deadline = DateTime.now().add(timeout); + var observedEvents = afterEventCount; + + while (true) { + _throwIfCancelled(); + _throwIfAckStreamErrored(); + final remaining = deadline.difference(DateTime.now()); + if (remaining <= Duration.zero) { + throw _DfuFailure( + 'Timed out waiting for initial DFU ACK after START. Ensure indications are enabled and retry.', + ); + } + + final gotEvent = await _waitForNextAckEvent( + afterEventCount: observedEvents, + timeout: remaining, + ); + if (!gotEvent) { + continue; + } + observedEvents = _ackEventCount; + return _latestAckSequence; + } + } + + Future _waitForAckProgress({ + required int ackedFramesBeforeWait, + required Duration timeout, + }) async { + final deadline = DateTime.now().add(timeout); + var observedEvents = _ackEventCount; + + while (true) { + _throwIfCancelled(); + _throwIfAckStreamErrored(); + + if (_ackedFrames > ackedFramesBeforeWait) { + return true; + } + + final remaining = deadline.difference(DateTime.now()); + if (remaining <= Duration.zero) { + return false; + } + + final gotEvent = await _waitForNextAckEvent( + afterEventCount: observedEvents, + timeout: remaining, + ); + + if (!gotEvent) { + continue; + } + + observedEvents = _ackEventCount; + } + } + + Future _waitForNextAckEvent({ + required int afterEventCount, + required Duration timeout, + }) async { + if (_ackEventCount > afterEventCount) { + return true; + } + + _ackSignal ??= Completer(); + final signal = _ackSignal!; + + try { + await Future.any([ + signal.future, + _cancelSignal?.future ?? Future.value(), + ]).timeout(timeout); + } on TimeoutException { + return false; + } + + if (identical(_ackSignal, signal)) { + _ackSignal = null; + } + + _throwIfCancelled(); + _throwIfAckStreamErrored(); + return _ackEventCount > afterEventCount; + } + + void _throwIfCancelled() { + if (_cancelRequested) { + throw const _DfuCancelled(); + } + } + + void _throwIfAckStreamErrored() { + final error = _ackStreamError; + if (error != null) { + throw _DfuFailure(error); + } + } + + Future _sendAbortForCleanup() async { + final result = + await _transport.writeControl(DfuProtocol.encodeAbortPayload()); + if (result.isErr()) { + final cleanupMessage = + 'Could not send DFU ABORT during cleanup: ${result.unwrapErr()}'; + if (_currentProgress.state == DfuUpdateState.failed && + _currentProgress.errorMessage != null) { + _emitProgress( + errorMessage: '${_currentProgress.errorMessage} $cleanupMessage', + ); + } + } + } + + void _signalAckWaiters() { + final signal = _ackSignal; + if (signal != null && !signal.isCompleted) { + signal.complete(); + } + } + + int _ackedBytesFromFrames(int ackedFrames, int totalFrames, int totalBytes) { + if (totalFrames == 0 || ackedFrames <= 0) { + return 0; + } + if (ackedFrames >= totalFrames) { + return totalBytes; + } + return ackedFrames * universalShifterDfuFramePayloadSizeBytes; + } +} + +abstract interface class FirmwareUpdateTransport { + Future> runPreflight({required int requestedMtu}); + + Stream> subscribeToAck(); + + Future> writeControl(List payload); + + Future> writeDataFrame(List frame); +} + +class ShifterFirmwareUpdateTransport implements FirmwareUpdateTransport { + ShifterFirmwareUpdateTransport({ + required this.shifterService, + required this.bluetoothController, + required this.buttonDeviceId, + }); + + final ShifterService shifterService; + final BluetoothController bluetoothController; + final String buttonDeviceId; + + @override + Future> runPreflight({ + required int requestedMtu, + }) { + return shifterService.runDfuPreflight(requestedMtu: requestedMtu); + } + + @override + Stream> subscribeToAck() { + return bluetoothController.subscribeToCharacteristic( + buttonDeviceId, + universalShifterControlServiceUuid, + universalShifterDfuAckCharacteristicUuid, + ); + } + + @override + Future> writeControl(List payload) { + return bluetoothController.writeCharacteristic( + buttonDeviceId, + universalShifterControlServiceUuid, + universalShifterDfuControlCharacteristicUuid, + payload, + ); + } + + @override + Future> writeDataFrame(List frame) { + return bluetoothController.writeCharacteristic( + buttonDeviceId, + universalShifterControlServiceUuid, + universalShifterDfuDataCharacteristicUuid, + frame, + withResponse: false, + ); + } +} + +class _DfuFailure implements Exception { + const _DfuFailure(this.message); + + final String message; + + @override + String toString() => message; +} + +class _DfuCancelled implements Exception { + const _DfuCancelled(); +} diff --git a/test/service/firmware_update_service_test.dart b/test/service/firmware_update_service_test.dart new file mode 100644 index 0000000..8cb795c --- /dev/null +++ b/test/service/firmware_update_service_test.dart @@ -0,0 +1,200 @@ +import 'dart:async'; + +import 'package:abawo_bt_app/model/shifter_types.dart'; +import 'package:abawo_bt_app/service/firmware_update_service.dart'; +import 'package:anyhow/anyhow.dart'; +import 'package:flutter_test/flutter_test.dart'; + +void main() { + group('FirmwareUpdateService', () { + test('completes happy path with START, data frames, and FINISH', () async { + final transport = _FakeFirmwareUpdateTransport(); + final service = FirmwareUpdateService( + transport: transport, + defaultWindowSize: 4, + defaultAckTimeout: const Duration(milliseconds: 100), + ); + + final image = List.generate(130, (index) => index & 0xFF); + final result = await service.startUpdate( + imageBytes: image, + sessionId: 7, + ); + + expect(result.isOk(), isTrue); + expect(transport.controlWrites.length, 2); + expect( + transport.controlWrites.first.first, universalShifterDfuOpcodeStart); + expect(transport.controlWrites.last, [universalShifterDfuOpcodeFinish]); + expect(transport.dataWrites.length, greaterThanOrEqualTo(3)); + expect(service.currentProgress.state, DfuUpdateState.completed); + expect(service.currentProgress.sentBytes, image.length); + + await service.dispose(); + await transport.dispose(); + }); + + test('rewinds to ack+1 and retransmits after ACK stall', () async { + final transport = _FakeFirmwareUpdateTransport(dropFirstSequence: 1); + final service = FirmwareUpdateService( + transport: transport, + defaultWindowSize: 3, + defaultAckTimeout: const Duration(milliseconds: 100), + maxNoProgressRetries: 4, + ); + + final image = List.generate(190, (index) => index & 0xFF); + final result = await service.startUpdate( + imageBytes: image, + sessionId: 9, + ); + + expect(result.isOk(), isTrue); + expect(transport.dataWrites.length, greaterThan(4)); + expect(transport.sequenceWriteCount(1), greaterThan(1)); + expect(service.currentProgress.state, DfuUpdateState.completed); + + await service.dispose(); + await transport.dispose(); + }); + + test('cancel sends ABORT and reports aborted state', () async { + final firstFrameSent = Completer(); + final transport = _FakeFirmwareUpdateTransport( + onDataWrite: (frame) { + if (!firstFrameSent.isCompleted) { + firstFrameSent.complete(); + } + }, + suppressDataAcks: true, + ); + final service = FirmwareUpdateService( + transport: transport, + defaultWindowSize: 1, + defaultAckTimeout: const Duration(milliseconds: 500), + ); + + final future = service.startUpdate( + imageBytes: List.generate(90, (index) => index & 0xFF), + sessionId: 11, + ); + + await firstFrameSent.future.timeout(const Duration(seconds: 1)); + await service.cancelUpdate(); + final result = await future; + + expect(result.isErr(), isTrue); + expect(result.unwrapErr().toString(), contains('canceled')); + expect(transport.controlWrites.last, [universalShifterDfuOpcodeAbort]); + expect(service.currentProgress.state, DfuUpdateState.aborted); + + await service.dispose(); + await transport.dispose(); + }); + }); +} + +class _FakeFirmwareUpdateTransport implements FirmwareUpdateTransport { + _FakeFirmwareUpdateTransport({ + this.dropFirstSequence, + this.onDataWrite, + this.suppressDataAcks = false, + }); + + final int? dropFirstSequence; + final void Function(List frame)? onDataWrite; + final bool suppressDataAcks; + + final StreamController> _ackController = + StreamController>.broadcast(); + + final List> controlWrites = >[]; + final List> dataWrites = >[]; + final Set _droppedOnce = {}; + int _lastAck = 0xFF; + int _expectedSequence = 0; + + @override + Future> runPreflight({ + required int requestedMtu, + }) async { + return Ok( + DfuPreflightResult.ready( + requestedMtu: requestedMtu, + negotiatedMtu: 128, + ), + ); + } + + @override + Stream> subscribeToAck() => _ackController.stream; + + @override + Future> writeControl(List payload) async { + controlWrites.add(List.from(payload, growable: false)); + + final opcode = payload.isEmpty ? -1 : payload.first; + if (opcode == universalShifterDfuOpcodeStart) { + _lastAck = 0xFF; + _expectedSequence = 0; + scheduleMicrotask(() { + _ackController.add([0xFF]); + }); + } + + if (opcode == universalShifterDfuOpcodeAbort) { + _lastAck = 0xFF; + _expectedSequence = 0; + } + + return Ok(null); + } + + @override + Future> writeDataFrame(List frame) async { + dataWrites.add(List.from(frame, growable: false)); + onDataWrite?.call(frame); + + if (suppressDataAcks) { + return Ok(null); + } + + final sequence = frame.first; + final shouldDrop = dropFirstSequence != null && + sequence == dropFirstSequence && + !_droppedOnce.contains(sequence); + + if (shouldDrop) { + _droppedOnce.add(sequence); + scheduleMicrotask(() { + _ackController.add([_lastAck]); + }); + return Ok(null); + } + + if (sequence == _expectedSequence) { + _lastAck = sequence; + _expectedSequence = (_expectedSequence + 1) & 0xFF; + } + + scheduleMicrotask(() { + _ackController.add([_lastAck]); + }); + + return Ok(null); + } + + int sequenceWriteCount(int sequence) { + var count = 0; + for (final frame in dataWrites) { + if (frame.first == sequence) { + count += 1; + } + } + return count; + } + + Future dispose() async { + await _ackController.close(); + } +}