import 'dart:async'; import 'dart:convert'; import 'package:anyhow/anyhow.dart'; import 'package:flutter_bloc/flutter_bloc.dart'; import 'package:ot_viewer_app/owntracks_api.dart'; import 'package:ot_viewer_app/settings_page.dart'; import 'package:ws/ws.dart'; import 'package:rust_core/option.dart'; // Define the state. For simplicity, we're just using Map directly. // You might want to define a more specific state class based on your application's needs. abstract class LocationUpdateState {} class LocationUpdateUnconnected extends LocationUpdateState {} class LocationUpdateConnected extends LocationUpdateState {} class LocationUpdateReceived extends LocationUpdateState { Point position; (String, String) deviceId; LocationUpdateReceived(this.position, this.deviceId); } class LocationSubscribeCubit extends Cubit { Option _wsClient = None; Option> _connectionCompleter = None; String url = ''; String username = ''; String pass = ''; LocationSubscribeCubit() : super(LocationUpdateUnconnected()); // TODO: handle ongoing connection attempt by canceling? the loop? or smth subscribe(SettingsState settings) async { // check if resubscribe is necessary (different URL) if (settings.url == url && settings.username == username && settings.password == pass) { return; } else { url = settings.url; username = settings.username; pass = settings.password; } await _wsConnectionEstablish(); } reconnect() async { print("reconnecting..."); if (_wsClient.isSome()) { await _wsClient.unwrap().close(); _wsClient = None; } else { print("not connected, not reconnecting"); return; } await _wsConnectionEstablish(); } Future _wsConnectionEstablish() async { // If there's an ongoing connection attempt, wait for it if (_connectionCompleter.isSome()) { await _connectionCompleter.unwrap().future; return; } // Start new connection attempt _connectionCompleter = Some(Completer()); if (_wsClient.isSome()) { await _wsClient.unwrap().close(); _wsClient = None; } Result ws = bail('Not done yet'); while (!ws.isOk()) { ws = await OwntracksApi(baseUrl: url, username: username, pass: pass) .createWebSocketConnection( wsPath: 'last', onMessage: (msg) { if (msg is String) { if (msg == 'LAST') { return; } try { final Map map = jsonDecode(msg); if (map['_type'] == 'location') { // filter points (only the ones for this device pls!) final topic = (map['topic'] as String?)?.split('/'); if (topic == null || topic.length < 3) { // couldn't reconstruct ID, bail return; } // build device_id final deviceId = (topic[1], topic[2]); print(map); // build point final p = Point( lat: map['lat'] as double, lon: map['lon'] as double, timestamp: DateTime.fromMillisecondsSinceEpoch( (map['tst'] as int) * 1000)); print(p); emit(LocationUpdateReceived(p, deviceId)); } } catch (e) { print('BUG: Couldn\'t parse WS message: $msg ($e)'); } } }, onStateChange: (sc) { switch (sc) { case WebSocketClientState$Open(:final url): _wsClient.map((wsc) => wsc.add('LAST')); emit(LocationUpdateConnected()); break; default: emit(LocationUpdateUnconnected()); break; } print(sc); }, ); if (!ws.isOk()) { print( 'Failed to connect to WebSocket: ${ws.unwrapErr()}\n, retrying in 1s'); await Future.delayed(const Duration(seconds: 1)); } } _wsClient = ws.expect("Estabilshing Websocket Conenction failed").toOption(); _connectionCompleter.unwrap().complete(); _connectionCompleter = None; } @override void onChange(Change change) { print('loc_sub_cubit change: $change'); } @override Future close() async { await _wsClient.toFutureOption().map((conn) => conn.close()); return super.close(); } }