defmodule Localiser.MQTT.Connection do @moduledoc """ GenServer responsible for maintaining a connection to the MQTT broker. """ use GenServer require Logger @broker_host Application.compile_env(:localiser, :mqtt_host, "localhost") @broker_port Application.compile_env(:localiser, :mqtt_port, 1883) @rssi_topic "localiser/sensor/+/rssi" @announce_topic "localiser/sensor/+/announce" def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end def publish(topic, payload) do GenServer.call(__MODULE__, {:publish, topic, payload}) end # Server Callbacks @impl true def init(_opts) do Process.flag(:trap_exit, true) send(self(), :connect) {:ok, %{client_id: nil, connected: false, client_pid: nil}} end @impl true def handle_info(:connect, state) do client_id = "localiser_#{:rand.uniform(1000)}" Logger.info("[MQTT.Connection] Attempting to connect to MQTT broker at #{@broker_host}:#{@broker_port} with client ID #{client_id}") case do_connect(client_id) do {:ok, pid} -> Logger.info("[MQTT.Connection] Connected to MQTT broker with client ID #{client_id}") {:noreply, %{state | client_id: client_id, connected: true, client_pid: pid}} {:error, reason} -> # TODO: Implement exponential backoff for retries Logger.error("[MQTT.Connection] Failed to connect to MQTT broker: #{inspect(reason)}. Retrying in 5 seconds...") Process.send_after(self(), :connect, 5_000) {:noreply, %{state | connected: false, client_pid: nil}} end end # Emqtt process exited (broker dropped connection or startup failure) @impl true def handle_info({:EXIT, pid, reason}, %{client_pid: pid} = state) do Logger.warning("[MQTT.Connection] MQTT client exited (#{inspect(reason)}). Reconnecting in 5 seconds...") Process.send_after(self(), :connect, 5_000) {:noreply, %{state | connected: false, client_pid: nil}} end def handle_info({:EXIT, _pid, _reason}, state), do: {:noreply, state} def handle_info({:publish, %{:topic => topic, :payload => payload}}, state) do Localiser.MQTT.Router.route(topic, payload) {:noreply, state} end @impl true def handle_call({:publish, _topic, _payload}, _from, %{connected: false} = state) do {:reply, {:error, :not_connected}, state} end @impl true def handle_call({:publish, topic, payload}, _from, %{client_pid: pid} = state) do result = :emqtt.publish(pid, topic, payload) {:reply, result, state} end # Private defp do_connect(client_id) do case :emqtt.start_link(host: @broker_host, port: @broker_port, clientid: client_id) do {:ok, pid} -> do_connect_pid(pid) {:error, reason} -> {:error, reason} end end defp do_connect_pid(pid) do case :emqtt.connect(pid) do {:ok, _props} -> :emqtt.subscribe(pid, {@rssi_topic, 1}) :emqtt.subscribe(pid, {@announce_topic, 1}) {:ok, pid} {:error, reason} -> if Process.alive?(pid), do: :emqtt.stop(pid) {:error, reason} end catch :exit, reason -> if Process.alive?(pid), do: :emqtt.stop(pid) {:error, reason} end end