defmodule Localiser.RSSI.Buffer do use GenServer alias Localiser.Localisation.Tag.Filter, as: TagFilter alias Localiser.Localisation.Sensor.Server, as: SensorServer @flush_interval_ms 500 # reading :: %{sensor_id: String.t(), tag_id: String.t(), rssi: integer(), tx_power: integer() | nil} def push(reading) do GenServer.cast(__MODULE__, {:push, reading}) end def start_link(_args) do GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end @impl true def init(state) do schedule_flush() {:ok, state} end @impl true def handle_cast({:push, %{tag_id: tag_id} = reading}, state) do {:noreply, Map.update(state, tag_id, [reading], &[reading | &1])} end @impl true def handle_info(:flush, state) do flush_batches(state) schedule_flush() {:noreply, %{}} end defp flush_batches(batches) do Enum.each(batches, fn {tag_id, readings} -> # Calibration routing runs regardless of whether the tag has a filter registered, # so scan-mode readings from unknown tags still reach the sensor server. normal = Enum.flat_map(readings, &route_reading(tag_id, &1)) if normal != [] do case Registry.lookup(Localiser.Registry, {:filter, tag_id}) do [{_pid, _}] -> TagFilter.ingest(tag_id, normal) [] -> :ok end end end) end # Routes one reading. If the sensor is in any calibration state, forwards to Sensor.Server # (with tag_id so scan-mode can advertise it) and returns [] to exclude from Tag.Filter. # Otherwise returns a normal measurement map. defp route_reading(tag_id, %{sensor_id: sensor_id, rssi: rssi, tx_power: tx_power}) do case Registry.lookup(Localiser.Registry, {:sensor, sensor_id}) do [{_pid, _}] -> if SensorServer.in_calibration_mode?(sensor_id) do SensorServer.calibration_reading(sensor_id, tag_id, rssi) [] else [SensorServer.measure(sensor_id, rssi, tx_power)] end [] -> [] end end defp schedule_flush do Process.send_after(self(), :flush, @flush_interval_ms) end end