diff options
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | main/Main.hs | 43 | ||||
-rw-r--r-- | main/Test.hs | 14 | ||||
-rw-r--r-- | src/Erebos/Conversation.hs | 2 | ||||
-rw-r--r-- | src/Erebos/Discovery.hs | 278 | ||||
-rw-r--r-- | src/Erebos/Flow.hs | 37 | ||||
-rw-r--r-- | src/Erebos/ICE.chs | 7 | ||||
-rw-r--r-- | src/Erebos/ICE/pjproject.c | 97 | ||||
-rw-r--r-- | src/Erebos/ICE/pjproject.h | 7 | ||||
-rw-r--r-- | src/Erebos/Message.hs | 21 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 60 | ||||
-rw-r--r-- | src/Erebos/Service.hs | 7 | ||||
-rw-r--r-- | src/Erebos/Storage.hs | 2 | ||||
-rw-r--r-- | test/common.test | 3 | ||||
-rw-r--r-- | test/discovery.test | 37 | ||||
-rw-r--r-- | test/message.test | 114 |
16 files changed, 543 insertions, 209 deletions
@@ -102,11 +102,13 @@ Test chatroom [19:03] Some Name: Hi `<message>` : Send `<message>` to selected conversation. -`/history` -: Show message history of the selected conversation. +`/history [<number>]` +: Show message history of the selected conversation, or the one identified by + `<number>` if given. -`/details` -: Show information about the selected conversations, contact or peer. +`/details [<number>]` +: Show information about the selected conversations, contact or peer; or the + one identified by `<number>` if given. ### Chatrooms @@ -137,12 +139,13 @@ are signed, so message author can not be forged. : Leave the chatroom. User will no longer be listed as a member and erebos tool will no longer collect message of this chatroom. -`/delete` -: Delete the chatroom; this action is only synchronized with devices belonging -to the current user and does not affect the chatroom state for others. Due to -the storage design, the chatroom data will not be purged from the local state -history, but the chatroom will no longer be listed as available and no futher -updates for this chatroom will be collected or shared with other peers. +`/delete [<number>]` +: Delete the chatroom (currently selected one, or the one identified by + `<number>`); this action is only synchronized with devices belonging to the + current user and does not affect the chatroom state for others. Due to the + storage design, the chatroom data will not be purged from the local state + history, but the chatroom will no longer be listed as available and no futher + updates for this chatroom will be collected or shared with other peers. ### Add contacts diff --git a/main/Main.hs b/main/Main.hs index 8a4729f..59ea7c3 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -479,7 +479,10 @@ getSelectedChatroom = gets csContext >>= \case _ -> throwError "no chatroom selected" getSelectedConversation :: CommandM Conversation -getSelectedConversation = gets csContext >>= \case +getSelectedConversation = gets csContext >>= getConversationFromContext + +getConversationFromContext :: CommandContext -> CommandM Conversation +getConversationFromContext = \case SelectedPeer peer -> peerIdentity peer >>= \case PeerIdentityFull pid -> directMessageConversation $ finalOwner pid _ -> throwError "incomplete peer identity" @@ -493,6 +496,13 @@ getSelectedConversation = gets csContext >>= \case SelectedConversation conv -> reloadConversation conv _ -> throwError "no contact, peer or conversation selected" +getSelectedOrManualContext :: CommandM CommandContext +getSelectedOrManualContext = do + asks ciLine >>= \case + "" -> gets csContext + str | all isDigit str -> getContextByIndex (read str) + _ -> throwError "invalid index" + commands :: [(String, Command)] commands = [ ("history", cmdHistory) @@ -609,19 +619,22 @@ cmdMembers = do forM_ (chatroomMembers room) $ \x -> do liftIO $ putStrLn $ maybe "<unnamed>" T.unpack $ idName x +getContextByIndex :: Int -> CommandM CommandContext +getContextByIndex n = do + join (asks ciContextOptions) >>= \ctxs -> if + | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx + | otherwise -> throwError "invalid index" cmdSelectContext :: Command cmdSelectContext = do n <- read <$> asks ciLine - join (asks ciContextOptions) >>= \ctxs -> if - | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do - modify $ \s -> s { csContext = ctx } - case ctx of - SelectedChatroom rstate -> do - when (not (roomStateSubscribe rstate)) $ do - chatroomSetSubscribe (head $ roomStateData rstate) True - _ -> return () - | otherwise -> throwError "invalid index" + ctx <- getContextByIndex n + modify $ \s -> s { csContext = ctx } + case ctx of + SelectedChatroom rstate -> do + when (not (roomStateSubscribe rstate)) $ do + chatroomSetSubscribe (head $ roomStateData rstate) True + _ -> return () cmdSend :: Command cmdSend = void $ do @@ -635,12 +648,12 @@ cmdSend = void $ do cmdDelete :: Command cmdDelete = void $ do - deleteConversation =<< getSelectedConversation + deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext modify $ \s -> s { csContext = NoContext } cmdHistory :: Command cmdHistory = void $ do - conv <- getSelectedConversation + conv <- getConversationFromContext =<< getSelectedOrManualContext case conversationHistory conv of thread@(_:_) -> do tzone <- liftIO $ getCurrentTimeZone @@ -804,7 +817,7 @@ cmdConversations = do cmdDetails :: Command cmdDetails = do - gets csContext >>= \case + getSelectedOrManualContext >>= \case SelectedPeer peer -> do liftIO $ putStr $ unlines [ "Network peer:" @@ -880,14 +893,14 @@ cmdDiscoveryInit = void $ do cmdDiscovery :: Command cmdDiscovery = void $ do - Just peer <- gets csIcePeer + server <- asks ciServer st <- getStorage sref <- asks ciLine eprint <- asks ciPrint liftIO $ readRef st (BC.pack sref) >>= \case Nothing -> error "ref does not exist" Just ref -> do - res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref + res <- runExceptT $ discoverySearch server ref case res of Right _ -> return () Left err -> eprint err diff --git a/main/Test.hs b/main/Test.hs index 0181575..adb3c39 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -284,6 +284,7 @@ commands = map (T.pack *** id) , ("contact-set-name", cmdContactSetName) , ("dm-send-peer", cmdDmSendPeer) , ("dm-send-contact", cmdDmSendContact) + , ("dm-send-identity", cmdDmSendIdentity) , ("dm-list-peer", cmdDmListPeer) , ("dm-list-contact", cmdDmListContact) , ("chatroom-create", cmdChatroomCreate) @@ -736,6 +737,14 @@ cmdDmSendContact = do Just to <- contactIdentity <$> getContact cid void $ sendDirectMessage to msg +cmdDmSendIdentity :: Command +cmdDmSendIdentity = do + st <- asks tiStorage + [ tid, msg ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just to <- return $ validateExtendedIdentity $ wrappedLoad ref + void $ sendDirectMessage to msg + dmList :: Foldable f => Identity f -> Command dmList peer = do threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead @@ -880,8 +889,5 @@ cmdDiscoveryConnect = do st <- asks tiStorage [ tref ] <- asks tiParams Just ref <- liftIO $ readRef st $ encodeUtf8 tref - Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + discoverySearch rsServer ref diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index f0ffa70..c165343 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con directMessageConversation peer = do (find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case Just thread -> return $ DirectMessageConversation thread - Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] + Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] [] chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index f156c85..d098f98 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -3,7 +3,9 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), - DiscoveryConnection(..) + DiscoveryConnection(..), + + discoverySearch, ) where import Control.Concurrent @@ -12,9 +14,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -30,6 +36,13 @@ import Erebos.Service import Erebos.Storage +#ifndef ENABLE_ICE_SUPPORT +type IceConfig = () +type IceSession = () +type IceRemoteInfo = Stored Object +#endif + + data DiscoveryService = DiscoverySelf [ Text ] (Maybe Int) | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) @@ -57,11 +70,7 @@ data DiscoveryConnection = DiscoveryConnection { dconnSource :: Ref , dconnTarget :: Ref , dconnAddress :: Maybe Text -#ifdef ENABLE_ICE_SUPPORT , dconnIceInfo :: Maybe IceRemoteInfo -#else - , dconnIceInfo :: Maybe (Stored Object) -#endif } emptyConnection :: Ref -> Ref -> DiscoveryConnection @@ -90,12 +99,13 @@ instance Storable DiscoveryService where DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn - where storeConnection ctype conn = do - storeText "connection" $ ctype - storeRawRef "source" $ dconnSource conn - storeRawRef "target" $ dconnTarget conn - storeMbText "address" $ dconnAddress conn - storeMbRef "ice-info" $ dconnIceInfo conn + where + storeConnection ctype DiscoveryConnection {..} = do + storeText "connection" $ ctype + storeRawRef "source" dconnSource + storeRawRef "target" dconnTarget + storeMbText "address" dconnAddress + storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum [ do @@ -120,22 +130,40 @@ instance Storable DiscoveryService where , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] - where loadConnection ctype ctor = do - ctype' <- loadText "connection" - guard $ ctype == ctype' - return . ctor =<< DiscoveryConnection - <$> loadRawRef "source" - <*> loadRawRef "target" - <*> loadMbText "address" - <*> loadMbRef "ice-info" + where + loadConnection ctype ctor = do + ctype' <- loadText "connection" + guard $ ctype == ctype' + dconnSource <- loadRawRef "source" + dconnTarget <- loadRawRef "target" + dconnAddress <- loadMbText "address" + dconnIceInfo <- loadMbRef "ice-info" + return $ ctor DiscoveryConnection {..} data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer , dpAddress :: [ Text ] -#ifdef ENABLE_ICE_SUPPORT , dpIceSession :: Maybe IceSession -#endif + } + +emptyPeer :: DiscoveryPeer +emptyPeer = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Nothing + } + +data DiscoveryPeerState = DiscoveryPeerState + { dpsStunServer :: Maybe ( Text, Word16 ) + , dpsTurnServer :: Maybe ( Text, Word16 ) + , dpsIceConfig :: Maybe IceConfig + } + +data DiscoveryGlobalState = DiscoveryGlobalState + { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -144,13 +172,18 @@ instance Service DiscoveryService where type ServiceAttributes DiscoveryService = DiscoveryAttributes defaultServiceAttributes _ = defaultDiscoveryAttributes -#ifdef ENABLE_ICE_SUPPORT - type ServiceState DiscoveryService = Maybe IceConfig - emptyServiceState _ = Nothing -#endif + type ServiceState DiscoveryService = DiscoveryPeerState + emptyServiceState _ = DiscoveryPeerState + { dpsStunServer = Nothing + , dpsTurnServer = Nothing + , dpsIceConfig = Nothing + } - type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer - emptyServiceGlobalState _ = M.empty + type ServiceGlobalState DiscoveryService = DiscoveryGlobalState + emptyServiceGlobalState _ = DiscoveryGlobalState + { dgsPeers = M.empty + , dgsSearchingFor = S.empty + } serviceHandler msg = case fromStored msg of DiscoverySelf addrs priority -> do @@ -171,15 +204,14 @@ instance Service DiscoveryService where | otherwise -> return Nothing - forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer - { dpPriority = fromMaybe 0 priority - , dpPeer = Just peer - , dpAddress = addrs -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do + let dp = DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = addrs + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } attrs <- asks svcAttributes replyPacket $ DiscoveryAcknowledged matchedAddrs (discoveryStunServer attrs) @@ -188,7 +220,6 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do -#ifdef ENABLE_ICE_SUPPORT paddr <- asks (peerAddress . svcPeer) >>= return . \case (DatagramAddress saddr) -> case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, _) @@ -204,100 +235,90 @@ instance Service DiscoveryService where toIceServer (Just server) Nothing = Just ( server, 0 ) toIceServer (Just server) (Just port) = Just ( server, port ) - cfg <- liftIO $ iceCreateConfig - (toIceServer stunServer stunPort) - (toIceServer turnServer turnPort) - svcSet cfg -#endif - return () + svcModify $ \s -> s + { dpsStunServer = toIceServer stunServer stunPort + , dpsTurnServer = toIceServer turnServer turnPort + } DiscoverySearch ref -> do - dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal + dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer DiscoveryResult ref [] -> do svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" DiscoveryResult ref addrs -> do + let dgst = refDigest ref -- TODO: check if we really requested that server <- asks svcServer self <- svcSelf - mbIceConfig <- svcGet discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if + forM_ addrs $ \addr -> if | addr == T.pack "ICE" -#ifdef ENABLE_ICE_SUPPORT - , Just config <- mbIceConfig -> do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } -#else - -> do - return () +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do + return () #endif + return () | [ ipaddr, port ] <- words (T.unpack addr) -> do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + void $ liftIO $ forkIO $ do + saddr <- head <$> + getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) + peer <- serverPeer server (addrAddress saddr) + runAsService $ do + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } | otherwise -> do - runAsService $ do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + if refDigest (dconnTarget conn) `elem` identityDigests self + then if #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion + -- request for us, create ICE sesssion + | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - svcGet >>= \case + getIceConfig >>= \case Just config -> do liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do rinfo <- iceRemoteInfo ice res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } case res of - Right _ -> do - case dconnIceInfo conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" + Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err Nothing -> do - svcPrint $ "Discovery: ICE request from peer without ICE configuration" -#else - return () + return () #endif - else do + | otherwise -> do + svcPrint $ "Discovery: unsupported connection request" + + else do -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal + mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp @@ -307,29 +328,28 @@ instance Service DiscoveryService where DiscoveryConnectionResponse conn -> do self <- svcSelf - dpeers <- svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) + dpeers <- dgsPeers <$> svcGetGlobal + if refDigest (dconnSource conn) `elem` identityDigests self then do -- response to our request, try to connect to the peer -#ifdef ENABLE_ICE_SUPPORT server <- asks svcServer if | Just addr <- dconnAddress conn , [ipaddr, port] <- words (T.unpack addr) -> do saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) [] Nothing + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s } +#ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice +#endif | otherwise -> svcPrint $ "Discovery: connection request failed" -#else - return () -#endif else do -- response to relayed request case M.lookup (refDigest $ dconnSource conn) dpeers of @@ -340,6 +360,7 @@ instance Service DiscoveryService where serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer + st <- getStorage let addrToText saddr = do ( addr, port ) <- IP.fromSockAddr saddr @@ -351,5 +372,60 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + liftIO (refFromDigest st dgst) >>= \case + Just ref -> sendToPeer peer $ DiscoverySearch ref + Nothing -> return () + +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig) +getIceConfig = do + dpsIceConfig <$> svcGet >>= \case + Just cfg -> return $ Just cfg + Nothing -> do +#ifdef ENABLE_ICE_SUPPORT + stun <- dpsStunServer <$> svcGet + turn <- dpsTurnServer <$> svcGet + liftIO (iceCreateConfig stun turn) >>= \case + Just cfg -> do + svcModify $ \s -> s { dpsIceConfig = Just cfg } + return $ Just cfg + Nothing -> do + svcPrint $ "Discovery: failed to create ICE config" + return Nothing +#else + return Nothing +#endif + + +discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m () +discoverySearch server ref = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ refDigest ref `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch ref diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs index ba2607a..1e1a521 100644 --- a/src/Erebos/Flow.hs +++ b/src/Erebos/Flow.hs @@ -11,54 +11,53 @@ module Erebos.Flow ( import Control.Concurrent.STM -data Flow r w = Flow (TMVar [r]) (TMVar [w]) - | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') +data Flow r w + = Flow (TBQueue r) (TBQueue w) + | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') type SymFlow a = Flow a a newFlow :: STM (Flow a b, Flow b a) newFlow = do - x <- newEmptyTMVar - y <- newEmptyTMVar + x <- newTBQueue 16 + y <- newTBQueue 16 return (Flow x y, Flow y x) newFlowIO :: IO (Flow a b, Flow b a) newFlowIO = atomically newFlow readFlow :: Flow r w -> STM r -readFlow (Flow rvar _) = takeTMVar rvar >>= \case - (x:[]) -> return x - (x:xs) -> putTMVar rvar xs >> return x - [] -> error "Flow: empty list" +readFlow (Flow rvar _) = readTBQueue rvar readFlow (MappedFlow f _ up) = f <$> readFlow up tryReadFlow :: Flow r w -> STM (Maybe r) -tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case - Just (x:[]) -> return (Just x) - Just (x:xs) -> putTMVar rvar xs >> return (Just x) - Just [] -> error "Flow: empty list" - Nothing -> return Nothing +tryReadFlow (Flow rvar _) = tryReadTBQueue rvar tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up canReadFlow :: Flow r w -> STM Bool -canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar +canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar canReadFlow (MappedFlow _ _ up) = canReadFlow up writeFlow :: Flow r w -> w -> STM () -writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (Flow _ wvar) = writeTBQueue wvar writeFlow (MappedFlow _ f up) = writeFlow up . f writeFlowBulk :: Flow r w -> [w] -> STM () writeFlowBulk _ [] = return () -writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs tryWriteFlow :: Flow r w -> w -> STM Bool -tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[]) -tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f +tryWriteFlow (Flow _ wvar) x = do + isFullTBQueue wvar >>= \case + True -> return False + False -> do + writeTBQueue wvar x + return True +tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x canWriteFlow :: Flow r w -> STM Bool -canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar +canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar canWriteFlow (MappedFlow _ _ up) = canWriteFlow up readFlowIO :: Flow r w -> IO r diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index 6f61451..cc2fdcc 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -8,6 +8,7 @@ module Erebos.ICE ( IceRemoteInfo, iceCreateConfig, + iceStopThread, iceCreateSession, iceDestroy, iceRemoteInfo, @@ -138,6 +139,12 @@ iceCreateConfig stun turn = then return Nothing else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg +foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread" + ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO () + +iceStopThread :: IceConfig -> IO () +iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread + {#pointer *pj_ice_strans as ^ #} iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index e79fb9d..e9446fe 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -1,6 +1,7 @@ #include "pjproject.h" #include "Erebos/ICE_stub.h" +#include <stdatomic.h> #include <stdio.h> #include <stdlib.h> #include <stdbool.h> @@ -15,6 +16,13 @@ static struct pj_sockaddr def_addr; } ice; +struct erebos_ice_cfg +{ + pj_ice_strans_cfg cfg; + pj_thread_t * thread; + atomic_bool exit; +}; + struct user_data { pj_ice_sess_role role; @@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status) fprintf(stderr, "ICE: %s: %s\n", msg, err); } -static int ice_worker_thread(void * vcfg) +static int ice_worker_thread( void * vcfg ) { - pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg; + struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg ); - while (true) { + while( ! ecfg->exit ){ pj_time_val max_timeout = { 0, 0 }; pj_time_val timeout = { 0, 0 }; max_timeout.msec = 500; - pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout); + pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout ); pj_assert(timeout.sec >= 0 && timeout.msec >= 0); if (timeout.msec >= 1000) @@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg) if (PJ_TIME_VAL_GT(timeout, max_timeout)) timeout = max_timeout; - int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout); + int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout ); if (c < 0) pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); } @@ -131,80 +139,91 @@ exit: pthread_mutex_unlock(&mutex); } -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ) { ice_init(); - pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) ); - pj_ice_strans_cfg_default( cfg ); + struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) ); + pj_ice_strans_cfg_default( &ecfg->cfg ); + ecfg->exit = false; + ecfg->thread = NULL; - cfg->stun_cfg.pf = &ice.cp.factory; + ecfg->cfg.stun_cfg.pf = &ice.cp.factory; if( pj_timer_heap_create( ice.pool, 100, - &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){ + &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){ fprintf( stderr, "pj_timer_heap_create failed\n" ); goto fail; } - if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){ + if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){ fprintf( stderr, "pj_ioqueue_create failed\n" ); goto fail; } - pj_thread_t * thread; if( pj_thread_create( ice.pool, NULL, &ice_worker_thread, - cfg, 0, 0, &thread ) != PJ_SUCCESS ){ + ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){ fprintf( stderr, "pj_thread_create failed\n" ); goto fail; } - cfg->af = pj_AF_INET(); - cfg->opt.aggressive = PJ_TRUE; + ecfg->cfg.af = pj_AF_INET(); + ecfg->cfg.opt.aggressive = PJ_TRUE; if( stun_server ){ - cfg->stun.server.ptr = malloc( strlen( stun_server )); - pj_strcpy2( &cfg->stun.server, stun_server ); + ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server )); + pj_strcpy2( &ecfg->cfg.stun.server, stun_server ); if( stun_port ) - cfg->stun.port = stun_port; + ecfg->cfg.stun.port = stun_port; } if( turn_server ){ - cfg->turn.server.ptr = malloc( strlen( turn_server )); - pj_strcpy2( &cfg->turn.server, turn_server ); + ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server )); + pj_strcpy2( &ecfg->cfg.turn.server, turn_server ); if( turn_port ) - cfg->turn.port = turn_port; - cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; - cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - cfg->turn.conn_type = PJ_TURN_TP_UDP; + ecfg->cfg.turn.port = turn_port; + ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; + ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP; } - return cfg; + return ecfg; fail: - ice_cfg_free( cfg ); + ice_cfg_free( ecfg ); return NULL; } -void ice_cfg_free( pj_ice_strans_cfg * cfg ) +void ice_cfg_free( struct erebos_ice_cfg * ecfg ) { - if( ! cfg ) + if( ! ecfg ) return; - if( cfg->turn.server.ptr ) - free( cfg->turn.server.ptr ); + ecfg->exit = true; + pj_thread_join( ecfg->thread ); - if( cfg->stun.server.ptr ) - free( cfg->stun.server.ptr ); + if( ecfg->cfg.turn.server.ptr ) + free( ecfg->cfg.turn.server.ptr ); - if( cfg->stun_cfg.ioqueue ) - pj_ioqueue_destroy( cfg->stun_cfg.ioqueue ); + if( ecfg->cfg.stun.server.ptr ) + free( ecfg->cfg.stun.server.ptr ); - if( cfg->stun_cfg.timer_heap ) - pj_timer_heap_destroy( cfg->stun_cfg.timer_heap ); + if( ecfg->cfg.stun_cfg.ioqueue ) + pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue ); - free( cfg ); + if( ecfg->cfg.stun_cfg.timer_heap ) + pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap ); + + free( ecfg ); +} + +void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) +{ + if( ! ecfg ) + return; + ecfg->exit = true; } -pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role, +pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { ice_init(); @@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role .on_ice_complete = cb_on_ice_complete, }; - pj_status_t status = pj_ice_strans_create( NULL, cfg, 1, + pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1, udata, &icecb, &res ); if (status != PJ_SUCCESS) diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index e4fcbdb..c31e227 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -3,11 +3,12 @@ #include <pjnath.h> #include <HsFFI.h> -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); -void ice_cfg_free( pj_ice_strans_cfg * cfg ); +void ice_cfg_free( struct erebos_ice_cfg * cfg ); +void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); -pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role, +pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); void ice_destroy(pj_ice_strans * strans); diff --git a/src/Erebos/Message.hs b/src/Erebos/Message.hs index 5ef27f3..78fb5e7 100644 --- a/src/Erebos/Message.hs +++ b/src/Erebos/Message.hs @@ -29,6 +29,7 @@ import qualified Data.Text as T import Data.Time.Format import Data.Time.LocalTime +import Erebos.Discovery import Erebos.Identity import Erebos.Network import Erebos.Service @@ -103,8 +104,10 @@ instance Service DirectMessage where serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal - serviceStorageWatchers _ = (:[]) $ - SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + serviceStorageWatchers _ = + [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers + ] data MessageState = MessageState @@ -210,12 +213,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do else do return unchanged +findMissingPeers :: Server -> DirectMessageThreads -> ExceptT String IO () +findMissingPeers server threads = do + forM_ (toThreadList threads) $ \thread -> do + when (msgHead thread /= msgReceived thread) $ do + mapM_ (discoverySearch server) $ map storedRef $ idDataF $ msgPeer thread + data DirectMessageThread = DirectMessageThread { msgPeer :: ComposedIdentity - , msgHead :: [Stored DirectMessage] - , msgSent :: [Stored DirectMessage] - , msgSeen :: [Stored DirectMessage] + , msgHead :: [ Stored DirectMessage ] + , msgSent :: [ Stored DirectMessage ] + , msgSeen :: [ Stored DirectMessage ] + , msgReceived :: [ Stored DirectMessage ] } threadToList :: DirectMessageThread -> [DirectMessage] @@ -249,6 +259,7 @@ messageThreadFor peer mss = , msgHead = filterAncestors $ ready ++ received , msgSent = filterAncestors $ sent ++ received , msgSeen = filterAncestors $ ready ++ seen + , msgReceived = filterAncestors $ received } diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index d8e868a..fb2b5e9 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -24,6 +24,7 @@ module Erebos.Network ( sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -300,13 +301,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do announceUpdate idt forM_ serverServices $ \(SomeService service _) -> do - forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do - withMVar serverPeers $ mapM_ $ \peer -> atomically $ do - readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue serverIOActions $ do - runPeerService peer $ act x - _ -> return () + forM_ (serviceStorageWatchers service) $ \case + SomeStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + GlobalStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + atomically $ writeTQueue serverIOActions $ do + act server x forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -391,8 +397,21 @@ startServer serverOptions serverOrigHead logd' serverServices = do return server stopServer :: Server -> IO () -stopServer Server {..} = do - mapM_ killThread =<< takeMVar serverThreads +stopServer server@Server {..} = do + withMVar serverPeers $ \peers -> do + ( global, peerStates ) <- atomically $ (,) + <$> takeTMVar serverServiceStates + <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState) + + forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do + ps <- forM peerStates $ \( peer, states ) -> + return $ ( peer, ) $ case M.lookup (serviceID proxy) states of + Just (SomeServiceState (_ :: Proxy ps) pstate) + | Just (Refl :: s :~: ps) <- eqT + -> pstate + _ -> emptyServiceState proxy + serviceStopServer proxy server gs ps + mapM_ killThread =<< takeMVar serverThreads dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do @@ -899,7 +918,7 @@ sendToPeerWith peer fobj = do Left err -> throwError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest @@ -954,6 +973,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m proxy. (Service s, MonadIO m, MonadError String m) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index d1943e1..b5e52dd 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -71,6 +71,9 @@ class ( serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] serviceStorageWatchers _ = [] + serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO () + serviceStopServer _ _ _ _ = return () + data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) @@ -100,7 +103,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) -data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) +data SomeStorageWatcher s + = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT String IO ()) newtype ServiceID = ServiceID UUID diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs index c1e9664..9ccfdde 100644 --- a/src/Erebos/Storage.hs +++ b/src/Erebos/Storage.hs @@ -4,7 +4,7 @@ module Erebos.Storage ( deriveEphemeralStorage, derivePartialStorage, Ref, PartialRef, RefDigest, - refDigest, + refDigest, refFromDigest, readRef, showRef, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, diff --git a/test/common.test b/test/common.test new file mode 100644 index 0000000..89941f0 --- /dev/null +++ b/test/common.test @@ -0,0 +1,3 @@ +module common + +export def refpat = /blake2#[0-9a-f]*/ diff --git a/test/discovery.test b/test/discovery.test index f2dddb7..5f6c443 100644 --- a/test/discovery.test +++ b/test/discovery.test @@ -73,3 +73,40 @@ test ManualDiscovery: send "stop-server" to p for p in [ pd, p1, p2 ]: expect /stop-server-done/ from p + + # Test delayed discovery with new peer + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + with p1: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p1.node.ip} 29665/ + /peer [12] id Device1 Owner1/ + + send "discovery-connect $id" to p2 + + with p2: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p2.node.ip} 29665/ + /peer [12] id Device2 Owner2/ + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p diff --git a/test/message.test b/test/message.test index c0e251b..2990d0f 100644 --- a/test/message.test +++ b/test/message.test @@ -1,3 +1,7 @@ +module message + +import common + test DirectMessage: let services = "contact,dm" @@ -149,3 +153,113 @@ test DirectMessage: send "start-server services $services" to p2 expect /dm-received from Owner1 text while_peer_offline/ from p2 + + +test DirectMessageDiscovery: + let services = "dm,discovery" + + subnet sd + subnet s1 + subnet s2 + subnet s3 + subnet s4 + + spawn on sd as pd + spawn on s1 as p1 + spawn on s2 as p2 + spawn on s3 as p3 + spawn on s4 as p4 + + send "create-identity Discovery" to pd + + send "create-identity Device1 Owner1" to p1 + expect /create-identity-done ref ($refpat)/ from p1 capture p1_id + send "identity-info $p1_id" to p1 + expect /identity-info ref $p1_id base ($refpat) owner ($refpat).*/ from p1 capture p1_base, p1_owner + + send "create-identity Device2 Owner2" to p2 + expect /create-identity-done ref ($refpat)/ from p2 capture p2_id + send "identity-info $p2_id" to p2 + expect /identity-info ref $p2_id base ($refpat) owner ($refpat).*/ from p2 capture p2_base, p2_owner + send "identity-info $p2_owner" to p2 + expect /identity-info ref $p2_owner base ($refpat).*/ from p2 capture p2_obase + + send "create-identity Device3 Owner3" to p3 + expect /create-identity-done ref ($refpat)/ from p3 capture p3_id + send "identity-info $p3_id" to p3 + expect /identity-info ref $p3_id base ($refpat) owner ($refpat).*/ from p3 capture p3_base, p3_owner + + send "create-identity Device4 Owner4" to p4 + expect /create-identity-done ref ($refpat)/ from p4 capture p4_id + send "identity-info $p4_id" to p4 + expect /identity-info ref $p4_id base ($refpat) owner ($refpat).*/ from p4 capture p4_base, p4_owner + + + for p in [ p1, p2, p3, p4 ]: + with p: + send "start-server services $services" + + for p in [ p2, p3, p4 ]: + with p1: + send "peer-add ${p.node.ip}" + expect: + /peer [0-9]+ addr ${p.node.ip} 29665/ + /peer [0-9]+ id Device. Owner./ + expect from p: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1 Owner1/ + + # Make sure p1 has other identities in storage: + for i in [ 1 .. 3 ]: + send "dm-send-peer $i init1" to p1 + for p in [ p2, p3, p4 ]: + expect /dm-received from Owner1 text init1/ from p + send "dm-send-identity $p1_owner init2" to p + expect /dm-received from Owner. text init2/ from p1 + + # Restart servers to remove peers: + for p in [ p1, p2, p3, p4 ]: + with p: + send "stop-server" + for p in [ p1, p2, p3, p4 ]: + with p: + expect /stop-server-done/ + + # Prepare message before peers connect to discovery + send "dm-send-identity $p4_owner hello_to_p4" to p1 + + for p in [ p1, p2, p3, p4, pd ]: + with p: + send "start-server services $services" + + for p in [ p2, p3, p4, p1 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [0-9]+ addr ${p.node.ip} 29665/ + /peer [0-9]+ id Device. Owner./ + + multiply_timeout by 2.0 + + # Connect via discovery manually, then send message + send "discovery-connect $p2_obase" to p1 + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + send "dm-send-identity $p2_owner hello_to_p2" to p1 + expect /dm-received from Owner1 text hello_to_p2/ from p2 + + # Send message, expect automatic discovery + send "dm-send-identity $p3_owner hello_to_p3" to p1 + expect /dm-received from Owner1 text hello_to_p3/ from p3 + + # Verify the first message + expect /dm-received from Owner1 text hello_to_p4/ from p4 + + for p in [ p1, p2, p3, p4, pd ]: + send "stop-server" to p + for p in [ p1, p2, p3, p4, pd ]: + expect /stop-server-done/ from p |