From 0cc2b3ee83e46608495f4fb92ea8c2ca48b4e306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 31 May 2025 20:35:48 +0200 Subject: Refactor discovery state to custom data types --- src/Erebos/Discovery.hs | 146 +++++++++++++++++++++++++++--------------------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index f156c85..63400cb 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -3,7 +3,7 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), - DiscoveryConnection(..) + DiscoveryConnection(..), ) where import Control.Concurrent @@ -90,12 +90,13 @@ instance Storable DiscoveryService where DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn - where storeConnection ctype conn = do - storeText "connection" $ ctype - storeRawRef "source" $ dconnSource conn - storeRawRef "target" $ dconnTarget conn - storeMbText "address" $ dconnAddress conn - storeMbRef "ice-info" $ dconnIceInfo conn + where + storeConnection ctype DiscoveryConnection {..} = do + storeText "connection" $ ctype + storeRawRef "source" dconnSource + storeRawRef "target" dconnTarget + storeMbText "address" dconnAddress + storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum [ do @@ -120,14 +121,15 @@ instance Storable DiscoveryService where , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] - where loadConnection ctype ctor = do - ctype' <- loadText "connection" - guard $ ctype == ctype' - return . ctor =<< DiscoveryConnection - <$> loadRawRef "source" - <*> loadRawRef "target" - <*> loadMbText "address" - <*> loadMbRef "ice-info" + where + loadConnection ctype ctor = do + ctype' <- loadText "connection" + guard $ ctype == ctype' + dconnSource <- loadRawRef "source" + dconnTarget <- loadRawRef "target" + dconnAddress <- loadMbText "address" + dconnIceInfo <- loadMbRef "ice-info" + return $ ctor DiscoveryConnection {..} data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int @@ -135,22 +137,38 @@ data DiscoveryPeer = DiscoveryPeer , dpAddress :: [ Text ] #ifdef ENABLE_ICE_SUPPORT , dpIceSession :: Maybe IceSession +#else + , dpIceSession :: Maybe () #endif } +data DiscoveryPeerState = DiscoveryPeerState +#ifdef ENABLE_ICE_SUPPORT + { dpsIceConfig :: Maybe IceConfig +#else + { dpsIceConfig :: Maybe () +#endif + } + +data DiscoveryGlobalState = DiscoveryGlobalState + { dgsPeers :: Map RefDigest DiscoveryPeer + } + instance Service DiscoveryService where serviceID _ = mkServiceID "dd59c89c-69cc-4703-b75b-4ddcd4b3c23c" type ServiceAttributes DiscoveryService = DiscoveryAttributes defaultServiceAttributes _ = defaultDiscoveryAttributes -#ifdef ENABLE_ICE_SUPPORT - type ServiceState DiscoveryService = Maybe IceConfig - emptyServiceState _ = Nothing -#endif + type ServiceState DiscoveryService = DiscoveryPeerState + emptyServiceState _ = DiscoveryPeerState + { dpsIceConfig = Nothing + } - type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer - emptyServiceGlobalState _ = M.empty + type ServiceGlobalState DiscoveryService = DiscoveryGlobalState + emptyServiceGlobalState _ = DiscoveryGlobalState + { dgsPeers = M.empty + } serviceHandler msg = case fromStored msg of DiscoverySelf addrs priority -> do @@ -171,15 +189,14 @@ instance Service DiscoveryService where | otherwise -> return Nothing - forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer - { dpPriority = fromMaybe 0 priority - , dpPeer = Just peer - , dpAddress = addrs -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do + let dp = DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = addrs + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } attrs <- asks svcAttributes replyPacket $ DiscoveryAcknowledged matchedAddrs (discoveryStunServer attrs) @@ -207,22 +224,23 @@ instance Service DiscoveryService where cfg <- liftIO $ iceCreateConfig (toIceServer stunServer stunPort) (toIceServer turnServer turnPort) - svcSet cfg + svcModify $ \s -> s { dpsIceConfig = cfg } #endif return () DiscoverySearch ref -> do - dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal + dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer DiscoveryResult ref [] -> do svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" DiscoveryResult ref addrs -> do + let dgst = refDigest ref -- TODO: check if we really requested that server <- asks svcServer self <- svcSelf - mbIceConfig <- svcGet + mbIceConfig <- dpsIceConfig <$> svcGet discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer @@ -240,12 +258,13 @@ instance Service DiscoveryService where Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } + let dp = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Just ice + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } #else -> do return () @@ -256,14 +275,13 @@ instance Service DiscoveryService where getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- serverPeer server (addrAddress saddr) runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + let dp = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Just peer + , dpAddress = [] + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } | otherwise -> do runAsService $ do @@ -273,31 +291,30 @@ instance Service DiscoveryService where self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + then if #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion + -- request for us, create ICE sesssion + | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - svcGet >>= \case + dpsIceConfig <$> svcGet >>= \case Just config -> do liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do rinfo <- iceRemoteInfo ice res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } case res of - Right _ -> do - case dconnIceInfo conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" + Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err Nothing -> do svcPrint $ "Discovery: ICE request from peer without ICE configuration" -#else - return () #endif - else do + | otherwise -> do + svcPrint $ "Discovery: unsupported connection request" + + else do -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal + mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp @@ -307,29 +324,28 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf - dpeers <- svcGetGlobal + dpeers <- dgsPeers <$> svcGetGlobal if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) then do -- response to our request, try to connect to the peer -#ifdef ENABLE_ICE_SUPPORT server <- asks svcServer if | Just addr <- dconnAddress conn , [ipaddr, port] <- words (T.unpack addr) -> do saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) [] Nothing + svcModifyGlobal $ \s -> s + { dgsPeers = M.insert (refDigest $ dconnTarget conn) + (DiscoveryPeer 0 (Just peer) [] Nothing) $ dgsPeers s } +#ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice +#endif | otherwise -> svcPrint $ "Discovery: connection request failed" -#else - return () -#endif else do -- response to relayed request case M.lookup (refDigest $ dconnSource conn) dpeers of -- cgit v1.2.3 From e8f3cbe08071c0507abafa76d0bf9d32908bbd7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 1 Jun 2025 17:39:58 +0200 Subject: Discovery search using global state to ask new peers --- main/Main.hs | 4 ++-- main/Test.hs | 5 +---- src/Erebos/Discovery.hs | 42 ++++++++++++++++++++++++++++++++++++++++-- src/Erebos/Network.hs | 24 +++++++++++++++++++++++- src/Erebos/Storage.hs | 2 +- test/discovery.test | 37 +++++++++++++++++++++++++++++++++++++ 6 files changed, 104 insertions(+), 10 deletions(-) diff --git a/main/Main.hs b/main/Main.hs index e055275..59ea7c3 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -893,14 +893,14 @@ cmdDiscoveryInit = void $ do cmdDiscovery :: Command cmdDiscovery = void $ do - Just peer <- gets csIcePeer + server <- asks ciServer st <- getStorage sref <- asks ciLine eprint <- asks ciPrint liftIO $ readRef st (BC.pack sref) >>= \case Nothing -> error "ref does not exist" Just ref -> do - res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref + res <- runExceptT $ discoverySearch server ref case res of Right _ -> return () Left err -> eprint err diff --git a/main/Test.hs b/main/Test.hs index 0181575..75eaaaf 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -880,8 +880,5 @@ cmdDiscoveryConnect = do st <- asks tiStorage [ tref ] <- asks tiParams Just ref <- liftIO $ readRef st $ encodeUtf8 tref - Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + discoverySearch rsServer ref diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 63400cb..1691ad9 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -4,6 +4,8 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -12,9 +14,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -152,6 +158,7 @@ data DiscoveryPeerState = DiscoveryPeerState data DiscoveryGlobalState = DiscoveryGlobalState { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -168,6 +175,7 @@ instance Service DiscoveryService where type ServiceGlobalState DiscoveryService = DiscoveryGlobalState emptyServiceGlobalState _ = DiscoveryGlobalState { dgsPeers = M.empty + , dgsSearchingFor = S.empty } serviceHandler msg = case fromStored msg of @@ -290,7 +298,7 @@ instance Service DiscoveryService where DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if refDigest (dconnTarget conn) `elem` identityDigests self then if #ifdef ENABLE_ICE_SUPPORT -- request for us, create ICE sesssion @@ -325,7 +333,7 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf dpeers <- dgsPeers <$> svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + if refDigest (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer server <- asks svcServer @@ -356,6 +364,7 @@ instance Service DiscoveryService where serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer + st <- getStorage let addrToText saddr = do ( addr, port ) <- IP.fromSockAddr saddr @@ -367,5 +376,34 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + liftIO (refFromDigest st dgst) >>= \case + Just ref -> sendToPeer peer $ DiscoverySearch ref + Nothing -> return () + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m () +discoverySearch server ref = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ refDigest ref `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch ref diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index d8e868a..32d06f2 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -24,6 +24,7 @@ module Erebos.Network ( sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -899,7 +900,7 @@ sendToPeerWith peer fobj = do Left err -> throwError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest @@ -954,6 +955,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m proxy. (Service s, MonadIO m, MonadError String m) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs index c1e9664..9ccfdde 100644 --- a/src/Erebos/Storage.hs +++ b/src/Erebos/Storage.hs @@ -4,7 +4,7 @@ module Erebos.Storage ( deriveEphemeralStorage, derivePartialStorage, Ref, PartialRef, RefDigest, - refDigest, + refDigest, refFromDigest, readRef, showRef, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, diff --git a/test/discovery.test b/test/discovery.test index f2dddb7..5f6c443 100644 --- a/test/discovery.test +++ b/test/discovery.test @@ -73,3 +73,40 @@ test ManualDiscovery: send "stop-server" to p for p in [ pd, p1, p2 ]: expect /stop-server-done/ from p + + # Test delayed discovery with new peer + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + with p1: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p1.node.ip} 29665/ + /peer [12] id Device1 Owner1/ + + send "discovery-connect $id" to p2 + + with p2: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p2.node.ip} 29665/ + /peer [12] id Device2 Owner2/ + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p -- cgit v1.2.3 From 17febf69d09832cd82edc2048afcb90b2520d936 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Thu, 5 Jun 2025 22:08:12 +0200 Subject: Join pjproject worker thread before freeing config struct --- src/Erebos/ICE/pjproject.c | 90 ++++++++++++++++++++++++++-------------------- src/Erebos/ICE/pjproject.h | 6 ++-- 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index e79fb9d..54da58d 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -1,6 +1,7 @@ #include "pjproject.h" #include "Erebos/ICE_stub.h" +#include #include #include #include @@ -15,6 +16,13 @@ static struct pj_sockaddr def_addr; } ice; +struct erebos_ice_cfg +{ + pj_ice_strans_cfg cfg; + pj_thread_t * thread; + atomic_bool exit; +}; + struct user_data { pj_ice_sess_role role; @@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status) fprintf(stderr, "ICE: %s: %s\n", msg, err); } -static int ice_worker_thread(void * vcfg) +static int ice_worker_thread( void * vcfg ) { - pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg; + struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg ); - while (true) { + while( ! ecfg->exit ){ pj_time_val max_timeout = { 0, 0 }; pj_time_val timeout = { 0, 0 }; max_timeout.msec = 500; - pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout); + pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout ); pj_assert(timeout.sec >= 0 && timeout.msec >= 0); if (timeout.msec >= 1000) @@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg) if (PJ_TIME_VAL_GT(timeout, max_timeout)) timeout = max_timeout; - int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout); + int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout ); if (c < 0) pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); } @@ -131,80 +139,84 @@ exit: pthread_mutex_unlock(&mutex); } -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ) { ice_init(); - pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) ); - pj_ice_strans_cfg_default( cfg ); + struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) ); + pj_ice_strans_cfg_default( &ecfg->cfg ); + ecfg->exit = false; + ecfg->thread = NULL; - cfg->stun_cfg.pf = &ice.cp.factory; + ecfg->cfg.stun_cfg.pf = &ice.cp.factory; if( pj_timer_heap_create( ice.pool, 100, - &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){ + &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){ fprintf( stderr, "pj_timer_heap_create failed\n" ); goto fail; } - if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){ + if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){ fprintf( stderr, "pj_ioqueue_create failed\n" ); goto fail; } - pj_thread_t * thread; if( pj_thread_create( ice.pool, NULL, &ice_worker_thread, - cfg, 0, 0, &thread ) != PJ_SUCCESS ){ + ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){ fprintf( stderr, "pj_thread_create failed\n" ); goto fail; } - cfg->af = pj_AF_INET(); - cfg->opt.aggressive = PJ_TRUE; + ecfg->cfg.af = pj_AF_INET(); + ecfg->cfg.opt.aggressive = PJ_TRUE; if( stun_server ){ - cfg->stun.server.ptr = malloc( strlen( stun_server )); - pj_strcpy2( &cfg->stun.server, stun_server ); + ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server )); + pj_strcpy2( &ecfg->cfg.stun.server, stun_server ); if( stun_port ) - cfg->stun.port = stun_port; + ecfg->cfg.stun.port = stun_port; } if( turn_server ){ - cfg->turn.server.ptr = malloc( strlen( turn_server )); - pj_strcpy2( &cfg->turn.server, turn_server ); + ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server )); + pj_strcpy2( &ecfg->cfg.turn.server, turn_server ); if( turn_port ) - cfg->turn.port = turn_port; - cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; - cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - cfg->turn.conn_type = PJ_TURN_TP_UDP; + ecfg->cfg.turn.port = turn_port; + ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; + ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP; } - return cfg; + return ecfg; fail: - ice_cfg_free( cfg ); + ice_cfg_free( ecfg ); return NULL; } -void ice_cfg_free( pj_ice_strans_cfg * cfg ) +void ice_cfg_free( struct erebos_ice_cfg * ecfg ) { - if( ! cfg ) + if( ! ecfg ) return; - if( cfg->turn.server.ptr ) - free( cfg->turn.server.ptr ); + ecfg->exit = true; + pj_thread_join( ecfg->thread ); + + if( ecfg->cfg.turn.server.ptr ) + free( ecfg->cfg.turn.server.ptr ); - if( cfg->stun.server.ptr ) - free( cfg->stun.server.ptr ); + if( ecfg->cfg.stun.server.ptr ) + free( ecfg->cfg.stun.server.ptr ); - if( cfg->stun_cfg.ioqueue ) - pj_ioqueue_destroy( cfg->stun_cfg.ioqueue ); + if( ecfg->cfg.stun_cfg.ioqueue ) + pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue ); - if( cfg->stun_cfg.timer_heap ) - pj_timer_heap_destroy( cfg->stun_cfg.timer_heap ); + if( ecfg->cfg.stun_cfg.timer_heap ) + pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap ); - free( cfg ); + free( ecfg ); } -pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role, +pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { ice_init(); @@ -221,7 +233,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role .on_ice_complete = cb_on_ice_complete, }; - pj_status_t status = pj_ice_strans_create( NULL, cfg, 1, + pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1, udata, &icecb, &res ); if (status != PJ_SUCCESS) diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index e4fcbdb..1d20891 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -3,11 +3,11 @@ #include #include -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); -void ice_cfg_free( pj_ice_strans_cfg * cfg ); +void ice_cfg_free( struct erebos_ice_cfg * cfg ); -pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role, +pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); void ice_destroy(pj_ice_strans * strans); -- cgit v1.2.3 From cc7ac84b840973f602c4e59cca22bf304321db6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sat, 7 Jun 2025 20:36:11 +0200 Subject: Delay ICE config initialization until connection request --- src/Erebos/Discovery.hs | 136 +++++++++++++++++++++++++++--------------------- 1 file changed, 77 insertions(+), 59 deletions(-) diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 1691ad9..9b6eccf 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -36,6 +36,13 @@ import Erebos.Service import Erebos.Storage +#ifndef ENABLE_ICE_SUPPORT +type IceConfig = () +type IceSession = () +type IceRemoteInfo = Stored Object +#endif + + data DiscoveryService = DiscoverySelf [ Text ] (Maybe Int) | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) @@ -63,11 +70,7 @@ data DiscoveryConnection = DiscoveryConnection { dconnSource :: Ref , dconnTarget :: Ref , dconnAddress :: Maybe Text -#ifdef ENABLE_ICE_SUPPORT , dconnIceInfo :: Maybe IceRemoteInfo -#else - , dconnIceInfo :: Maybe (Stored Object) -#endif } emptyConnection :: Ref -> Ref -> DiscoveryConnection @@ -141,19 +144,13 @@ data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer , dpAddress :: [ Text ] -#ifdef ENABLE_ICE_SUPPORT , dpIceSession :: Maybe IceSession -#else - , dpIceSession :: Maybe () -#endif } data DiscoveryPeerState = DiscoveryPeerState -#ifdef ENABLE_ICE_SUPPORT - { dpsIceConfig :: Maybe IceConfig -#else - { dpsIceConfig :: Maybe () -#endif + { dpsStunServer :: Maybe ( Text, Word16 ) + , dpsTurnServer :: Maybe ( Text, Word16 ) + , dpsIceConfig :: Maybe IceConfig } data DiscoveryGlobalState = DiscoveryGlobalState @@ -169,7 +166,9 @@ instance Service DiscoveryService where type ServiceState DiscoveryService = DiscoveryPeerState emptyServiceState _ = DiscoveryPeerState - { dpsIceConfig = Nothing + { dpsStunServer = Nothing + , dpsTurnServer = Nothing + , dpsIceConfig = Nothing } type ServiceGlobalState DiscoveryService = DiscoveryGlobalState @@ -213,7 +212,6 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do -#ifdef ENABLE_ICE_SUPPORT paddr <- asks (peerAddress . svcPeer) >>= return . \case (DatagramAddress saddr) -> case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, _) @@ -229,12 +227,10 @@ instance Service DiscoveryService where toIceServer (Just server) Nothing = Just ( server, 0 ) toIceServer (Just server) (Just port) = Just ( server, port ) - cfg <- liftIO $ iceCreateConfig - (toIceServer stunServer stunPort) - (toIceServer turnServer turnPort) - svcModify $ \s -> s { dpsIceConfig = cfg } -#endif - return () + svcModify $ \s -> s + { dpsStunServer = toIceServer stunServer stunPort + , dpsTurnServer = toIceServer turnServer turnPort + } DiscoverySearch ref -> do dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal @@ -248,52 +244,54 @@ instance Service DiscoveryService where -- TODO: check if we really requested that server <- asks svcServer self <- svcSelf - mbIceConfig <- dpsIceConfig <$> svcGet discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if + forM_ addrs $ \addr -> if | addr == T.pack "ICE" -#ifdef ENABLE_ICE_SUPPORT - , Just config <- mbIceConfig -> do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - - runAsService $ do - let dp = DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } - svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } -#else - -> do - return () +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let dp = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Just ice + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } + + Nothing -> do + return () #endif + return () | [ ipaddr, port ] <- words (T.unpack addr) -> do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - let dp = DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] - , dpIceSession = Nothing - } - svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } + void $ liftIO $ forkIO $ do + saddr <- head <$> + getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) + peer <- serverPeer server (addrAddress saddr) + runAsService $ do + let dp = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Just peer + , dpAddress = [] + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } | otherwise -> do - runAsService $ do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr DiscoveryConnectionRequest conn -> do self <- svcSelf @@ -305,7 +303,7 @@ instance Service DiscoveryService where | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - dpsIceConfig <$> svcGet >>= \case + getIceConfig >>= \case Just config -> do liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do rinfo <- iceRemoteInfo ice @@ -314,7 +312,7 @@ instance Service DiscoveryService where Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err Nothing -> do - svcPrint $ "Discovery: ICE request from peer without ICE configuration" + return () #endif | otherwise -> do @@ -393,6 +391,26 @@ identityDigests :: Foldable f => Identity f -> [ RefDigest ] identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid +getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig) +getIceConfig = do + dpsIceConfig <$> svcGet >>= \case + Just cfg -> return $ Just cfg + Nothing -> do +#ifdef ENABLE_ICE_SUPPORT + stun <- dpsStunServer <$> svcGet + turn <- dpsTurnServer <$> svcGet + liftIO (iceCreateConfig stun turn) >>= \case + Just cfg -> do + svcModify $ \s -> s { dpsIceConfig = Just cfg } + return $ Just cfg + Nothing -> do + svcPrint $ "Discovery: failed to create ICE config" + return Nothing +#else + return Nothing +#endif + + discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m () discoverySearch server ref = do peers <- liftIO $ getCurrentPeerList server -- cgit v1.2.3 From 934d7c13bc144e8db23c53058e605f508c1fda77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 8 Jun 2025 22:03:17 +0200 Subject: Update instead of replacing discovery peer info --- src/Erebos/Discovery.hs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 9b6eccf..d203866 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -147,6 +147,14 @@ data DiscoveryPeer = DiscoveryPeer , dpIceSession :: Maybe IceSession } +emptyPeer :: DiscoveryPeer +emptyPeer = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Nothing + } + data DiscoveryPeerState = DiscoveryPeerState { dpsStunServer :: Maybe ( Text, Word16 ) , dpsTurnServer :: Maybe ( Text, Word16 ) @@ -263,13 +271,8 @@ instance Service DiscoveryService where Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err runAsService $ do - let dp = DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } - svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } Nothing -> do return () @@ -282,13 +285,8 @@ instance Service DiscoveryService where getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- serverPeer server (addrAddress saddr) runAsService $ do - let dp = DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] - , dpIceSession = Nothing - } - svcModifyGlobal $ \s -> s { dgsPeers = M.insert dgst dp $ dgsPeers s } + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } | otherwise -> do svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr @@ -340,9 +338,9 @@ instance Service DiscoveryService where saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) + let upd dp = dp { dpPeer = Just peer } svcModifyGlobal $ \s -> s - { dgsPeers = M.insert (refDigest $ dconnTarget conn) - (DiscoveryPeer 0 (Just peer) [] Nothing) $ dgsPeers s } + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s } #ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers -- cgit v1.2.3 From 7fa7cdeb5a8c0c784145df852da8ec4c18cb6e6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Tue, 10 Jun 2025 22:17:47 +0200 Subject: Flow: use bounded queues instead of MVars --- src/Erebos/Flow.hs | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs index ba2607a..1e1a521 100644 --- a/src/Erebos/Flow.hs +++ b/src/Erebos/Flow.hs @@ -11,54 +11,53 @@ module Erebos.Flow ( import Control.Concurrent.STM -data Flow r w = Flow (TMVar [r]) (TMVar [w]) - | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') +data Flow r w + = Flow (TBQueue r) (TBQueue w) + | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') type SymFlow a = Flow a a newFlow :: STM (Flow a b, Flow b a) newFlow = do - x <- newEmptyTMVar - y <- newEmptyTMVar + x <- newTBQueue 16 + y <- newTBQueue 16 return (Flow x y, Flow y x) newFlowIO :: IO (Flow a b, Flow b a) newFlowIO = atomically newFlow readFlow :: Flow r w -> STM r -readFlow (Flow rvar _) = takeTMVar rvar >>= \case - (x:[]) -> return x - (x:xs) -> putTMVar rvar xs >> return x - [] -> error "Flow: empty list" +readFlow (Flow rvar _) = readTBQueue rvar readFlow (MappedFlow f _ up) = f <$> readFlow up tryReadFlow :: Flow r w -> STM (Maybe r) -tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case - Just (x:[]) -> return (Just x) - Just (x:xs) -> putTMVar rvar xs >> return (Just x) - Just [] -> error "Flow: empty list" - Nothing -> return Nothing +tryReadFlow (Flow rvar _) = tryReadTBQueue rvar tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up canReadFlow :: Flow r w -> STM Bool -canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar +canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar canReadFlow (MappedFlow _ _ up) = canReadFlow up writeFlow :: Flow r w -> w -> STM () -writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (Flow _ wvar) = writeTBQueue wvar writeFlow (MappedFlow _ f up) = writeFlow up . f writeFlowBulk :: Flow r w -> [w] -> STM () writeFlowBulk _ [] = return () -writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs tryWriteFlow :: Flow r w -> w -> STM Bool -tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[]) -tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f +tryWriteFlow (Flow _ wvar) x = do + isFullTBQueue wvar >>= \case + True -> return False + False -> do + writeTBQueue wvar x + return True +tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x canWriteFlow :: Flow r w -> STM Bool -canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar +canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar canWriteFlow (MappedFlow _ _ up) = canWriteFlow up readFlowIO :: Flow r w -> IO r -- cgit v1.2.3 From 5b3c52c50b5301d009fb9a0d44b4cf91d50f6de4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Fri, 13 Jun 2025 22:49:29 +0200 Subject: Terminate ICE thread when server is stopped Changelog: Improved handling of ICE connections --- src/Erebos/Discovery.hs | 6 ++++++ src/Erebos/ICE.chs | 7 +++++++ src/Erebos/ICE/pjproject.c | 7 +++++++ src/Erebos/ICE/pjproject.h | 1 + src/Erebos/Network.hs | 17 +++++++++++++++-- src/Erebos/Service.hs | 3 +++ 6 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index d203866..d098f98 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -384,6 +384,12 @@ instance Service DiscoveryService where Just ref -> sendToPeer peer $ DiscoverySearch ref Nothing -> return () +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + identityDigests :: Foldable f => Identity f -> [ RefDigest ] identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index 6f61451..cc2fdcc 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -8,6 +8,7 @@ module Erebos.ICE ( IceRemoteInfo, iceCreateConfig, + iceStopThread, iceCreateSession, iceDestroy, iceRemoteInfo, @@ -138,6 +139,12 @@ iceCreateConfig stun turn = then return Nothing else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg +foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread" + ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO () + +iceStopThread :: IceConfig -> IO () +iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread + {#pointer *pj_ice_strans as ^ #} iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index 54da58d..e9446fe 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -216,6 +216,13 @@ void ice_cfg_free( struct erebos_ice_cfg * ecfg ) free( ecfg ); } +void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) +{ + if( ! ecfg ) + return; + ecfg->exit = true; +} + pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index 1d20891..c31e227 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -6,6 +6,7 @@ struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); void ice_cfg_free( struct erebos_ice_cfg * cfg ); +void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 32d06f2..ed68ed4 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -392,8 +392,21 @@ startServer serverOptions serverOrigHead logd' serverServices = do return server stopServer :: Server -> IO () -stopServer Server {..} = do - mapM_ killThread =<< takeMVar serverThreads +stopServer server@Server {..} = do + withMVar serverPeers $ \peers -> do + ( global, peerStates ) <- atomically $ (,) + <$> takeTMVar serverServiceStates + <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState) + + forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do + ps <- forM peerStates $ \( peer, states ) -> + return $ ( peer, ) $ case M.lookup (serviceID proxy) states of + Just (SomeServiceState (_ :: Proxy ps) pstate) + | Just (Refl :: s :~: ps) <- eqT + -> pstate + _ -> emptyServiceState proxy + serviceStopServer proxy server gs ps + mapM_ killThread =<< takeMVar serverThreads dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index d1943e1..9cf71e7 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -71,6 +71,9 @@ class ( serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] serviceStorageWatchers _ = [] + serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO () + serviceStopServer _ _ _ _ = return () + data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) -- cgit v1.2.3