diff options
40 files changed, 1563 insertions, 486 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 56fc058..2beacb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Revision history for erebos +## 0.1.9 -- 2025-07-08 + +* Option to show details or delete a conversation by giving index parameter without first selecting it +* Improved handling of ICE connections +* Automatic discovery of peers for pending direct messages + +## 0.1.8.1 -- 2025-03-29 + +* Fix build from sdist (add missing include) + ## 0.1.8 -- 2025-03-28 * Discovery service without requiring ICE support @@ -102,11 +102,13 @@ Test chatroom [19:03] Some Name: Hi `<message>` : Send `<message>` to selected conversation. -`/history` -: Show message history of the selected conversation. +`/history [<number>]` +: Show message history of the selected conversation, or the one identified by + `<number>` if given. -`/details` -: Show information about the selected conversations, contact or peer. +`/details [<number>]` +: Show information about the selected conversations, contact or peer; or the + one identified by `<number>` if given. ### Chatrooms @@ -137,12 +139,13 @@ are signed, so message author can not be forged. : Leave the chatroom. User will no longer be listed as a member and erebos tool will no longer collect message of this chatroom. -`/delete` -: Delete the chatroom; this action is only synchronized with devices belonging -to the current user and does not affect the chatroom state for others. Due to -the storage design, the chatroom data will not be purged from the local state -history, but the chatroom will no longer be listed as available and no futher -updates for this chatroom will be collected or shared with other peers. +`/delete [<number>]` +: Delete the chatroom (currently selected one, or the one identified by + `<number>`); this action is only synchronized with devices belonging to the + current user and does not affect the chatroom state for others. Due to the + storage design, the chatroom data will not be purged from the local state + history, but the chatroom will no longer be listed as available and no futher + updates for this chatroom will be collected or shared with other peers. ### Add contacts diff --git a/erebos.cabal b/erebos.cabal index 4e45f3c..9a49273 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -1,7 +1,7 @@ Cabal-Version: 3.0 Name: erebos -Version: 0.1.8 +Version: 0.1.9 Synopsis: Decentralized messaging and synchronization Description: Library and simple CLI interface implementing the Erebos identity @@ -29,6 +29,7 @@ Extra-Doc-Files: CHANGELOG.md Extra-Source-Files: src/Erebos/ICE/pjproject.h + src/Erebos/Network/ifaddrs.h Flag ice Description: Enable peer discovery with ICE support using pjproject @@ -38,6 +39,10 @@ Flag ci default: False manual: True +Flag cryptonite + description: Use deprecated 'cryptonite' package + default: False + source-repository head type: git location: https://code.erebosprotocol.net/erebos @@ -103,12 +108,11 @@ library Erebos.Error Erebos.Identity Erebos.Network - Erebos.Network.Channel - Erebos.Network.Protocol Erebos.Object Erebos.Pairing Erebos.PubKey Erebos.Service + Erebos.Service.Stream Erebos.Set Erebos.State Erebos.Storable @@ -121,17 +125,22 @@ library other-modules: Erebos.Flow + Erebos.Network.Channel + Erebos.Network.Protocol Erebos.Object.Internal Erebos.Storage.Disk Erebos.Storage.Internal Erebos.Storage.Memory Erebos.Storage.Platform + Erebos.UUID Erebos.Util c-sources: src/Erebos/Network/ifaddrs.c include-dirs: src + includes: + src/Erebos/Network/ifaddrs.h if flag(ice) exposed-modules: @@ -151,7 +160,6 @@ library bytestring >=0.10 && <0.13, clock >=0.8 && < 0.9, containers ^>= { 0.6, 0.7, 0.8 }, - crypton ^>= { 0.34, 1.0 }, deepseq >= 1.4 && <1.6, directory >= 1.3 && <1.4, filepath >=1.4 && <1.6, @@ -165,9 +173,16 @@ library stm >=2.5 && <2.6, text >= 1.2 && <2.2, time ^>= { 1.8, 1.9, 1.10, 1.11, 1.12, 1.13, 1.14 }, - uuid >=1.3 && <1.4, + uuid-types ^>= { 1.0.4 }, zlib >=0.6 && <0.8 + if !flag(cryptonite) + build-depends: + crypton ^>= { 0.34, 1.0 }, + else + build-depends: + cryptonite >=0.25 && <0.31, + if os(windows) hs-source-dirs: src/windows build-depends: @@ -192,13 +207,13 @@ executable erebos Test.Service Version Version.Git + WebSocket autogen-modules: Paths_erebos build-depends: ansi-terminal ^>= { 0.11, 1.0, 1.1 }, bytestring, - crypton, directory, erebos, mtl, @@ -209,4 +224,12 @@ executable erebos text, time, transformers >= 0.5 && <0.7, - uuid, + uuid-types, + websockets ^>= { 0.12.7, 0.13 }, + + if !flag(cryptonite) + build-depends: + crypton, + else + build-depends: + cryptonite, diff --git a/main/Main.hs b/main/Main.hs index 3f78db1..403e5e9 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -11,6 +11,7 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State import Control.Monad.Trans.Maybe +import Control.Monad.Writer import Crypto.Random @@ -61,6 +62,7 @@ import State import Terminal import Test import Version +import WebSocket data Options = Options { optServer :: ServerOptions @@ -68,6 +70,7 @@ data Options = Options , optStorage :: StorageOption , optChatroomAutoSubscribe :: Maybe Int , optDmBotEcho :: Maybe Text + , optWebSocketServer :: Maybe Int , optShowHelp :: Bool , optShowVersion :: Bool } @@ -90,6 +93,7 @@ defaultOptions = Options , optStorage = DefaultStorage , optChatroomAutoSubscribe = Nothing , optDmBotEcho = Nothing + , optWebSocketServer = Nothing , optShowHelp = False , optShowVersion = False } @@ -110,7 +114,7 @@ availableServices = True "peer discovery" ] -options :: [OptDescr (Options -> Options)] +options :: [ OptDescr (Options -> Writer [ String ] Options) ] options = [ Option ['p'] ["port"] (ReqArg (\p -> so $ \opts -> opts { serverPort = read p }) "<port>") @@ -119,57 +123,81 @@ options = (NoArg (so $ \opts -> opts { serverLocalDiscovery = False })) "do not send announce packets for local discovery" , Option [] [ "storage" ] - (ReqArg (\path -> \opts -> opts { optStorage = FilesystemStorage path }) "<path>") + (ReqArg (\path -> \opts -> return opts { optStorage = FilesystemStorage path }) "<path>") "use storage in <path>" , Option [] [ "memory-storage" ] - (NoArg (\opts -> opts { optStorage = MemoryStorage })) + (NoArg (\opts -> return opts { optStorage = MemoryStorage })) "use memory storage" , Option [] ["chatroom-auto-subscribe"] - (ReqArg (\count -> \opts -> opts { optChatroomAutoSubscribe = Just (read count) }) "<count>") + (ReqArg (\count -> \opts -> return opts { optChatroomAutoSubscribe = Just (read count) }) "<count>") "automatically subscribe for up to <count> chatrooms" #ifdef ENABLE_ICE_SUPPORT , Option [] [ "discovery-stun-port" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryStunPort = Just (read value) }) "<port>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryStunPort = Just (read value) }) "<port>") "offer specified <port> to discovery peers for STUN protocol" , Option [] [ "discovery-stun-server" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryStunServer = Just (read value) }) "<server>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryStunServer = Just (read value) }) "<server>") "offer <server> (domain name or IP address) to discovery peers for STUN protocol" , Option [] [ "discovery-turn-port" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryTurnPort = Just (read value) }) "<port>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryTurnPort = Just (read value) }) "<port>") "offer specified <port> to discovery peers for TURN protocol" , Option [] [ "discovery-turn-server" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryTurnServer = Just (read value) }) "<server>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryTurnServer = Just (read value) }) "<server>") "offer <server> (domain name or IP address) to discovery peers for TURN protocol" #endif + , Option [] [ "discovery-tunnel" ] + (OptArg (\value -> \opts -> do + fun <- provideTunnelFun value + serviceAttr (\attrs -> return attrs { discoveryProvideTunnel = fun }) opts) "<peer-type>") + "offer to provide tunnel for peers of given <peer-type>, possible values: all, none, websocket" , Option [] ["dm-bot-echo"] - (ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") + (ReqArg (\prefix -> \opts -> return opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") "automatically reply to direct messages with the same text prefixed with <prefix>" + , Option [] [ "websocket-server" ] + (ReqArg (\value -> \opts -> return opts { optWebSocketServer = Just (read value) }) "<port>") + "start WebSocket server on given port" , Option ['h'] ["help"] - (NoArg $ \opts -> opts { optShowHelp = True }) + (NoArg $ \opts -> return opts { optShowHelp = True }) "show this help and exit" , Option ['V'] ["version"] - (NoArg $ \opts -> opts { optShowVersion = True }) + (NoArg $ \opts -> return opts { optShowVersion = True }) "show version and exit" ] where - so f opts = opts { optServer = f $ optServer opts } + so f opts = return opts { optServer = f $ optServer opts } - updateService :: Service s => (ServiceAttributes s -> ServiceAttributes s) -> SomeService -> SomeService + updateService :: (Service s, Monad m, Typeable m) => (ServiceAttributes s -> m (ServiceAttributes s)) -> SomeService -> m SomeService updateService f some@(SomeService proxy attrs) - | Just f' <- cast f = SomeService proxy (f' attrs) - | otherwise = some - - serviceAttr :: Service s => (ServiceAttributes s -> ServiceAttributes s) -> Options -> Options - serviceAttr f opts = opts { optServices = map (\sopt -> sopt { soptService = updateService f (soptService sopt) }) (optServices opts) } - -servicesOptions :: [OptDescr (Options -> Options)] + | Just f' <- cast f = SomeService proxy <$> f' attrs + | otherwise = return some + + serviceAttr :: (Service s, Monad m, Typeable m) => (ServiceAttributes s -> m (ServiceAttributes s)) -> Options -> m Options + serviceAttr f opts = do + services' <- forM (optServices opts) $ \sopt -> do + service <- updateService f (soptService sopt) + return sopt { soptService = service } + return opts { optServices = services' } + + provideTunnelFun :: Maybe String -> Writer [ String ] (Peer -> Bool) + provideTunnelFun Nothing = return $ const True + provideTunnelFun (Just "all") = return $ const True + provideTunnelFun (Just "none") = return $ const False + provideTunnelFun (Just "websocket") = return $ \peer -> + case peerAddress peer of + CustomPeerAddress addr | Just WebSocketAddress {} <- cast addr -> True + _ -> False + provideTunnelFun (Just name) = do + tell [ "Invalid value of --discovery-tunnel: ‘" <> name <> "’\n" ] + return $ const False + +servicesOptions :: [ OptDescr (Options -> Writer [ String ] Options) ] servicesOptions = concatMap helper $ "all" : map soptName availableServices where helper name = - [ Option [] ["enable-" <> name] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = True }) "" - , Option [] ["disable-" <> name] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = False }) "" + [ Option [] [ "enable-" <> name ] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = True }) "" + , Option [] [ "disable-" <> name ] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = False }) "" ] - so f opts = opts { optServices = f $ optServices opts } + so f opts = return opts { optServices = f $ optServices opts } change :: String -> (ServiceOption -> ServiceOption) -> [ServiceOption] -> [ServiceOption] change name f (s : ss) | soptName s == name || name == "all" @@ -187,13 +215,16 @@ getDefaultStorageDir = do main :: IO () main = do - (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case - (o, args, []) -> do - return (foldl (flip id) defaultOptions o, args) - (_, _, errs) -> do + let printErrors errs = do progName <- getProgName hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information." exitFailure + (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case + (wo, args, []) -> + case runWriter (foldM (flip ($)) defaultOptions wo) of + ( o, [] ) -> return ( o, args ) + ( _, errs ) -> printErrors errs + (_, _, errs) -> printErrors errs st <- liftIO $ case optStorage opts of DefaultStorage -> openStorage =<< getDefaultStorageDir @@ -362,6 +393,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do startServer (optServer opts) erebosHead extPrintLn $ map soptService $ filter soptEnabled $ optServices opts + case optWebSocketServer opts of + Just port -> startWebsocketServer server "::" port extPrintLn + Nothing -> return () + void $ liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange server peerIdentity peer >>= \case @@ -493,7 +528,10 @@ getSelectedChatroom = gets csContext >>= \case _ -> throwOtherError "no chatroom selected" getSelectedConversation :: CommandM Conversation -getSelectedConversation = gets csContext >>= \case +getSelectedConversation = gets csContext >>= getConversationFromContext + +getConversationFromContext :: CommandContext -> CommandM Conversation +getConversationFromContext = \case SelectedPeer peer -> peerIdentity peer >>= \case PeerIdentityFull pid -> directMessageConversation $ finalOwner pid _ -> throwOtherError "incomplete peer identity" @@ -507,6 +545,13 @@ getSelectedConversation = gets csContext >>= \case SelectedConversation conv -> reloadConversation conv _ -> throwOtherError "no contact, peer or conversation selected" +getSelectedOrManualContext :: CommandM CommandContext +getSelectedOrManualContext = do + asks ciLine >>= \case + "" -> gets csContext + str | all isDigit str -> getContextByIndex (read str) + _ -> throwOtherError "invalid index" + commands :: [(String, Command)] commands = [ ("history", cmdHistory) @@ -628,19 +673,22 @@ cmdMembers = do forM_ (chatroomMembers room) $ \x -> do cmdPutStrLn $ maybe "<unnamed>" T.unpack $ idName x +getContextByIndex :: Int -> CommandM CommandContext +getContextByIndex n = do + join (asks ciContextOptions) >>= \ctxs -> if + | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx + | otherwise -> throwOtherError "invalid index" cmdSelectContext :: Command cmdSelectContext = do n <- read <$> asks ciLine - join (asks ciContextOptions) >>= \ctxs -> if - | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do - modify $ \s -> s { csContext = ctx } - case ctx of - SelectedChatroom rstate -> do - when (not (roomStateSubscribe rstate)) $ do - chatroomSetSubscribe (head $ roomStateData rstate) True - _ -> return () - | otherwise -> throwOtherError "invalid index" + ctx <- getContextByIndex n + modify $ \s -> s { csContext = ctx } + case ctx of + SelectedChatroom rstate -> do + when (not (roomStateSubscribe rstate)) $ do + chatroomSetSubscribe (head $ roomStateData rstate) True + _ -> return () cmdSend :: Command cmdSend = void $ do @@ -654,12 +702,12 @@ cmdSend = void $ do cmdDelete :: Command cmdDelete = void $ do - deleteConversation =<< getSelectedConversation + deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext modify $ \s -> s { csContext = NoContext } cmdHistory :: Command cmdHistory = void $ do - conv <- getSelectedConversation + conv <- getConversationFromContext =<< getSelectedOrManualContext case conversationHistory conv of thread@(_:_) -> do tzone <- liftIO $ getCurrentTimeZone @@ -824,7 +872,7 @@ cmdConversations = do cmdDetails :: Command cmdDetails = do - gets csContext >>= \case + getSelectedOrManualContext >>= \case SelectedPeer peer -> do cmdPutStrLn $ unlines [ "Network peer:" @@ -900,17 +948,11 @@ cmdDiscoveryInit = void $ do cmdDiscovery :: Command cmdDiscovery = void $ do - Just peer <- gets csIcePeer - st <- getStorage + server <- asks ciServer sref <- asks ciLine - eprint <- asks ciPrint - liftIO $ readRef st (BC.pack sref) >>= \case - Nothing -> error "ref does not exist" - Just ref -> do - res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref - case res of - Right _ -> return () - Left err -> eprint err + case readRefDigest (BC.pack sref) of + Nothing -> throwOtherError "failed to parse ref" + Just dgst -> discoverySearch server dgst #ifdef ENABLE_ICE_SUPPORT diff --git a/main/State.hs b/main/State.hs index 76441df..150178e 100644 --- a/main/State.hs +++ b/main/State.hs @@ -44,22 +44,22 @@ loadLocalStateHead term st = loadHeads st >>= \case , ssValue = [ storedRef $ idExtData $ fromMaybe identity owner ] } storeHead st $ LocalState - { lsIdentity = idExtData identity + { lsPrev = Nothing + , lsIdentity = idExtData identity , lsShared = [ shared ] , lsOther = [] } updateSharedIdentity :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Terminal -> m () -updateSharedIdentity term = updateLocalHead_ $ updateSharedState_ $ \case +updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case Just identity -> do Just . toComposedIdentity <$> interactiveIdentityUpdate term identity Nothing -> throwOtherError "no existing shared identity" interactiveIdentityUpdate :: (Foldable f, MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => Terminal -> Identity f -> m UnifiedIdentity -interactiveIdentityUpdate term identity = do - let public = idKeyIdentity identity - +interactiveIdentityUpdate term fidentity = do + identity <- mergeIdentity fidentity name <- liftIO $ do setPrompt term $ T.unpack $ T.concat $ concat [ [ T.pack "Name" ] @@ -70,11 +70,11 @@ interactiveIdentityUpdate term identity = do ] getInputLine term $ KeepPrompt . maybe T.empty T.pack - if | T.null name -> mergeIdentity identity + if | T.null name -> return identity | otherwise -> do - secret <- loadKey public - maybe (throwOtherError "created invalid identity") return . validateIdentity =<< - mstore =<< sign secret =<< mstore (emptyIdentityData public) - { iddPrev = toList $ idDataF identity - , iddName = Just name + secret <- loadKey $ idKeyIdentity identity + maybe (throwOtherError "created invalid identity") return . validateExtendedIdentity =<< + mstore =<< sign secret =<< mstore . ExtendedIdentityData =<< return (emptyIdentityExtension $ idData identity) + { idePrev = toList $ idExtDataF identity + , ideName = Just name } diff --git a/main/Terminal.hs b/main/Terminal.hs index 5dc3612..150bd8c 100644 --- a/main/Terminal.hs +++ b/main/Terminal.hs @@ -31,8 +31,9 @@ import Data.List import Data.Text (Text) import Data.Text qualified as T -import System.IO import System.Console.ANSI +import System.IO +import System.IO.Error data Terminal = Terminal @@ -107,7 +108,7 @@ termPutStr Terminal {..} str = do getInput :: IO Input getInput = do - getChar >>= \case + handleJust (guard . isEOFError) (\() -> return InputEnd) $ getChar >>= \case '\ESC' -> do esc <- readEsc case parseEsc esc of diff --git a/main/Test.hs b/main/Test.hs index 08ad880..b07bd87 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -26,7 +26,7 @@ import Data.Text qualified as T import Data.Text.Encoding import Data.Text.IO qualified as T import Data.Typeable -import Data.UUID qualified as U +import Data.UUID.Types qualified as U import Network.Socket @@ -44,6 +44,7 @@ import Erebos.Object import Erebos.Pairing import Erebos.PubKey import Erebos.Service +import Erebos.Service.Stream import Erebos.Set import Erebos.State import Erebos.Storable @@ -66,10 +67,17 @@ data TestState = TestState data RunningServer = RunningServer { rsServer :: Server - , rsPeers :: MVar (Int, [(Int, Peer)]) + , rsPeers :: MVar ( Int, [ TestPeer ] ) , rsPeerThread :: ThreadId } +data TestPeer = TestPeer + { tpIndex :: Int + , tpPeer :: Peer + , tpStreamReaders :: MVar [ (Int, StreamReader ) ] + , tpStreamWriters :: MVar [ (Int, StreamWriter ) ] + } + initTestState :: TestState initTestState = TestState { tsHead = Nothing @@ -137,17 +145,20 @@ cmdOut line = do getPeer :: Text -> CommandM Peer -getPeer spidx = do +getPeer spidx = tpPeer <$> getTestPeer spidx + +getTestPeer :: Text -> CommandM TestPeer +getTestPeer spidx = do Just RunningServer {..} <- gets tsServer - Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers) + Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers) return peer -getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int +getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int getPeerIndex pmvar = do peer <- asks svcPeer - maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar) + maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar) -pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a +pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a pairingAttributes _ out peers prefix = PairingAttributes { pairingHookRequest = return () @@ -216,6 +227,11 @@ directMessageAttributes out = DirectMessageAttributes { dmOwnerMismatch = afterCommit $ outLine out "dm-owner-mismatch" } +discoveryAttributes :: DiscoveryAttributes +discoveryAttributes = (defaultServiceAttributes Proxy) + { discoveryProvideTunnel = const True + } + dmReceivedWatcher :: Output -> Stored DirectMessage -> IO () dmReceivedWatcher out smsg = do let msg = fromStored smsg @@ -267,6 +283,9 @@ commands = map (T.pack *** id) , ("peer-drop", cmdPeerDrop) , ("peer-list", cmdPeerList) , ("test-message-send", cmdTestMessageSend) + , ("test-stream-open", cmdTestStreamOpen) + , ("test-stream-close", cmdTestStreamClose) + , ("test-stream-send", cmdTestStreamSend) , ("local-state-get", cmdLocalStateGet) , ("local-state-replace", cmdLocalStateReplace) , ("local-state-wait", cmdLocalStateWait) @@ -286,6 +305,7 @@ commands = map (T.pack *** id) , ("contact-set-name", cmdContactSetName) , ("dm-send-peer", cmdDmSendPeer) , ("dm-send-contact", cmdDmSendContact) + , ("dm-send-identity", cmdDmSendIdentity) , ("dm-list-peer", cmdDmListPeer) , ("dm-list-contact", cmdDmListContact) , ("chatroom-create", cmdChatroomCreate) @@ -301,6 +321,7 @@ commands = map (T.pack *** id) , ("chatroom-leave", cmdChatroomLeave) , ("chatroom-message-send", cmdChatroomMessageSend) , ("discovery-connect", cmdDiscoveryConnect) + , ("discovery-tunnel", cmdDiscoveryTunnel) ] cmdStore :: Command @@ -454,7 +475,8 @@ cmdCreateIdentity = do _ -> return [] storeHead st $ LocalState - { lsIdentity = idExtData identity + { lsPrev = Nothing + , lsIdentity = idExtData identity , lsShared = shared , lsOther = [] } @@ -493,14 +515,27 @@ cmdStartServer = do "attach" -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" "chatroom" -> return $ someService @ChatroomService Proxy "contact" -> return $ someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact" - "discovery" -> return $ someService @DiscoveryService Proxy + "discovery" -> return $ someServiceAttr $ discoveryAttributes "dm" -> return $ someServiceAttr $ directMessageAttributes out "sync" -> return $ someService @SyncService Proxy "test" -> return $ someServiceAttr $ (defaultServiceAttributes Proxy) { testMessageReceived = \obj otype len sref -> do liftIO $ do void $ store (headStorage h) obj - outLine out $ unwords ["test-message-received", otype, len, sref] + outLine out $ unwords [ "test-message-received", otype, len, sref ] + , testStreamsReceived = \streams -> do + pidx <- getPeerIndex rsPeers + liftIO $ do + nums <- mapM getStreamReaderNumber streams + outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums + forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do + let go = readStreamPacket stream >>= \case + StreamData seqNum bytes -> do + outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ] + go + StreamClosed seqNum -> do + outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ] + go } sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" @@ -509,15 +544,23 @@ cmdStartServer = do rsPeerThread <- liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange rsServer - let printPeer (idx, p) = do - params <- peerIdentity p >>= return . \case + let printPeer TestPeer {..} = do + params <- peerIdentity tpPeer >>= return . \case PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) - _ -> [ "addr", show (peerAddress p) ] - outLine out $ unwords $ [ "peer", show idx ] ++ params + _ -> [ "addr", show (peerAddress tpPeer) ] + outLine out $ unwords $ [ "peer", show tpIndex ] ++ params + + update ( tpIndex, [] ) = do + tpPeer <- return peer + tpStreamReaders <- newMVar [] + tpStreamWriters <- newMVar [] + let tp = TestPeer {..} + printPeer tp + return ( tpIndex + 1, [ tp ] ) - update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)]) - update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur - | otherwise = fmap (p:) <$> update (nid, ps) + update cur@( nid, p : ps ) + | tpPeer p == peer = printPeer p >> return cur + | otherwise = fmap (p :) <$> update ( nid, ps ) modifyMVar_ rsPeers update @@ -554,10 +597,10 @@ cmdPeerList = do peers <- liftIO $ getCurrentPeerList rsServer tpeers <- liftIO $ readMVar rsPeers forM_ peers $ \peer -> do - Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers + Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers mbpid <- peerIdentity peer cmdOut $ unwords $ concat - [ [ "peer-list-item", show n ] + [ [ "peer-list-item", show (tpIndex tp) ] , [ "addr", show (peerAddress peer) ] , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) _ -> [] @@ -574,6 +617,40 @@ cmdTestMessageSend = do sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs cmdOut "test-message-send done" +cmdTestStreamOpen :: Command +cmdTestStreamOpen = do + spidx : rest <- asks tiParams + tp <- getTestPeer spidx + count <- case rest of + [] -> return 1 + tcount : _ -> return $ read $ T.unpack tcount + + out <- asks tiOutput + runPeerService (tpPeer tp) $ do + streams <- openTestStreams count + afterCommit $ do + nums <- mapM getStreamWriterNumber streams + modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams) + outLine out $ unwords $ "test-stream-open-done" + : T.unpack spidx + : map show nums + +cmdTestStreamClose :: Command +cmdTestStreamClose = do + [ spidx, sid ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ closeStream stream + cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ] + +cmdTestStreamSend :: Command +cmdTestStreamSend = do + [ spidx, sid, content ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ writeStream stream $ encodeUtf8 content + cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ] + cmdLocalStateGet :: Command cmdLocalStateGet = do h <- getHead @@ -646,7 +723,7 @@ cmdWatchSharedIdentity = do cmdUpdateLocalIdentity :: Command cmdUpdateLocalIdentity = do [name] <- asks tiParams - updateLocalHead_ $ \ls -> do + updateLocalState_ $ \ls -> do Just identity <- return $ validateExtendedIdentity $ lsIdentity $ fromStored ls let public = idKeyIdentity identity @@ -661,7 +738,7 @@ cmdUpdateLocalIdentity = do cmdUpdateSharedIdentity :: Command cmdUpdateSharedIdentity = do [name] <- asks tiParams - updateLocalHead_ $ updateSharedState_ $ \case + updateLocalState_ $ updateSharedState_ $ \case Nothing -> throwOtherError "no existing shared identity" Just identity -> do let public = idKeyIdentity identity @@ -731,7 +808,7 @@ cmdContactSetName :: Command cmdContactSetName = do [cid, name] <- asks tiParams contact <- getContact cid - updateLocalHead_ $ updateSharedState_ $ contactSetName contact name + updateLocalState_ $ updateSharedState_ $ contactSetName contact name cmdOut "contact-set-name-done" cmdDmSendPeer :: Command @@ -746,6 +823,14 @@ cmdDmSendContact = do Just to <- contactIdentity <$> getContact cid void $ sendDirectMessage to msg +cmdDmSendIdentity :: Command +cmdDmSendIdentity = do + st <- asks tiStorage + [ tid, msg ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just to <- return $ validateExtendedIdentity $ wrappedLoad ref + void $ sendDirectMessage to msg + dmList :: Foldable f => Identity f -> Command dmList peer = do threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead @@ -887,11 +972,14 @@ cmdChatroomMessageSend = do cmdDiscoveryConnect :: Command cmdDiscoveryConnect = do - st <- asks tiStorage [ tref ] <- asks tiParams - Just ref <- liftIO $ readRef st $ encodeUtf8 tref - + Just dgst <- return $ readRefDigest $ encodeUtf8 tref Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + discoverySearch rsServer dgst + +cmdDiscoveryTunnel :: Command +cmdDiscoveryTunnel = do + [ tvia, ttarget ] <- asks tiParams + via <- getPeer tvia + Just target <- return $ readRefDigest $ encodeUtf8 ttarget + liftIO $ discoverySetupTunnel via target diff --git a/main/Test/Service.hs b/main/Test/Service.hs index 8c58dee..c0be07d 100644 --- a/main/Test/Service.hs +++ b/main/Test/Service.hs @@ -1,8 +1,11 @@ module Test.Service ( TestMessage(..), TestMessageAttributes(..), + + openTestStreams, ) where +import Control.Monad import Control.Monad.Reader import Data.ByteString.Lazy.Char8 qualified as BL @@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL import Erebos.Network import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable data TestMessage = TestMessage (Stored Object) data TestMessageAttributes = TestMessageAttributes { testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage () + , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage () } instance Storable TestMessage where @@ -26,7 +31,10 @@ instance Service TestMessage where serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8" type ServiceAttributes TestMessage = TestMessageAttributes - defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ()) + defaultServiceAttributes _ = TestMessageAttributes + { testMessageReceived = \_ _ _ _ -> return () + , testStreamsReceived = \_ -> return () + } serviceHandler smsg = do let TestMessage sobj = fromStored smsg @@ -36,3 +44,14 @@ instance Service TestMessage where cb <- asks $ testMessageReceived . svcAttributes cb obj otype len (show $ refDigest $ storedRef sobj) _ -> return () + + streams <- receivedStreams + when (not $ null streams) $ do + cb <- asks $ testStreamsReceived . svcAttributes + cb streams + + +openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ] +openTestStreams count = do + replyPacket . TestMessage =<< mstore (Rec []) + replicateM count openStream diff --git a/main/WebSocket.hs b/main/WebSocket.hs new file mode 100644 index 0000000..5c49aa1 --- /dev/null +++ b/main/WebSocket.hs @@ -0,0 +1,46 @@ +module WebSocket ( + WebSocketAddress(..), + startWebsocketServer, +) where + +import Control.Concurrent +import Control.Exception +import Control.Monad + +import Data.ByteString.Lazy qualified as BL +import Data.Unique + +import Erebos.Network + +import Network.WebSockets qualified as WS + + +data WebSocketAddress = WebSocketAddress Unique WS.Connection + +instance Eq WebSocketAddress where + WebSocketAddress u _ == WebSocketAddress u' _ = u == u' + +instance Ord WebSocketAddress where + compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u' + +instance Show WebSocketAddress where + show (WebSocketAddress _ _) = "websocket" + +instance PeerAddressType WebSocketAddress where + sendBytesToAddress (WebSocketAddress _ conn) msg = do + WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg + +startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO () +startWebsocketServer server addr port logd = do + void $ forkIO $ do + WS.runServer addr port $ \pending -> do + conn <- WS.acceptRequest pending + u <- newUnique + let paddr = WebSocketAddress u conn + void $ serverPeerCustom server paddr + handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do + WS.withPingThread conn 30 (return ()) $ do + forever $ do + WS.receiveDataMessage conn >>= \case + WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg + WS.Text {} -> logd $ "unexpected websocket text message" diff --git a/src/Erebos/Attach.hs b/src/Erebos/Attach.hs index df61406..fad6197 100644 --- a/src/Erebos/Attach.hs +++ b/src/Erebos/Attach.hs @@ -52,7 +52,7 @@ instance PairingResult AttachIdentity where guard $ iddPrev (fromSigned $ idData identity) == [eiddStoredBase curid] return (identity, keys) - pairingFinalizeRequest (identity, keys) = updateLocalHead_ $ \slocal -> do + pairingFinalizeRequest (identity, keys) = updateLocalState_ $ \slocal -> do let owner = finalOwner identity st <- getStorage pkeys <- mapM (copyStored st) [ idKeyIdentity owner, idKeyMessage owner ] diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs index 2d4f272..74456ff 100644 --- a/src/Erebos/Chatroom.hs +++ b/src/Erebos/Chatroom.hs @@ -293,7 +293,7 @@ createChatroom rdName rdDescription = do , rsdSubscribe = Just True } - updateLocalHead $ updateSharedState $ \rooms -> do + updateLocalState $ updateSharedState $ \rooms -> do st <- getStorage (, cstate) <$> storeSetAdd st cstate rooms @@ -302,7 +302,7 @@ findAndUpdateChatroomState => (ChatroomState -> Maybe (m ChatroomState)) -> m (Maybe ChatroomState) findAndUpdateChatroomState f = do - updateLocalHead $ updateSharedState $ \roomSet -> do + updateLocalState $ updateSharedState $ \roomSet -> do let roomList = fromSetBy (comparing $ roomName <=< roomStateRoom) roomSet case catMaybes $ map (\x -> (x,) <$> f x) roomList of ((orig, act) : _) -> do @@ -523,7 +523,7 @@ instance Service ChatroomService where } when (not $ null chatRoomInfo) $ do - updateLocalHead_ $ updateSharedState_ $ \roomSet -> do + updateLocalState_ $ updateSharedState_ $ \roomSet -> do let rooms = fromSetBy (comparing $ roomName <=< roomStateRoom) roomSet upd set (roomInfo :: Stored (Signed ChatroomData)) = do let currentRoots = storedRoots roomInfo @@ -562,7 +562,7 @@ instance Service ChatroomService where svcModify $ \ps -> ps { psSubscribedTo = filter (/= leastRoot) (psSubscribedTo ps) } when (not (null chatRoomMessage)) $ do - updateLocalHead_ $ updateSharedState_ $ \roomSet -> do + updateLocalState_ $ updateSharedState_ $ \roomSet -> do let rooms = fromSetBy (comparing $ roomName <=< roomStateRoom) roomSet upd set (msgData :: Stored (Signed ChatMessageData)) | Just msg <- validateSingleMessage msgData = do diff --git a/src/Erebos/Contact.hs b/src/Erebos/Contact.hs index 25239b9..88e6c44 100644 --- a/src/Erebos/Contact.hs +++ b/src/Erebos/Contact.hs @@ -165,7 +165,7 @@ contactReject :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () contactReject = pairingReject @ContactAccepted Proxy finalizeContact :: MonadHead LocalState m => UnifiedIdentity -> m () -finalizeContact identity = updateLocalHead_ $ updateSharedState_ $ \contacts -> do +finalizeContact identity = updateLocalState_ $ updateSharedState_ $ \contacts -> do st <- getStorage cdata <- wrappedStore st ContactData { cdPrev = [] diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index dee6faa..187fddd 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con directMessageConversation peer = do (find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case Just thread -> return $ DirectMessageConversation thread - Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] + Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] [] chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) diff --git a/src/Erebos/DirectMessage.hs b/src/Erebos/DirectMessage.hs index 28d8085..dc6724c 100644 --- a/src/Erebos/DirectMessage.hs +++ b/src/Erebos/DirectMessage.hs @@ -17,6 +17,7 @@ module Erebos.DirectMessage ( ) where import Control.Monad +import Control.Monad.Except import Control.Monad.Reader import Data.List @@ -27,8 +28,10 @@ import qualified Data.Text as T import Data.Time.Format import Data.Time.LocalTime +import Erebos.Discovery import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service import Erebos.State import Erebos.Storable @@ -102,8 +105,10 @@ instance Service DirectMessage where serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal - serviceStorageWatchers _ = (:[]) $ - SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + serviceStorageWatchers _ = + [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers + ] data MessageState = MessageState @@ -158,7 +163,7 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m) => Identity f -> Text -> m (Stored DirectMessage) -sendDirectMessage pid text = updateLocalHead $ \ls -> do +sendDirectMessage pid text = updateLocalState $ \ls -> do let self = localIdentity $ fromStored ls powner = finalOwner pid flip updateSharedState ls $ \(DirectMessageThreads prev _) -> do @@ -188,7 +193,7 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do peer <- asks svcPeer let thread = messageThreadFor pid mss mapM_ (sendToPeerStored peer) $ msgHead thread - updateLocalHead_ $ \ls -> do + updateLocalState_ $ \ls -> do let powner = finalOwner pid flip updateSharedState_ ls $ \unchanged@(DirectMessageThreads prev _) -> do let ready = findMsgProperty powner msReady prev @@ -209,12 +214,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do else do return unchanged +findMissingPeers :: Server -> DirectMessageThreads -> ExceptT ErebosError IO () +findMissingPeers server threads = do + forM_ (toThreadList threads) $ \thread -> do + when (msgHead thread /= msgReceived thread) $ do + mapM_ (discoverySearch server) $ map (refDigest . storedRef) $ idDataF $ msgPeer thread + data DirectMessageThread = DirectMessageThread { msgPeer :: ComposedIdentity - , msgHead :: [Stored DirectMessage] - , msgSent :: [Stored DirectMessage] - , msgSeen :: [Stored DirectMessage] + , msgHead :: [ Stored DirectMessage ] + , msgSent :: [ Stored DirectMessage ] + , msgSeen :: [ Stored DirectMessage ] + , msgReceived :: [ Stored DirectMessage ] } threadToList :: DirectMessageThread -> [DirectMessage] @@ -248,6 +260,7 @@ messageThreadFor peer mss = , msgHead = filterAncestors $ ready ++ received , msgSent = filterAncestors $ sent ++ received , msgSeen = filterAncestors $ ready ++ seen + , msgReceived = filterAncestors $ received } diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 48500d7..ede9cc9 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -3,7 +3,10 @@ module Erebos.Discovery ( DiscoveryService(..), DiscoveryAttributes(..), - DiscoveryConnection(..) + DiscoveryConnection(..), + + discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -12,9 +15,13 @@ import Control.Monad.Except import Control.Monad.Reader import Data.IP qualified as IP +import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M import Data.Maybe +import Data.Proxy +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) import Data.Text qualified as T import Data.Word @@ -28,14 +35,22 @@ import Erebos.Identity import Erebos.Network import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable +#ifndef ENABLE_ICE_SUPPORT +type IceConfig = () +type IceSession = () +type IceRemoteInfo = Stored Object +#endif + + data DiscoveryService = DiscoverySelf [ Text ] (Maybe Int) | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) - | DiscoverySearch Ref - | DiscoveryResult Ref [ Text ] + | DiscoverySearch (Either Ref RefDigest) + | DiscoveryResult (Either Ref RefDigest) [ Text ] | DiscoveryConnectionRequest DiscoveryConnection | DiscoveryConnectionResponse DiscoveryConnection @@ -44,6 +59,7 @@ data DiscoveryAttributes = DiscoveryAttributes , discoveryStunServer :: Maybe Text , discoveryTurnPort :: Maybe Word16 , discoveryTurnServer :: Maybe Text + , discoveryProvideTunnel :: Peer -> Bool } defaultDiscoveryAttributes :: DiscoveryAttributes @@ -52,23 +68,22 @@ defaultDiscoveryAttributes = DiscoveryAttributes , discoveryStunServer = Nothing , discoveryTurnPort = Nothing , discoveryTurnServer = Nothing + , discoveryProvideTunnel = const False } data DiscoveryConnection = DiscoveryConnection - { dconnSource :: Ref - , dconnTarget :: Ref + { dconnSource :: Either Ref RefDigest + , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text -#ifdef ENABLE_ICE_SUPPORT + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo -#else - , dconnIceInfo :: Maybe (Stored Object) -#endif } -emptyConnection :: Ref -> Ref -> DiscoveryConnection +emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -84,19 +99,21 @@ instance Storable DiscoveryService where storeMbInt "stun-port" stunPort storeMbText "turn-server" turnServer storeMbInt "turn-port" turnPort - DiscoverySearch ref -> storeRawRef "search" ref - DiscoveryResult ref addr -> do - storeRawRef "result" ref + DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst + DiscoveryResult edgst addr -> do + either (storeRawRef "result") (storeRawWeak "result") edgst mapM_ (storeText "address") addr DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn - where storeConnection ctype conn = do - storeText "connection" $ ctype - storeRawRef "source" $ dconnSource conn - storeRawRef "target" $ dconnTarget conn - storeMbText "address" $ dconnAddress conn - storeMbRef "ice-info" $ dconnIceInfo conn + where + storeConnection ctype DiscoveryConnection {..} = do + storeText "connection" $ ctype + either (storeRawRef "source") (storeRawWeak "source") dconnSource + either (storeRawRef "target") (storeRawWeak "target") dconnTarget + storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" + storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum [ do @@ -114,29 +131,64 @@ instance Storable DiscoveryService where <*> loadMbInt "stun-port" <*> loadMbText "turn-server" <*> loadMbInt "turn-port" - , DiscoverySearch <$> loadRawRef "search" + , DiscoverySearch <$> msum + [ Left <$> loadRawRef "search" + , Right <$> loadRawWeak "search" + ] , DiscoveryResult - <$> loadRawRef "result" + <$> msum + [ Left <$> loadRawRef "result" + , Right <$> loadRawWeak "result" + ] <*> loadTexts "address" , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] - where loadConnection ctype ctor = do - ctype' <- loadText "connection" - guard $ ctype == ctype' - return . ctor =<< DiscoveryConnection - <$> loadRawRef "source" - <*> loadRawRef "target" - <*> loadMbText "address" - <*> loadMbRef "ice-info" + where + loadConnection ctype ctor = do + ctype' <- loadText "connection" + guard $ ctype == ctype' + dconnSource <- msum + [ Left <$> loadRawRef "source" + , Right <$> loadRawWeak "source" + ] + dconnTarget <- msum + [ Left <$> loadRawRef "target" + , Right <$> loadRawWeak "target" + ] + dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" + dconnIceInfo <- loadMbRef "ice-info" + return $ ctor DiscoveryConnection {..} data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer , dpAddress :: [ Text ] -#ifdef ENABLE_ICE_SUPPORT , dpIceSession :: Maybe IceSession -#endif + } + +emptyPeer :: DiscoveryPeer +emptyPeer = DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Nothing + } + +data DiscoveryPeerState = DiscoveryPeerState + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) + , dpsTurnServer :: Maybe ( Text, Word16 ) + , dpsIceConfig :: Maybe IceConfig + } + +data DiscoveryGlobalState = DiscoveryGlobalState + { dgsPeers :: Map RefDigest DiscoveryPeer + , dgsSearchingFor :: Set RefDigest } instance Service DiscoveryService where @@ -145,13 +197,20 @@ instance Service DiscoveryService where type ServiceAttributes DiscoveryService = DiscoveryAttributes defaultServiceAttributes _ = defaultDiscoveryAttributes -#ifdef ENABLE_ICE_SUPPORT - type ServiceState DiscoveryService = Maybe IceConfig - emptyServiceState _ = Nothing -#endif - - type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer - emptyServiceGlobalState _ = M.empty + type ServiceState DiscoveryService = DiscoveryPeerState + emptyServiceState _ = DiscoveryPeerState + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing + , dpsTurnServer = Nothing + , dpsIceConfig = Nothing + } + + type ServiceGlobalState DiscoveryService = DiscoveryGlobalState + emptyServiceGlobalState _ = DiscoveryGlobalState + { dgsPeers = M.empty + , dgsSearchingFor = S.empty + } serviceHandler msg = case fromStored msg of DiscoverySelf addrs priority -> do @@ -172,15 +231,14 @@ instance Service DiscoveryService where | otherwise -> return Nothing - forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer - { dpPriority = fromMaybe 0 priority - , dpPeer = Just peer - , dpAddress = addrs -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do + let dp = DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = addrs + , dpIceSession = Nothing + } + svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } attrs <- asks svcAttributes replyPacket $ DiscoveryAcknowledged matchedAddrs (discoveryStunServer attrs) @@ -189,7 +247,6 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do -#ifdef ENABLE_ICE_SUPPORT paddr <- asks (peerAddress . svcPeer) >>= return . \case (DatagramAddress saddr) -> case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, _) @@ -205,138 +262,207 @@ instance Service DiscoveryService where toIceServer (Just server) Nothing = Just ( server, 0 ) toIceServer (Just server) (Just port) = Just ( server, port ) - cfg <- liftIO $ iceCreateConfig - (toIceServer stunServer stunPort) - (toIceServer turnServer turnPort) - svcSet cfg -#endif - return () + svcModify $ \s -> s + { dpsStunServer = toIceServer stunServer stunPort + , dpsTurnServer = toIceServer turnServer turnPort + } - DiscoverySearch ref -> do - dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal - replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer + DiscoverySearch edgst -> do + dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal + replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer - DiscoveryResult ref [] -> do - svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" + DiscoveryResult _ [] -> do + -- not found + return () - DiscoveryResult ref addrs -> do + DiscoveryResult edgst addrs -> do + let dgst = either refDigest id edgst -- TODO: check if we really requested that server <- asks svcServer + st <- getStorage self <- svcSelf - mbIceConfig <- svcGet discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if + forM_ addrs $ \addr -> if | addr == T.pack "ICE" -#ifdef ENABLE_ICE_SUPPORT - , Just config <- mbIceConfig - -> do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Nothing - , dpAddress = [] - , dpIceSession = Just ice - } -#else -> do - return () +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + -- Try to promote weak ref to normal one for older peers: + edgst' <- case edgst of + Left r -> return (Left r) + Right d -> refFromDigest st d >>= \case + Just r -> return (Left r) + Nothing -> return (Right d) + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do + return () #endif + return () | [ ipaddr, port ] <- words (T.unpack addr) -> do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer - { dpPriority = 0 - , dpPeer = Just peer - , dpAddress = [] -#ifdef ENABLE_ICE_SUPPORT - , dpIceSession = Nothing -#endif - } + void $ liftIO $ forkIO $ do + saddr <- head <$> + getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) + peer <- serverPeer server (addrAddress saddr) + runAsService $ do + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } | otherwise -> do - runAsService $ do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr DiscoveryConnectionRequest conn -> do self <- svcSelf + attrs <- asks svcAttributes let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) - if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + if either refDigest id (dconnTarget conn) `elem` identityDigests self + then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion + | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer - svcGet >>= \case + getIceConfig >>= \case Just config -> do liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do rinfo <- iceRemoteInfo ice res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } case res of - Right _ -> do - case dconnIceInfo conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" + Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err Nothing -> do - svcPrint $ "Discovery: ICE request from peer without ICE configuration" -#else - return () + return () #endif - else do - -- request to some of our peers, relay - mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal - case mbdp of + | otherwise -> do + svcPrint $ "Discovery: unsupported connection request" + + else do + -- request to some of our peers, relay + peer <- asks svcPeer + mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams + case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | not (discoveryProvideTunnel attrs peer) -> do + replyPacket $ DiscoveryConnectionResponse rconn + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf - dpeers <- svcGetGlobal - if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) - then do + dps <- svcGet + dpeers <- dgsPeers <$> svcGetGlobal + + if either refDigest id (dconnSource conn) `elem` identityDigests self + then do -- response to our request, try to connect to the peer -#ifdef ENABLE_ICE_SUPPORT server <- asks svcServer if | Just addr <- dconnAddress conn , [ipaddr, port] <- words (T.unpack addr) -> do saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) [] Nothing + let upd dp = dp { dpPeer = Just peer } + svcModifyGlobal $ \s -> s + { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> svcPrint $ "Discovery: missing stream in tunnel response" - | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers +#ifdef ENABLE_ICE_SUPPORT + | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice +#endif | otherwise -> svcPrint $ "Discovery: connection request failed" -#else - return () -#endif - else do - -- response to relayed request - case M.lookup (refDigest $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + | otherwise -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer @@ -352,5 +478,118 @@ instance Service DiscoveryService where #endif ] + pid <- asks svcPeerIdentity + gs <- svcGetGlobal + let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid) + svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor } + when (not $ null addrs) $ do sendToPeer peer $ DiscoverySelf addrs Nothing + forM_ searchingFor $ \dgst -> do + sendToPeer peer $ DiscoverySearch (Right dgst) + +#ifdef ENABLE_ICE_SUPPORT + serviceStopServer _ _ _ pstates = do + forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do + mapM_ iceStopThread dpsIceConfig +#endif + + +identityDigests :: Foldable f => Identity f -> [ RefDigest ] +identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid + + +getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig) +getIceConfig = do + dpsIceConfig <$> svcGet >>= \case + Just cfg -> return $ Just cfg + Nothing -> do +#ifdef ENABLE_ICE_SUPPORT + stun <- dpsStunServer <$> svcGet + turn <- dpsTurnServer <$> svcGet + liftIO (iceCreateConfig stun turn) >>= \case + Just cfg -> do + svcModify $ \s -> s { dpsIceConfig = Just cfg } + return $ Just cfg + Nothing -> do + svcPrint $ "Discovery: failed to create ICE config" + return Nothing +#else + return Nothing +#endif + + +discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m () +discoverySearch server dgst = do + peers <- liftIO $ getCurrentPeerList server + match <- forM peers $ \peer -> do + peerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ dgst `elem` identityDigests pid + _ -> return False + when (not $ or match) $ do + modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s + { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s + } + forM_ peers $ \peer -> do + sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs index ba2607a..1e1a521 100644 --- a/src/Erebos/Flow.hs +++ b/src/Erebos/Flow.hs @@ -11,54 +11,53 @@ module Erebos.Flow ( import Control.Concurrent.STM -data Flow r w = Flow (TMVar [r]) (TMVar [w]) - | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') +data Flow r w + = Flow (TBQueue r) (TBQueue w) + | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w') type SymFlow a = Flow a a newFlow :: STM (Flow a b, Flow b a) newFlow = do - x <- newEmptyTMVar - y <- newEmptyTMVar + x <- newTBQueue 16 + y <- newTBQueue 16 return (Flow x y, Flow y x) newFlowIO :: IO (Flow a b, Flow b a) newFlowIO = atomically newFlow readFlow :: Flow r w -> STM r -readFlow (Flow rvar _) = takeTMVar rvar >>= \case - (x:[]) -> return x - (x:xs) -> putTMVar rvar xs >> return x - [] -> error "Flow: empty list" +readFlow (Flow rvar _) = readTBQueue rvar readFlow (MappedFlow f _ up) = f <$> readFlow up tryReadFlow :: Flow r w -> STM (Maybe r) -tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case - Just (x:[]) -> return (Just x) - Just (x:xs) -> putTMVar rvar xs >> return (Just x) - Just [] -> error "Flow: empty list" - Nothing -> return Nothing +tryReadFlow (Flow rvar _) = tryReadTBQueue rvar tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up canReadFlow :: Flow r w -> STM Bool -canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar +canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar canReadFlow (MappedFlow _ _ up) = canReadFlow up writeFlow :: Flow r w -> w -> STM () -writeFlow (Flow _ wvar) = putTMVar wvar . (:[]) +writeFlow (Flow _ wvar) = writeTBQueue wvar writeFlow (MappedFlow _ f up) = writeFlow up . f writeFlowBulk :: Flow r w -> [w] -> STM () writeFlowBulk _ [] = return () -writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs +writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs tryWriteFlow :: Flow r w -> w -> STM Bool -tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[]) -tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f +tryWriteFlow (Flow _ wvar) x = do + isFullTBQueue wvar >>= \case + True -> return False + False -> do + writeTBQueue wvar x + return True +tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x canWriteFlow :: Flow r w -> STM Bool -canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar +canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar canWriteFlow (MappedFlow _ _ up) = canWriteFlow up readFlowIO :: Flow r w -> IO r diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index 2c6f500..dceeb2c 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -8,6 +8,7 @@ module Erebos.ICE ( IceRemoteInfo, iceCreateConfig, + iceStopThread, iceCreateSession, iceDestroy, iceRemoteInfo, @@ -139,6 +140,12 @@ iceCreateConfig stun turn = then return Nothing else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg +foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread" + ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO () + +iceStopThread :: IceConfig -> IO () +iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread + {#pointer *pj_ice_strans as ^ #} iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index e79fb9d..e9446fe 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -1,6 +1,7 @@ #include "pjproject.h" #include "Erebos/ICE_stub.h" +#include <stdatomic.h> #include <stdio.h> #include <stdlib.h> #include <stdbool.h> @@ -15,6 +16,13 @@ static struct pj_sockaddr def_addr; } ice; +struct erebos_ice_cfg +{ + pj_ice_strans_cfg cfg; + pj_thread_t * thread; + atomic_bool exit; +}; + struct user_data { pj_ice_sess_role role; @@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status) fprintf(stderr, "ICE: %s: %s\n", msg, err); } -static int ice_worker_thread(void * vcfg) +static int ice_worker_thread( void * vcfg ) { - pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg; + struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg ); - while (true) { + while( ! ecfg->exit ){ pj_time_val max_timeout = { 0, 0 }; pj_time_val timeout = { 0, 0 }; max_timeout.msec = 500; - pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout); + pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout ); pj_assert(timeout.sec >= 0 && timeout.msec >= 0); if (timeout.msec >= 1000) @@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg) if (PJ_TIME_VAL_GT(timeout, max_timeout)) timeout = max_timeout; - int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout); + int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout ); if (c < 0) pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); } @@ -131,80 +139,91 @@ exit: pthread_mutex_unlock(&mutex); } -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ) { ice_init(); - pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) ); - pj_ice_strans_cfg_default( cfg ); + struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) ); + pj_ice_strans_cfg_default( &ecfg->cfg ); + ecfg->exit = false; + ecfg->thread = NULL; - cfg->stun_cfg.pf = &ice.cp.factory; + ecfg->cfg.stun_cfg.pf = &ice.cp.factory; if( pj_timer_heap_create( ice.pool, 100, - &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){ + &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){ fprintf( stderr, "pj_timer_heap_create failed\n" ); goto fail; } - if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){ + if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){ fprintf( stderr, "pj_ioqueue_create failed\n" ); goto fail; } - pj_thread_t * thread; if( pj_thread_create( ice.pool, NULL, &ice_worker_thread, - cfg, 0, 0, &thread ) != PJ_SUCCESS ){ + ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){ fprintf( stderr, "pj_thread_create failed\n" ); goto fail; } - cfg->af = pj_AF_INET(); - cfg->opt.aggressive = PJ_TRUE; + ecfg->cfg.af = pj_AF_INET(); + ecfg->cfg.opt.aggressive = PJ_TRUE; if( stun_server ){ - cfg->stun.server.ptr = malloc( strlen( stun_server )); - pj_strcpy2( &cfg->stun.server, stun_server ); + ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server )); + pj_strcpy2( &ecfg->cfg.stun.server, stun_server ); if( stun_port ) - cfg->stun.port = stun_port; + ecfg->cfg.stun.port = stun_port; } if( turn_server ){ - cfg->turn.server.ptr = malloc( strlen( turn_server )); - pj_strcpy2( &cfg->turn.server, turn_server ); + ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server )); + pj_strcpy2( &ecfg->cfg.turn.server, turn_server ); if( turn_port ) - cfg->turn.port = turn_port; - cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; - cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - cfg->turn.conn_type = PJ_TURN_TP_UDP; + ecfg->cfg.turn.port = turn_port; + ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; + ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP; } - return cfg; + return ecfg; fail: - ice_cfg_free( cfg ); + ice_cfg_free( ecfg ); return NULL; } -void ice_cfg_free( pj_ice_strans_cfg * cfg ) +void ice_cfg_free( struct erebos_ice_cfg * ecfg ) { - if( ! cfg ) + if( ! ecfg ) return; - if( cfg->turn.server.ptr ) - free( cfg->turn.server.ptr ); + ecfg->exit = true; + pj_thread_join( ecfg->thread ); - if( cfg->stun.server.ptr ) - free( cfg->stun.server.ptr ); + if( ecfg->cfg.turn.server.ptr ) + free( ecfg->cfg.turn.server.ptr ); - if( cfg->stun_cfg.ioqueue ) - pj_ioqueue_destroy( cfg->stun_cfg.ioqueue ); + if( ecfg->cfg.stun.server.ptr ) + free( ecfg->cfg.stun.server.ptr ); - if( cfg->stun_cfg.timer_heap ) - pj_timer_heap_destroy( cfg->stun_cfg.timer_heap ); + if( ecfg->cfg.stun_cfg.ioqueue ) + pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue ); - free( cfg ); + if( ecfg->cfg.stun_cfg.timer_heap ) + pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap ); + + free( ecfg ); +} + +void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) +{ + if( ! ecfg ) + return; + ecfg->exit = true; } -pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role, +pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { ice_init(); @@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role .on_ice_complete = cb_on_ice_complete, }; - pj_status_t status = pj_ice_strans_create( NULL, cfg, 1, + pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1, udata, &icecb, &res ); if (status != PJ_SUCCESS) diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index e4fcbdb..c31e227 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -3,11 +3,12 @@ #include <pjnath.h> #include <HsFFI.h> -pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); -void ice_cfg_free( pj_ice_strans_cfg * cfg ); +void ice_cfg_free( struct erebos_ice_cfg * cfg ); +void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); -pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role, +pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); void ice_destroy(pj_ice_strans * strans); diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 54658de..8da4c8d 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -14,7 +14,12 @@ module Erebos.Network ( PeerIdentity(..), peerIdentity, WaitingRef, wrDigest, Service(..), + + PeerAddressType(..), + receivedFromCustomAddress, + serverPeer, + serverPeerCustom, #ifdef ENABLE_ICE_SUPPORT serverPeerIce, #endif @@ -24,6 +29,7 @@ module Erebos.Network ( sendToPeerStored, sendManyToPeerStored, sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -36,13 +42,14 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State +import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.Function import Data.IP qualified as IP import Data.List import Data.Map (Map) -import qualified Data.Map as M +import Data.Map qualified as M import Data.Maybe import Data.Typeable import Data.Word @@ -56,7 +63,7 @@ import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) -import qualified Network.Socket.ByteString as S +import Network.Socket.ByteString qualified as S import Erebos.Error #ifdef ENABLE_ICE_SUPPORT @@ -157,12 +164,19 @@ setPeerChannel Peer {..} ch = do instance Eq Peer where (==) = (==) `on` peerIdentityVar -data PeerAddress = DatagramAddress SockAddr +class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where + sendBytesToAddress :: addr -> ByteString -> IO () + +data PeerAddress + = forall addr. PeerAddressType addr => CustomPeerAddress addr + | DatagramAddress SockAddr #ifdef ENABLE_ICE_SUPPORT - | PeerIceSession IceSession + | PeerIceSession IceSession #endif instance Show PeerAddress where + show (CustomPeerAddress addr) = show addr + show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, port) | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 @@ -170,22 +184,32 @@ instance Show PeerAddress where Just (addr, port) -> [show addr, show port] _ -> [show saddr] + #ifdef ENABLE_ICE_SUPPORT show (PeerIceSession ice) = show ice #endif instance Eq PeerAddress where + CustomPeerAddress addr == CustomPeerAddress addr' + | Just addr'' <- cast addr' = addr == addr'' DatagramAddress addr == DatagramAddress addr' = addr == addr' #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice == PeerIceSession ice' = ice == ice' - _ == _ = False #endif + _ == _ = False instance Ord PeerAddress where + compare (CustomPeerAddress addr) (CustomPeerAddress addr') + | Just addr'' <- cast addr' = compare addr addr'' + | otherwise = compare (typeOf addr) (typeOf addr') + compare (CustomPeerAddress _ ) _ = LT + compare _ (CustomPeerAddress _ ) = GT + compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' #ifdef ENABLE_ICE_SUPPORT compare (DatagramAddress _ ) _ = LT compare _ (DatagramAddress _ ) = GT + compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' #endif @@ -198,9 +222,10 @@ peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] - | PeerConnected (Connection PeerAddress) - | PeerDropped +data PeerState + = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ] + | PeerConnected (Connection PeerAddress) + | PeerDropped lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID @@ -302,13 +327,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do announceUpdate idt forM_ serverServices $ \(SomeService service _) -> do - forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do - withMVar serverPeers $ mapM_ $ \peer -> atomically $ do - readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue serverIOActions $ do - runPeerService peer $ act x - _ -> return () + forM_ (serviceStorageWatchers service) $ \case + SomeStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + GlobalStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + atomically $ writeTQueue serverIOActions $ do + act server x forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -316,8 +346,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do case paddr of + CustomPeerAddress addr -> sendBytesToAddress addr msg DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice -> iceSend ice msg @@ -385,16 +416,29 @@ startServer serverOptions serverOrigHead logd' serverServices = do bracket (open addr) close loop forkServerThread server $ forever $ do - (peer, svc, ref) <- atomically $ readTQueue chanSvc + ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of - Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server stopServer :: Server -> IO () -stopServer Server {..} = do - mapM_ killThread =<< takeMVar serverThreads +stopServer server@Server {..} = do + withMVar serverPeers $ \peers -> do + ( global, peerStates ) <- atomically $ (,) + <$> takeTMVar serverServiceStates + <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState) + + forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do + ps <- forM peerStates $ \( peer, states ) -> + return $ ( peer, ) $ case M.lookup (serviceID proxy) states of + Just (SomeServiceState (_ :: Proxy ps) pstate) + | Just (Refl :: s :~: ps) <- eqT + -> pstate + _ -> emptyServiceState proxy + serviceStopServer proxy server gs ps + mapM_ killThread =<< takeMVar serverThreads dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do @@ -502,9 +546,7 @@ openStream = do conn <- readTVarP peerState >>= \case PeerConnected conn -> return conn _ -> throwError "can't open stream without established connection" - (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case - Right res -> return res - Left err -> throwError err + (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn) liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) addHeader hdr @@ -524,8 +566,8 @@ appendDistinct x (y:ys) | x == y = y : ys appendDistinct x [] = [x] handlePacket :: UnifiedIdentity -> Bool - -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> [PartialRef] -> IO () + -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ] + -> TransportHeader -> [ PartialRef ] -> IO () handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- getPeerChannel peer @@ -659,10 +701,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} - then do - void $ newWaitingRef dgst $ \ref -> - liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show dgst + then do + streamReaders <- mapM acceptStream $ lookupNewStreams headers + void $ newWaitingRef dgst $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders ) + else throwError $ "missing service object " ++ show dgst | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" @@ -787,9 +830,13 @@ notifyServicesOfPeer :: Peer -> STM () notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do writeTQueue serverIOActions $ do forM_ serverServices $ \service@(SomeService _ attrs) -> - runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer + runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer +receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () +receivedFromCustomAddress Server {..} addr msg = do + writeFlowIO serverRawPath ( CustomPeerAddress addr, msg ) + mkPeer :: Server -> PeerAddress -> IO Peer mkPeer peerServer_ peerAddress = do peerState <- newTVarIO (PeerInit []) @@ -808,6 +855,9 @@ serverPeer server paddr = do _ -> paddr serverPeer' server (DatagramAddress paddr') +serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer +serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr) + #ifdef ENABLE_ICE_SUPPORT serverPeerIce :: Server -> IceSession -> IO Peer serverPeerIce server@Server {..} ice = do @@ -856,19 +906,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: []) sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) -sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () +sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m () sendToPeerList peer parts = do let st = peerStorage peer - srefs <- liftIO $ fmap catMaybes $ forM parts $ \case - ServiceReply (Left x) use -> Just . (,use) <$> store st x - ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) - ServiceFinally act -> act >> return Nothing - let dgsts = map (refDigest . fst) srefs - let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) - packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] - liftIO $ atomically $ sendToPeerS peer ackedBy packet + res <- runExceptT $ do + srefs <- liftIO $ fmap catMaybes $ forM parts $ \case + ServiceReply (Left x) use -> Just . (,use) <$> store st x + ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) + _ -> return Nothing + + streamHeaders <- concat <$> do + (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do + forM parts $ \case + ServiceOpenStream cb -> do + conn <- lift (readTVar (peerState peer)) >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't open stream without established connection" + (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn) + + lift $ writeTQueue (serverIOActions (peerServer peer)) $ do + liftIO $ forkServerThread (peerServer peer) handler + return [ ( hdr, cb writer ) ] + _ -> return [] + liftIO $ sequence_ $ map snd streamHeaders + + liftIO $ forM_ parts $ \case + ServiceFinally act -> act + _ -> return () + + let dgsts = map (refDigest . fst) srefs + let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU + header = TransportHeader $ concat + [ [ ServiceType (serviceID $ head parts) ] + , map ServiceRef dgsts + , map fst streamHeaders + ] + packet = TransportPacket header content + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet + + case res of + Right () -> return () + Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $ + "failed to send packet to " <> show (peerAddress peer) <> ": " <> err sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS' secure Peer {..} ackedBy packet = do @@ -901,17 +981,17 @@ sendToPeerWith peer fobj = do Left err -> throwError $ fromErebosError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest lookupService _ [] = Nothing runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () -runPeerService = runPeerServiceOn Nothing +runPeerService = runPeerServiceOn Nothing [] -runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () -runPeerServiceOn mbservice peer handler = liftIO $ do +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice newStreams peer handler = liftIO $ do let server = peerServer peer proxy = Proxy @s svc = serviceID proxy @@ -936,6 +1016,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do , svcPeerIdentity = peerId , svcServer = server , svcPrintOp = atomically . logd + , svcNewStreams = newStreams } reloadHead (serverOrigHead server) >>= \case Nothing -> atomically $ do @@ -956,6 +1037,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +modifyServiceGlobalState + :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index c340503..025f52c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -3,6 +3,7 @@ module Erebos.Network.Protocol ( transportToObject, TransportHeader(..), TransportHeaderItem(..), + ServiceID(..), SecurityRequirement(..), WaitingRef(..), @@ -22,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -36,6 +38,7 @@ import Control.Applicative import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM +import Control.Exception import Control.Monad import Control.Monad.Except import Control.Monad.Trans @@ -68,9 +71,9 @@ import Erebos.Flow import Erebos.Identity import Erebos.Network.Channel import Erebos.Object -import Erebos.Service import Erebos.Storable import Erebos.Storage +import Erebos.UUID (UUID) protocolVersion :: Text @@ -107,6 +110,9 @@ data TransportHeaderItem | StreamOpen Word8 deriving (Eq, Show) +newtype ServiceID = ServiceID UUID + deriving (Eq, Ord, Show, StorableUUID) + newtype Cookie = Cookie ByteString deriving (Eq, Show) @@ -283,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go cGlobalState streamNumber stream + ) where go gs@GlobalState {..} streamNumber stream = do @@ -356,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) + +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -398,7 +415,7 @@ streamClosed Connection {..} snum = atomically $ do modifyTVar' cOutStreams $ filter ((snum /=) . fst) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) @@ -420,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest @@ -512,8 +529,10 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do race_ (waitTill next) waitForUpdate - race_ signalTimeouts $ forever $ join $ atomically $ - passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + race_ signalTimeouts $ forever $ do + io <- atomically $ do + passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + catch io $ \(e :: SomeException) -> atomically $ gLog $ "exception during network protocol handling: " <> show e getConnection :: GlobalState addr -> addr -> STM (Connection addr) diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs index 26ca09f..f00b63d 100644 --- a/src/Erebos/Object.hs +++ b/src/Erebos/Object.hs @@ -13,8 +13,9 @@ module Erebos.Object ( RecItem, RecItem'(..), Ref, PartialRef, RefDigest, - refDigest, - readRef, showRef, showRefDigest, + refDigest, refFromDigest, + readRef, showRef, + readRefDigest, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, ) where diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs index 4bca49c..fdb587a 100644 --- a/src/Erebos/Object/Internal.hs +++ b/src/Erebos/Object/Internal.hs @@ -2,8 +2,9 @@ module Erebos.Object.Internal ( Storage, PartialStorage, StorageCompleteness, Ref, PartialRef, RefDigest, - refDigest, - readRef, showRef, showRefDigest, + refDigest, refFromDigest, + readRef, showRef, + readRefDigest, showRefDigest, refDigestFromByteString, hashToRefDigest, copyRef, partialRef, partialRefFromDigest, @@ -22,9 +23,9 @@ module Erebos.Object.Internal ( Store, StoreRec, evalStore, evalStoreObject, storeBlob, storeRec, storeZero, - storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, - storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, - storeZRef, + storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, storeWeak, storeRawWeak, + storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, storeMbWeak, storeMbRawWeak, + storeZRef, storeZWeak, storeRecItems, Load, LoadRec, @@ -33,9 +34,9 @@ module Erebos.Object.Internal ( loadRecCurrentRef, loadRecItems, loadBlob, loadRec, loadZero, - loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, - loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, - loadTexts, loadBinaries, loadRefs, loadRawRefs, + loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, loadRawWeak, + loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, loadMbRawWeak, + loadTexts, loadBinaries, loadRefs, loadRawRefs, loadRawWeaks, loadZRef, Stored, @@ -74,13 +75,14 @@ import Data.Time.Calendar import Data.Time.Clock import Data.Time.Format import Data.Time.LocalTime -import Data.UUID (UUID) -import qualified Data.UUID as U import System.IO.Unsafe import Erebos.Error import Erebos.Storage.Internal +import Erebos.UUID (UUID) +import Erebos.UUID qualified as U +import Erebos.Util zeroRef :: Storage' c -> Ref' c @@ -121,6 +123,7 @@ copyRecItem' st = \case RecDate x -> return $ return $ RecDate x RecUUID x -> return $ return $ RecUUID x RecRef x -> fmap RecRef <$> copyRef' st x + RecWeak x -> return $ return $ RecWeak x RecUnknown t x -> return $ return $ RecUnknown t x copyObject' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (c (Object' c')) @@ -164,6 +167,7 @@ data RecItem' c | RecDate ZonedTime | RecUUID UUID | RecRef (Ref' c) + | RecWeak RefDigest | RecUnknown ByteString ByteString deriving (Show) @@ -202,6 +206,7 @@ serializeRecItem name (RecBinary x) = [name, BC.pack ":b ", showHex x, BC.single serializeRecItem name (RecDate x) = [name, BC.pack ":d", BC.singleton ' ', BC.pack (formatTime defaultTimeLocale "%s %z" x), BC.singleton '\n'] serializeRecItem name (RecUUID x) = [name, BC.pack ":u", BC.singleton ' ', U.toASCIIBytes x, BC.singleton '\n'] serializeRecItem name (RecRef x) = [name, BC.pack ":r ", showRef x, BC.singleton '\n'] +serializeRecItem name (RecWeak x) = [name, BC.pack ":w ", showRefDigest x, BC.singleton '\n'] serializeRecItem name (RecUnknown t x) = [ name, BC.singleton ':', t, BC.singleton ' ', x, BC.singleton '\n' ] lazyLoadObject :: forall c. StorageCompleteness c => Ref' c -> LoadResult c (Object' c) @@ -268,6 +273,7 @@ unsafeDeserializeObject st bytes = "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content) "u" -> RecUUID <$> U.fromASCIIBytes content "r" -> RecRef . Ref st <$> readRefDigest content + "w" -> RecWeak <$> readRefDigest content _ -> Nothing return (name, val) @@ -518,6 +524,33 @@ storeZRef name x = StoreRecM $ do return $ if isZeroRef ref then [] else [(BC.pack name, RecRef ref)] +storeWeak :: Storable a => StorageCompleteness c => String -> a -> StoreRec c +storeWeak name x = StoreRecM $ do + s <- ask + tell $ (:[]) $ do + ref <- store s x + return [ ( BC.pack name, RecWeak $ refDigest ref ) ] + +storeMbWeak :: Storable a => StorageCompleteness c => String -> Maybe a -> StoreRec c +storeMbWeak name = maybe (return ()) (storeWeak name) + +storeRawWeak :: StorageCompleteness c => String -> RefDigest -> StoreRec c +storeRawWeak name dgst = StoreRecM $ do + tell $ (:[]) $ do + return [ ( BC.pack name, RecWeak dgst ) ] + +storeMbRawWeak :: StorageCompleteness c => String -> Maybe RefDigest -> StoreRec c +storeMbRawWeak name = maybe (return ()) (storeRawWeak name) + +storeZWeak :: (ZeroStorable a, StorageCompleteness c) => String -> a -> StoreRec c +storeZWeak name x = StoreRecM $ do + s <- ask + tell $ (:[]) $ do + ref <- store s x + return $ if isZeroRef ref then [] + else [ ( BC.pack name, RecWeak $ refDigest ref ) ] + + storeRecItems :: StorageCompleteness c => [ ( ByteString, RecItem ) ] -> StoreRec c storeRecItems items = StoreRecM $ do st <- ask @@ -654,8 +687,21 @@ loadZRef name = loadMbRef name >>= \case return $ fromZero st Just x -> return x +loadRawWeak :: String -> LoadRec RefDigest +loadRawWeak name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbRawWeak name + +loadMbRawWeak :: String -> LoadRec (Maybe RefDigest) +loadMbRawWeak name = listToMaybe <$> loadRawWeaks name + +loadRawWeaks :: String -> LoadRec [ RefDigest ] +loadRawWeaks name = mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecRef x ) | name' == bname = Just (refDigest x) + p ( name', RecWeak x ) | name' == bname = Just x + p _ = Nothing + -type Stored a = Stored' Complete a instance Storable a => Storable (Stored a) where store st = copyRef st . storedRef @@ -666,10 +712,10 @@ instance ZeroStorable a => ZeroStorable (Stored a) where fromZero st = Stored (zeroRef st) $ fromZero st fromStored :: Stored a -> a -fromStored (Stored _ x) = x +fromStored = storedObject' storedRef :: Stored a -> Ref -storedRef (Stored ref _) = ref +storedRef = storedRef' wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) wrappedStore st x = do ref <- liftIO $ store st x @@ -678,9 +724,8 @@ wrappedStore st x = do ref <- liftIO $ store st x wrappedLoad :: Storable a => Ref -> Stored a wrappedLoad ref = Stored ref (load ref) -copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => - Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a)) -copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref' +copyStored :: forall m a. MonadIO m => Storage -> Stored a -> m (Stored a) +copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (\r -> Stored r x) <$> copyRef' st ref' -- |Passed function needs to preserve the object representation to be safe unsafeMapStored :: (a -> b) -> Stored a -> Stored b diff --git a/src/Erebos/Pairing.hs b/src/Erebos/Pairing.hs index 703afcd..e3ebf2b 100644 --- a/src/Erebos/Pairing.hs +++ b/src/Erebos/Pairing.hs @@ -17,9 +17,10 @@ import Control.Monad.Reader import Crypto.Random import Data.Bits -import Data.ByteArray (Bytes, convert) -import qualified Data.ByteArray as BA -import qualified Data.ByteString.Char8 as BC +import Data.ByteArray qualified as BA +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Char8 qualified as BC import Data.Kind import Data.Maybe import Data.Typeable @@ -34,16 +35,16 @@ import Erebos.State import Erebos.Storable data PairingService a = PairingRequest (Stored (Signed IdentityData)) (Stored (Signed IdentityData)) RefDigest - | PairingResponse Bytes - | PairingRequestNonce Bytes + | PairingResponse ByteString + | PairingRequestNonce ByteString | PairingAccept a | PairingReject data PairingState a = NoPairing - | OurRequest UnifiedIdentity UnifiedIdentity Bytes + | OurRequest UnifiedIdentity UnifiedIdentity ByteString | OurRequestConfirm (Maybe (PairingVerifiedResult a)) | OurRequestReady - | PeerRequest UnifiedIdentity UnifiedIdentity Bytes RefDigest + | PeerRequest UnifiedIdentity UnifiedIdentity ByteString RefDigest | PeerRequestConfirm | PairingDone @@ -88,7 +89,7 @@ instance Storable a => Storable (PairingService a) where load' = do res <- loadRec $ do - (req :: Maybe Bytes) <- loadMbBinary "request" + (req :: Maybe ByteString) <- loadMbBinary "request" idReq <- loadMbRef "id-req" idRsp <- loadMbRef "id-rsp" rsp <- loadMbBinary "response" @@ -171,7 +172,7 @@ instance PairingResult a => Service (PairingService a) where x@(OurRequestReady, _) -> reject $ uncurry PairingUnexpectedMessage x (PeerRequest peer self nonce dgst, PairingRequestNonce pnonce) -> do - if dgst == nonceDigest peer self pnonce BA.empty + if dgst == nonceDigest peer self pnonce BS.empty then do hook <- asks $ pairingHookRequestNonce . svcAttributes hook $ confirmationNumber $ nonceDigest peer self pnonce nonce svcSet PeerRequestConfirm @@ -188,12 +189,12 @@ reject reason = do replyPacket PairingReject -nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> Bytes -> Bytes -> RefDigest +nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> ByteString -> ByteString -> RefDigest nonceDigest idReq idRsp nonceReq nonceRsp = hashToRefDigest $ serializeObject $ Rec [ (BC.pack "id-req", RecRef $ storedRef $ idData idReq) , (BC.pack "id-rsp", RecRef $ storedRef $ idData idRsp) - , (BC.pack "nonce-req", RecBinary $ convert nonceReq) - , (BC.pack "nonce-rsp", RecBinary $ convert nonceRsp) + , (BC.pack "nonce-req", RecBinary nonceReq) + , (BC.pack "nonce-rsp", RecBinary nonceRsp) ] confirmationNumber :: RefDigest -> String @@ -212,7 +213,7 @@ pairingRequest _ peer = do PeerIdentityFull pid -> return pid _ -> throwOtherError "incomplete peer identity" sendToPeerWith @(PairingService a) peer $ \case - NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BA.empty), OurRequest self pid nonce) + NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BS.empty), OurRequest self pid nonce) _ -> throwOtherError "already in progress" pairingAccept :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m () diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index e95e700..4499ef9 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -29,14 +29,14 @@ import Control.Monad.Writer import Data.Kind import Data.Typeable -import Data.UUID (UUID) -import qualified Data.UUID as U import Erebos.Identity import {-# SOURCE #-} Erebos.Network +import Erebos.Network.Protocol import Erebos.State import Erebos.Storable import Erebos.Storage.Head +import Erebos.UUID qualified as U class ( Typeable s, Storable s, @@ -72,6 +72,9 @@ class ( serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s] serviceStorageWatchers _ = [] + serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO () + serviceStopServer _ _ _ _ = return () + data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s) @@ -101,11 +104,10 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) -data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) - +data SomeStorageWatcher s + = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT ErebosError IO ()) -newtype ServiceID = ServiceID UUID - deriving (Eq, Ord, Show, StorableUUID) mkServiceID :: String -> ServiceID mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString @@ -116,10 +118,13 @@ data ServiceInput s = ServiceInput , svcPeerIdentity :: UnifiedIdentity , svcServer :: Server , svcPrintOp :: String -> IO () + , svcNewStreams :: [ RawStreamReader ] } -data ServiceReply s = ServiceReply (Either s (Stored s)) Bool - | ServiceFinally (IO ()) +data ServiceReply s + = ServiceReply (Either s (Stored s)) Bool + | ServiceOpenStream (RawStreamWriter -> IO ()) + | ServiceFinally (IO ()) data ServiceHandlerState s = ServiceHandlerState { svcValue :: ServiceState s diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs new file mode 100644 index 0000000..67df4d7 --- /dev/null +++ b/src/Erebos/Service/Stream.hs @@ -0,0 +1,74 @@ +module Erebos.Service.Stream ( + StreamPacket(..), + StreamReader, getStreamReaderNumber, + StreamWriter, getStreamWriterNumber, + openStream, receivedStreams, + readStreamPacket, writeStreamPacket, + writeStream, + closeStream, +) where + +import Control.Concurrent.MVar +import Control.Monad.Reader +import Control.Monad.Writer + +import Data.ByteString (ByteString) +import Data.Word + +import Erebos.Flow +import Erebos.Network +import Erebos.Network.Protocol +import Erebos.Service + + +data StreamReader = StreamReader RawStreamReader + +getStreamReaderNumber :: StreamReader -> IO Int +getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream + +data StreamWriter = StreamWriter (MVar StreamWriterData) + +data StreamWriterData = StreamWriterData + { swdStream :: RawStreamWriter + , swdSequence :: Maybe Word64 + } + +getStreamWriterNumber :: StreamWriter -> IO Int +getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream + + +openStream :: Service s => ServiceHandler s StreamWriter +openStream = do + mvar <- liftIO newEmptyMVar + tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ] + return $ StreamWriter mvar + +receivedStreams :: Service s => ServiceHandler s [ StreamReader ] +receivedStreams = do + map StreamReader <$> asks svcNewStreams + +readStreamPacket :: StreamReader -> IO StreamPacket +readStreamPacket (StreamReader stream) = do + readFlowIO (rsrFlow stream) + +writeStreamPacket :: StreamWriter -> StreamPacket -> IO () +writeStreamPacket (StreamWriter mvar) packet = do + withMVar mvar $ \swd -> do + writeFlowIO (rswFlow $ swdStream swd) packet + +writeStream :: StreamWriter -> ByteString -> IO () +writeStream (StreamWriter mvar) bytes = do + modifyMVar_ mvar $ \swd -> do + case swdSequence swd of + Just seqNum -> do + writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes + return swd { swdSequence = Just (seqNum + 1) } + Nothing -> do + fail "writeStream: stream closed" + +closeStream :: StreamWriter -> IO () +closeStream (StreamWriter mvar) = do + withMVar mvar $ \swd -> do + case swdSequence swd of + Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum + Nothing -> fail "closeStream: stream already closed" diff --git a/src/Erebos/State.hs b/src/Erebos/State.hs index a2ecb9e..7cd82de 100644 --- a/src/Erebos/State.hs +++ b/src/Erebos/State.hs @@ -7,6 +7,7 @@ module Erebos.State ( MonadHead(..), updateLocalHead_, + updateLocalState, updateLocalState_, updateSharedState, updateSharedState_, lookupSharedValue, makeSharedStateUpdate, @@ -22,8 +23,6 @@ import Control.Monad.Reader import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.Typeable -import Data.UUID (UUID) -import Data.UUID qualified as U import Erebos.Identity import Erebos.Object @@ -31,9 +30,12 @@ import Erebos.PubKey import Erebos.Storable import Erebos.Storage.Head import Erebos.Storage.Merge +import Erebos.UUID (UUID) +import Erebos.UUID qualified as U data LocalState = LocalState - { lsIdentity :: Stored (Signed ExtendedIdentityData) + { lsPrev :: Maybe RefDigest + , lsIdentity :: Stored (Signed ExtendedIdentityData) , lsShared :: [Stored SharedState] , lsOther :: [ ( ByteString, RecItem ) ] } @@ -55,14 +57,16 @@ class Mergeable a => SharedType a where instance Storable LocalState where store' LocalState {..} = storeRec $ do + mapM_ (storeRawWeak "PREV") lsPrev storeRef "id" lsIdentity mapM_ (storeRef "shared") lsShared storeRecItems lsOther load' = loadRec $ do + lsPrev <- loadMbRawWeak "PREV" lsIdentity <- loadRef "id" lsShared <- loadRefs "shared" - lsOther <- filter ((`notElem` [ BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems + lsOther <- filter ((`notElem` [ BC.pack "PREV", BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems return LocalState {..} instance HeadType LocalState where @@ -106,6 +110,17 @@ headLocalIdentity :: Head LocalState -> UnifiedIdentity headLocalIdentity = localIdentity . headObject +updateLocalState :: forall m b. MonadHead LocalState m => (Stored LocalState -> m ( Stored LocalState, b )) -> m b +updateLocalState f = updateLocalHead $ \ls -> do + ( ls', x ) <- f ls + (, x) <$> if ls' == ls + then return ls' + else mstore (fromStored ls') { lsPrev = Just $ refDigest (storedRef ls) } + +updateLocalState_ :: forall m. MonadHead LocalState m => (Stored LocalState -> m (Stored LocalState)) -> m () +updateLocalState_ f = updateLocalState (fmap (,()) . f) + + updateSharedState_ :: forall a m. (SharedType a, MonadHead LocalState m) => (a -> m a) -> Stored LocalState -> m (Stored LocalState) updateSharedState_ f = fmap fst <$> updateSharedState (fmap (,()) . f) @@ -135,7 +150,7 @@ makeSharedStateUpdate st val prev = liftIO $ wrappedStore st SharedState mergeSharedIdentity :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => m UnifiedIdentity -mergeSharedIdentity = updateLocalHead $ updateSharedState $ \case +mergeSharedIdentity = updateLocalState $ updateSharedState $ \case Just cidentity -> do identity <- mergeIdentity cidentity return (Just $ toComposedIdentity identity, identity) diff --git a/src/Erebos/Storable.hs b/src/Erebos/Storable.hs index b0795f9..caaf525 100644 --- a/src/Erebos/Storable.hs +++ b/src/Erebos/Storable.hs @@ -14,9 +14,9 @@ module Erebos.Storable ( Store, StoreRec, storeBlob, storeRec, storeZero, - storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, - storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, - storeZRef, + storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, storeWeak, storeRawWeak, + storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, storeMbWeak, storeMbRawWeak, + storeZRef, storeZWeak, storeRecItems, Load, LoadRec, @@ -24,9 +24,9 @@ module Erebos.Storable ( loadRecCurrentRef, loadRecItems, loadBlob, loadRec, loadZero, - loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, - loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, - loadTexts, loadBinaries, loadRefs, loadRawRefs, + loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, loadRawWeak, + loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, loadMbRawWeak, + loadTexts, loadBinaries, loadRefs, loadRawRefs, loadRawWeaks, loadZRef, Stored, diff --git a/src/Erebos/Storage/Disk.hs b/src/Erebos/Storage/Disk.hs index 370c584..8e35940 100644 --- a/src/Erebos/Storage/Disk.hs +++ b/src/Erebos/Storage/Disk.hs @@ -18,7 +18,6 @@ import Data.ByteString.Lazy.Char8 qualified as BLC import Data.Function import Data.List import Data.Maybe -import Data.UUID qualified as U import System.Directory import System.FSNotify @@ -31,6 +30,7 @@ import Erebos.Storage.Backend import Erebos.Storage.Head import Erebos.Storage.Internal import Erebos.Storage.Platform +import Erebos.UUID qualified as U data DiskStorage = StorageDir diff --git a/src/Erebos/Storage/Head.hs b/src/Erebos/Storage/Head.hs index 8f8e009..285902d 100644 --- a/src/Erebos/Storage/Head.hs +++ b/src/Erebos/Storage/Head.hs @@ -28,13 +28,12 @@ import Control.Monad.Reader import Data.Bifunctor import Data.Typeable -import Data.UUID qualified as U -import Data.UUID.V4 qualified as U import Erebos.Object import Erebos.Storable import Erebos.Storage.Backend import Erebos.Storage.Internal +import Erebos.UUID qualified as U -- | Represents loaded Erebos storage head, along with the object it pointed to @@ -114,7 +113,7 @@ loadHeadRaw st@Storage {..} tid hid = do -- | Reload the given head from storage, returning `Head' with updated object, -- or `Nothing' if there is no longer head with the particular ID in storage. reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a)) -reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid +reloadHead (Head hid val) = loadHead (storedStorage val) hid -- | Store a new `Head' of type 'a' in the storage. storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) @@ -233,8 +232,8 @@ watchHeadWith -> (Head a -> b) -- ^ Selector function -> (b -> IO ()) -- ^ Callback -> IO WatchedHead -- ^ Watched head handle -watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do - watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb +watchHeadWith (Head hid val) sel cb = do + watchHeadRaw (storedStorage val) (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb -- | Watch the given head using raw IDs and a selector from `Ref'. watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs index 6df1410..db211bb 100644 --- a/src/Erebos/Storage/Internal.hs +++ b/src/Erebos/Storage/Internal.hs @@ -1,32 +1,55 @@ -module Erebos.Storage.Internal where +module Erebos.Storage.Internal ( + Storage'(..), Storage, PartialStorage, + Ref'(..), Ref, PartialRef, + RefDigest(..), + WatchID, startWatchID, nextWatchID, + WatchList(..), WatchListItem(..), watchListAdd, watchListDel, + + refStorage, + refDigest, refDigestFromByteString, + showRef, showRefDigest, showRefDigestParts, + readRefDigest, + hashToRefDigest, + + StorageCompleteness(..), + StorageBackend(..), + Complete, Partial, + + unsafeStoreRawBytes, + ioLoadBytesFromStorage, + + Generation(..), + HeadID(..), HeadTypeID(..), + Stored(..), storedStorage, +) where import Control.Arrow import Control.Concurrent import Control.DeepSeq import Control.Exception -import Control.Monad import Control.Monad.Identity import Crypto.Hash import Data.Bits -import Data.ByteArray (ByteArray, ByteArrayAccess, ScrubbedBytes) +import Data.ByteArray (ByteArrayAccess, ScrubbedBytes) import Data.ByteArray qualified as BA import Data.ByteString (ByteString) -import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL -import Data.Char +import Data.Function import Data.HashTable.IO qualified as HT import Data.Hashable import Data.Kind import Data.Typeable -import Data.UUID (UUID) import Foreign.Storable (peek) import System.IO.Unsafe (unsafePerformIO) +import Erebos.UUID (UUID) +import Erebos.Util + data Storage' c = forall bck. (StorageBackend bck, BackendCompleteness bck ~ c) => Storage { stBackend :: bck @@ -196,35 +219,15 @@ showRefDigest = showRefDigestParts >>> \(alg, hex) -> alg <> BC.pack "#" <> hex readRefDigest :: ByteString -> Maybe RefDigest readRefDigest x = case BC.split '#' x of [alg, dgst] | BA.convert alg == BC.pack "blake2" -> - refDigestFromByteString =<< readHex @ByteString dgst + refDigestFromByteString =<< readHex dgst _ -> Nothing -refDigestFromByteString :: ByteArrayAccess ba => ba -> Maybe RefDigest +refDigestFromByteString :: ByteString -> Maybe RefDigest refDigestFromByteString = fmap RefDigest . digestFromByteString hashToRefDigest :: BL.ByteString -> RefDigest hashToRefDigest = RefDigest . hashFinalize . hashUpdates hashInit . BL.toChunks -showHex :: ByteArrayAccess ba => ba -> ByteString -showHex = B.concat . map showHexByte . BA.unpack - where showHexChar x | x < 10 = x + o '0' - | otherwise = x + o 'a' - 10 - showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ] - o = fromIntegral . ord - -readHex :: ByteArray ba => ByteString -> Maybe ba -readHex = return . BA.concat <=< readHex' - where readHex' bs | B.null bs = Just [] - readHex' bs = do (bx, bs') <- B.uncons bs - (by, bs'') <- B.uncons bs' - x <- hexDigit bx - y <- hexDigit by - (B.singleton (x * 16 + y) :) <$> readHex' bs'' - hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0' - | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10 - | otherwise = Nothing - o = fromIntegral . ord - newtype Generation = Generation Int deriving (Eq, Show) @@ -237,17 +240,20 @@ newtype HeadID = HeadID UUID newtype HeadTypeID = HeadTypeID UUID deriving (Eq, Ord) -data Stored' c a = Stored (Ref' c) a +data Stored a = Stored + { storedRef' :: Ref + , storedObject' :: a + } deriving (Show) -instance Eq (Stored' c a) where - Stored r1 _ == Stored r2 _ = refDigest r1 == refDigest r2 +instance Eq (Stored a) where + (==) = (==) `on` (refDigest . storedRef') -instance Ord (Stored' c a) where - compare (Stored r1 _) (Stored r2 _) = compare (refDigest r1) (refDigest r2) +instance Ord (Stored a) where + compare = compare `on` (refDigest . storedRef') -storedStorage :: Stored' c a -> Storage' c -storedStorage (Stored (Ref st _) _) = st +storedStorage :: Stored a -> Storage +storedStorage = refStorage . storedRef' type Complete = Identity diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs index 41725af..a41a65f 100644 --- a/src/Erebos/Storage/Merge.hs +++ b/src/Erebos/Storage/Merge.hs @@ -52,7 +52,7 @@ merge xs = mergeSorted $ filterAncestors xs storeMerge :: (Mergeable a, Storable a) => [Stored (Component a)] -> IO (Stored a) storeMerge [] = error "merge: empty list" -storeMerge xs@(Stored ref _ : _) = wrappedStore (refStorage ref) $ mergeSorted $ filterAncestors xs +storeMerge xs@(x : _) = wrappedStore (storedStorage x) $ mergeSorted $ filterAncestors xs previous :: Storable a => Stored a -> [Stored a] previous (Stored ref _) = case load ref of diff --git a/src/Erebos/Sync.hs b/src/Erebos/Sync.hs index 32e2e22..d837a14 100644 --- a/src/Erebos/Sync.hs +++ b/src/Erebos/Sync.hs @@ -23,7 +23,7 @@ instance Service SyncService where pid <- asks svcPeerIdentity self <- svcSelf when (finalOwner pid `sameIdentity` finalOwner self) $ do - updateLocalHead_ $ \ls -> do + updateLocalState_ $ \ls -> do let current = sort $ lsShared $ fromStored ls updated = filterAncestors (added : current) if current /= updated diff --git a/src/Erebos/UUID.hs b/src/Erebos/UUID.hs new file mode 100644 index 0000000..128d450 --- /dev/null +++ b/src/Erebos/UUID.hs @@ -0,0 +1,24 @@ +module Erebos.UUID ( + UUID, + toString, fromString, + toText, fromText, + toASCIIBytes, fromASCIIBytes, + nextRandom, +) where + +import Crypto.Random.Entropy + +import Data.Bits +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as BSL +import Data.Maybe +import Data.UUID.Types + +nextRandom :: IO UUID +nextRandom = do + [ b0, b1, b2, b3, b4, b5, b6, b7, b8, b9, ba, bb, bc, bd, be, bf ] + <- BS.unpack <$> getEntropy 16 + let version = 4 + b6' = b6 .&. 0x0f .|. (version `shiftL` 4) + b8' = b8 .&. 0x3f .|. 0x80 + return $ fromJust $ fromByteString $ BSL.pack [ b0, b1, b2, b3, b4, b5, b6', b7, b8', b9, ba, bb, bc, bd, be, bf ] diff --git a/src/Erebos/Util.hs b/src/Erebos/Util.hs index ffca9c7..0381c3e 100644 --- a/src/Erebos/Util.hs +++ b/src/Erebos/Util.hs @@ -1,5 +1,14 @@ module Erebos.Util where +import Control.Monad + +import Data.ByteArray (ByteArray, ByteArrayAccess) +import Data.ByteArray qualified as BA +import Data.ByteString (ByteString) +import Data.ByteString qualified as B +import Data.Char + + uniq :: Eq a => [a] -> [a] uniq (x:y:xs) | x == y = uniq (x:xs) | otherwise = x : uniq (y:xs) @@ -35,3 +44,24 @@ intersectsSorted (x:xs) (y:ys) | x < y = intersectsSorted xs (y:ys) | x > y = intersectsSorted (x:xs) ys | otherwise = True intersectsSorted _ _ = False + + +showHex :: ByteArrayAccess ba => ba -> ByteString +showHex = B.concat . map showHexByte . BA.unpack + where showHexChar x | x < 10 = x + o '0' + | otherwise = x + o 'a' - 10 + showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ] + o = fromIntegral . ord + +readHex :: ByteArray ba => ByteString -> Maybe ba +readHex = return . BA.concat <=< readHex' + where readHex' bs | B.null bs = Just [] + readHex' bs = do (bx, bs') <- B.uncons bs + (by, bs'') <- B.uncons bs' + x <- hexDigit bx + y <- hexDigit by + (B.singleton (x * 16 + y) :) <$> readHex' bs'' + hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0' + | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10 + | otherwise = Nothing + o = fromIntegral . ord diff --git a/test/common.test b/test/common.test new file mode 100644 index 0000000..89941f0 --- /dev/null +++ b/test/common.test @@ -0,0 +1,3 @@ +module common + +export def refpat = /blake2#[0-9a-f]*/ diff --git a/test/discovery.test b/test/discovery.test index f2dddb7..9453e65 100644 --- a/test/discovery.test +++ b/test/discovery.test @@ -1,8 +1,9 @@ module discovery +def refpat = /blake2#[0-9a-f]*/ + test ManualDiscovery: - let services = "discovery,test" - let refpat = /blake2#[0-9a-f]*/ + let services = "discovery" subnet sd subnet s1 @@ -24,27 +25,6 @@ test ManualDiscovery: expect /create-identity-done ref $refpat.*/ from p2 expect /create-identity-done ref $refpat.*/ from pd - # TODO: avoid the need to send identity objects with weak refs - for p in [ p1, p2 ]: - with p: - send "start-server services $services" - send "peer-add ${p2.node.ip}" to p1 - expect from p1: - /peer 1 addr ${p2.node.ip} 29665/ - /peer 1 id Device2 Owner2/ - expect from p2: - /peer 1 addr ${p1.node.ip} 29665/ - /peer 1 id Device1 Owner1/ - for r in [ p1base, p1obase ]: - with p1: - send "test-message-send 1 $r" - expect /test-message-send done/ - with p2: - expect /test-message-received rec [0-9]+ $r/ - for p in [ p1, p2 ]: - send "stop-server" to p - expect /stop-server-done/ from p - # Test discovery using owner and device identities: for id in [ p1obase, p1base ]: for p in [ pd, p1, p2 ]: @@ -73,3 +53,99 @@ test ManualDiscovery: send "stop-server" to p for p in [ pd, p1, p2 ]: expect /stop-server-done/ from p + + # Test delayed discovery with new peer + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + with p1: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p1.node.ip} 29665/ + /peer [12] id Device1 Owner1/ + + send "discovery-connect $id" to p2 + + with p2: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p2.node.ip} 29665/ + /peer [12] id Device2 Owner2/ + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p + + +test DiscoveryTunnel: + let services = "discovery" + + subnet sd + subnet s1 + subnet s2 + + spawn as pd on sd + spawn as p1 on s1 + spawn as p2 on s2 + + for n in [ p1.node, p2.node ]: + shell on n: + nft add table inet filter + nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }' + nft add rule inet filter input 'ct state { established, related } accept' + + send "create-identity Discovery" to pd + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + expect /create-identity-done ref ($refpat).*/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + send "identity-info $p1owner" to p1 + expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase + + expect /create-identity-done ref $refpat.*/ from p2 + expect /create-identity-done ref $refpat.*/ from pd + + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + for p in [ p1, p2 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p.node.ip} 29665/ + /peer [12] id .*/ + + send "discovery-tunnel 1 $id" to p2 + + expect from p1: + /peer [0-9]+ addr tunnel@.*/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr tunnel@.*/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p diff --git a/test/message.test b/test/message.test index c0e251b..2990d0f 100644 --- a/test/message.test +++ b/test/message.test @@ -1,3 +1,7 @@ +module message + +import common + test DirectMessage: let services = "contact,dm" @@ -149,3 +153,113 @@ test DirectMessage: send "start-server services $services" to p2 expect /dm-received from Owner1 text while_peer_offline/ from p2 + + +test DirectMessageDiscovery: + let services = "dm,discovery" + + subnet sd + subnet s1 + subnet s2 + subnet s3 + subnet s4 + + spawn on sd as pd + spawn on s1 as p1 + spawn on s2 as p2 + spawn on s3 as p3 + spawn on s4 as p4 + + send "create-identity Discovery" to pd + + send "create-identity Device1 Owner1" to p1 + expect /create-identity-done ref ($refpat)/ from p1 capture p1_id + send "identity-info $p1_id" to p1 + expect /identity-info ref $p1_id base ($refpat) owner ($refpat).*/ from p1 capture p1_base, p1_owner + + send "create-identity Device2 Owner2" to p2 + expect /create-identity-done ref ($refpat)/ from p2 capture p2_id + send "identity-info $p2_id" to p2 + expect /identity-info ref $p2_id base ($refpat) owner ($refpat).*/ from p2 capture p2_base, p2_owner + send "identity-info $p2_owner" to p2 + expect /identity-info ref $p2_owner base ($refpat).*/ from p2 capture p2_obase + + send "create-identity Device3 Owner3" to p3 + expect /create-identity-done ref ($refpat)/ from p3 capture p3_id + send "identity-info $p3_id" to p3 + expect /identity-info ref $p3_id base ($refpat) owner ($refpat).*/ from p3 capture p3_base, p3_owner + + send "create-identity Device4 Owner4" to p4 + expect /create-identity-done ref ($refpat)/ from p4 capture p4_id + send "identity-info $p4_id" to p4 + expect /identity-info ref $p4_id base ($refpat) owner ($refpat).*/ from p4 capture p4_base, p4_owner + + + for p in [ p1, p2, p3, p4 ]: + with p: + send "start-server services $services" + + for p in [ p2, p3, p4 ]: + with p1: + send "peer-add ${p.node.ip}" + expect: + /peer [0-9]+ addr ${p.node.ip} 29665/ + /peer [0-9]+ id Device. Owner./ + expect from p: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1 Owner1/ + + # Make sure p1 has other identities in storage: + for i in [ 1 .. 3 ]: + send "dm-send-peer $i init1" to p1 + for p in [ p2, p3, p4 ]: + expect /dm-received from Owner1 text init1/ from p + send "dm-send-identity $p1_owner init2" to p + expect /dm-received from Owner. text init2/ from p1 + + # Restart servers to remove peers: + for p in [ p1, p2, p3, p4 ]: + with p: + send "stop-server" + for p in [ p1, p2, p3, p4 ]: + with p: + expect /stop-server-done/ + + # Prepare message before peers connect to discovery + send "dm-send-identity $p4_owner hello_to_p4" to p1 + + for p in [ p1, p2, p3, p4, pd ]: + with p: + send "start-server services $services" + + for p in [ p2, p3, p4, p1 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [0-9]+ addr ${p.node.ip} 29665/ + /peer [0-9]+ id Device. Owner./ + + multiply_timeout by 2.0 + + # Connect via discovery manually, then send message + send "discovery-connect $p2_obase" to p1 + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + send "dm-send-identity $p2_owner hello_to_p2" to p1 + expect /dm-received from Owner1 text hello_to_p2/ from p2 + + # Send message, expect automatic discovery + send "dm-send-identity $p3_owner hello_to_p3" to p1 + expect /dm-received from Owner1 text hello_to_p3/ from p3 + + # Verify the first message + expect /dm-received from Owner1 text hello_to_p4/ from p4 + + for p in [ p1, p2, p3, p4, pd ]: + send "stop-server" to p + for p in [ p1, p2, p3, p4, pd ]: + expect /stop-server-done/ from p diff --git a/test/network.test b/test/network.test index 52fcbee..0f49a1e 100644 --- a/test/network.test +++ b/test/network.test @@ -182,6 +182,58 @@ test ManyStreams: expect /test-message-received blob 100[2-4] $ref/ from p2 +test ServiceStreams: + let services = "test" + + spawn as p1 + spawn as p2 + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server services $services" to p1 + send "start-server services $services" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + send "test-stream-open 1" to p1 + expect /test-stream-open-done 1 ([0-9]+)/ from p1 capture stream1 + expect /test-stream-open-from 1 $stream1/ from p2 + + send "test-stream-send 1 $stream1 hello" to p1 + expect /test-stream-send-done 1 $stream1/ from p1 + expect /test-stream-received 1 $stream1 0 hello/ from p2 + + send "test-stream-close 1 $stream1" to p1 + expect /test-stream-close-done 1 $stream1/ from p1 + expect /test-stream-closed-from 1 $stream1 1/ from p2 + + send "test-stream-open 1 8" to p2 + expect /test-stream-open-done 1 ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)/ from p2 capture stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 + expect /test-stream-open-from 1 $stream2_1 $stream2_2 $stream2_3 $stream2_4 $stream2_5 $stream2_6 $stream2_7 $stream2_8/ from p1 + + let streams2 = [ stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 ] + with p2: + for i in [ 1..20 ]: + for s in streams2: + send "test-stream-send 1 $s hello$i" + for i in [ 1..20 ]: + for s in streams2: + expect /test-stream-send-done 1 $s/ + for s in streams2: + send "test-stream-close 1 $s" + for s in streams2: + expect /test-stream-close-done 1 $s/ + with p1: + for i in [ 1..20 ]: + for s in streams2: + expect /test-stream-received 1 $s ${i-1} hello$i/ + for s in streams2: + expect /test-stream-closed-from 1 $s 20/ + + test MultipleServiceRefs: let services = "test" diff --git a/test/storage.test b/test/storage.test index a5cca7f..2230eac 100644 --- a/test/storage.test +++ b/test/storage.test @@ -470,6 +470,7 @@ test LocalStateKeepUnknown: send "load $s3" expect /load-type rec [0-9]*/ + expect /load-line PREV:w $s2/ expect /load-line id:r ($refpat)/ capture id2 guard (id1 /= id2) expect /load-line TEST:i 123/ |