diff --git a/actors.cabal b/actors.cabal index 7d150d6..2b6afa1 100644 --- a/actors.cabal +++ b/actors.cabal @@ -62,7 +62,14 @@ library import: warnings -- Modules exported by the library. - exposed-modules: Control.Actor + exposed-modules: + Control.Actor + Control.Actor.Types + Control.Actor.Transport + Control.Actor.Runtime + Control.Actor.Core + Control.Actor.Supervision + Control.Actor.Network -- Modules included in this library but not exported. -- other-modules: @@ -71,7 +78,7 @@ library -- other-extensions: -- Other library packages from which modules are imported. - build-depends: base ^>=4.18.3.0, stm, uuid, bytestring ^>=0.12.0.0, containers ^>=0.8, mtl ^>=2.3.0, binary ^>=0.8.9 + build-depends: base ^>=4.18.3.0, stm, uuid, bytestring ^>=0.12.0.0, containers ^>=0.8, mtl ^>=2.3.0, binary ^>=0.8.9, network ^>=3.2 -- Directories containing source files. hs-source-dirs: src diff --git a/src/Control/Actor.hs b/src/Control/Actor.hs index 9e5e05b..0cf3958 100644 --- a/src/Control/Actor.hs +++ b/src/Control/Actor.hs @@ -1,439 +1,31 @@ -{-# LANGUAGE StrictData #-} +module Control.Actor + ( module Control.Actor.Types + , module Control.Actor.Transport + , module Control.Actor.Runtime + , module Control.Actor.Core + , module Control.Actor.Supervision + , module Control.Actor.Network + -- Demo + , pingActor + , forwardActorWithCell + , repeatActor + , system + ) where -module Control.Actor where +import Control.Actor.Core +import Control.Actor.Network +import Control.Actor.Runtime +import Control.Actor.Supervision +import Control.Actor.Transport +import Control.Actor.Types -import Control.Concurrent (ThreadId, forkIO, threadDelay) -import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar) +import Control.Concurrent (forkIO) import Control.Concurrent.STM - ( TMVar, - TQueue, - TVar, - atomically, - flushTQueue, - modifyTVar, - newEmptyTMVarIO, - newTQueueIO, - newTVar, - newTVarIO, - orElse, - putTMVar, - readTMVar, - readTQueue, - readTVar, - readTVarIO, - tryTakeTMVar, - writeTQueue, - writeTVar, - ) -import Control.Exception (AsyncException (..), SomeException, fromException, throwIO, try) -import Control.Monad (forM_, forever, void) -import Control.Monad.Reader (MonadIO (liftIO), MonadReader (ask), ReaderT (runReaderT), asks, lift, withReaderT) -import Data.Binary (Binary, decode, encode) -import Data.ByteString.Lazy (ByteString) -import Data.List (find) -import Data.Map qualified as Map -import Data.UUID (UUID) -import Data.UUID.V4 (nextRandom) -import GHC.Generics (Generic) + ( TMVar, atomically, newEmptyTMVarIO, readTMVar ) +import Control.Monad.Reader (MonadIO (..)) import Unsafe.Coerce (unsafeCoerce) -data NodeAddr = NodeAddr - { nodeHost :: String, - nodePort :: Integer - } - deriving (Eq, Show, Generic) - -instance Binary NodeAddr - -type NodeId = Integer - -thisNodeId :: NodeId -thisNodeId = 0 - -data ActorId = ActorId NodeId UUID - deriving (Eq, Ord, Show, Generic) - -instance Binary ActorId - -data ExitReason - = Normal - | Killed - | Exception SomeException - deriving (Show, Generic) - -data DeathMessage = DeathMessage - { dmActorId :: ActorId, - dmReason :: ExitReason - } - deriving (Show, Generic) - -data DeathTarget - = LocalTarget (TQueue DeathMessage) - | RemoteTarget ActorId - -data ActorState u = ActorState - { asId :: ActorId, - asLinks :: TVar [DeathTarget], - asEnv :: u - } - -newtype ActorM u r = ActorM - {unActorM :: ReaderT (ActorState u, Runtime) IO r} - deriving - ( Functor, - Applicative, - Monad, - MonadIO, - MonadReader (ActorState u, Runtime) - ) - -runActorM :: ActorM u r -> Runtime -> ActorState u -> IO r -runActorM m r s = runReaderT (unActorM m) (s, r) - -state :: ActorM u u -state = asks (asEnv . fst) - -getSelf :: ActorM u SomeActorRef -getSelf = do - (as, rt) <- ask - actors <- liftIO $ readTVarIO (rtActors rt) - let actorId = asId as - maybeRef = snd <$> Map.lookup actorId actors - case maybeRef of - Just ref -> return ref - Nothing -> error "getSelf: actor not found in runtime" - -data ActorRef msg reply - = forall u. LocalRef - { arMsgQ :: TQueue (Envelope msg reply), - arDeathQ :: TQueue DeathMessage, - arState :: ActorState u - } - | RemoteRef ActorId - -data SomeActorRef = forall msg reply. SomeActorRef (ActorRef msg reply) - -someActorId :: SomeActorRef -> ActorId -someActorId (SomeActorRef (LocalRef {arState})) = asId arState -someActorId (SomeActorRef (RemoteRef aid)) = aid - -data SupervisorAction u - = Stop - | Continue u - -type CorrelationId = Integer - -data Envelope msg reply - = Cast msg - | Call msg (MVar (Maybe reply)) - -data Runtime = Runtime - { rtNodeId :: NodeAddr, - rtActors :: TVar (Map.Map ActorId (ThreadId, SomeActorRef)), - rtPending :: TVar (Map.Map CorrelationId (MVar ByteString)), - rtNextCorr :: TVar CorrelationId, - rtNodeTable :: TVar (Map.Map NodeId NodeAddr), - rtTransport :: Transport - } - -data Transport = Transport - { sendBytes :: NodeAddr -> ByteString -> IO () - } - -data RemoteEnvelope - = RemoteCast UUID ByteString - | RemoteCall UUID CorrelationId NodeAddr ByteString - | RemoteReply CorrelationId ByteString - deriving (Generic) - -instance Binary RemoteEnvelope - -newtype RuntimeM a = RuntimeM - {unRuntimeM :: ReaderT Runtime IO a} - deriving - ( Functor, - Applicative, - Monad, - MonadIO, - MonadReader Runtime - ) - -liftRuntime :: RuntimeM a -> ActorM u a -liftRuntime = ActorM . withReaderT snd . unRuntimeM - -withRuntime :: Runtime -> RuntimeM a -> IO a -withRuntime = (. unRuntimeM) . flip runReaderT -{-# INLINE withRuntime #-} - -newRuntime :: IO Runtime -newRuntime = atomically $ do - actors <- newTVar Map.empty - pending <- newTVar Map.empty - nextCorr <- newTVar (0 :: Integer) - nodeTable <- newTVar Map.empty - return - Runtime - { rtNodeId = NodeAddr "localhost" 0, - rtActors = actors, - rtPending = pending, - rtNextCorr = nextCorr, - rtNodeTable = nodeTable, - rtTransport = Transport (\_ _ -> return ()) - } - -lookupNode :: NodeId -> RuntimeM (Maybe NodeAddr) -lookupNode nodeId = do - rt <- ask - liftIO $ atomically $ do - table <- readTVar $ rtNodeTable rt - return $ Map.lookup nodeId table - -cast' :: (Binary msg) => msg -> ActorRef msg reply -> RuntimeM () -cast' msg (LocalRef {arMsgQ}) = RuntimeM $ lift $ atomically $ writeTQueue arMsgQ (Cast msg) -cast' msg (RemoteRef (ActorId nodeId uuid)) = do - rt <- ask - maybeAddr <- lookupNode nodeId - let payload = encode (RemoteCast uuid (encode msg)) - case maybeAddr of - Just addr -> liftIO $ sendBytes (rtTransport rt) addr payload - Nothing -> liftIO $ putStrLn $ "cast: no node in lookup table with id " <> show nodeId - -cast :: (Binary msg) => msg -> ActorRef msg reply -> ActorM u () -cast = (liftRuntime .) . cast' - -castIn :: (Binary msg) => Int -> msg -> ActorRef msg reply -> ActorM u () -castIn ms msg ref = do - rt <- asks snd - liftIO $ void $ forkIO $ do - threadDelay (ms * 1000) - withRuntime rt $ cast' msg ref - -call' :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> RuntimeM (Maybe reply) -call' msg (LocalRef {arMsgQ}) = liftIO $ do - mv <- newEmptyMVar - atomically $ writeTQueue arMsgQ (Call msg mv) - takeMVar mv -call' msg (RemoteRef (ActorId nodeId uuid)) = do - rt <- ask - corrId <- liftIO $ atomically $ do - cid <- readTVar (rtNextCorr rt) - writeTVar (rtNextCorr rt) (cid + 1) - return cid - replyVar <- liftIO newEmptyMVar - liftIO $ atomically $ modifyTVar (rtPending rt) (Map.insert corrId replyVar) - let payload = encode (RemoteCall uuid corrId (rtNodeId rt) (encode msg)) - maybeAddr <- lookupNode nodeId - case maybeAddr of - Just addr -> do - liftIO $ sendBytes (rtTransport rt) addr payload - raw <- liftIO $ takeMVar replyVar - liftIO $ atomically $ modifyTVar (rtPending rt) (Map.delete corrId) - return $ decode raw - Nothing -> do - liftIO $ putStrLn $ "call: no node in lookup table with id " <> show nodeId - return Nothing - -call :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> ActorM u (Maybe reply) -call = (liftRuntime .) . call' - -type Actor u r = ActorM u (Maybe r, u) - -notifyOfDeath :: DeathMessage -> DeathTarget -> IO () -notifyOfDeath dm (LocalTarget q) = atomically $ writeTQueue q dm -notifyOfDeath _ (RemoteTarget _) = return () - -spawnActor :: - (m -> Actor u r) -> - (DeathMessage -> ActorM u (SupervisorAction u)) -> - u -> - RuntimeM (ActorRef m r) -spawnActor actorFn deathFn initState = do - rt <- ask - mailbox <- liftIO newTQueueIO - deathQ <- liftIO newTQueueIO - links <- liftIO $ newTVarIO [] - uuid <- liftIO nextRandom - let actorId = ActorId thisNodeId uuid - actorState = ActorState actorId links initState - actorRef = LocalRef mailbox deathQ actorState - - let loop as = do - event <- - atomically $ - (Left <$> readTQueue mailbox) - `orElse` (Right <$> readTQueue deathQ) - case event of - Left envelope -> - case envelope of - Cast msg -> do - (_, u') <- runActorM (actorFn msg) rt as - loop as {asEnv = u'} - Call msg mv -> do - (reply, u') <- runActorM (actorFn msg) rt as - putMVar mv reply - loop as {asEnv = u'} - Right dm -> do - action <- runActorM (deathFn dm) rt as - case action of - Stop -> return () - Continue u -> loop as {asEnv = u} - - tid <- liftIO $ forkIO $ do - result <- try (loop actorState) :: IO (Either SomeException ()) - let reason = case result of - Right () -> Normal - Left exc -> case fromException exc of - Just ThreadKilled -> Killed - _anyOtherExc -> Exception exc - links' <- readTVarIO (asLinks actorState) - let dm = DeathMessage actorId reason - forM_ links' (notifyOfDeath dm) - atomically $ modifyTVar (rtActors rt) (Map.delete actorId) - case result of - Left exc -> throwIO exc - Right () -> return () - - liftIO $ atomically $ modifyTVar (rtActors rt) (Map.insert actorId (tid, SomeActorRef actorRef)) - return actorRef - -linkActorTo :: DeathTarget -> ActorRef m r -> RuntimeM () -linkActorTo target (LocalRef {arState}) = - liftIO $ atomically $ modifyTVar (asLinks arState) (target :) -linkActorTo _ (RemoteRef _) = return () - -linkTo :: DeathTarget -> ActorM u () -linkTo target = do - as <- asks fst - liftIO $ atomically $ modifyTVar (asLinks as) (target :) - -killActor :: ActorRef m r -> ActorM u () -killActor (LocalRef {arDeathQ, arState}) = - liftIO $ atomically $ writeTQueue arDeathQ (DeathMessage (asId arState) Killed) -killActor (RemoteRef _) = - liftIO $ putStrLn "killActor: remote kill not yet implemented" - -stopOnDeath :: DeathMessage -> ActorM u (SupervisorAction u) -stopOnDeath _ = return Stop - --- Supervision - -data ChildSpec = forall m r. ChildSpec - { csRun :: DeathTarget -> RuntimeM (ActorRef m r), - csOnSpawn :: ActorRef m r -> IO () - } - -child :: - (m -> Actor u r) -> - (DeathMessage -> ActorM u (SupervisorAction u)) -> - u -> - ChildSpec -child msgFn deathFn initState = - ChildSpec - { csRun = \target -> do - ref <- spawnActor msgFn deathFn initState - linkActorTo target ref - return ref, - csOnSpawn = \_ -> return () - } - -childWithRef :: - (m -> Actor u r) -> - (DeathMessage -> ActorM u (SupervisorAction u)) -> - u -> - TMVar (ActorRef m r) -> - ChildSpec -childWithRef msgFn deathFn initState cell = - ChildSpec - { csRun = \target -> do - ref <- spawnActor msgFn deathFn initState - linkActorTo target ref - return ref, - csOnSpawn = \ref -> atomically $ do - void $ tryTakeTMVar cell - putTMVar cell ref - } - -data RestartStrategy = OneForOne | OneForAll | RestForOne - -data ChildSlot = forall m r. ChildSlot - { slotSpec :: ChildSpec, - slotRef :: ActorRef m r, - slotId :: ActorId - } - -spawnSlot :: DeathTarget -> ChildSpec -> RuntimeM ChildSlot -spawnSlot target spec@ChildSpec{csRun, csOnSpawn} = do - ref <- csRun target - _ <- liftIO $ csOnSpawn ref - return $ ChildSlot spec ref (someActorId (SomeActorRef ref)) - -supervise' :: RestartStrategy -> [ChildSpec] -> RuntimeM () -supervise' strategy specs = do - rt <- ask - supDeathQ <- liftIO newTQueueIO - let target = LocalTarget supDeathQ - slots <- mapM (spawnSlot target) specs - slotsVar <- liftIO $ newTVarIO slots - _ <- liftIO $ forkIO $ forever $ do - DeathMessage deadId _ <- atomically $ readTQueue supDeathQ - slots' <- readTVarIO slotsVar - case strategy of - OneForOne -> doOneForOne rt target slotsVar slots' deadId - OneForAll -> doOneForAll rt target slotsVar supDeathQ slots' - RestForOne -> doRestForOne rt target slotsVar supDeathQ slots' deadId - return () - -supervise :: RestartStrategy -> [ChildSpec] -> ActorM u () -supervise = (liftRuntime .) . supervise' - -doOneForOne :: - Runtime -> DeathTarget -> TVar [ChildSlot] -> [ChildSlot] -> ActorId -> IO () -doOneForOne rt target slotsVar slots deadId = - case find (\s -> slotId s == deadId) slots of - Nothing -> return () - Just slot -> do - newSlot <- withRuntime rt $ spawnSlot target (slotSpec slot) - atomically $ - modifyTVar slotsVar $ - map (\s -> if slotId s == deadId then newSlot else s) - -doOneForAll :: - Runtime -> - DeathTarget -> - TVar [ChildSlot] -> - TQueue DeathMessage -> - [ChildSlot] -> - IO () -doOneForAll rt target slotsVar supDeathQ slots = do - mapM_ (killSlot supDeathQ) slots - atomically $ void $ flushTQueue supDeathQ - newSlots <- withRuntime rt $ mapM (spawnSlot target . slotSpec) slots - atomically $ writeTVar slotsVar newSlots - -doRestForOne :: - Runtime -> - DeathTarget -> - TVar [ChildSlot] -> - TQueue DeathMessage -> - [ChildSlot] -> - ActorId -> - IO () -doRestForOne rt target slotsVar supDeathQ slots deadId = do - let (before, fromDead) = break (\s -> slotId s == deadId) slots - case fromDead of - [] -> return () - _nonempty -> do - mapM_ (killSlot supDeathQ) (drop 1 fromDead) - atomically $ void $ flushTQueue supDeathQ - newSlots <- withRuntime rt $ mapM (spawnSlot target . slotSpec) fromDead - atomically $ writeTVar slotsVar (before ++ newSlots) - -killSlot :: TQueue DeathMessage -> ChildSlot -> IO () -killSlot _ (ChildSlot {slotRef, slotId}) = case slotRef of - LocalRef {arDeathQ} -> atomically $ writeTQueue arDeathQ (DeathMessage slotId Killed) - RemoteRef _ -> return () - ------ Demo +-- Demo pingActor :: String -> Actor () String pingActor msg = return (Just ("Hello, " <> msg <> "!"), ()) @@ -441,15 +33,15 @@ pingActor msg = return (Just ("Hello, " <> msg <> "!"), ()) forwardActorWithCell :: TMVar (ActorRef String String) -> String -> Actor () String forwardActorWithCell cell msg = do pingRef <- liftIO $ atomically $ readTMVar cell - reply <- call msg pingRef + reply <- call msg pingRef case reply of Nothing -> liftIO $ putStrLn "forwardActorWithCell: received empty reply!" - Just x -> liftIO $ putStrLn ("forwardActorWithCell: received reply - " <> x) + Just x -> liftIO $ putStrLn ("forwardActorWithCell: received reply - " <> x) return (reply, ()) repeatActor :: String -> TMVar (ActorRef String String) -> () -> Actor () String repeatActor r cell () = do - ref <- liftIO $ atomically $ readTMVar cell + ref <- liftIO $ atomically $ readTMVar cell (SomeActorRef self) <- getSelf cast r ref castIn 1000 () (unsafeCoerce self) @@ -457,20 +49,20 @@ repeatActor r cell () = do system :: IO () system = do - rt <- newRuntime + rt <- initRuntime (NodeAddr "localhost" 9000) withRuntime rt $ do - pingCell <- liftIO newEmptyTMVarIO + pingCell <- liftIO newEmptyTMVarIO forwardCell <- liftIO newEmptyTMVarIO - repeatCell <- liftIO newEmptyTMVarIO + repeatCell <- liftIO newEmptyTMVarIO _ <- liftIO $ forkIO $ do repeatRef <- atomically $ readTMVar repeatCell - withRuntime rt $ do + withRuntime rt $ cast' () repeatRef supervise' OneForOne - [ childWithRef pingActor stopOnDeath () pingCell, - childWithRef (forwardActorWithCell pingCell) stopOnDeath () forwardCell, - childWithRef (repeatActor "repeaaat" forwardCell) stopOnDeath () repeatCell + [ childWithRef pingActor stopOnDeath () pingCell + , childWithRef (forwardActorWithCell pingCell) stopOnDeath () forwardCell + , childWithRef (repeatActor "repeaaat" forwardCell) stopOnDeath () repeatCell ] diff --git a/src/Control/Actor/Core.hs b/src/Control/Actor/Core.hs new file mode 100644 index 0000000..6273396 --- /dev/null +++ b/src/Control/Actor/Core.hs @@ -0,0 +1,211 @@ +{-# LANGUAGE StrictData #-} + +module Control.Actor.Core + ( ActorM (..) + , runActorM + , state + , getSelf + , Actor + , liftRuntime + , notifyOfDeath + , spawnActor + , linkActorTo + , linkTo + , killActor + , stopOnDeath + , cast' + , call' + , cast + , call + , castIn + ) where + +import Control.Actor.Runtime +import Control.Actor.Types +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) +import Control.Concurrent.STM + ( atomically + , modifyTVar + , newTQueueIO + , newTVarIO + , orElse + , readTQueue + , readTVar + , readTVarIO + , writeTQueue + , writeTVar + ) +import Control.Exception + ( AsyncException (..) + , SomeException + , fromException + , throwIO + , try + ) +import Control.Monad (forM_, void) +import Control.Monad.Reader + ( MonadIO (..) + , MonadReader (..) + , ReaderT (..) + , asks + , withReaderT + ) +import Data.Binary (Binary, decode, encode) +import Data.Map qualified as Map +import Data.UUID.V4 (nextRandom) + +newtype ActorM u r = ActorM + { unActorM :: ReaderT (ActorState u, Runtime) IO r } + deriving + ( Functor, Applicative, Monad, MonadIO + , MonadReader (ActorState u, Runtime) + ) + +runActorM :: ActorM u r -> Runtime -> ActorState u -> IO r +runActorM m rt s = runReaderT (unActorM m) (s, rt) + +state :: ActorM u u +state = asks (asEnv . fst) + +getSelf :: ActorM u SomeActorRef +getSelf = do + (as, rt) <- ask + actors <- liftIO $ readTVarIO (rtActors rt) + let actorId = asId as + maybeRef = snd <$> Map.lookup actorId actors + case maybeRef of + Just ref -> return ref + Nothing -> error "getSelf: actor not found in runtime" + +type Actor u r = ActorM u (Maybe r, u) + +liftRuntime :: RuntimeM a -> ActorM u a +liftRuntime = ActorM . withReaderT snd . unRuntimeM + +notifyOfDeath :: Runtime -> DeathMessage -> DeathTarget -> IO () +notifyOfDeath _ dm (LocalTarget q) = atomically $ writeTQueue q dm +notifyOfDeath rt dm (RemoteTarget _ peerAddr) = + withRuntime rt $ rtSendRemote rt peerAddr (NMDeath (dmActorId dm) (toRemoteExitReason (dmReason dm))) + +spawnActor :: + (Binary m, Binary r) => + (m -> Actor u r) -> + (DeathMessage -> ActorM u (SupervisorAction u)) -> + u -> + RuntimeM (ActorRef m r) +spawnActor actorFn deathFn initState = do + rt <- ask + mailbox <- liftIO newTQueueIO + deathQ <- liftIO newTQueueIO + links <- liftIO $ newTVarIO [] + uuid <- liftIO nextRandom + let actorId = ActorId thisNodeId uuid + actorState = ActorState actorId links initState + actorRef = LocalRef mailbox deathQ actorState + + let loop as = do + event <- + atomically $ + (Left <$> readTQueue mailbox) + `orElse` (Right <$> readTQueue deathQ) + case event of + Left envelope -> + case envelope of + Cast msg -> do + (_, u') <- runActorM (actorFn msg) rt as + loop as {asEnv = u'} + Call msg mv -> do + (reply, u') <- runActorM (actorFn msg) rt as + putMVar mv reply + loop as {asEnv = u'} + Right dm -> do + action <- runActorM (deathFn dm) rt as + case action of + Stop -> return () + Continue u -> loop as {asEnv = u} + + tid <- liftIO $ forkIO $ do + result <- try @SomeException (loop actorState) + let reason = case result of + Right () -> Normal + Left exc -> case fromException exc of + Just ThreadKilled -> Killed + _anyOtherExc -> Exception exc + links' <- readTVarIO (asLinks actorState) + forM_ links' (notifyOfDeath rt (DeathMessage actorId reason)) + atomically $ modifyTVar (rtActors rt) (Map.delete actorId) + case result of + Left exc -> throwIO exc + Right () -> return () + + liftIO $ atomically $ + modifyTVar (rtActors rt) (Map.insert actorId (tid, SomeActorRef actorRef)) + return actorRef + +linkActorTo :: DeathTarget -> ActorRef m r -> RuntimeM () +linkActorTo target (LocalRef {arState}) = + liftIO $ atomically $ modifyTVar (asLinks arState) (target :) +linkActorTo _ (RemoteRef _) = return () + +linkTo :: DeathTarget -> ActorM u () +linkTo target = do + as <- asks fst + liftIO $ atomically $ modifyTVar (asLinks as) (target :) + +killActor :: ActorRef m r -> ActorM u () +killActor (LocalRef {arDeathQ, arState}) = + liftIO $ atomically $ writeTQueue arDeathQ (DeathMessage (asId arState) Killed) +killActor (RemoteRef _) = + liftIO $ putStrLn "killActor: remote kill not yet implemented" + +stopOnDeath :: DeathMessage -> ActorM u (SupervisorAction u) +stopOnDeath _ = return Stop + +cast' :: (Binary msg) => msg -> ActorRef msg reply -> RuntimeM () +cast' msg (LocalRef {arMsgQ}) = + liftIO $ atomically $ writeTQueue arMsgQ (Cast msg) +cast' msg (RemoteRef (ActorId nodeId uuid)) = do + rt <- ask + maybeAddr <- lookupNode nodeId + case maybeAddr of + Nothing -> liftIO $ putStrLn $ "cast: no node in lookup table with id " <> show nodeId + Just addr -> rtSendRemote rt addr (NMCast uuid (encode msg)) + +call' :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> RuntimeM (Maybe reply) +call' msg (LocalRef {arMsgQ}) = liftIO $ do + mv <- newEmptyMVar + atomically $ writeTQueue arMsgQ (Call msg mv) + takeMVar mv +call' msg (RemoteRef (ActorId nodeId uuid)) = do + rt <- ask + corrId <- liftIO $ atomically $ do + cid <- readTVar (rtNextCorr rt) + writeTVar (rtNextCorr rt) (cid + 1) + return cid + replyVar <- liftIO newEmptyMVar + liftIO $ atomically $ modifyTVar (rtPending rt) (Map.insert corrId replyVar) + maybeAddr <- lookupNode nodeId + case maybeAddr of + Nothing -> liftIO $ do + putStrLn $ "call: no node in lookup table with id " <> show nodeId + atomically $ modifyTVar (rtPending rt) (Map.delete corrId) + return Nothing + Just addr -> do + rtSendRemote rt addr (NMCall uuid corrId (rtNodeId rt) (encode msg)) + raw <- liftIO $ takeMVar replyVar + liftIO $ atomically $ modifyTVar (rtPending rt) (Map.delete corrId) + return $ Just (decode raw) + +cast :: (Binary msg) => msg -> ActorRef msg reply -> ActorM u () +cast = (liftRuntime .) . cast' + +castIn :: (Binary msg) => Int -> msg -> ActorRef msg reply -> ActorM u () +castIn ms msg ref = do + rt <- asks snd + liftIO $ void $ forkIO $ do + threadDelay (ms * 1000) + withRuntime rt $ cast' msg ref + +call :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> ActorM u (Maybe reply) +call = (liftRuntime .) . call' diff --git a/src/Control/Actor/Network.hs b/src/Control/Actor/Network.hs new file mode 100644 index 0000000..0b3cb21 --- /dev/null +++ b/src/Control/Actor/Network.hs @@ -0,0 +1,254 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE StrictData #-} + +module Control.Actor.Network + ( spawnConnTree + , handleNewConn + , getOrCreateConn + , findByUUID + , routeRemoteDeath + , validateAndDispatch + , sysHandlerFn + , initRuntime + ) where + +import Control.Actor.Core + ( Actor, ActorM, cast', liftRuntime, linkActorTo, spawnActor, state, stopOnDeath ) +import Control.Actor.Runtime (Runtime (..), RuntimeM, newRuntime, withRuntime) +import Control.Actor.Supervision (ChildSpec (..), RestartStrategy (..), childWithRef, supervise') +import Control.Actor.Transport (ConnHandle (..), Transport (..), createTCPTransport) +import Control.Actor.Types +import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) +import Control.Concurrent.STM + ( atomically + , modifyTVar + , newEmptyTMVar + , newEmptyTMVarIO + , putTMVar + , readTMVar + , readTVar + , readTVarIO + , tryTakeTMVar + , writeTQueue + ) +import Control.Exception (SomeException, try) +import Control.Monad (forM_, forever, unless, void) +import Control.Monad.Reader (MonadIO (..), MonadReader (..)) +import Data.Binary (decode, decodeOrFail, encode) +import Data.ByteString.Lazy (ByteString) +import Data.Map qualified as Map +import Data.UUID (UUID) + +-- Connection actors + +connActorFn :: NetworkMessage -> Actor ConnHandle () +connActorFn nm = do + ch <- state + liftIO $ chSend ch (encode nm) + return (Nothing, ch) + +connDeathFn :: NodeAddr -> DeathMessage -> ActorM ConnHandle (SupervisorAction ConnHandle) +connDeathFn peer _ = do + ch <- state + rt <- liftRuntime ask + liftIO $ do + atomically $ modifyTVar (rtConnections rt) (Map.delete peer) + chClose ch + return Stop + +routerActorFn :: ByteString -> Actor () () +routerActorFn raw = do + let nm = decode raw :: NetworkMessage + valid <- liftRuntime $ validateAndDispatch nm + unless valid $ liftIO $ putStrLn "router: dropping invalid message" + return (Nothing, ()) + +-- Connection supervision tree + +spawnConnTree :: NodeAddr -> ConnHandle -> RuntimeM (ActorRef NetworkMessage ()) +spawnConnTree peer ch = do + rt <- ask + routerCell <- liftIO newEmptyTMVarIO + connCell <- liftIO newEmptyTMVarIO + supervise' OneForAll + [ childWithRef routerActorFn stopOnDeath () routerCell + , ChildSpec + { csRun = \target -> do + ref <- spawnActor connActorFn (connDeathFn peer) ch + linkActorTo target ref + return ref + , csOnSpawn = \ref -> case ref of + LocalRef { arDeathQ, arState } -> do + atomically $ do + void $ tryTakeTMVar connCell + putTMVar connCell ref + routerRef <- atomically $ readTMVar routerCell + void $ forkIO $ do + result <- try @SomeException $ forever $ + chRecv ch >>= \raw -> withRuntime rt $ cast' raw routerRef + case result of + Left _ -> atomically $ + writeTQueue arDeathQ (DeathMessage (asId arState) Killed) + Right () -> return () + RemoteRef _ -> return () + } + ] + liftIO $ atomically $ readTMVar connCell + +-- Incoming connection handler + +handleNewConn :: ConnHandle -> RuntimeM () +handleNewConn ch = do + rt <- ask + raw <- liftIO $ chRecv ch + case decode raw :: NetworkMessage of + NMHandshake peerAddr -> do + ref <- spawnConnTree peerAddr ch + liftIO $ atomically $ do + modifyTVar (rtConnections rt) (Map.insert peerAddr ref) + promises <- readTVar (rtConnPromises rt) + case Map.lookup peerAddr promises of + Nothing -> return () + Just p -> do + modifyTVar (rtConnPromises rt) (Map.delete peerAddr) + putTMVar p ref + _else -> liftIO $ chClose ch + +-- Connection pool + +getOrCreateConn :: NodeAddr -> RuntimeM (ActorRef NetworkMessage ()) +getOrCreateConn peer = do + rt <- ask + action <- liftIO $ atomically $ do + conns <- readTVar (rtConnections rt) + case Map.lookup peer conns of + Just ref -> return (Left ref) + Nothing -> do + promises <- readTVar (rtConnPromises rt) + case Map.lookup peer promises of + Just p -> return (Right (Left p)) + Nothing -> do + p <- newEmptyTMVar + modifyTVar (rtConnPromises rt) (Map.insert peer p) + return (Right (Right p)) + case action of + Left ref -> return ref + Right (Left promise) -> liftIO $ atomically $ readTMVar promise + Right (Right promise) -> do + ch <- liftIO $ tConnect (rtTransport rt) peer + liftIO $ chSend ch (encode (NMHandshake (rtNodeId rt))) + ref <- spawnConnTree peer ch + liftIO $ atomically $ do + modifyTVar (rtConnections rt) (Map.insert peer ref) + modifyTVar (rtConnPromises rt) (Map.delete peer) + putTMVar promise ref + return ref + +-- Message dispatch + +findByUUID :: UUID -> Map.Map ActorId (ThreadId, SomeActorRef) -> Maybe SomeActorRef +findByUUID uuid actors = + case Map.toList (Map.filterWithKey (\(ActorId _ u) _ -> u == uuid) actors) of + [] -> Nothing + (_, (_, r)):_ -> Just r + +routeRemoteDeath :: ActorId -> RemoteExitReason -> RuntimeM () +routeRemoteDeath deadId reason = do + rt <- ask + liftIO $ do + actors <- readTVarIO (rtActors rt) + let exitReason = case reason of + RNormal -> Normal + RKilled -> Killed + RException s -> Exception (error s) + dm = DeathMessage deadId exitReason + forM_ (Map.elems actors) $ \(_, SomeActorRef ref) -> + case ref of + LocalRef {arDeathQ, arState} -> do + links <- readTVarIO (asLinks arState) + forM_ links $ \case + RemoteTarget rid _ | rid == deadId -> + atomically $ writeTQueue arDeathQ dm + _else -> return () + RemoteRef _ -> return () + +validateAndDispatch :: NetworkMessage -> RuntimeM Bool +validateAndDispatch nm = case nm of + + NMHandshake _ -> return False + + NMReply corrId payload -> do + rt <- ask + liftIO $ do + pending <- readTVarIO (rtPending rt) + case Map.lookup corrId pending of + Nothing -> return False + Just mv -> putMVar mv payload >> return True + + NMCast uuid payload -> do + rt <- ask + liftIO $ do + actors <- readTVarIO (rtActors rt) + case findByUUID uuid actors of + Nothing -> + return False + Just (SomeActorRef (LocalRef {arMsgQ})) -> + case decodeOrFail payload of + Left _ -> return False + Right (_, _, msg) -> do + atomically $ writeTQueue arMsgQ (Cast msg) + return True + Just (SomeActorRef (RemoteRef _)) -> + return False + + NMCall uuid corrId returnAddr payload -> do + rt <- ask + liftIO $ do + actors <- readTVarIO (rtActors rt) + case findByUUID uuid actors of + Nothing -> + return False + Just (SomeActorRef (LocalRef {arMsgQ})) -> + case decodeOrFail payload of + Left _ -> return False + Right (_, _, msg) -> do + mv <- newEmptyMVar + atomically $ writeTQueue arMsgQ (Call msg mv) + void $ forkIO $ do + reply <- takeMVar mv + case reply of + Nothing -> return () + Just rv -> withRuntime rt $ do + connRef <- getOrCreateConn returnAddr + cast' (NMReply corrId (encode rv)) connRef + return True + Just (SomeActorRef (RemoteRef _)) -> + return False + + NMDeath deadId reason -> do + routeRemoteDeath deadId reason + return True + +-- System event handler + +sysHandlerFn :: NetworkMessage -> Actor () () +sysHandlerFn (NMDeath deadId reason) = do + liftRuntime $ routeRemoteDeath deadId reason + return (Nothing, ()) +sysHandlerFn _ = return (Nothing, ()) + +-- Runtime initialization + +initRuntime :: NodeAddr -> IO Runtime +initRuntime myAddr = do + transport <- createTCPTransport myAddr + rt0 <- newRuntime myAddr transport + let rt = rt0 { rtSendRemote = \addr nm -> getOrCreateConn addr >>= cast' nm } + withRuntime rt $ void $ spawnActor sysHandlerFn stopOnDeath () + tListen transport $ \ch -> do + result <- try @SomeException $ withRuntime rt $ handleNewConn ch + case result of + Left _ -> chClose ch + Right () -> return () + return rt diff --git a/src/Control/Actor/Runtime.hs b/src/Control/Actor/Runtime.hs new file mode 100644 index 0000000..ee7844a --- /dev/null +++ b/src/Control/Actor/Runtime.hs @@ -0,0 +1,67 @@ +{-# LANGUAGE StrictData #-} + +module Control.Actor.Runtime + ( Runtime (..) + , RuntimeM (..) + , newRuntime + , withRuntime + , lookupNode + ) where + +import Control.Actor.Transport (Transport) +import Control.Actor.Types +import Control.Concurrent (ThreadId) +import Control.Concurrent.MVar (MVar) +import Control.Concurrent.STM + ( TMVar, TVar, atomically, newTVarIO, readTVar ) +import Control.Monad.Reader + ( MonadIO (..), MonadReader (..), ReaderT (..), runReaderT ) +import Data.ByteString.Lazy (ByteString) +import Data.Map qualified as Map + +data Runtime = Runtime + { rtNodeId :: NodeAddr + , rtActors :: TVar (Map.Map ActorId (ThreadId, SomeActorRef)) + , rtPending :: TVar (Map.Map CorrelationId (MVar ByteString)) + , rtNextCorr :: TVar CorrelationId + , rtNodeTable :: TVar (Map.Map NodeId NodeAddr) + , rtTransport :: Transport + , rtConnections :: TVar (Map.Map NodeAddr (ActorRef NetworkMessage ())) + , rtConnPromises :: TVar (Map.Map NodeAddr (TMVar (ActorRef NetworkMessage ()))) + , rtSendRemote :: NodeAddr -> NetworkMessage -> RuntimeM () + } + +newtype RuntimeM a = RuntimeM + { unRuntimeM :: ReaderT Runtime IO a } + deriving (Functor, Applicative, Monad, MonadIO, MonadReader Runtime) + +newRuntime :: NodeAddr -> Transport -> IO Runtime +newRuntime myAddr transport = do + actors <- newTVarIO Map.empty + pending <- newTVarIO Map.empty + nextCorr <- newTVarIO (0 :: Integer) + nodeTable <- newTVarIO Map.empty + conns <- newTVarIO Map.empty + promises <- newTVarIO Map.empty + return Runtime + { rtNodeId = myAddr + , rtActors = actors + , rtPending = pending + , rtNextCorr = nextCorr + , rtNodeTable = nodeTable + , rtTransport = transport + , rtConnections = conns + , rtConnPromises = promises + , rtSendRemote = \_ _ -> return () + } + +withRuntime :: Runtime -> RuntimeM a -> IO a +withRuntime rt m = runReaderT (unRuntimeM m) rt +{-# INLINE withRuntime #-} + +lookupNode :: NodeId -> RuntimeM (Maybe NodeAddr) +lookupNode nodeId = do + rt <- ask + liftIO $ atomically $ do + table <- readTVar (rtNodeTable rt) + return $ Map.lookup nodeId table diff --git a/src/Control/Actor/Supervision.hs b/src/Control/Actor/Supervision.hs new file mode 100644 index 0000000..df58a30 --- /dev/null +++ b/src/Control/Actor/Supervision.hs @@ -0,0 +1,164 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE StrictData #-} + +module Control.Actor.Supervision + ( ChildSpec (..) + , child + , childWithRef + , RestartStrategy (..) + , ChildSlot (..) + , spawnSlot + , supervise' + , supervise + , doOneForOne + , doOneForAll + , doRestForOne + , killSlot + ) where + +import Control.Actor.Core (Actor, ActorM, liftRuntime, linkActorTo, spawnActor) +import Control.Actor.Runtime (Runtime (..), RuntimeM, withRuntime) +import Control.Actor.Types +import Data.Binary (Binary) +import Control.Concurrent (ThreadId, forkIO, killThread) +import Control.Concurrent.STM + ( TMVar + , TQueue + , TVar + , atomically + , flushTQueue + , modifyTVar + , newTQueueIO + , newTVarIO + , putTMVar + , readTQueue + , readTVarIO + , tryTakeTMVar + , writeTVar + ) +import Control.Monad (forever, void) +import Control.Monad.Reader (MonadIO (..), MonadReader (..)) +import Data.Map qualified as Map + +data ChildSpec = forall m r. (Binary m, Binary r) => ChildSpec + { csRun :: DeathTarget -> RuntimeM (ActorRef m r) + , csOnSpawn :: ActorRef m r -> IO () + } + +child :: + (Binary m, Binary r) => + (m -> Actor u r) -> + (DeathMessage -> ActorM u (SupervisorAction u)) -> + u -> + ChildSpec +child msgFn deathFn initState = + ChildSpec + { csRun = \target -> do + ref <- spawnActor msgFn deathFn initState + linkActorTo target ref + return ref + , csOnSpawn = \_ -> return () + } + +childWithRef :: + (Binary m, Binary r) => + (m -> Actor u r) -> + (DeathMessage -> ActorM u (SupervisorAction u)) -> + u -> + TMVar (ActorRef m r) -> + ChildSpec +childWithRef msgFn deathFn initState cell = + ChildSpec + { csRun = \target -> do + ref <- spawnActor msgFn deathFn initState + linkActorTo target ref + return ref + , csOnSpawn = \ref -> atomically $ do + void $ tryTakeTMVar cell + putTMVar cell ref + } + +data RestartStrategy = OneForOne | OneForAll | RestForOne + +data ChildSlot = forall m r. (Binary m, Binary r) => ChildSlot + { slotSpec :: ChildSpec + , slotRef :: ActorRef m r + , slotId :: ActorId + , slotTid :: ThreadId + } + +spawnSlot :: DeathTarget -> ChildSpec -> RuntimeM ChildSlot +spawnSlot target spec@ChildSpec {csRun, csOnSpawn} = do + rt <- ask + ref <- csRun target + _ <- liftIO $ csOnSpawn ref + let aid = someActorId (SomeActorRef ref) + tid <- liftIO $ do + actors <- readTVarIO (rtActors rt) + case Map.lookup aid actors of + Just (t, _) -> return t + Nothing -> error "spawnSlot: actor vanished immediately after spawn" + return $ ChildSlot spec ref aid tid + +killSlot :: ChildSlot -> IO () +killSlot ChildSlot {slotTid} = killThread slotTid + +supervise' :: RestartStrategy -> [ChildSpec] -> RuntimeM () +supervise' strategy specs = do + rt <- ask + supDeathQ <- liftIO newTQueueIO + let target = LocalTarget supDeathQ + slots <- mapM (spawnSlot target) specs + slotsVar <- liftIO $ newTVarIO slots + liftIO $ void $ forkIO $ forever $ do + DeathMessage deadId _ <- atomically $ readTQueue supDeathQ + slots' <- readTVarIO slotsVar + withRuntime rt $ case strategy of + OneForOne -> doOneForOne target slotsVar slots' deadId + OneForAll -> doOneForAll target slotsVar supDeathQ slots' + RestForOne -> doRestForOne target slotsVar supDeathQ slots' deadId + +supervise :: RestartStrategy -> [ChildSpec] -> ActorM u () +supervise = (liftRuntime .) . supervise' + +doOneForOne :: + DeathTarget -> TVar [ChildSlot] -> [ChildSlot] -> ActorId -> RuntimeM () +doOneForOne target slotsVar slots deadId = + case filter (\s -> slotId s == deadId) slots of + [] -> return () + slot:_ -> do + newSlot <- spawnSlot target (slotSpec slot) + liftIO $ atomically $ + modifyTVar slotsVar $ + map (\s -> if slotId s == deadId then newSlot else s) + +doOneForAll :: + DeathTarget -> + TVar [ChildSlot] -> + TQueue DeathMessage -> + [ChildSlot] -> + RuntimeM () +doOneForAll target slotsVar supDeathQ slots = do + liftIO $ do + mapM_ killSlot slots + atomically $ void $ flushTQueue supDeathQ + newSlots <- mapM (spawnSlot target . slotSpec) slots + liftIO $ atomically $ writeTVar slotsVar newSlots + +doRestForOne :: + DeathTarget -> + TVar [ChildSlot] -> + TQueue DeathMessage -> + [ChildSlot] -> + ActorId -> + RuntimeM () +doRestForOne target slotsVar supDeathQ slots deadId = do + let (before, fromDead) = break (\s -> slotId s == deadId) slots + case fromDead of + [] -> return () + _nonempty -> do + liftIO $ do + mapM_ killSlot (drop 1 fromDead) + atomically $ void $ flushTQueue supDeathQ + newSlots <- mapM (spawnSlot target . slotSpec) fromDead + liftIO $ atomically $ writeTVar slotsVar (before ++ newSlots) diff --git a/src/Control/Actor/Transport.hs b/src/Control/Actor/Transport.hs new file mode 100644 index 0000000..be98d21 --- /dev/null +++ b/src/Control/Actor/Transport.hs @@ -0,0 +1,106 @@ +{-# LANGUAGE StrictData #-} + +module Control.Actor.Transport + ( ConnHandle (..) + , Transport (..) + , createTCPTransport + ) where + +import Control.Actor.Types (NodeAddr (..)) +import Control.Concurrent (forkIO) +import Control.Monad (forever, void, when) +import Data.Binary (decode, encode) +import qualified Data.ByteString.Lazy as BL +import Data.ByteString.Lazy (ByteString) +import Data.Word (Word64) +import Network.Socket + ( AddrInfo (addrAddress, addrFamily, addrFlags, addrProtocol, addrSocketType) + , AddrInfoFlag (AI_PASSIVE) + , Socket + , SocketOption (NoDelay, ReuseAddr) + , SocketType (Stream) + , accept + , bind + , close + , connect + , defaultHints + , getAddrInfo + , listen + , setSocketOption + , socket + ) +import Network.Socket.ByteString.Lazy (recv, sendAll) + +data ConnHandle = ConnHandle + { chSend :: ByteString -> IO () + , chRecv :: IO ByteString + , chClose :: IO () + } + +data Transport = Transport + { tConnect :: NodeAddr -> IO ConnHandle + , tListen :: (ConnHandle -> IO ()) -> IO () + } + +sendFramed :: Socket -> ByteString -> IO () +sendFramed sock payload = + sendAll sock (encode (fromIntegral (BL.length payload) :: Word64) <> payload) + +recvExact :: Socket -> Int -> IO ByteString +recvExact sock = go [] + where + go acc 0 = return (BL.concat (reverse acc)) + go acc r = do + chunk <- recv sock (fromIntegral (min r 65536)) + when (BL.null chunk) $ fail "recvExact: connection closed" + go (chunk : acc) (r - fromIntegral (BL.length chunk)) + +recvFramed :: Socket -> IO ByteString +recvFramed sock = do + header <- recvExact sock 8 + recvExact sock (fromIntegral (decode header :: Word64)) + +connectTcp :: NodeAddr -> IO Socket +connectTcp (NodeAddr host port) = do + let hints = defaultHints {addrSocketType = Stream} + addrs <- getAddrInfo (Just hints) (Just host) (Just (show port)) + case addrs of + [] -> fail $ "connectTcp: no address for " <> host <> ":" <> show port + (a:_) -> do + sock <- socket (addrFamily a) Stream (addrProtocol a) + setSocketOption sock NoDelay 1 + connect sock (addrAddress a) + return sock + +listenTcp :: NodeAddr -> IO Socket +listenTcp (NodeAddr host port) = do + let hints = defaultHints {addrFlags = [AI_PASSIVE], addrSocketType = Stream} + addrs <- getAddrInfo (Just hints) (Just host) (Just (show port)) + case addrs of + [] -> fail "listenTcp: no address" + (a:_) -> do + sock <- socket (addrFamily a) Stream (addrProtocol a) + setSocketOption sock ReuseAddr 1 + bind sock (addrAddress a) + listen sock 128 + return sock + +createTCPTransport :: NodeAddr -> IO Transport +createTCPTransport myAddr = do + lsock <- listenTcp myAddr + return Transport + { tConnect = \peer -> do + sock <- connectTcp peer + return ConnHandle + { chSend = sendFramed sock + , chRecv = recvFramed sock + , chClose = close sock + } + , tListen = \callback -> void $ forkIO $ forever $ do + (csock, _) <- accept lsock + void $ forkIO $ callback ConnHandle + { chSend = sendFramed csock + , chRecv = recvFramed csock + , chClose = close csock + } + } diff --git a/src/Control/Actor/Types.hs b/src/Control/Actor/Types.hs new file mode 100644 index 0000000..16927b1 --- /dev/null +++ b/src/Control/Actor/Types.hs @@ -0,0 +1,115 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE StrictData #-} + +module Control.Actor.Types + ( NodeAddr (..) + , NodeId + , thisNodeId + , ActorId (..) + , ExitReason (..) + , DeathMessage (..) + , DeathTarget (..) + , RemoteExitReason (..) + , toRemoteExitReason + , NetworkMessage (..) + , ActorState (..) + , ActorRef (..) + , SomeActorRef (..) + , someActorId + , SupervisorAction (..) + , CorrelationId + , Envelope (..) + ) where + +import Control.Concurrent.MVar (MVar) +import Control.Concurrent.STM (TQueue, TVar) +import Control.Exception (SomeException) +import Data.Binary (Binary) +import Data.ByteString.Lazy (ByteString) +import Data.UUID (UUID) +import GHC.Generics (Generic) + +data NodeAddr = NodeAddr + { nodeHost :: String + , nodePort :: Integer + } deriving (Eq, Ord, Show, Generic) + +instance Binary NodeAddr + +type NodeId = Integer + +thisNodeId :: NodeId +thisNodeId = 0 + +data ActorId = ActorId NodeId UUID + deriving (Eq, Ord, Show, Generic) + +instance Binary ActorId + +data ExitReason + = Normal + | Killed + | Exception SomeException + deriving Show + +data DeathMessage = DeathMessage + { dmActorId :: ActorId + , dmReason :: ExitReason + } deriving Show + +data DeathTarget + = LocalTarget (TQueue DeathMessage) + | RemoteTarget ActorId NodeAddr + +data RemoteExitReason + = RNormal + | RKilled + | RException String + deriving (Show, Generic) + +instance Binary RemoteExitReason + +toRemoteExitReason :: ExitReason -> RemoteExitReason +toRemoteExitReason Normal = RNormal +toRemoteExitReason Killed = RKilled +toRemoteExitReason (Exception e) = RException (show e) + +data NetworkMessage + = NMHandshake NodeAddr + | NMCast UUID ByteString + | NMCall UUID CorrelationId NodeAddr ByteString + | NMReply CorrelationId ByteString + | NMDeath ActorId RemoteExitReason + deriving (Generic) + +instance Binary NetworkMessage + +data ActorState u = ActorState + { asId :: ActorId + , asLinks :: TVar [DeathTarget] + , asEnv :: u + } + +type CorrelationId = Integer + +data Envelope msg reply + = Cast msg + | Call msg (MVar (Maybe reply)) + +data ActorRef msg reply + = forall u. LocalRef + { arMsgQ :: TQueue (Envelope msg reply) + , arDeathQ :: TQueue DeathMessage + , arState :: ActorState u + } + | RemoteRef ActorId + +data SomeActorRef = forall msg reply. (Binary msg, Binary reply) => SomeActorRef (ActorRef msg reply) + +someActorId :: SomeActorRef -> ActorId +someActorId (SomeActorRef (LocalRef {arState})) = asId arState +someActorId (SomeActorRef (RemoteRef aid)) = aid + +data SupervisorAction u + = Stop + | Continue u