import 'package:phoenix_socket/phoenix_socket.dart'; import '../../../domain/models/server_config.dart'; class RealtimeDataClient { RealtimeDataClient({required this.config, required this.token}); final ServerConfig config; final String token; PhoenixSocket? _socket; final _channels = {}; Future connect() async { _socket = PhoenixSocket( config.wsUrl, socketOptions: PhoenixSocketOptions( params: {'token': token}, ), ); await _socket!.connect(); } Future disconnect() async { for (final channel in _channels.values) { channel.leave(); } _channels.clear(); _socket?.close(); _socket = null; } bool get isConnected => _socket?.isConnected ?? false; /// Joins [topic] (if not already joined) and returns a stream of message /// payloads for that topic. The stream stays open until [disconnect] is /// called or the underlying socket closes. Stream> channel( String topic, { Map params = const {}, }) { final socket = _socket; if (socket == null) throw StateError('RealtimeDataClient not connected'); final channel = _channels.putIfAbsent( topic, () { final ch = socket.addChannel(topic: topic, parameters: params); ch.join(); return ch; }, ); return channel.messages .where((msg) => msg.event.value != 'phx_reply') .map((msg) => msg.payload ?? const {}); } /// Like [channel], but includes the Phoenix event name in each emission. Stream<({String event, Map payload})> channelMessages( String topic, { Map params = const {}, }) { final socket = _socket; if (socket == null) throw StateError('RealtimeDataClient not connected'); final ch = _channels.putIfAbsent( topic, () { final c = socket.addChannel(topic: topic, parameters: params); c.join(); return c; }, ); return ch.messages .where((msg) => msg.event.value != 'phx_reply') .map((msg) => (event: msg.event.value, payload: msg.payload ?? const {})); } /// Pushes [event] on [topic] and waits for the server reply. /// The channel must have been joined first via [channel]. Future> push( String topic, String event, Map payload, ) async { final ch = _channels[topic]; if (ch == null) throw StateError('Channel $topic has not been joined'); final reply = await ch.push(event, payload).future; return (reply.response as Map?) ?? const {}; } }