feat, refactor: implement networking, wire protocol

This commit is contained in:
2026-06-09 13:59:51 +02:00
parent bdda9763c6
commit 05ea32f006
8 changed files with 959 additions and 443 deletions
+9 -2
View File
@@ -62,7 +62,14 @@ library
import: warnings import: warnings
-- Modules exported by the library. -- Modules exported by the library.
exposed-modules: Control.Actor exposed-modules:
Control.Actor
Control.Actor.Types
Control.Actor.Transport
Control.Actor.Runtime
Control.Actor.Core
Control.Actor.Supervision
Control.Actor.Network
-- Modules included in this library but not exported. -- Modules included in this library but not exported.
-- other-modules: -- other-modules:
@@ -71,7 +78,7 @@ library
-- other-extensions: -- other-extensions:
-- Other library packages from which modules are imported. -- Other library packages from which modules are imported.
build-depends: base ^>=4.18.3.0, stm, uuid, bytestring ^>=0.12.0.0, containers ^>=0.8, mtl ^>=2.3.0, binary ^>=0.8.9 build-depends: base ^>=4.18.3.0, stm, uuid, bytestring ^>=0.12.0.0, containers ^>=0.8, mtl ^>=2.3.0, binary ^>=0.8.9, network ^>=3.2
-- Directories containing source files. -- Directories containing source files.
hs-source-dirs: src hs-source-dirs: src
+28 -436
View File
@@ -1,439 +1,31 @@
{-# LANGUAGE StrictData #-} module Control.Actor
( module Control.Actor.Types
, module Control.Actor.Transport
, module Control.Actor.Runtime
, module Control.Actor.Core
, module Control.Actor.Supervision
, module Control.Actor.Network
-- Demo
, pingActor
, forwardActorWithCell
, repeatActor
, system
) where
module Control.Actor where import Control.Actor.Core
import Control.Actor.Network
import Control.Actor.Runtime
import Control.Actor.Supervision
import Control.Actor.Transport
import Control.Actor.Types
import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM import Control.Concurrent.STM
( TMVar, ( TMVar, atomically, newEmptyTMVarIO, readTMVar )
TQueue, import Control.Monad.Reader (MonadIO (..))
TVar,
atomically,
flushTQueue,
modifyTVar,
newEmptyTMVarIO,
newTQueueIO,
newTVar,
newTVarIO,
orElse,
putTMVar,
readTMVar,
readTQueue,
readTVar,
readTVarIO,
tryTakeTMVar,
writeTQueue,
writeTVar,
)
import Control.Exception (AsyncException (..), SomeException, fromException, throwIO, try)
import Control.Monad (forM_, forever, void)
import Control.Monad.Reader (MonadIO (liftIO), MonadReader (ask), ReaderT (runReaderT), asks, lift, withReaderT)
import Data.Binary (Binary, decode, encode)
import Data.ByteString.Lazy (ByteString)
import Data.List (find)
import Data.Map qualified as Map
import Data.UUID (UUID)
import Data.UUID.V4 (nextRandom)
import GHC.Generics (Generic)
import Unsafe.Coerce (unsafeCoerce) import Unsafe.Coerce (unsafeCoerce)
data NodeAddr = NodeAddr -- Demo
{ nodeHost :: String,
nodePort :: Integer
}
deriving (Eq, Show, Generic)
instance Binary NodeAddr
type NodeId = Integer
thisNodeId :: NodeId
thisNodeId = 0
data ActorId = ActorId NodeId UUID
deriving (Eq, Ord, Show, Generic)
instance Binary ActorId
data ExitReason
= Normal
| Killed
| Exception SomeException
deriving (Show, Generic)
data DeathMessage = DeathMessage
{ dmActorId :: ActorId,
dmReason :: ExitReason
}
deriving (Show, Generic)
data DeathTarget
= LocalTarget (TQueue DeathMessage)
| RemoteTarget ActorId
data ActorState u = ActorState
{ asId :: ActorId,
asLinks :: TVar [DeathTarget],
asEnv :: u
}
newtype ActorM u r = ActorM
{unActorM :: ReaderT (ActorState u, Runtime) IO r}
deriving
( Functor,
Applicative,
Monad,
MonadIO,
MonadReader (ActorState u, Runtime)
)
runActorM :: ActorM u r -> Runtime -> ActorState u -> IO r
runActorM m r s = runReaderT (unActorM m) (s, r)
state :: ActorM u u
state = asks (asEnv . fst)
getSelf :: ActorM u SomeActorRef
getSelf = do
(as, rt) <- ask
actors <- liftIO $ readTVarIO (rtActors rt)
let actorId = asId as
maybeRef = snd <$> Map.lookup actorId actors
case maybeRef of
Just ref -> return ref
Nothing -> error "getSelf: actor not found in runtime"
data ActorRef msg reply
= forall u. LocalRef
{ arMsgQ :: TQueue (Envelope msg reply),
arDeathQ :: TQueue DeathMessage,
arState :: ActorState u
}
| RemoteRef ActorId
data SomeActorRef = forall msg reply. SomeActorRef (ActorRef msg reply)
someActorId :: SomeActorRef -> ActorId
someActorId (SomeActorRef (LocalRef {arState})) = asId arState
someActorId (SomeActorRef (RemoteRef aid)) = aid
data SupervisorAction u
= Stop
| Continue u
type CorrelationId = Integer
data Envelope msg reply
= Cast msg
| Call msg (MVar (Maybe reply))
data Runtime = Runtime
{ rtNodeId :: NodeAddr,
rtActors :: TVar (Map.Map ActorId (ThreadId, SomeActorRef)),
rtPending :: TVar (Map.Map CorrelationId (MVar ByteString)),
rtNextCorr :: TVar CorrelationId,
rtNodeTable :: TVar (Map.Map NodeId NodeAddr),
rtTransport :: Transport
}
data Transport = Transport
{ sendBytes :: NodeAddr -> ByteString -> IO ()
}
data RemoteEnvelope
= RemoteCast UUID ByteString
| RemoteCall UUID CorrelationId NodeAddr ByteString
| RemoteReply CorrelationId ByteString
deriving (Generic)
instance Binary RemoteEnvelope
newtype RuntimeM a = RuntimeM
{unRuntimeM :: ReaderT Runtime IO a}
deriving
( Functor,
Applicative,
Monad,
MonadIO,
MonadReader Runtime
)
liftRuntime :: RuntimeM a -> ActorM u a
liftRuntime = ActorM . withReaderT snd . unRuntimeM
withRuntime :: Runtime -> RuntimeM a -> IO a
withRuntime = (. unRuntimeM) . flip runReaderT
{-# INLINE withRuntime #-}
newRuntime :: IO Runtime
newRuntime = atomically $ do
actors <- newTVar Map.empty
pending <- newTVar Map.empty
nextCorr <- newTVar (0 :: Integer)
nodeTable <- newTVar Map.empty
return
Runtime
{ rtNodeId = NodeAddr "localhost" 0,
rtActors = actors,
rtPending = pending,
rtNextCorr = nextCorr,
rtNodeTable = nodeTable,
rtTransport = Transport (\_ _ -> return ())
}
lookupNode :: NodeId -> RuntimeM (Maybe NodeAddr)
lookupNode nodeId = do
rt <- ask
liftIO $ atomically $ do
table <- readTVar $ rtNodeTable rt
return $ Map.lookup nodeId table
cast' :: (Binary msg) => msg -> ActorRef msg reply -> RuntimeM ()
cast' msg (LocalRef {arMsgQ}) = RuntimeM $ lift $ atomically $ writeTQueue arMsgQ (Cast msg)
cast' msg (RemoteRef (ActorId nodeId uuid)) = do
rt <- ask
maybeAddr <- lookupNode nodeId
let payload = encode (RemoteCast uuid (encode msg))
case maybeAddr of
Just addr -> liftIO $ sendBytes (rtTransport rt) addr payload
Nothing -> liftIO $ putStrLn $ "cast: no node in lookup table with id " <> show nodeId
cast :: (Binary msg) => msg -> ActorRef msg reply -> ActorM u ()
cast = (liftRuntime .) . cast'
castIn :: (Binary msg) => Int -> msg -> ActorRef msg reply -> ActorM u ()
castIn ms msg ref = do
rt <- asks snd
liftIO $ void $ forkIO $ do
threadDelay (ms * 1000)
withRuntime rt $ cast' msg ref
call' :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> RuntimeM (Maybe reply)
call' msg (LocalRef {arMsgQ}) = liftIO $ do
mv <- newEmptyMVar
atomically $ writeTQueue arMsgQ (Call msg mv)
takeMVar mv
call' msg (RemoteRef (ActorId nodeId uuid)) = do
rt <- ask
corrId <- liftIO $ atomically $ do
cid <- readTVar (rtNextCorr rt)
writeTVar (rtNextCorr rt) (cid + 1)
return cid
replyVar <- liftIO newEmptyMVar
liftIO $ atomically $ modifyTVar (rtPending rt) (Map.insert corrId replyVar)
let payload = encode (RemoteCall uuid corrId (rtNodeId rt) (encode msg))
maybeAddr <- lookupNode nodeId
case maybeAddr of
Just addr -> do
liftIO $ sendBytes (rtTransport rt) addr payload
raw <- liftIO $ takeMVar replyVar
liftIO $ atomically $ modifyTVar (rtPending rt) (Map.delete corrId)
return $ decode raw
Nothing -> do
liftIO $ putStrLn $ "call: no node in lookup table with id " <> show nodeId
return Nothing
call :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> ActorM u (Maybe reply)
call = (liftRuntime .) . call'
type Actor u r = ActorM u (Maybe r, u)
notifyOfDeath :: DeathMessage -> DeathTarget -> IO ()
notifyOfDeath dm (LocalTarget q) = atomically $ writeTQueue q dm
notifyOfDeath _ (RemoteTarget _) = return ()
spawnActor ::
(m -> Actor u r) ->
(DeathMessage -> ActorM u (SupervisorAction u)) ->
u ->
RuntimeM (ActorRef m r)
spawnActor actorFn deathFn initState = do
rt <- ask
mailbox <- liftIO newTQueueIO
deathQ <- liftIO newTQueueIO
links <- liftIO $ newTVarIO []
uuid <- liftIO nextRandom
let actorId = ActorId thisNodeId uuid
actorState = ActorState actorId links initState
actorRef = LocalRef mailbox deathQ actorState
let loop as = do
event <-
atomically $
(Left <$> readTQueue mailbox)
`orElse` (Right <$> readTQueue deathQ)
case event of
Left envelope ->
case envelope of
Cast msg -> do
(_, u') <- runActorM (actorFn msg) rt as
loop as {asEnv = u'}
Call msg mv -> do
(reply, u') <- runActorM (actorFn msg) rt as
putMVar mv reply
loop as {asEnv = u'}
Right dm -> do
action <- runActorM (deathFn dm) rt as
case action of
Stop -> return ()
Continue u -> loop as {asEnv = u}
tid <- liftIO $ forkIO $ do
result <- try (loop actorState) :: IO (Either SomeException ())
let reason = case result of
Right () -> Normal
Left exc -> case fromException exc of
Just ThreadKilled -> Killed
_anyOtherExc -> Exception exc
links' <- readTVarIO (asLinks actorState)
let dm = DeathMessage actorId reason
forM_ links' (notifyOfDeath dm)
atomically $ modifyTVar (rtActors rt) (Map.delete actorId)
case result of
Left exc -> throwIO exc
Right () -> return ()
liftIO $ atomically $ modifyTVar (rtActors rt) (Map.insert actorId (tid, SomeActorRef actorRef))
return actorRef
linkActorTo :: DeathTarget -> ActorRef m r -> RuntimeM ()
linkActorTo target (LocalRef {arState}) =
liftIO $ atomically $ modifyTVar (asLinks arState) (target :)
linkActorTo _ (RemoteRef _) = return ()
linkTo :: DeathTarget -> ActorM u ()
linkTo target = do
as <- asks fst
liftIO $ atomically $ modifyTVar (asLinks as) (target :)
killActor :: ActorRef m r -> ActorM u ()
killActor (LocalRef {arDeathQ, arState}) =
liftIO $ atomically $ writeTQueue arDeathQ (DeathMessage (asId arState) Killed)
killActor (RemoteRef _) =
liftIO $ putStrLn "killActor: remote kill not yet implemented"
stopOnDeath :: DeathMessage -> ActorM u (SupervisorAction u)
stopOnDeath _ = return Stop
-- Supervision
data ChildSpec = forall m r. ChildSpec
{ csRun :: DeathTarget -> RuntimeM (ActorRef m r),
csOnSpawn :: ActorRef m r -> IO ()
}
child ::
(m -> Actor u r) ->
(DeathMessage -> ActorM u (SupervisorAction u)) ->
u ->
ChildSpec
child msgFn deathFn initState =
ChildSpec
{ csRun = \target -> do
ref <- spawnActor msgFn deathFn initState
linkActorTo target ref
return ref,
csOnSpawn = \_ -> return ()
}
childWithRef ::
(m -> Actor u r) ->
(DeathMessage -> ActorM u (SupervisorAction u)) ->
u ->
TMVar (ActorRef m r) ->
ChildSpec
childWithRef msgFn deathFn initState cell =
ChildSpec
{ csRun = \target -> do
ref <- spawnActor msgFn deathFn initState
linkActorTo target ref
return ref,
csOnSpawn = \ref -> atomically $ do
void $ tryTakeTMVar cell
putTMVar cell ref
}
data RestartStrategy = OneForOne | OneForAll | RestForOne
data ChildSlot = forall m r. ChildSlot
{ slotSpec :: ChildSpec,
slotRef :: ActorRef m r,
slotId :: ActorId
}
spawnSlot :: DeathTarget -> ChildSpec -> RuntimeM ChildSlot
spawnSlot target spec@ChildSpec{csRun, csOnSpawn} = do
ref <- csRun target
_ <- liftIO $ csOnSpawn ref
return $ ChildSlot spec ref (someActorId (SomeActorRef ref))
supervise' :: RestartStrategy -> [ChildSpec] -> RuntimeM ()
supervise' strategy specs = do
rt <- ask
supDeathQ <- liftIO newTQueueIO
let target = LocalTarget supDeathQ
slots <- mapM (spawnSlot target) specs
slotsVar <- liftIO $ newTVarIO slots
_ <- liftIO $ forkIO $ forever $ do
DeathMessage deadId _ <- atomically $ readTQueue supDeathQ
slots' <- readTVarIO slotsVar
case strategy of
OneForOne -> doOneForOne rt target slotsVar slots' deadId
OneForAll -> doOneForAll rt target slotsVar supDeathQ slots'
RestForOne -> doRestForOne rt target slotsVar supDeathQ slots' deadId
return ()
supervise :: RestartStrategy -> [ChildSpec] -> ActorM u ()
supervise = (liftRuntime .) . supervise'
doOneForOne ::
Runtime -> DeathTarget -> TVar [ChildSlot] -> [ChildSlot] -> ActorId -> IO ()
doOneForOne rt target slotsVar slots deadId =
case find (\s -> slotId s == deadId) slots of
Nothing -> return ()
Just slot -> do
newSlot <- withRuntime rt $ spawnSlot target (slotSpec slot)
atomically $
modifyTVar slotsVar $
map (\s -> if slotId s == deadId then newSlot else s)
doOneForAll ::
Runtime ->
DeathTarget ->
TVar [ChildSlot] ->
TQueue DeathMessage ->
[ChildSlot] ->
IO ()
doOneForAll rt target slotsVar supDeathQ slots = do
mapM_ (killSlot supDeathQ) slots
atomically $ void $ flushTQueue supDeathQ
newSlots <- withRuntime rt $ mapM (spawnSlot target . slotSpec) slots
atomically $ writeTVar slotsVar newSlots
doRestForOne ::
Runtime ->
DeathTarget ->
TVar [ChildSlot] ->
TQueue DeathMessage ->
[ChildSlot] ->
ActorId ->
IO ()
doRestForOne rt target slotsVar supDeathQ slots deadId = do
let (before, fromDead) = break (\s -> slotId s == deadId) slots
case fromDead of
[] -> return ()
_nonempty -> do
mapM_ (killSlot supDeathQ) (drop 1 fromDead)
atomically $ void $ flushTQueue supDeathQ
newSlots <- withRuntime rt $ mapM (spawnSlot target . slotSpec) fromDead
atomically $ writeTVar slotsVar (before ++ newSlots)
killSlot :: TQueue DeathMessage -> ChildSlot -> IO ()
killSlot _ (ChildSlot {slotRef, slotId}) = case slotRef of
LocalRef {arDeathQ} -> atomically $ writeTQueue arDeathQ (DeathMessage slotId Killed)
RemoteRef _ -> return ()
----- Demo
pingActor :: String -> Actor () String pingActor :: String -> Actor () String
pingActor msg = return (Just ("Hello, " <> msg <> "!"), ()) pingActor msg = return (Just ("Hello, " <> msg <> "!"), ())
@@ -457,7 +49,7 @@ repeatActor r cell () = do
system :: IO () system :: IO ()
system = do system = do
rt <- newRuntime rt <- initRuntime (NodeAddr "localhost" 9000)
withRuntime rt $ do withRuntime rt $ do
pingCell <- liftIO newEmptyTMVarIO pingCell <- liftIO newEmptyTMVarIO
forwardCell <- liftIO newEmptyTMVarIO forwardCell <- liftIO newEmptyTMVarIO
@@ -465,12 +57,12 @@ system = do
_ <- liftIO $ forkIO $ do _ <- liftIO $ forkIO $ do
repeatRef <- atomically $ readTMVar repeatCell repeatRef <- atomically $ readTMVar repeatCell
withRuntime rt $ do withRuntime rt $
cast' () repeatRef cast' () repeatRef
supervise' supervise'
OneForOne OneForOne
[ childWithRef pingActor stopOnDeath () pingCell, [ childWithRef pingActor stopOnDeath () pingCell
childWithRef (forwardActorWithCell pingCell) stopOnDeath () forwardCell, , childWithRef (forwardActorWithCell pingCell) stopOnDeath () forwardCell
childWithRef (repeatActor "repeaaat" forwardCell) stopOnDeath () repeatCell , childWithRef (repeatActor "repeaaat" forwardCell) stopOnDeath () repeatCell
] ]
+211
View File
@@ -0,0 +1,211 @@
{-# LANGUAGE StrictData #-}
module Control.Actor.Core
( ActorM (..)
, runActorM
, state
, getSelf
, Actor
, liftRuntime
, notifyOfDeath
, spawnActor
, linkActorTo
, linkTo
, killActor
, stopOnDeath
, cast'
, call'
, cast
, call
, castIn
) where
import Control.Actor.Runtime
import Control.Actor.Types
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM
( atomically
, modifyTVar
, newTQueueIO
, newTVarIO
, orElse
, readTQueue
, readTVar
, readTVarIO
, writeTQueue
, writeTVar
)
import Control.Exception
( AsyncException (..)
, SomeException
, fromException
, throwIO
, try
)
import Control.Monad (forM_, void)
import Control.Monad.Reader
( MonadIO (..)
, MonadReader (..)
, ReaderT (..)
, asks
, withReaderT
)
import Data.Binary (Binary, decode, encode)
import Data.Map qualified as Map
import Data.UUID.V4 (nextRandom)
newtype ActorM u r = ActorM
{ unActorM :: ReaderT (ActorState u, Runtime) IO r }
deriving
( Functor, Applicative, Monad, MonadIO
, MonadReader (ActorState u, Runtime)
)
runActorM :: ActorM u r -> Runtime -> ActorState u -> IO r
runActorM m rt s = runReaderT (unActorM m) (s, rt)
state :: ActorM u u
state = asks (asEnv . fst)
getSelf :: ActorM u SomeActorRef
getSelf = do
(as, rt) <- ask
actors <- liftIO $ readTVarIO (rtActors rt)
let actorId = asId as
maybeRef = snd <$> Map.lookup actorId actors
case maybeRef of
Just ref -> return ref
Nothing -> error "getSelf: actor not found in runtime"
type Actor u r = ActorM u (Maybe r, u)
liftRuntime :: RuntimeM a -> ActorM u a
liftRuntime = ActorM . withReaderT snd . unRuntimeM
notifyOfDeath :: Runtime -> DeathMessage -> DeathTarget -> IO ()
notifyOfDeath _ dm (LocalTarget q) = atomically $ writeTQueue q dm
notifyOfDeath rt dm (RemoteTarget _ peerAddr) =
withRuntime rt $ rtSendRemote rt peerAddr (NMDeath (dmActorId dm) (toRemoteExitReason (dmReason dm)))
spawnActor ::
(Binary m, Binary r) =>
(m -> Actor u r) ->
(DeathMessage -> ActorM u (SupervisorAction u)) ->
u ->
RuntimeM (ActorRef m r)
spawnActor actorFn deathFn initState = do
rt <- ask
mailbox <- liftIO newTQueueIO
deathQ <- liftIO newTQueueIO
links <- liftIO $ newTVarIO []
uuid <- liftIO nextRandom
let actorId = ActorId thisNodeId uuid
actorState = ActorState actorId links initState
actorRef = LocalRef mailbox deathQ actorState
let loop as = do
event <-
atomically $
(Left <$> readTQueue mailbox)
`orElse` (Right <$> readTQueue deathQ)
case event of
Left envelope ->
case envelope of
Cast msg -> do
(_, u') <- runActorM (actorFn msg) rt as
loop as {asEnv = u'}
Call msg mv -> do
(reply, u') <- runActorM (actorFn msg) rt as
putMVar mv reply
loop as {asEnv = u'}
Right dm -> do
action <- runActorM (deathFn dm) rt as
case action of
Stop -> return ()
Continue u -> loop as {asEnv = u}
tid <- liftIO $ forkIO $ do
result <- try @SomeException (loop actorState)
let reason = case result of
Right () -> Normal
Left exc -> case fromException exc of
Just ThreadKilled -> Killed
_anyOtherExc -> Exception exc
links' <- readTVarIO (asLinks actorState)
forM_ links' (notifyOfDeath rt (DeathMessage actorId reason))
atomically $ modifyTVar (rtActors rt) (Map.delete actorId)
case result of
Left exc -> throwIO exc
Right () -> return ()
liftIO $ atomically $
modifyTVar (rtActors rt) (Map.insert actorId (tid, SomeActorRef actorRef))
return actorRef
linkActorTo :: DeathTarget -> ActorRef m r -> RuntimeM ()
linkActorTo target (LocalRef {arState}) =
liftIO $ atomically $ modifyTVar (asLinks arState) (target :)
linkActorTo _ (RemoteRef _) = return ()
linkTo :: DeathTarget -> ActorM u ()
linkTo target = do
as <- asks fst
liftIO $ atomically $ modifyTVar (asLinks as) (target :)
killActor :: ActorRef m r -> ActorM u ()
killActor (LocalRef {arDeathQ, arState}) =
liftIO $ atomically $ writeTQueue arDeathQ (DeathMessage (asId arState) Killed)
killActor (RemoteRef _) =
liftIO $ putStrLn "killActor: remote kill not yet implemented"
stopOnDeath :: DeathMessage -> ActorM u (SupervisorAction u)
stopOnDeath _ = return Stop
cast' :: (Binary msg) => msg -> ActorRef msg reply -> RuntimeM ()
cast' msg (LocalRef {arMsgQ}) =
liftIO $ atomically $ writeTQueue arMsgQ (Cast msg)
cast' msg (RemoteRef (ActorId nodeId uuid)) = do
rt <- ask
maybeAddr <- lookupNode nodeId
case maybeAddr of
Nothing -> liftIO $ putStrLn $ "cast: no node in lookup table with id " <> show nodeId
Just addr -> rtSendRemote rt addr (NMCast uuid (encode msg))
call' :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> RuntimeM (Maybe reply)
call' msg (LocalRef {arMsgQ}) = liftIO $ do
mv <- newEmptyMVar
atomically $ writeTQueue arMsgQ (Call msg mv)
takeMVar mv
call' msg (RemoteRef (ActorId nodeId uuid)) = do
rt <- ask
corrId <- liftIO $ atomically $ do
cid <- readTVar (rtNextCorr rt)
writeTVar (rtNextCorr rt) (cid + 1)
return cid
replyVar <- liftIO newEmptyMVar
liftIO $ atomically $ modifyTVar (rtPending rt) (Map.insert corrId replyVar)
maybeAddr <- lookupNode nodeId
case maybeAddr of
Nothing -> liftIO $ do
putStrLn $ "call: no node in lookup table with id " <> show nodeId
atomically $ modifyTVar (rtPending rt) (Map.delete corrId)
return Nothing
Just addr -> do
rtSendRemote rt addr (NMCall uuid corrId (rtNodeId rt) (encode msg))
raw <- liftIO $ takeMVar replyVar
liftIO $ atomically $ modifyTVar (rtPending rt) (Map.delete corrId)
return $ Just (decode raw)
cast :: (Binary msg) => msg -> ActorRef msg reply -> ActorM u ()
cast = (liftRuntime .) . cast'
castIn :: (Binary msg) => Int -> msg -> ActorRef msg reply -> ActorM u ()
castIn ms msg ref = do
rt <- asks snd
liftIO $ void $ forkIO $ do
threadDelay (ms * 1000)
withRuntime rt $ cast' msg ref
call :: (Binary msg, Binary reply) => msg -> ActorRef msg reply -> ActorM u (Maybe reply)
call = (liftRuntime .) . call'
+254
View File
@@ -0,0 +1,254 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE StrictData #-}
module Control.Actor.Network
( spawnConnTree
, handleNewConn
, getOrCreateConn
, findByUUID
, routeRemoteDeath
, validateAndDispatch
, sysHandlerFn
, initRuntime
) where
import Control.Actor.Core
( Actor, ActorM, cast', liftRuntime, linkActorTo, spawnActor, state, stopOnDeath )
import Control.Actor.Runtime (Runtime (..), RuntimeM, newRuntime, withRuntime)
import Control.Actor.Supervision (ChildSpec (..), RestartStrategy (..), childWithRef, supervise')
import Control.Actor.Transport (ConnHandle (..), Transport (..), createTCPTransport)
import Control.Actor.Types
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM
( atomically
, modifyTVar
, newEmptyTMVar
, newEmptyTMVarIO
, putTMVar
, readTMVar
, readTVar
, readTVarIO
, tryTakeTMVar
, writeTQueue
)
import Control.Exception (SomeException, try)
import Control.Monad (forM_, forever, unless, void)
import Control.Monad.Reader (MonadIO (..), MonadReader (..))
import Data.Binary (decode, decodeOrFail, encode)
import Data.ByteString.Lazy (ByteString)
import Data.Map qualified as Map
import Data.UUID (UUID)
-- Connection actors
connActorFn :: NetworkMessage -> Actor ConnHandle ()
connActorFn nm = do
ch <- state
liftIO $ chSend ch (encode nm)
return (Nothing, ch)
connDeathFn :: NodeAddr -> DeathMessage -> ActorM ConnHandle (SupervisorAction ConnHandle)
connDeathFn peer _ = do
ch <- state
rt <- liftRuntime ask
liftIO $ do
atomically $ modifyTVar (rtConnections rt) (Map.delete peer)
chClose ch
return Stop
routerActorFn :: ByteString -> Actor () ()
routerActorFn raw = do
let nm = decode raw :: NetworkMessage
valid <- liftRuntime $ validateAndDispatch nm
unless valid $ liftIO $ putStrLn "router: dropping invalid message"
return (Nothing, ())
-- Connection supervision tree
spawnConnTree :: NodeAddr -> ConnHandle -> RuntimeM (ActorRef NetworkMessage ())
spawnConnTree peer ch = do
rt <- ask
routerCell <- liftIO newEmptyTMVarIO
connCell <- liftIO newEmptyTMVarIO
supervise' OneForAll
[ childWithRef routerActorFn stopOnDeath () routerCell
, ChildSpec
{ csRun = \target -> do
ref <- spawnActor connActorFn (connDeathFn peer) ch
linkActorTo target ref
return ref
, csOnSpawn = \ref -> case ref of
LocalRef { arDeathQ, arState } -> do
atomically $ do
void $ tryTakeTMVar connCell
putTMVar connCell ref
routerRef <- atomically $ readTMVar routerCell
void $ forkIO $ do
result <- try @SomeException $ forever $
chRecv ch >>= \raw -> withRuntime rt $ cast' raw routerRef
case result of
Left _ -> atomically $
writeTQueue arDeathQ (DeathMessage (asId arState) Killed)
Right () -> return ()
RemoteRef _ -> return ()
}
]
liftIO $ atomically $ readTMVar connCell
-- Incoming connection handler
handleNewConn :: ConnHandle -> RuntimeM ()
handleNewConn ch = do
rt <- ask
raw <- liftIO $ chRecv ch
case decode raw :: NetworkMessage of
NMHandshake peerAddr -> do
ref <- spawnConnTree peerAddr ch
liftIO $ atomically $ do
modifyTVar (rtConnections rt) (Map.insert peerAddr ref)
promises <- readTVar (rtConnPromises rt)
case Map.lookup peerAddr promises of
Nothing -> return ()
Just p -> do
modifyTVar (rtConnPromises rt) (Map.delete peerAddr)
putTMVar p ref
_else -> liftIO $ chClose ch
-- Connection pool
getOrCreateConn :: NodeAddr -> RuntimeM (ActorRef NetworkMessage ())
getOrCreateConn peer = do
rt <- ask
action <- liftIO $ atomically $ do
conns <- readTVar (rtConnections rt)
case Map.lookup peer conns of
Just ref -> return (Left ref)
Nothing -> do
promises <- readTVar (rtConnPromises rt)
case Map.lookup peer promises of
Just p -> return (Right (Left p))
Nothing -> do
p <- newEmptyTMVar
modifyTVar (rtConnPromises rt) (Map.insert peer p)
return (Right (Right p))
case action of
Left ref -> return ref
Right (Left promise) -> liftIO $ atomically $ readTMVar promise
Right (Right promise) -> do
ch <- liftIO $ tConnect (rtTransport rt) peer
liftIO $ chSend ch (encode (NMHandshake (rtNodeId rt)))
ref <- spawnConnTree peer ch
liftIO $ atomically $ do
modifyTVar (rtConnections rt) (Map.insert peer ref)
modifyTVar (rtConnPromises rt) (Map.delete peer)
putTMVar promise ref
return ref
-- Message dispatch
findByUUID :: UUID -> Map.Map ActorId (ThreadId, SomeActorRef) -> Maybe SomeActorRef
findByUUID uuid actors =
case Map.toList (Map.filterWithKey (\(ActorId _ u) _ -> u == uuid) actors) of
[] -> Nothing
(_, (_, r)):_ -> Just r
routeRemoteDeath :: ActorId -> RemoteExitReason -> RuntimeM ()
routeRemoteDeath deadId reason = do
rt <- ask
liftIO $ do
actors <- readTVarIO (rtActors rt)
let exitReason = case reason of
RNormal -> Normal
RKilled -> Killed
RException s -> Exception (error s)
dm = DeathMessage deadId exitReason
forM_ (Map.elems actors) $ \(_, SomeActorRef ref) ->
case ref of
LocalRef {arDeathQ, arState} -> do
links <- readTVarIO (asLinks arState)
forM_ links $ \case
RemoteTarget rid _ | rid == deadId ->
atomically $ writeTQueue arDeathQ dm
_else -> return ()
RemoteRef _ -> return ()
validateAndDispatch :: NetworkMessage -> RuntimeM Bool
validateAndDispatch nm = case nm of
NMHandshake _ -> return False
NMReply corrId payload -> do
rt <- ask
liftIO $ do
pending <- readTVarIO (rtPending rt)
case Map.lookup corrId pending of
Nothing -> return False
Just mv -> putMVar mv payload >> return True
NMCast uuid payload -> do
rt <- ask
liftIO $ do
actors <- readTVarIO (rtActors rt)
case findByUUID uuid actors of
Nothing ->
return False
Just (SomeActorRef (LocalRef {arMsgQ})) ->
case decodeOrFail payload of
Left _ -> return False
Right (_, _, msg) -> do
atomically $ writeTQueue arMsgQ (Cast msg)
return True
Just (SomeActorRef (RemoteRef _)) ->
return False
NMCall uuid corrId returnAddr payload -> do
rt <- ask
liftIO $ do
actors <- readTVarIO (rtActors rt)
case findByUUID uuid actors of
Nothing ->
return False
Just (SomeActorRef (LocalRef {arMsgQ})) ->
case decodeOrFail payload of
Left _ -> return False
Right (_, _, msg) -> do
mv <- newEmptyMVar
atomically $ writeTQueue arMsgQ (Call msg mv)
void $ forkIO $ do
reply <- takeMVar mv
case reply of
Nothing -> return ()
Just rv -> withRuntime rt $ do
connRef <- getOrCreateConn returnAddr
cast' (NMReply corrId (encode rv)) connRef
return True
Just (SomeActorRef (RemoteRef _)) ->
return False
NMDeath deadId reason -> do
routeRemoteDeath deadId reason
return True
-- System event handler
sysHandlerFn :: NetworkMessage -> Actor () ()
sysHandlerFn (NMDeath deadId reason) = do
liftRuntime $ routeRemoteDeath deadId reason
return (Nothing, ())
sysHandlerFn _ = return (Nothing, ())
-- Runtime initialization
initRuntime :: NodeAddr -> IO Runtime
initRuntime myAddr = do
transport <- createTCPTransport myAddr
rt0 <- newRuntime myAddr transport
let rt = rt0 { rtSendRemote = \addr nm -> getOrCreateConn addr >>= cast' nm }
withRuntime rt $ void $ spawnActor sysHandlerFn stopOnDeath ()
tListen transport $ \ch -> do
result <- try @SomeException $ withRuntime rt $ handleNewConn ch
case result of
Left _ -> chClose ch
Right () -> return ()
return rt
+67
View File
@@ -0,0 +1,67 @@
{-# LANGUAGE StrictData #-}
module Control.Actor.Runtime
( Runtime (..)
, RuntimeM (..)
, newRuntime
, withRuntime
, lookupNode
) where
import Control.Actor.Transport (Transport)
import Control.Actor.Types
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Concurrent.STM
( TMVar, TVar, atomically, newTVarIO, readTVar )
import Control.Monad.Reader
( MonadIO (..), MonadReader (..), ReaderT (..), runReaderT )
import Data.ByteString.Lazy (ByteString)
import Data.Map qualified as Map
data Runtime = Runtime
{ rtNodeId :: NodeAddr
, rtActors :: TVar (Map.Map ActorId (ThreadId, SomeActorRef))
, rtPending :: TVar (Map.Map CorrelationId (MVar ByteString))
, rtNextCorr :: TVar CorrelationId
, rtNodeTable :: TVar (Map.Map NodeId NodeAddr)
, rtTransport :: Transport
, rtConnections :: TVar (Map.Map NodeAddr (ActorRef NetworkMessage ()))
, rtConnPromises :: TVar (Map.Map NodeAddr (TMVar (ActorRef NetworkMessage ())))
, rtSendRemote :: NodeAddr -> NetworkMessage -> RuntimeM ()
}
newtype RuntimeM a = RuntimeM
{ unRuntimeM :: ReaderT Runtime IO a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader Runtime)
newRuntime :: NodeAddr -> Transport -> IO Runtime
newRuntime myAddr transport = do
actors <- newTVarIO Map.empty
pending <- newTVarIO Map.empty
nextCorr <- newTVarIO (0 :: Integer)
nodeTable <- newTVarIO Map.empty
conns <- newTVarIO Map.empty
promises <- newTVarIO Map.empty
return Runtime
{ rtNodeId = myAddr
, rtActors = actors
, rtPending = pending
, rtNextCorr = nextCorr
, rtNodeTable = nodeTable
, rtTransport = transport
, rtConnections = conns
, rtConnPromises = promises
, rtSendRemote = \_ _ -> return ()
}
withRuntime :: Runtime -> RuntimeM a -> IO a
withRuntime rt m = runReaderT (unRuntimeM m) rt
{-# INLINE withRuntime #-}
lookupNode :: NodeId -> RuntimeM (Maybe NodeAddr)
lookupNode nodeId = do
rt <- ask
liftIO $ atomically $ do
table <- readTVar (rtNodeTable rt)
return $ Map.lookup nodeId table
+164
View File
@@ -0,0 +1,164 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE StrictData #-}
module Control.Actor.Supervision
( ChildSpec (..)
, child
, childWithRef
, RestartStrategy (..)
, ChildSlot (..)
, spawnSlot
, supervise'
, supervise
, doOneForOne
, doOneForAll
, doRestForOne
, killSlot
) where
import Control.Actor.Core (Actor, ActorM, liftRuntime, linkActorTo, spawnActor)
import Control.Actor.Runtime (Runtime (..), RuntimeM, withRuntime)
import Control.Actor.Types
import Data.Binary (Binary)
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Concurrent.STM
( TMVar
, TQueue
, TVar
, atomically
, flushTQueue
, modifyTVar
, newTQueueIO
, newTVarIO
, putTMVar
, readTQueue
, readTVarIO
, tryTakeTMVar
, writeTVar
)
import Control.Monad (forever, void)
import Control.Monad.Reader (MonadIO (..), MonadReader (..))
import Data.Map qualified as Map
data ChildSpec = forall m r. (Binary m, Binary r) => ChildSpec
{ csRun :: DeathTarget -> RuntimeM (ActorRef m r)
, csOnSpawn :: ActorRef m r -> IO ()
}
child ::
(Binary m, Binary r) =>
(m -> Actor u r) ->
(DeathMessage -> ActorM u (SupervisorAction u)) ->
u ->
ChildSpec
child msgFn deathFn initState =
ChildSpec
{ csRun = \target -> do
ref <- spawnActor msgFn deathFn initState
linkActorTo target ref
return ref
, csOnSpawn = \_ -> return ()
}
childWithRef ::
(Binary m, Binary r) =>
(m -> Actor u r) ->
(DeathMessage -> ActorM u (SupervisorAction u)) ->
u ->
TMVar (ActorRef m r) ->
ChildSpec
childWithRef msgFn deathFn initState cell =
ChildSpec
{ csRun = \target -> do
ref <- spawnActor msgFn deathFn initState
linkActorTo target ref
return ref
, csOnSpawn = \ref -> atomically $ do
void $ tryTakeTMVar cell
putTMVar cell ref
}
data RestartStrategy = OneForOne | OneForAll | RestForOne
data ChildSlot = forall m r. (Binary m, Binary r) => ChildSlot
{ slotSpec :: ChildSpec
, slotRef :: ActorRef m r
, slotId :: ActorId
, slotTid :: ThreadId
}
spawnSlot :: DeathTarget -> ChildSpec -> RuntimeM ChildSlot
spawnSlot target spec@ChildSpec {csRun, csOnSpawn} = do
rt <- ask
ref <- csRun target
_ <- liftIO $ csOnSpawn ref
let aid = someActorId (SomeActorRef ref)
tid <- liftIO $ do
actors <- readTVarIO (rtActors rt)
case Map.lookup aid actors of
Just (t, _) -> return t
Nothing -> error "spawnSlot: actor vanished immediately after spawn"
return $ ChildSlot spec ref aid tid
killSlot :: ChildSlot -> IO ()
killSlot ChildSlot {slotTid} = killThread slotTid
supervise' :: RestartStrategy -> [ChildSpec] -> RuntimeM ()
supervise' strategy specs = do
rt <- ask
supDeathQ <- liftIO newTQueueIO
let target = LocalTarget supDeathQ
slots <- mapM (spawnSlot target) specs
slotsVar <- liftIO $ newTVarIO slots
liftIO $ void $ forkIO $ forever $ do
DeathMessage deadId _ <- atomically $ readTQueue supDeathQ
slots' <- readTVarIO slotsVar
withRuntime rt $ case strategy of
OneForOne -> doOneForOne target slotsVar slots' deadId
OneForAll -> doOneForAll target slotsVar supDeathQ slots'
RestForOne -> doRestForOne target slotsVar supDeathQ slots' deadId
supervise :: RestartStrategy -> [ChildSpec] -> ActorM u ()
supervise = (liftRuntime .) . supervise'
doOneForOne ::
DeathTarget -> TVar [ChildSlot] -> [ChildSlot] -> ActorId -> RuntimeM ()
doOneForOne target slotsVar slots deadId =
case filter (\s -> slotId s == deadId) slots of
[] -> return ()
slot:_ -> do
newSlot <- spawnSlot target (slotSpec slot)
liftIO $ atomically $
modifyTVar slotsVar $
map (\s -> if slotId s == deadId then newSlot else s)
doOneForAll ::
DeathTarget ->
TVar [ChildSlot] ->
TQueue DeathMessage ->
[ChildSlot] ->
RuntimeM ()
doOneForAll target slotsVar supDeathQ slots = do
liftIO $ do
mapM_ killSlot slots
atomically $ void $ flushTQueue supDeathQ
newSlots <- mapM (spawnSlot target . slotSpec) slots
liftIO $ atomically $ writeTVar slotsVar newSlots
doRestForOne ::
DeathTarget ->
TVar [ChildSlot] ->
TQueue DeathMessage ->
[ChildSlot] ->
ActorId ->
RuntimeM ()
doRestForOne target slotsVar supDeathQ slots deadId = do
let (before, fromDead) = break (\s -> slotId s == deadId) slots
case fromDead of
[] -> return ()
_nonempty -> do
liftIO $ do
mapM_ killSlot (drop 1 fromDead)
atomically $ void $ flushTQueue supDeathQ
newSlots <- mapM (spawnSlot target . slotSpec) fromDead
liftIO $ atomically $ writeTVar slotsVar (before ++ newSlots)
+106
View File
@@ -0,0 +1,106 @@
{-# LANGUAGE StrictData #-}
module Control.Actor.Transport
( ConnHandle (..)
, Transport (..)
, createTCPTransport
) where
import Control.Actor.Types (NodeAddr (..))
import Control.Concurrent (forkIO)
import Control.Monad (forever, void, when)
import Data.Binary (decode, encode)
import qualified Data.ByteString.Lazy as BL
import Data.ByteString.Lazy (ByteString)
import Data.Word (Word64)
import Network.Socket
( AddrInfo (addrAddress, addrFamily, addrFlags, addrProtocol, addrSocketType)
, AddrInfoFlag (AI_PASSIVE)
, Socket
, SocketOption (NoDelay, ReuseAddr)
, SocketType (Stream)
, accept
, bind
, close
, connect
, defaultHints
, getAddrInfo
, listen
, setSocketOption
, socket
)
import Network.Socket.ByteString.Lazy (recv, sendAll)
data ConnHandle = ConnHandle
{ chSend :: ByteString -> IO ()
, chRecv :: IO ByteString
, chClose :: IO ()
}
data Transport = Transport
{ tConnect :: NodeAddr -> IO ConnHandle
, tListen :: (ConnHandle -> IO ()) -> IO ()
}
sendFramed :: Socket -> ByteString -> IO ()
sendFramed sock payload =
sendAll sock (encode (fromIntegral (BL.length payload) :: Word64) <> payload)
recvExact :: Socket -> Int -> IO ByteString
recvExact sock = go []
where
go acc 0 = return (BL.concat (reverse acc))
go acc r = do
chunk <- recv sock (fromIntegral (min r 65536))
when (BL.null chunk) $ fail "recvExact: connection closed"
go (chunk : acc) (r - fromIntegral (BL.length chunk))
recvFramed :: Socket -> IO ByteString
recvFramed sock = do
header <- recvExact sock 8
recvExact sock (fromIntegral (decode header :: Word64))
connectTcp :: NodeAddr -> IO Socket
connectTcp (NodeAddr host port) = do
let hints = defaultHints {addrSocketType = Stream}
addrs <- getAddrInfo (Just hints) (Just host) (Just (show port))
case addrs of
[] -> fail $ "connectTcp: no address for " <> host <> ":" <> show port
(a:_) -> do
sock <- socket (addrFamily a) Stream (addrProtocol a)
setSocketOption sock NoDelay 1
connect sock (addrAddress a)
return sock
listenTcp :: NodeAddr -> IO Socket
listenTcp (NodeAddr host port) = do
let hints = defaultHints {addrFlags = [AI_PASSIVE], addrSocketType = Stream}
addrs <- getAddrInfo (Just hints) (Just host) (Just (show port))
case addrs of
[] -> fail "listenTcp: no address"
(a:_) -> do
sock <- socket (addrFamily a) Stream (addrProtocol a)
setSocketOption sock ReuseAddr 1
bind sock (addrAddress a)
listen sock 128
return sock
createTCPTransport :: NodeAddr -> IO Transport
createTCPTransport myAddr = do
lsock <- listenTcp myAddr
return Transport
{ tConnect = \peer -> do
sock <- connectTcp peer
return ConnHandle
{ chSend = sendFramed sock
, chRecv = recvFramed sock
, chClose = close sock
}
, tListen = \callback -> void $ forkIO $ forever $ do
(csock, _) <- accept lsock
void $ forkIO $ callback ConnHandle
{ chSend = sendFramed csock
, chRecv = recvFramed csock
, chClose = close csock
}
}
+115
View File
@@ -0,0 +1,115 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE StrictData #-}
module Control.Actor.Types
( NodeAddr (..)
, NodeId
, thisNodeId
, ActorId (..)
, ExitReason (..)
, DeathMessage (..)
, DeathTarget (..)
, RemoteExitReason (..)
, toRemoteExitReason
, NetworkMessage (..)
, ActorState (..)
, ActorRef (..)
, SomeActorRef (..)
, someActorId
, SupervisorAction (..)
, CorrelationId
, Envelope (..)
) where
import Control.Concurrent.MVar (MVar)
import Control.Concurrent.STM (TQueue, TVar)
import Control.Exception (SomeException)
import Data.Binary (Binary)
import Data.ByteString.Lazy (ByteString)
import Data.UUID (UUID)
import GHC.Generics (Generic)
data NodeAddr = NodeAddr
{ nodeHost :: String
, nodePort :: Integer
} deriving (Eq, Ord, Show, Generic)
instance Binary NodeAddr
type NodeId = Integer
thisNodeId :: NodeId
thisNodeId = 0
data ActorId = ActorId NodeId UUID
deriving (Eq, Ord, Show, Generic)
instance Binary ActorId
data ExitReason
= Normal
| Killed
| Exception SomeException
deriving Show
data DeathMessage = DeathMessage
{ dmActorId :: ActorId
, dmReason :: ExitReason
} deriving Show
data DeathTarget
= LocalTarget (TQueue DeathMessage)
| RemoteTarget ActorId NodeAddr
data RemoteExitReason
= RNormal
| RKilled
| RException String
deriving (Show, Generic)
instance Binary RemoteExitReason
toRemoteExitReason :: ExitReason -> RemoteExitReason
toRemoteExitReason Normal = RNormal
toRemoteExitReason Killed = RKilled
toRemoteExitReason (Exception e) = RException (show e)
data NetworkMessage
= NMHandshake NodeAddr
| NMCast UUID ByteString
| NMCall UUID CorrelationId NodeAddr ByteString
| NMReply CorrelationId ByteString
| NMDeath ActorId RemoteExitReason
deriving (Generic)
instance Binary NetworkMessage
data ActorState u = ActorState
{ asId :: ActorId
, asLinks :: TVar [DeathTarget]
, asEnv :: u
}
type CorrelationId = Integer
data Envelope msg reply
= Cast msg
| Call msg (MVar (Maybe reply))
data ActorRef msg reply
= forall u. LocalRef
{ arMsgQ :: TQueue (Envelope msg reply)
, arDeathQ :: TQueue DeathMessage
, arState :: ActorState u
}
| RemoteRef ActorId
data SomeActorRef = forall msg reply. (Binary msg, Binary reply) => SomeActorRef (ActorRef msg reply)
someActorId :: SomeActorRef -> ActorId
someActorId (SomeActorRef (LocalRef {arState})) = asId arState
someActorId (SomeActorRef (RemoteRef aid)) = aid
data SupervisorAction u
= Stop
| Continue u