Files

73 lines
2.1 KiB
Elixir

defmodule Localiser.RSSI.Buffer do
use GenServer
alias Localiser.Localisation.Tag.Filter, as: TagFilter
alias Localiser.Localisation.Sensor.Server, as: SensorServer
@flush_interval_ms 500
# reading :: %{sensor_id: String.t(), tag_id: String.t(), rssi: integer(), tx_power: integer() | nil}
def push(reading) do
GenServer.cast(__MODULE__, {:push, reading})
end
def start_link(_args) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
@impl true
def init(state) do
schedule_flush()
{:ok, state}
end
@impl true
def handle_cast({:push, %{tag_id: tag_id} = reading}, state) do
{:noreply, Map.update(state, tag_id, [reading], &[reading | &1])}
end
@impl true
def handle_info(:flush, state) do
flush_batches(state)
schedule_flush()
{:noreply, %{}}
end
defp flush_batches(batches) do
Enum.each(batches, fn {tag_id, readings} ->
# Calibration routing runs regardless of whether the tag has a filter registered,
# so scan-mode readings from unknown tags still reach the sensor server.
normal = Enum.flat_map(readings, &route_reading(tag_id, &1))
if normal != [] do
case Registry.lookup(Localiser.Registry, {:filter, tag_id}) do
[{_pid, _}] -> TagFilter.ingest(tag_id, normal)
[] -> :ok
end
end
end)
end
# Routes one reading. If the sensor is in any calibration state, forwards to Sensor.Server
# (with tag_id so scan-mode can advertise it) and returns [] to exclude from Tag.Filter.
# Otherwise returns a normal measurement map.
defp route_reading(tag_id, %{sensor_id: sensor_id, rssi: rssi, tx_power: tx_power}) do
case Registry.lookup(Localiser.Registry, {:sensor, sensor_id}) do
[{_pid, _}] ->
if SensorServer.in_calibration_mode?(sensor_id) do
SensorServer.calibration_reading(sensor_id, tag_id, rssi)
[]
else
[SensorServer.measure(sensor_id, rssi, tx_power)]
end
[] ->
[]
end
end
defp schedule_flush do
Process.send_after(self(), :flush, @flush_interval_ms)
end
end