summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs335
1 files changed, 211 insertions, 124 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 54658de..6265bbf 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -1,5 +1,3 @@
-{-# LANGUAGE CPP #-}
-
module Erebos.Network (
Server,
startServer,
@@ -10,20 +8,24 @@ module Erebos.Network (
ServerOptions(..), serverIdentity, defaultServerOptions,
Peer, peerServer, peerStorage,
- PeerAddress(..), peerAddress,
- PeerIdentity(..), peerIdentity,
+ PeerAddress(..), getPeerAddress, getPeerAddresses,
+ PeerIdentity(..), getPeerIdentity,
WaitingRef, wrDigest,
Service(..),
+
+ PeerAddressType(..),
+ receivedFromCustomAddress,
+
serverPeer,
-#ifdef ENABLE_ICE_SUPPORT
- serverPeerIce,
-#endif
+ serverPeerCustom,
+ findPeer,
dropPeer,
isPeerDropped,
sendToPeer, sendManyToPeer,
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -36,13 +38,14 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
+import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
import Data.Map (Map)
-import qualified Data.Map as M
+import Data.Map qualified as M
import Data.Maybe
import Data.Typeable
import Data.Word
@@ -56,13 +59,11 @@ import Foreign.Storable as F
import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
-import qualified Network.Socket.ByteString as S
+import Network.Socket.ByteString qualified as S
import Erebos.Error
-#ifdef ENABLE_ICE_SUPPORT
-import Erebos.ICE
-#endif
import Erebos.Identity
+import Erebos.Network.Address
import Erebos.Network.Channel
import Erebos.Network.Protocol
import Erebos.Object.Internal
@@ -114,12 +115,16 @@ getNextPeerChange = atomically . readTChan . serverChanPeer
data ServerOptions = ServerOptions
{ serverPort :: PortNumber
, serverLocalDiscovery :: Bool
+ , serverErrorPrefix :: String
+ , serverTestLog :: Bool
}
defaultServerOptions :: ServerOptions
defaultServerOptions = ServerOptions
{ serverPort = discoveryPort
, serverLocalDiscovery = True
+ , serverErrorPrefix = ""
+ , serverTestLog = False
}
@@ -134,6 +139,14 @@ data Peer = Peer
, peerWaitingRefs :: TMVar [WaitingRef]
}
+-- | Get current main address of the peer (used to send new packets).
+getPeerAddress :: MonadIO m => Peer -> m PeerAddress
+getPeerAddress = liftIO . return . peerAddress
+
+-- | Get all known addresses of given peer.
+getPeerAddresses :: MonadIO m => Peer -> m [ PeerAddress ]
+getPeerAddresses = fmap (: []) . getPeerAddress
+
peerServer :: Peer -> Server
peerServer = peerServer_
@@ -157,50 +170,52 @@ setPeerChannel Peer {..} ch = do
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
-data PeerAddress = DatagramAddress SockAddr
-#ifdef ENABLE_ICE_SUPPORT
- | PeerIceSession IceSession
-#endif
+class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where
+ sendBytesToAddress :: addr -> ByteString -> IO ()
+ connectionToAddressClosed :: addr -> IO ()
+
+data PeerAddress
+ = forall addr. PeerAddressType addr => CustomPeerAddress addr
+ | DatagramAddress SockAddr
instance Show PeerAddress where
- show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of
- Just (IP.IPv6 ipv6, port)
- | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
- -> [show (IP.toIPv4w ipv4), show port]
- Just (addr, port)
- -> [show addr, show port]
- _ -> [show saddr]
-#ifdef ENABLE_ICE_SUPPORT
- show (PeerIceSession ice) = show ice
-#endif
+ show (CustomPeerAddress addr) = show addr
+
+ show (DatagramAddress saddr) =
+ case inetFromSockAddr saddr of
+ Just ( addr, port ) -> unwords [ show addr, show port ]
+ _ -> show saddr
instance Eq PeerAddress where
+ CustomPeerAddress addr == CustomPeerAddress addr'
+ | Just addr'' <- cast addr' = addr == addr''
DatagramAddress addr == DatagramAddress addr' = addr == addr'
-#ifdef ENABLE_ICE_SUPPORT
- PeerIceSession ice == PeerIceSession ice' = ice == ice'
_ == _ = False
-#endif
instance Ord PeerAddress where
+ compare (CustomPeerAddress addr) (CustomPeerAddress addr')
+ | Just addr'' <- cast addr' = compare addr addr''
+ | otherwise = compare (typeOf addr) (typeOf addr')
+ compare (CustomPeerAddress _ ) _ = LT
+ compare _ (CustomPeerAddress _ ) = GT
+
compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr'
-#ifdef ENABLE_ICE_SUPPORT
- compare (DatagramAddress _ ) _ = LT
- compare _ (DatagramAddress _ ) = GT
- compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
-#endif
-data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()])
- | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()])
- | PeerIdentityFull UnifiedIdentity
+data PeerIdentity
+ = PeerIdentityUnknown (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ])
+ | PeerIdentityRef WaitingRef (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ])
+ | PeerIdentityFull UnifiedIdentity
-peerIdentity :: MonadIO m => Peer -> m PeerIdentity
-peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
+-- | Get currently known identity of the given peer
+getPeerIdentity :: MonadIO m => Peer -> m PeerIdentity
+getPeerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
- | PeerConnected (Connection PeerAddress)
- | PeerDropped
+data PeerState
+ = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
@@ -252,7 +267,16 @@ startServer serverOptions serverOrigHead logd' serverServices = do
let logd = writeTQueue serverErrorLog
forkServerThread server $ forever $ do
- logd' =<< atomically (readTQueue serverErrorLog)
+ logd' . (serverErrorPrefix serverOptions <>) =<< atomically (readTQueue serverErrorLog)
+
+ logt <- if
+ | serverTestLog serverOptions -> do
+ serverTestLog <- newTQueueIO
+ forkServerThread server $ forever $ do
+ logd' =<< atomically (readTQueue serverTestLog)
+ return $ writeTQueue serverTestLog
+ | otherwise -> do
+ return $ \_ -> return ()
forkServerThread server $ dataResponseWorker server
forkServerThread server $ forever $ do
@@ -302,13 +326,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -316,12 +345,10 @@ startServer serverOptions serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
case paddr of
+ CustomPeerAddress addr -> sendBytesToAddress addr msg
DatagramAddress addr -> void $ S.sendTo sock msg addr
-#ifdef ENABLE_ICE_SUPPORT
- PeerIceSession ice -> iceSend ice msg
-#endif
forkServerThread server $ forever $ do
readFlowIO serverControlFlow >>= \case
@@ -363,9 +390,13 @@ startServer serverOptions serverOrigHead logd' serverServices = do
prefs <- forM objs $ storeObject $ peerInStorage peer
identity <- readMVar serverIdentity_
let svcs = map someServiceID serverServices
- handlePacket identity secure peer chanSvc svcs header prefs
+ handlePacket paddr identity secure peer chanSvc svcs header prefs
peerLoop
Nothing -> do
+ case paddr of
+ DatagramAddress _ -> return ()
+ CustomPeerAddress caddr -> connectionToAddressClosed caddr
+
dropPeer peer
atomically $ writeTChan serverChanPeer peer
peerLoop
@@ -373,7 +404,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
ReceivedAnnounce addr _ -> do
void $ serverPeer' server addr
- erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow
+ erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd logt protocolRawPath protocolControlFlow
forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
@@ -385,16 +416,29 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, paddr, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams paddr peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
stopServer :: Server -> IO ()
-stopServer Server {..} = do
- mapM_ killThread =<< takeMVar serverThreads
+stopServer server@Server {..} = do
+ withMVar serverPeers $ \peers -> do
+ ( global, peerStates ) <- atomically $ (,)
+ <$> takeTMVar serverServiceStates
+ <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState)
+
+ forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do
+ ps <- forM peerStates $ \( peer, states ) ->
+ return $ ( peer, ) $ case M.lookup (serviceID proxy) states of
+ Just (SomeServiceState (_ :: Proxy ps) pstate)
+ | Just (Refl :: s :~: ps) <- eqT
+ -> pstate
+ _ -> emptyServiceState proxy
+ serviceStopServer proxy server gs ps
+ mapM_ killThread =<< takeMVar serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -502,9 +546,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -523,10 +565,10 @@ appendDistinct x (y:ys) | x == y = y : ys
| otherwise = y : appendDistinct x ys
appendDistinct x [] = [x]
-handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
-handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
+handlePacket :: PeerAddress -> UnifiedIdentity -> Bool
+ -> Peer -> TQueue ( Peer, PeerAddress, ServiceID, Ref, [ RawStreamReader ] ) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
+handlePacket paddr identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
let sidentity = idData identity
@@ -659,10 +701,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, paddr, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
@@ -741,7 +784,7 @@ finalizedChannel peer@Peer {..} ch self = do
-- Notify services about new peer
readTVar peerIdentityVar >>= \case
- PeerIdentityFull _ -> notifyServicesOfPeer peer
+ PeerIdentityFull _ -> notifyServicesOfPeer True peer
_ -> return ()
@@ -767,7 +810,7 @@ handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
PeerIdentityFull pid
| idData pid `precedes` wrappedLoad ref
-> validateAndUpdate (idUpdates pid) $ \_ -> do
- notifyServicesOfPeer peer
+ notifyServicesOfPeer False peer
_ -> return ()
@@ -779,16 +822,23 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do
-> do
writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid'
writeTChan (serverChanPeer $ peerServer peer) peer
- when (idData pid /= idData pid') $ notifyServicesOfPeer peer
+ when (pid /= pid') $ do
+ notifyServicesOfPeer False peer
| otherwise -> return ()
-notifyServicesOfPeer :: Peer -> STM ()
-notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
+notifyServicesOfPeer :: Bool -> Peer -> STM ()
+notifyServicesOfPeer new peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
+ paddr <- getPeerAddress peer
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] paddr peer $
+ if new then serviceNewPeer else serviceUpdatedPeer
+
+receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
+receivedFromCustomAddress Server {..} addr msg = do
+ writeFlowIO serverRawPath ( CustomPeerAddress addr, msg )
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer peerServer_ peerAddress = do
@@ -808,14 +858,8 @@ serverPeer server paddr = do
_ -> paddr
serverPeer' server (DatagramAddress paddr')
-#ifdef ENABLE_ICE_SUPPORT
-serverPeerIce :: Server -> IceSession -> IO Peer
-serverPeerIce server@Server {..} ice = do
- let paddr = PeerIceSession ice
- peer <- serverPeer' server paddr
- iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath
- return peer
-#endif
+serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer
+serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr)
serverPeer' :: Server -> PeerAddress -> IO Peer
serverPeer' server paddr = do
@@ -829,6 +873,13 @@ serverPeer' server paddr = do
writeFlow (serverControlFlow server) (RequestConnection paddr)
return peer
+findPeer :: Server -> (Peer -> IO Bool) -> IO (Maybe Peer)
+findPeer server test = withMVar (serverPeers server) (helper . M.elems)
+ where
+ helper (p : ps) = test p >>= \case True -> return (Just p)
+ False -> helper ps
+ helper [] = return Nothing
+
dropPeer :: MonadIO m => Peer -> m ()
dropPeer peer = liftIO $ do
modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do
@@ -856,19 +907,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -901,17 +982,19 @@ sendToPeerWith peer fobj = do
Left err -> throwError $ fromErebosError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService peer handler = do
+ paddr <- getPeerAddress peer
+ runPeerServiceOn Nothing [] paddr peer handler
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> PeerAddress -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams paddr peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -933,9 +1016,11 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
let inp = ServiceInput
{ svcAttributes = attr
, svcPeer = peer
+ , svcPeerAddress = paddr
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do
@@ -951,35 +1036,37 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs
putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global
_ -> do
- atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer)
+ atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show paddr
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
-foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
-foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
-foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
-foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO ()
-
-data InetAddress = InetAddress { fromInetAddress :: IP.IP }
-
-instance F.Storable InetAddress where
- sizeOf _ = sizeOf (undefined :: CInt) + 16
- alignment _ = 8
-
- peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case
- AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt))
- AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8)
- _ -> fail "InetAddress: unknown family"
- poke ptr (InetAddress addr) = case addr of
- IP.IPv4 ip -> do
- pokeByteOff ptr 0 (packFamily AF_INET)
- pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip)
- IP.IPv6 ip -> do
- pokeByteOff ptr 0 (packFamily AF_INET6)
- pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip)
+foreign import ccall unsafe "Network/ifaddrs.h erebos_join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
+foreign import ccall unsafe "Network/ifaddrs.h erebos_local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
+foreign import ccall unsafe "Network/ifaddrs.h erebos_broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
+foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO ()
joinMulticast :: Socket -> IO [ Word32 ]
joinMulticast sock =
@@ -1007,7 +1094,7 @@ getServerAddresses Server {..} = do
count <- fromIntegral <$> peek pcount
res <- peekArray count ptr
cFree ptr
- return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res
+ return $ map (inetToSockAddr . (, serverPort serverOptions )) res
getBroadcastAddresses :: PortNumber -> IO [SockAddr]
getBroadcastAddresses port = do