Files
companion/lib/data/sources/localiser/realtime_data_client.dart
T
2026-05-07 18:35:58 +02:00

73 lines
2.0 KiB
Dart

import 'package:phoenix_socket/phoenix_socket.dart';
import '../../../domain/models/server_config.dart';
class RealtimeDataClient {
RealtimeDataClient({required this.config, required this.token});
final ServerConfig config;
final String token;
PhoenixSocket? _socket;
final _channels = <String, PhoenixChannel>{};
Future<void> connect() async {
_socket = PhoenixSocket(
config.wsUrl,
socketOptions: PhoenixSocketOptions(
params: {'token': token},
),
);
await _socket!.connect();
}
Future<void> disconnect() async {
for (final channel in _channels.values) {
channel.leave();
}
_channels.clear();
_socket?.close();
_socket = null;
}
bool get isConnected => _socket?.isConnected ?? false;
/// Joins [topic] (if not already joined) and returns a stream of message
/// payloads for that topic. The stream stays open until [disconnect] is
/// called or the underlying socket closes.
Stream<Map<String, dynamic>> channel(
String topic, {
Map<String, dynamic> params = const {},
}) {
final socket = _socket;
if (socket == null) throw StateError('RealtimeDataClient not connected');
final channel = _channels.putIfAbsent(
topic,
() {
final ch = socket.addChannel(topic: topic, parameters: params);
ch.join();
return ch;
},
);
return channel.messages
.where((msg) => msg.event.value != 'phx_reply')
.map((msg) => msg.payload ?? const {});
}
/// Pushes [event] on [topic] and waits for the server reply.
/// The channel must have been joined first via [channel].
Future<Map<String, dynamic>> push(
String topic,
String event,
Map<String, dynamic> payload,
) async {
final ch = _channels[topic];
if (ch == null) throw StateError('Channel $topic has not been joined');
final reply = await ch.push(event, payload).future;
return (reply.response as Map<String, dynamic>?) ?? const {};
}
}