diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Erebos/Conversation.hs | 2 | ||||
| -rw-r--r-- | src/Erebos/Discovery.hs | 11 | ||||
| -rw-r--r-- | src/Erebos/ICE.chs | 7 | ||||
| -rw-r--r-- | src/Erebos/ICE/pjproject.c | 7 | ||||
| -rw-r--r-- | src/Erebos/ICE/pjproject.h | 1 | ||||
| -rw-r--r-- | src/Erebos/Message.hs | 21 | ||||
| -rw-r--r-- | src/Erebos/Network.hs | 36 | ||||
| -rw-r--r-- | src/Erebos/Service.hs | 7 |
8 files changed, 74 insertions, 18 deletions
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index f0ffa70..c165343 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con directMessageConversation peer = do (find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case Just thread -> return $ DirectMessageConversation thread - Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] + Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] [] chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index d203866..ef289cb 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -244,8 +244,9 @@ instance Service DiscoveryService where dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer - DiscoveryResult ref [] -> do - svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" + DiscoveryResult _ [] -> do + -- not found + return () DiscoveryResult ref addrs -> do let dgst = refDigest ref @@ -384,6 +385,12 @@ instance Service DiscoveryService where Just ref -> sendToPeer peer $ DiscoverySearch ref Nothing -> return () +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + identityDigests :: Foldable f => Identity f -> [ RefDigest ] identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index 6f61451..cc2fdcc 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -8,6 +8,7 @@ module Erebos.ICE ( IceRemoteInfo, iceCreateConfig, + iceStopThread, iceCreateSession, iceDestroy, iceRemoteInfo, @@ -138,6 +139,12 @@ iceCreateConfig stun turn = then return Nothing else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg +foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread" + ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO () + +iceStopThread :: IceConfig -> IO () +iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread + {#pointer *pj_ice_strans as ^ #} iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index 54da58d..e9446fe 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -216,6 +216,13 @@ void ice_cfg_free( struct erebos_ice_cfg * ecfg ) free( ecfg ); } +void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) +{ + if( ! ecfg ) + return; + ecfg->exit = true; +} + pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index 1d20891..c31e227 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -6,6 +6,7 @@ struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); void ice_cfg_free( struct erebos_ice_cfg * cfg ); +void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); diff --git a/src/Erebos/Message.hs b/src/Erebos/Message.hs index 5ef27f3..78fb5e7 100644 --- a/src/Erebos/Message.hs +++ b/src/Erebos/Message.hs @@ -29,6 +29,7 @@ import qualified Data.Text as T import Data.Time.Format import Data.Time.LocalTime +import Erebos.Discovery import Erebos.Identity import Erebos.Network import Erebos.Service @@ -103,8 +104,10 @@ instance Service DirectMessage where serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal - serviceStorageWatchers _ = (:[]) $ - SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + serviceStorageWatchers _ = + [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers + ] data MessageState = MessageState @@ -210,12 +213,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do else do return unchanged +findMissingPeers :: Server -> DirectMessageThreads -> ExceptT String IO () +findMissingPeers server threads = do + forM_ (toThreadList threads) $ \thread -> do + when (msgHead thread /= msgReceived thread) $ do + mapM_ (discoverySearch server) $ map storedRef $ idDataF $ msgPeer thread + data DirectMessageThread = DirectMessageThread { msgPeer :: ComposedIdentity - , msgHead :: [Stored DirectMessage] - , msgSent :: [Stored DirectMessage] - , msgSeen :: [Stored DirectMessage] + , msgHead :: [ Stored DirectMessage ] + , msgSent :: [ Stored DirectMessage ] + , msgSeen :: [ Stored DirectMessage ] + , msgReceived :: [ Stored DirectMessage ] } threadToList :: DirectMessageThread -> [DirectMessage] @@ -249,6 +259,7 @@ messageThreadFor peer mss = , msgHead = filterAncestors $ ready ++ received , msgSent = filterAncestors $ sent ++ received , msgSeen = filterAncestors $ ready ++ seen + , msgReceived = filterAncestors $ received } diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 32d06f2..fb2b5e9 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -301,13 +301,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do announceUpdate idt forM_ serverServices $ \(SomeService service _) -> do - forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do - withMVar serverPeers $ mapM_ $ \peer -> atomically $ do - readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue serverIOActions $ do - runPeerService peer $ act x - _ -> return () + forM_ (serviceStorageWatchers service) $ \case + SomeStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + GlobalStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + atomically $ writeTQueue serverIOActions $ do + act server x forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -392,8 +397,21 @@ startServer serverOptions serverOrigHead logd' serverServices = do return server stopServer :: Server -> IO () -stopServer Server {..} = do - mapM_ killThread =<< takeMVar serverThreads +stopServer server@Server {..} = do + withMVar serverPeers $ \peers -> do + ( global, peerStates ) <- atomically $ (,) + <$> takeTMVar serverServiceStates + <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState) + + forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do + ps <- forM peerStates $ \( peer, states ) -> + return $ ( peer, ) $ case M.lookup (serviceID proxy) states of + Just (SomeServiceState (_ :: Proxy ps) pstate) + | Just (Refl :: s :~: ps) <- eqT + -> pstate + _ -> emptyServiceState proxy + serviceStopServer proxy server gs ps + mapM_ killThread =<< takeMVar serverThreads dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index d1943e1..b5e52dd 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -71,6 +71,9 @@ class ( serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] serviceStorageWatchers _ = [] + serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO () + serviceStopServer _ _ _ _ = return () + data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) @@ -100,7 +103,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) -data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) +data SomeStorageWatcher s + = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT String IO ()) newtype ServiceID = ServiceID UUID |