diff options
-rw-r--r-- | CHANGELOG.md | 14 | ||||
-rw-r--r-- | README.md | 109 | ||||
-rw-r--r-- | erebos.cabal | 17 | ||||
-rw-r--r-- | main/Main.hs | 277 | ||||
-rw-r--r-- | main/Test.hs | 41 | ||||
-rw-r--r-- | src/Erebos/Chatroom.hs | 216 | ||||
-rw-r--r-- | src/Erebos/Conversation.hs | 36 | ||||
-rw-r--r-- | src/Erebos/Identity.hs | 33 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 120 | ||||
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 150 | ||||
-rw-r--r-- | src/Erebos/Network/ifaddrs.c | 132 | ||||
-rw-r--r-- | src/Erebos/Network/ifaddrs.h | 2 | ||||
-rw-r--r-- | src/Erebos/Storage/Internal.hs | 6 | ||||
-rw-r--r-- | src/Erebos/Storage/Key.hs | 7 | ||||
-rw-r--r-- | src/Erebos/Storage/Merge.hs | 7 | ||||
-rw-r--r-- | src/windows/Erebos/Storage/Platform.hs | 13 | ||||
-rw-r--r-- | test/chatroom.test | 326 | ||||
-rw-r--r-- | test/network.test | 56 |
18 files changed, 1344 insertions, 218 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index bc1ea1d..de69a6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Revision history for erebos +## 0.1.6 -- 2024-08-12 + +* Chatroom members list and join/leave commands +* Fix sending multiple data responses in a stream +* Added `--storage`/`--memory-storage` command-line options +* Compatibility with GHC up to 9.10 +* Local discovery with IPv6 + +## 0.1.5 -- 2024-07-16 + +* Public chatrooms for multiple participants +* Send keep-alive packets on idle connection +* Windows support + ## 0.1.4 -- 2024-06-11 * Added `/conversations` command to list and select conversations @@ -60,7 +60,8 @@ Add public peer: [1] PEER UPD discovery1.erebosprotocol.net [37.221.243.57 29665] ``` -Select the peer and send it a message, the public server just responds with automatic echo message: +Select the peer and send it a message, the public server just responds with +automatic echo message: ``` > /1 discovery1.erebosprotocol.net> hello @@ -68,30 +69,68 @@ discovery1.erebosprotocol.net> hello [18:55] discovery1.erebosprotocol.net: Echo: hello ``` +List chatrooms known to the peers: +``` +> /chatrooms +[1] Test chatroom +[2] Second test chatroom +``` + +Enter a chatroom and send a message there: +``` +> /1 +Test chatroom> Hi +Test chatroom [19:03] Some Name: Hi +``` + ### Messaging `/peers` -List peers with direct network connection. Peers are discovered automatically -on local network or can be manually added. +: List peers with direct network connection. Peers are discovered automatically + on local network or can be manually added. `/contacts` -List known contacts (see below). +: List known contacts (see below). `/conversations` -List started conversations with contacts or other peers. +: List started conversations with contacts or other peers. `/<number>` -Select conversation, contact or peer `<number>` based on the last -`/conversations`, `/contacts` or `/peers` output list. +: Select conversation, contact or peer `<number>` based on the last + `/conversations`, `/contacts` or `/peers` output list. `<message>` -Send `<message>` to selected conversation. +: Send `<message>` to selected conversation. `/history` -Show message history of the selected conversation. +: Show message history of the selected conversation. `/details` -Show information about the selected conversations, contact or peer. +: Show information about the selected conversations, contact or peer. + +### Chatrooms + +Currently only public unmoderated chatrooms are supported, which means that any +network peer is allowed to read and post to the chatroom. Individual messages +are signed, so message author can not be forged. + +`/chatrooms` +: List known chatrooms. + +`/chatroom-create-public [<name>]` +: Create public unmoderated chatroom. Room name can be passed as command + argument or entered interactively. + +`/members` +: List members of the chatroom – usesers who sent any message or joined via the +`join` command. + +`/join` +: Join chatroom without sending text message. + +`/leave` +: Leave the chatroom. User will no longer be listed as a member and erebos tool + will no longer collect message of this chatroom. ### Add contacts @@ -101,21 +140,22 @@ contacts to contact list (similar to bluetooth device pairing). Before adding new contact, list peers using `/peers` command and select one with `/<number>`. `/contacts` -List already added contacts. +: List already added contacts. `/contact-add` -Add selected peer as contact. Six-digit verification code will be computed -based on peer keys, which will be displayed on both devices and needs to be -checked that both numbers are same. After that it needs to be confirmed using -`/contact-accept` to finish the process. +: Add selected peer as contact. Six-digit verification code will be computed + based on peer keys, which will be displayed on both devices and needs to be + checked that both numbers are same. After that it needs to be confirmed using + `/contact-accept` to finish the process. `/contact-accept` -Confirm that displayed verification codes are same on both devices and add the -selected peer as contact. The side, which did not initiate the contact adding -process, needs to select the corresponding peer with `/<number>` command first. +: Confirm that displayed verification codes are same on both devices and add + the selected peer as contact. The side, which did not initiate the contact + adding process, needs to select the corresponding peer with `/<number>` + command first. `/contact-reject` -Reject contact request or verification code of selected peer. +: Reject contact request or verification code of selected peer. ### Attach other devices @@ -134,37 +174,38 @@ Before attaching device, list peers using `/peers` command and select the target device with `/<number>`. `/attach` -Attach current device to the selected peer. After the process completes the -owner of the selected peer will become owner of this device as well. Six-digit -verification code will be displayed on both devices and the user needs to check -that both are the same before confirmation using the `/attach-accept` command. +: Attach current device to the selected peer. After the process completes the + owner of the selected peer will become owner of this device as well. + Six-digit verification code will be displayed on both devices and the user + needs to check that both are the same before confirmation using the + `/attach-accept` command. `/attach-accept` -Confirm that displayed verification codes are same on both devices and complete -the attachment process (or wait for the confirmation on the peer device). The -side, which did not initiate the attachment process, needs to select the -corresponding peer with `/<number>` command first. +: Confirm that displayed verification codes are same on both devices and + complete the attachment process (or wait for the confirmation on the peer + device). The side, which did not initiate the attachment process, needs to + select the corresponding peer with `/<number>` command first. `/attach-reject` -Reject device attachment request or verification code of selected peer. +: Reject device attachment request or verification code of selected peer. ### Other `/peer-add <host> [<port>]` -Manually add network peer with given hostname or IP address. +: Manually add network peer with given hostname or IP address. `/peer-add-public` -Add known public network peer(s). +: Add known public network peer(s). `/peer-drop` -Drop the currently selected peer. Afterwards, the connection can be -re-established by either side. +: Drop the currently selected peer. Afterwards, the connection can be + re-established by either side. `/update-identity` -Interactively update current identity information +: Interactively update current identity information `/quit` -Quit the erebos tool. +: Quit the erebos tool. Storage diff --git a/erebos.cabal b/erebos.cabal index 472811f..2629048 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -1,7 +1,7 @@ Cabal-Version: 3.0 Name: erebos -Version: 0.1.4 +Version: 0.1.6 Synopsis: Decentralized messaging and synchronization Description: Library and simple CLI interface implementing the Erebos identity @@ -54,7 +54,7 @@ common common -Wno-error=unused-imports build-depends: - base >=4.13 && <4.20, + base ^>= { 4.15, 4.16, 4.17, 4.18, 4.19, 4.20 }, default-extensions: DefaultSignatures @@ -116,7 +116,6 @@ library Erebos.Storage.Internal other-modules: Erebos.Flow - Erebos.Storage.List Erebos.Storage.Platform Erebos.Util @@ -144,7 +143,7 @@ library bytestring >=0.10 && <0.13, clock >=0.8 && < 0.9, containers >= 0.6 && <0.8, - cryptonite >=0.25 && <0.31, + crypton ^>= { 1.0 }, deepseq >= 1.4 && <1.6, directory >= 1.3 && <1.4, filepath >=1.4 && <1.6, @@ -161,7 +160,11 @@ library uuid >=1.3 && <1.4, zlib >=0.6 && <0.8 - if true + if os(windows) + hs-source-dirs: src/windows + build-depends: + Win32 ^>= { 2.14 }, + else hs-source-dirs: src/unix build-depends: unix ^>= { 2.7, 2.8 }, @@ -184,14 +187,14 @@ executable erebos build-depends: bytestring, - cryptonite, + crypton, directory, erebos, haskeline >=0.7 && <0.9, mtl, network, process >=1.6 && <1.7, - template-haskell >=2.17 && <2.22, + template-haskell ^>= { 2.17, 2.18, 2.19, 2.20, 2.21, 2.22 }, text, time, transformers >= 0.5 && <0.7, diff --git a/main/Main.hs b/main/Main.hs index 0eb414c..94c0418 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -24,6 +24,7 @@ import Data.Text (Text) import Data.Text qualified as T import Data.Text.Encoding qualified as T import Data.Text.IO qualified as T +import Data.Time.Format import Data.Time.LocalTime import Data.Typeable @@ -37,6 +38,7 @@ import System.IO import Erebos.Attach import Erebos.Contact +import Erebos.Chatroom import Erebos.Conversation #ifdef ENABLE_ICE_SUPPORT import Erebos.Discovery @@ -59,11 +61,17 @@ import Version data Options = Options { optServer :: ServerOptions , optServices :: [ServiceOption] + , optStorage :: StorageOption + , optChatroomAutoSubscribe :: Maybe Int , optDmBotEcho :: Maybe Text , optShowHelp :: Bool , optShowVersion :: Bool } +data StorageOption = DefaultStorage + | FilesystemStorage FilePath + | MemoryStorage + data ServiceOption = ServiceOption { soptName :: String , soptService :: SomeService @@ -75,6 +83,8 @@ defaultOptions :: Options defaultOptions = Options { optServer = defaultServerOptions , optServices = availableServices + , optStorage = DefaultStorage + , optChatroomAutoSubscribe = Nothing , optDmBotEcho = Nothing , optShowHelp = False , optShowVersion = False @@ -86,6 +96,8 @@ availableServices = True "attach (to) other devices" , ServiceOption "sync" (someService @SyncService Proxy) True "synchronization with attached devices" + , ServiceOption "chatroom" (someService @ChatroomService Proxy) + True "chatrooms with multiple participants" , ServiceOption "contact" (someService @ContactService Proxy) True "create contacts with network peers" , ServiceOption "dm" (someService @DirectMessage Proxy) @@ -104,6 +116,15 @@ options = , Option ['s'] ["silent"] (NoArg (so $ \opts -> opts { serverLocalDiscovery = False })) "do not send announce packets for local discovery" + , Option [] [ "storage" ] + (ReqArg (\path -> \opts -> opts { optStorage = FilesystemStorage path }) "<path>") + "use storage in <path>" + , Option [] [ "memory-storage" ] + (NoArg (\opts -> opts { optStorage = MemoryStorage })) + "use memory storage" + , Option [] ["chatroom-auto-subscribe"] + (ReqArg (\count -> \opts -> opts { optChatroomAutoSubscribe = Just (read count) }) "<count>") + "automatically subscribe for up to <count> chatrooms" , Option [] ["dm-bot-echo"] (ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") "automatically reply to direct messages with the same text prefixed with <prefix>" @@ -133,8 +154,20 @@ servicesOptions = concatMap helper $ "all" : map soptName availableServices main :: IO () main = do - st <- liftIO $ openStorage . fromMaybe "./.erebos" =<< lookupEnv "EREBOS_DIR" - getArgs >>= \case + (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case + (o, args, []) -> do + return (foldl (flip id) defaultOptions o, args) + (_, _, errs) -> do + progName <- getProgName + hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information." + exitFailure + + st <- liftIO $ case optStorage opts of + DefaultStorage -> openStorage . fromMaybe "./.erebos" =<< lookupEnv "EREBOS_DIR" + FilesystemStorage path -> openStorage path + MemoryStorage -> memoryStorage + + case args of ["cat-file", sref] -> do readRef st (BC.pack sref) >>= \case Nothing -> error "ref does not exist" @@ -150,7 +183,7 @@ main = do forM_ (signedSignature signed) $ \sig -> do putStr $ "SIG " BC.putStrLn $ showRef $ storedRef $ sigKey $ fromStored sig - "identity" -> case validateIdentityF (wrappedLoad <$> refs) of + "identity" -> case validateExtendedIdentityF (wrappedLoad <$> refs) of Just identity -> do let disp :: Identity m -> IO () disp idt = do @@ -160,7 +193,7 @@ main = do case idOwner idt of Nothing -> return () Just owner -> do - mapM_ (putStrLn . ("OWNER " ++) . BC.unpack . showRefDigest . refDigest . storedRef) $ idDataF owner + mapM_ (putStrLn . ("OWNER " ++) . BC.unpack . showRefDigest . refDigest . storedRef) $ idExtDataF owner disp owner disp identity Nothing -> putStrLn $ "Identity verification failed" @@ -184,32 +217,30 @@ main = do ["test"] -> runTestTool st - args -> case getOpt Permute (options ++ servicesOptions) args of - (o, [], []) -> do - let opts = foldl (flip id) defaultOptions o - header = "Usage: erebos [OPTION...]" - serviceDesc ServiceOption {..} = padService (" " <> soptName) <> soptDescription - - padTo n str = str <> replicate (n - length str) ' ' - padOpt = padTo 37 - padService = padTo 16 - - if | optShowHelp opts -> putStr $ usageInfo header options <> unlines - ( - [ padOpt " --enable-<service>" <> "enable network service <service>" - , padOpt " --disable-<service>" <> "disable network service <service>" - , padOpt " --enable-all" <> "enable all network services" - , padOpt " --disable-all" <> "disable all network services" - , "" - , "Available network services:" - ] ++ map serviceDesc availableServices - ) - | optShowVersion opts -> putStrLn versionLine - | otherwise -> interactiveLoop st opts - (_, _, errs) -> do - progName <- getProgName - hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information." - exitFailure + [] -> do + let header = "Usage: erebos [OPTION...]" + serviceDesc ServiceOption {..} = padService (" " <> soptName) <> soptDescription + + padTo n str = str <> replicate (n - length str) ' ' + padOpt = padTo 37 + padService = padTo 16 + + if | optShowHelp opts -> putStr $ usageInfo header options <> unlines + ( + [ padOpt " --enable-<service>" <> "enable network service <service>" + , padOpt " --disable-<service>" <> "disable network service <service>" + , padOpt " --enable-all" <> "enable all network services" + , padOpt " --disable-all" <> "disable all network services" + , "" + , "Available network services:" + ] ++ map serviceDesc availableServices + ) + | optShowVersion opts -> putStrLn versionLine + | otherwise -> interactiveLoop st opts + + (cmdname : _) -> do + hPutStrLn stderr $ "Unknown command `" <> cmdname <> "'" + exitFailure inputSettings :: Settings IO @@ -222,8 +253,10 @@ interactiveLoop st opts = runInputT inputSettings $ do tui <- haveTerminalUI extPrint <- getExternalPrint - let extPrintLn str = extPrint $ case reverse str of ('\n':_) -> str - _ -> str ++ "\n"; + let extPrintLn str = do + let str' = case reverse str of ('\n':_) -> str + _ -> str ++ "\n"; + extPrint $! str' -- evaluate str before calling extPrint to avoid blinking let getInputLinesTui eprompt = do prompt <- case eprompt of @@ -235,6 +268,7 @@ interactiveLoop st opts = runInputT inputSettings $ do PeerIdentityRef wref _ -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">" PeerIdentityUnknown _ -> "<unknown>" SelectedContact contact -> return $ T.unpack $ contactName contact + SelectedChatroom rstate -> return $ T.unpack $ fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate SelectedConversation conv -> return $ T.unpack $ conversationName conv return $ pname ++ "> " Right prompt -> return prompt @@ -281,13 +315,20 @@ interactiveLoop st opts = runInputT inputSettings $ do Right reply -> extPrintLn $ formatDirectMessage tzone $ fromStored reply Left err -> extPrintLn $ "Failed to send dm echo: " <> err + peers <- liftIO $ newMVar [] + contextOptions <- liftIO $ newMVar [] + chatroomSetVar <- liftIO $ newEmptyMVar + + let autoSubscribe = optChatroomAutoSubscribe opts + chatroomList = fromSetBy (comparing roomStateData) . lookupSharedValue . lsShared . headObject $ erebosHead + watched <- if isJust autoSubscribe || any roomStateSubscribe chatroomList + then fmap Just $ liftIO $ watchChatroomsForCli extPrintLn erebosHead chatroomSetVar contextOptions autoSubscribe + else return Nothing + server <- liftIO $ do startServer (optServer opts) erebosHead extPrintLn $ map soptService $ filter soptEnabled $ optServices opts - peers <- liftIO $ newMVar [] - contextOptions <- liftIO $ newMVar [] - void $ liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange server peerIdentity peer >>= \case @@ -320,11 +361,14 @@ interactiveLoop st opts = runInputT inputSettings $ do { ciServer = server , ciLine = line , ciPrint = extPrintLn + , ciOptions = opts , ciPeers = liftIO $ modifyMVar peers $ \ps -> do ps' <- filterM (fmap not . isPeerDropped . fst) ps return (ps', ps') , ciContextOptions = liftIO $ readMVar contextOptions , ciSetContextOptions = \ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ctxs + , ciContextOptionsVar = contextOptions + , ciChatroomSetVar = chatroomSetVar } case res of Right cstate' @@ -343,6 +387,7 @@ interactiveLoop st opts = runInputT inputSettings $ do , csIceSessions = [] #endif , csIcePeer = Nothing + , csWatchChatrooms = watched , csQuit = False } @@ -351,9 +396,12 @@ data CommandInput = CommandInput { ciServer :: Server , ciLine :: String , ciPrint :: String -> IO () + , ciOptions :: Options , ciPeers :: CommandM [(Peer, String)] , ciContextOptions :: CommandM [CommandContext] , ciSetContextOptions :: [CommandContext] -> Command + , ciContextOptionsVar :: MVar [ CommandContext ] + , ciChatroomSetVar :: MVar (Set ChatroomState) } data CommandState = CommandState @@ -363,12 +411,14 @@ data CommandState = CommandState , csIceSessions :: [IceSession] #endif , csIcePeer :: Maybe Peer + , csWatchChatrooms :: Maybe WatchedHead , csQuit :: Bool } data CommandContext = NoContext | SelectedPeer Peer | SelectedContact Contact + | SelectedChatroom ChatroomState | SelectedConversation Conversation newtype CommandM a = CommandM (ReaderT CommandInput (StateT CommandState (ExceptT String IO)) a) @@ -402,6 +452,11 @@ getSelectedPeer = gets csContext >>= \case SelectedPeer peer -> return peer _ -> throwError "no peer selected" +getSelectedChatroom :: CommandM ChatroomState +getSelectedChatroom = gets csContext >>= \case + SelectedChatroom rstate -> return rstate + _ -> throwError "no chatroom selected" + getSelectedConversation :: CommandM Conversation getSelectedConversation = gets csContext >>= \case SelectedPeer peer -> peerIdentity peer >>= \case @@ -410,6 +465,10 @@ getSelectedConversation = gets csContext >>= \case SelectedContact contact -> case contactIdentity contact of Just cid -> directMessageConversation cid Nothing -> throwError "contact without erebos identity" + SelectedChatroom rstate -> + chatroomConversation rstate >>= \case + Just conv -> return conv + Nothing -> throwError "invalid chatroom" SelectedConversation conv -> reloadConversation conv _ -> throwError "no contact, peer or conversation selected" @@ -425,6 +484,8 @@ commands = , ("attach", cmdAttach) , ("attach-accept", cmdAttachAccept) , ("attach-reject", cmdAttachReject) + , ("chatrooms", cmdChatrooms) + , ("chatroom-create-public", cmdChatroomCreatePublic) , ("contacts", cmdContacts) , ("contact-add", cmdContactAdd) , ("contact-accept", cmdContactAccept) @@ -440,6 +501,9 @@ commands = , ("ice-connect", cmdIceConnect) , ("ice-send", cmdIceSend) #endif + , ("join", cmdJoin) + , ("leave", cmdLeave) + , ("members", cmdMembers) , ("select", cmdSelectContext) , ("quit", cmdQuit) ] @@ -492,20 +556,41 @@ showPeer pidentity paddr = PeerIdentityFull pid -> T.unpack $ displayIdentity pid in name ++ " [" ++ show paddr ++ "]" +cmdJoin :: Command +cmdJoin = joinChatroom =<< getSelectedChatroom + +cmdLeave :: Command +cmdLeave = leaveChatroom =<< getSelectedChatroom + +cmdMembers :: Command +cmdMembers = do + Just room <- findChatroomByStateData . head . roomStateData =<< getSelectedChatroom + forM_ (chatroomMembers room) $ \x -> do + liftIO $ putStrLn $ maybe "<unnamed>" T.unpack $ idName x + + cmdSelectContext :: Command cmdSelectContext = do n <- read <$> asks ciLine join (asks ciContextOptions) >>= \ctxs -> if - | n > 0, (ctx : _) <- drop (n - 1) ctxs -> modify $ \s -> s { csContext = ctx } + | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do + modify $ \s -> s { csContext = ctx } + case ctx of + SelectedChatroom rstate -> do + when (not (roomStateSubscribe rstate)) $ do + chatroomSetSubscribe (head $ roomStateData rstate) True + _ -> return () | otherwise -> throwError "invalid index" cmdSend :: Command cmdSend = void $ do text <- asks ciLine conv <- getSelectedConversation - msg <- sendMessage conv $ T.pack text - tzone <- liftIO $ getCurrentTimeZone - liftIO $ putStrLn $ formatMessage tzone msg + sendMessage conv (T.pack text) >>= \case + Just msg -> do + tzone <- liftIO $ getCurrentTimeZone + liftIO $ putStrLn $ formatMessage tzone msg + Nothing -> return () cmdHistory :: Command cmdHistory = void $ do @@ -530,6 +615,110 @@ cmdAttachAccept = attachAccept =<< getSelectedPeer cmdAttachReject :: Command cmdAttachReject = attachReject =<< getSelectedPeer +watchChatroomsForCli :: (String -> IO ()) -> Head LocalState -> MVar (Set ChatroomState) -> MVar [ CommandContext ] -> Maybe Int -> IO WatchedHead +watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe = do + subscribedNumVar <- newEmptyMVar + + let ctxUpdate updateType (idx :: Int) rstate = \case + SelectedChatroom rstate' : rest + | currentRoots <- filterAncestors (concatMap storedRoots $ roomStateData rstate) + , any ((`intersectsSorted` currentRoots) . storedRoots) $ roomStateData rstate' + -> do + eprint $ "[" <> show idx <> "] CHATROOM " <> updateType <> " " <> name + return (SelectedChatroom rstate : rest) + selected : rest + -> do + (selected : ) <$> ctxUpdate updateType (idx + 1) rstate rest + [] + -> do + eprint $ "[" <> show idx <> "] CHATROOM " <> updateType <> " " <> name + return [ SelectedChatroom rstate ] + where + name = maybe "<unnamed>" T.unpack $ roomName =<< roomStateRoom rstate + + watchChatrooms h $ \set -> \case + Nothing -> do + let chatroomList = fromSetBy (comparing roomStateData) set + (subscribed, notSubscribed) = partition roomStateSubscribe chatroomList + subscribedNum = length subscribed + + putMVar chatroomSetVar set + putMVar subscribedNumVar subscribedNum + + case autoSubscribe of + Nothing -> return () + Just num -> do + forM_ (take (num - subscribedNum) notSubscribed) $ \rstate -> do + (runExceptT $ flip runReaderT h $ chatroomSetSubscribe (head $ roomStateData rstate) True) >>= \case + Right () -> return () + Left err -> eprint err + + Just diff -> do + modifyMVar_ chatroomSetVar $ return . const set + forM_ diff $ \case + AddedChatroom rstate -> do + modifyMVar_ contextVar $ ctxUpdate "NEW" 1 rstate + modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then (+ 1) else id + + RemovedChatroom rstate -> do + modifyMVar_ contextVar $ ctxUpdate "DEL" 1 rstate + modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then subtract 1 else id + + UpdatedChatroom oldroom rstate -> do + when (any ((\rsd -> not (null (rsdRoom rsd))) . fromStored) (roomStateData rstate)) $ do + modifyMVar_ contextVar $ ctxUpdate "UPD" 1 rstate + when (any (not . null . rsdMessages . fromStored) (roomStateData rstate)) $ do + tzone <- getCurrentTimeZone + forM_ (reverse $ getMessagesSinceState rstate oldroom) $ \msg -> do + eprint $ concat $ + [ maybe "<unnamed>" T.unpack $ roomName =<< cmsgRoom msg + , formatTime defaultTimeLocale " [%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ cmsgTime msg + , maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg + , if cmsgLeave msg then " left" else "" + , maybe (if cmsgLeave msg then "" else " joined") ((": " ++) . T.unpack) $ cmsgText msg + ] + modifyMVar_ subscribedNumVar $ return + . (if roomStateSubscribe rstate then (+ 1) else id) + . (if roomStateSubscribe oldroom then subtract 1 else id) + +ensureWatchedChatrooms :: Command +ensureWatchedChatrooms = do + gets csWatchChatrooms >>= \case + Nothing -> do + eprint <- asks ciPrint + h <- gets csHead + chatroomSetVar <- asks ciChatroomSetVar + contextVar <- asks ciContextOptionsVar + autoSubscribe <- asks $ optChatroomAutoSubscribe . ciOptions + watched <- liftIO $ watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe + modify $ \s -> s { csWatchChatrooms = Just watched } + Just _ -> return () + +cmdChatrooms :: Command +cmdChatrooms = do + ensureWatchedChatrooms + chatroomSetVar <- asks ciChatroomSetVar + chatroomList <- fromSetBy (comparing roomStateData) <$> liftIO (readMVar chatroomSetVar) + set <- asks ciSetContextOptions + set $ map SelectedChatroom chatroomList + forM_ (zip [1..] chatroomList) $ \(i :: Int, rstate) -> do + liftIO $ putStrLn $ "[" ++ show i ++ "] " ++ maybe "<unnamed>" T.unpack (roomName =<< roomStateRoom rstate) + +cmdChatroomCreatePublic :: Command +cmdChatroomCreatePublic = do + name <- asks ciLine >>= \case + line | not (null line) -> return $ T.pack line + _ -> liftIO $ do + T.putStr $ T.pack "Name: " + hFlush stdout + T.getLine + + ensureWatchedChatrooms + void $ createChatroom + (if T.null name then Nothing else Just name) + Nothing + + cmdContacts :: Command cmdContacts = do args <- words <$> asks ciLine @@ -586,6 +775,9 @@ cmdDetails = do SelectedContact contact -> do printContactDetails contact + SelectedChatroom rstate -> do + liftIO $ putStrLn $ "Chatroom: " <> (T.unpack $ fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate) + SelectedConversation conv -> do case conversationPeer conv of Just pid -> printContactOrIdentityDetails pid @@ -703,3 +895,10 @@ cmdIceSend = void $ do cmdQuit :: Command cmdQuit = modify $ \s -> s { csQuit = True } + + +intersectsSorted :: Ord a => [a] -> [a] -> Bool +intersectsSorted (x:xs) (y:ys) | x < y = intersectsSorted xs (y:ys) + | x > y = intersectsSorted (x:xs) ys + | otherwise = True +intersectsSorted _ _ = False diff --git a/main/Test.hs b/main/Test.hs index d5737c2..c6448b8 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -97,7 +97,7 @@ runTestTool st = do Nothing -> return () runExceptT (evalStateT testLoop initTestState) >>= \case - Left x -> hPutStrLn stderr x + Left x -> B.hPutStr stderr $ (`BC.snoc` '\n') $ BC.pack x Right () -> return () getLineMb :: MonadIO m => m (Maybe Text) @@ -121,7 +121,7 @@ outLine :: Output -> String -> IO () outLine mvar line = do evaluate $ foldl' (flip seq) () line withMVar mvar $ \() -> do - putStrLn line + B.putStr $ (`BC.snoc` '\n') $ BC.pack line hFlush stdout cmdOut :: String -> Command @@ -283,6 +283,9 @@ commands = map (T.pack *** id) , ("chatroom-set-name", cmdChatroomSetName) , ("chatroom-subscribe", cmdChatroomSubscribe) , ("chatroom-unsubscribe", cmdChatroomUnsubscribe) + , ("chatroom-members", cmdChatroomMembers) + , ("chatroom-join", cmdChatroomJoin) + , ("chatroom-leave", cmdChatroomLeave) , ("chatroom-message-send", cmdChatroomMessageSend) ] @@ -428,7 +431,7 @@ cmdStartServer = do h <- getOrLoadHead rsPeers <- liftIO $ newMVar (1, []) - rsServer <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr) + rsServer <- liftIO $ startServer defaultServerOptions h (B.hPutStr stderr . (`BC.snoc` '\n') . BC.pack) [ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" , someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact" , someServiceAttr $ directMessageAttributes out @@ -501,11 +504,11 @@ cmdPeerList = do cmdTestMessageSend :: Command cmdTestMessageSend = do - [spidx, tref] <- asks tiParams + spidx : trefs <- asks tiParams st <- asks tiStorage - Just ref <- liftIO $ readRef st (encodeUtf8 tref) + Just refs <- liftIO $ fmap sequence $ mapM (readRef st . encodeUtf8) trefs peer <- getPeer spidx - sendToPeer peer $ TestMessage $ wrappedLoad ref + sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs cmdOut "test-message-send done" cmdSharedStateGet :: Command @@ -726,11 +729,13 @@ cmdChatroomWatchLocal = do , [ "new" ], map (show . refDigest . storedRef) (roomStateData room) ] when (any (not . null . rsdMessages . fromStored) (roomStateData room)) $ do - forM_ (getMessagesSinceState room oldroom) $ \msg -> do + forM_ (reverse $ getMessagesSinceState room oldroom) $ \msg -> do outLine out $ unwords $ concat [ [ "chatroom-message-new" ] , [ show . refDigest . storedRef . head . filterAncestors . concatMap storedRoots . toComponents $ room ] + , [ "room", maybe "<unnamed>" T.unpack $ roomName =<< cmsgRoom msg ] , [ "from", maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg ] + , if cmsgLeave msg then [ "leave" ] else [] , maybe [] (("text":) . (:[]) . T.unpack) $ cmsgText msg ] @@ -753,8 +758,28 @@ cmdChatroomUnsubscribe = do to <- getChatroomStateData cid void $ chatroomSetSubscribe to False +cmdChatroomMembers :: Command +cmdChatroomMembers = do + [ cid ] <- asks tiParams + Just chatroom <- findChatroomByStateData =<< getChatroomStateData cid + forM_ (chatroomMembers chatroom) $ \user -> do + cmdOut $ unwords [ "chatroom-members-item", maybe "<unnamed>" T.unpack $ idName user ] + cmdOut "chatroom-members-done" + +cmdChatroomJoin :: Command +cmdChatroomJoin = do + [ cid ] <- asks tiParams + joinChatroomByStateData =<< getChatroomStateData cid + cmdOut "chatroom-join-done" + +cmdChatroomLeave :: Command +cmdChatroomLeave = do + [ cid ] <- asks tiParams + leaveChatroomByStateData =<< getChatroomStateData cid + cmdOut "chatroom-leave-done" + cmdChatroomMessageSend :: Command cmdChatroomMessageSend = do [cid, msg] <- asks tiParams to <- getChatroomStateData cid - void $ chatroomMessageByStateData to msg + void $ sendChatroomMessageByStateData to msg diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs index 673c59f..c8b5805 100644 --- a/src/Erebos/Chatroom.hs +++ b/src/Erebos/Chatroom.hs @@ -11,14 +11,20 @@ module Erebos.Chatroom ( findChatroomByRoomData, findChatroomByStateData, chatroomSetSubscribe, + chatroomMembers, + joinChatroom, joinChatroomByStateData, + leaveChatroom, leaveChatroomByStateData, getMessagesSinceState, ChatroomSetChange(..), watchChatrooms, - ChatMessage, cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave, + ChatMessage, + cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave, + cmsgRoom, cmsgRoomData, ChatMessageData(..), - chatroomMessageByStateData, + sendChatroomMessage, + sendChatroomMessageByStateData, ChatroomService(..), ) where @@ -29,6 +35,9 @@ import Control.Monad.Except import Control.Monad.IO.Class import Data.Bool +import Data.Either +import Data.Foldable +import Data.Function import Data.IORef import Data.List import Data.Maybe @@ -111,6 +120,11 @@ data ChatMessage = ChatMessage { cmsgData :: Stored (Signed ChatMessageData) } +validateSingleMessage :: Stored (Signed ChatMessageData) -> Maybe ChatMessage +validateSingleMessage sdata = do + guard $ fromStored sdata `isSignedBy` idKeyMessage (mdFrom (fromSigned sdata)) + return $ ChatMessage sdata + cmsgFrom :: ChatMessage -> ComposedIdentity cmsgFrom = mdFrom . fromSigned . cmsgData @@ -126,6 +140,12 @@ cmsgText = mdText . fromSigned . cmsgData cmsgLeave :: ChatMessage -> Bool cmsgLeave = mdLeave . fromSigned . cmsgData +cmsgRoom :: ChatMessage -> Maybe Chatroom +cmsgRoom = either (const Nothing) Just . runExcept . validateChatroom . cmsgRoomData + +cmsgRoomData :: ChatMessage -> [ Stored (Signed ChatroomData) ] +cmsgRoomData = concat . findProperty ((\case [] -> Nothing; xs -> Just xs) . mdRoom . fromStored . signedData) . (: []) . cmsgData + instance Storable ChatMessageData where store' ChatMessageData {..} = storeRec $ do mapM_ (storeRef "SPREV") mdPrev @@ -146,37 +166,42 @@ instance Storable ChatMessageData where mdLeave <- isJust <$> loadMbEmpty "leave" return ChatMessageData {..} -threadToList :: [Stored (Signed ChatMessageData)] -> [ChatMessage] -threadToList thread = helper S.empty $ thread +threadToListSince :: [ Stored (Signed ChatMessageData) ] -> [ Stored (Signed ChatMessageData) ] -> [ ChatMessage ] +threadToListSince since thread = helper (S.fromList since) thread where helper :: S.Set (Stored (Signed ChatMessageData)) -> [Stored (Signed ChatMessageData)] -> [ChatMessage] helper seen msgs | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs = - messageFromData msg : helper (S.insert msg seen) (msgs' ++ mdPrev (fromSigned msg)) + maybe id (:) (validateSingleMessage msg) $ + helper (S.insert msg seen) (msgs' ++ mdPrev (fromSigned msg)) | otherwise = [] cmpView msg = (zonedTimeToUTC $ mdTime $ fromSigned msg, msg) - messageFromData :: Stored (Signed ChatMessageData) -> ChatMessage - messageFromData sdata = ChatMessage { cmsgData = sdata } +sendChatroomMessage + :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + => ChatroomState -> Text -> m () +sendChatroomMessage rstate msg = sendChatroomMessageByStateData (head $ roomStateData rstate) msg -chatroomMessageByStateData +sendChatroomMessageByStateData :: (MonadStorage m, MonadHead LocalState m, MonadError String m) => Stored ChatroomStateData -> Text -> m () -chatroomMessageByStateData lookupData msg = void $ findAndUpdateChatroomState $ \cstate -> do +sendChatroomMessageByStateData lookupData msg = sendRawChatroomMessageByStateData lookupData Nothing (Just msg) False + +sendRawChatroomMessageByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + => Stored ChatroomStateData -> Maybe (Stored (Signed ChatMessageData)) -> Maybe Text -> Bool -> m () +sendRawChatroomMessageByStateData lookupData mdReplyTo mdText mdLeave = void $ findAndUpdateChatroomState $ \cstate -> do guard $ any (lookupData `precedesOrEquals`) $ roomStateData cstate Just $ do - self <- finalOwner . localIdentity . fromStored <$> getLocalHead - secret <- loadKey $ idKeyMessage self - time <- liftIO getZonedTime - mdata <- mstore =<< sign secret =<< mstore ChatMessageData - { mdPrev = roomStateMessageData cstate - , mdRoom = [] - , mdFrom = self - , mdReplyTo = Nothing - , mdTime = time - , mdText = Just msg - , mdLeave = False - } + mdFrom <- finalOwner . localIdentity . fromStored <$> getLocalHead + secret <- loadKey $ idKeyMessage mdFrom + mdTime <- liftIO getZonedTime + let mdPrev = roomStateMessageData cstate + mdRoom = if null (roomStateMessageData cstate) + then maybe [] roomData (roomStateRoom cstate) + else [] + + mdata <- mstore =<< sign secret =<< mstore ChatMessageData {..} mergeSorted . (:[]) <$> mstore ChatroomStateData { rsdPrev = roomStateData cstate , rsdRoom = [] @@ -224,7 +249,7 @@ instance Mergeable ChatroomState where ChatroomStateData {..} | null rsdMessages -> Nothing | otherwise -> Just rsdMessages roomStateSubscribe = fromMaybe False $ findPropertyFirst rsdSubscribe roomStateData - roomStateMessages = threadToList $ concatMap (rsdMessages . fromStored) roomStateData + roomStateMessages = threadToListSince [] $ concatMap (rsdMessages . fromStored) roomStateData in ChatroomState {..} toComponents = roomStateData @@ -321,11 +346,38 @@ chatroomSetSubscribe lookupData subscribe = void $ findAndUpdateChatroomState $ , rsdMessages = [] } +chatroomMembers :: ChatroomState -> [ ComposedIdentity ] +chatroomMembers ChatroomState {..} = + map (mdFrom . fromSigned . head) $ + filter (any $ not . mdLeave . fromSigned) $ -- keep only users that hasn't left + map (filterAncestors . map snd) $ -- gather message data per each identity and filter ancestors + groupBy ((==) `on` fst) $ -- group on identity root + sortBy (comparing fst) $ -- sort by first root of identity data + map (\x -> ( head . filterAncestors . concatMap storedRoots . idDataF . mdFrom . fromSigned $ x, x )) $ + toList $ ancestors $ roomStateMessageData + +joinChatroom + :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + => ChatroomState -> m () +joinChatroom rstate = joinChatroomByStateData (head $ roomStateData rstate) + +joinChatroomByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + => Stored ChatroomStateData -> m () +joinChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupData Nothing Nothing False + +leaveChatroom + :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + => ChatroomState -> m () +leaveChatroom rstate = leaveChatroomByStateData (head $ roomStateData rstate) + +leaveChatroomByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + => Stored ChatroomStateData -> m () +leaveChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupData Nothing Nothing True + getMessagesSinceState :: ChatroomState -> ChatroomState -> [ChatMessage] -getMessagesSinceState cur old = takeWhile notOld (roomStateMessages cur) - where - notOld msg = cmsgData msg `notElem` roomStateMessageData old - -- TODO: parallel message threads +getMessagesSinceState cur old = threadToListSince (roomStateMessageData old) (roomStateMessageData cur) data ChatroomSetChange = AddedChatroom ChatroomState @@ -365,13 +417,18 @@ makeChatroomDiff [] ys = map (AddedChatroom . snd) ys data ChatroomService = ChatroomService { chatRoomQuery :: Bool , chatRoomInfo :: [Stored (Signed ChatroomData)] + , chatRoomSubscribe :: [Stored (Signed ChatroomData)] + , chatRoomUnsubscribe :: [Stored (Signed ChatroomData)] , chatRoomMessage :: [Stored (Signed ChatMessageData)] } + deriving (Eq) emptyPacket :: ChatroomService emptyPacket = ChatroomService { chatRoomQuery = False , chatRoomInfo = [] + , chatRoomSubscribe = [] + , chatRoomUnsubscribe = [] , chatRoomMessage = [] } @@ -379,17 +436,22 @@ instance Storable ChatroomService where store' ChatroomService {..} = storeRec $ do when chatRoomQuery $ storeEmpty "room-query" forM_ chatRoomInfo $ storeRef "room-info" + forM_ chatRoomSubscribe $ storeRef "room-subscribe" + forM_ chatRoomUnsubscribe $ storeRef "room-unsubscribe" forM_ chatRoomMessage $ storeRef "room-message" load' = loadRec $ do chatRoomQuery <- isJust <$> loadMbEmpty "room-query" chatRoomInfo <- loadRefs "room-info" + chatRoomSubscribe <- loadRefs "room-subscribe" + chatRoomUnsubscribe <- loadRefs "room-unsubscribe" chatRoomMessage <- loadRefs "room-message" return ChatroomService {..} data PeerState = PeerState { psSendRoomUpdates :: Bool , psLastList :: [(Stored ChatroomStateData, ChatroomState)] + , psSubscribedTo :: [ Stored (Signed ChatroomData) ] -- least root for each room } instance Service ChatroomService where @@ -399,12 +461,18 @@ instance Service ChatroomService where emptyServiceState _ = PeerState { psSendRoomUpdates = False , psLastList = [] + , psSubscribedTo = [] } serviceHandler spacket = do let ChatroomService {..} = fromStored spacket + + previouslyUpdated <- psSendRoomUpdates <$> svcGet svcModify $ \s -> s { psSendRoomUpdates = True } + when (not previouslyUpdated) $ do + syncChatroomsToPeer . lookupSharedValue . lsShared . fromStored =<< getLocalHead + when chatRoomQuery $ do rooms <- listChatrooms replyPacket emptyPacket @@ -420,7 +488,7 @@ instance Service ChatroomService where maybe [] roomData . roomStateRoom let prev = concatMap roomStateData $ filter isCurrentRoom rooms - prevRoom = concatMap (rsdRoom . fromStored) prev + prevRoom = filterAncestors $ concat $ findProperty ((\case [] -> Nothing; xs -> Just xs) . rsdRoom) prev room = filterAncestors $ (roomInfo : ) prevRoom -- update local state only if we got roomInfo not present there @@ -436,6 +504,51 @@ instance Service ChatroomService where else return set foldM upd roomSet chatRoomInfo + forM_ chatRoomSubscribe $ \subscribeData -> do + mbRoomState <- findChatroomByRoomData subscribeData + forM_ mbRoomState $ \roomState -> + forM (roomStateRoom roomState) $ \room -> do + let leastRoot = head . filterAncestors . concatMap storedRoots . roomData $ room + svcModify $ \ps -> ps { psSubscribedTo = leastRoot : psSubscribedTo ps } + replyPacket emptyPacket + { chatRoomMessage = roomStateMessageData roomState + } + + forM_ chatRoomUnsubscribe $ \unsubscribeData -> do + mbRoomState <- findChatroomByRoomData unsubscribeData + forM_ (mbRoomState >>= roomStateRoom) $ \room -> do + let leastRoot = head . filterAncestors . concatMap storedRoots . roomData $ room + svcModify $ \ps -> ps { psSubscribedTo = filter (/= leastRoot) (psSubscribedTo ps) } + + when (not (null chatRoomMessage)) $ do + updateLocalHead_ $ updateSharedState_ $ \roomSet -> do + let rooms = fromSetBy (comparing $ roomName <=< roomStateRoom) roomSet + upd set (msgData :: Stored (Signed ChatMessageData)) + | Just msg <- validateSingleMessage msgData = do + let roomInfo = cmsgRoomData msg + currentRoots = filterAncestors $ concatMap storedRoots roomInfo + isCurrentRoom = any ((`intersectsSorted` currentRoots) . storedRoots) . + maybe [] roomData . roomStateRoom + + let prevData = concatMap roomStateData $ filter isCurrentRoom rooms + prev = mergeSorted prevData + prevMessages = roomStateMessageData prev + messages = filterAncestors $ msgData : prevMessages + + -- update local state only if subscribed and we got some new messages + if roomStateSubscribe prev && messages /= prevMessages + then do + sdata <- mstore ChatroomStateData + { rsdPrev = prevData + , rsdRoom = [] + , rsdSubscribe = Nothing + , rsdMessages = messages + } + storeSetAddComponent sdata set + else return set + | otherwise = return set + foldM upd roomSet chatRoomMessage + serviceNewPeer = do replyPacket emptyPacket { chatRoomQuery = True } @@ -447,11 +560,50 @@ syncChatroomsToPeer set = do ps@PeerState {..} <- svcGet when psSendRoomUpdates $ do let curList = chatroomSetToList set - updates <- fmap (concat . catMaybes) $ - forM (makeChatroomDiff psLastList curList) $ return . \case + diff = makeChatroomDiff psLastList curList + + roomUpdates <- fmap (concat . catMaybes) $ + forM diff $ return . \case AddedChatroom room -> roomData <$> roomStateRoom room RemovedChatroom {} -> Nothing - UpdatedChatroom _ room -> roomData <$> roomStateRoom room - when (not $ null updates) $ do - replyPacket $ emptyPacket { chatRoomInfo = updates } + UpdatedChatroom oldroom room + | roomStateData oldroom /= roomStateData room -> roomData <$> roomStateRoom room + | otherwise -> Nothing + + (subscribe, unsubscribe) <- fmap (partitionEithers . concat . catMaybes) $ + forM diff $ return . \case + AddedChatroom room + | roomStateSubscribe room + -> map Left . roomData <$> roomStateRoom room + RemovedChatroom oldroom + | roomStateSubscribe oldroom + -> map Right . roomData <$> roomStateRoom oldroom + UpdatedChatroom oldroom room + | roomStateSubscribe oldroom /= roomStateSubscribe room + -> map (if roomStateSubscribe room then Left else Right) . roomData <$> roomStateRoom room + _ -> Nothing + + messages <- fmap concat $ do + let leastRootFor = head . filterAncestors . concatMap storedRoots . roomData + forM diff $ return . \case + AddedChatroom rstate + | Just room <- roomStateRoom rstate + , leastRootFor room `elem` psSubscribedTo + -> roomStateMessageData rstate + UpdatedChatroom oldstate rstate + | Just room <- roomStateRoom rstate + , leastRootFor room `elem` psSubscribedTo + , roomStateMessageData oldstate /= roomStateMessageData rstate + -> roomStateMessageData rstate + _ -> [] + + let packet = emptyPacket + { chatRoomInfo = roomUpdates + , chatRoomSubscribe = subscribe + , chatRoomUnsubscribe = unsubscribe + , chatRoomMessage = messages + } + + when (packet /= emptyPacket) $ do + replyPacket packet svcSet $ ps { psLastList = curList } diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index 94d2399..63475bd 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -1,12 +1,15 @@ module Erebos.Conversation ( Message, messageFrom, + messageTime, messageText, messageUnread, formatMessage, Conversation, directMessageConversation, + chatroomConversation, + chatroomConversationByStateData, reloadConversation, lookupConversations, @@ -23,30 +26,45 @@ import Data.List import Data.Maybe import Data.Text (Text) import Data.Text qualified as T +import Data.Time.Format import Data.Time.LocalTime import Erebos.Identity +import Erebos.Chatroom import Erebos.Message hiding (formatMessage) import Erebos.State import Erebos.Storage data Message = DirectMessageMessage DirectMessage Bool + | ChatroomMessage ChatMessage Bool messageFrom :: Message -> ComposedIdentity messageFrom (DirectMessageMessage msg _) = msgFrom msg +messageFrom (ChatroomMessage msg _) = cmsgFrom msg + +messageTime :: Message -> ZonedTime +messageTime (DirectMessageMessage msg _) = msgTime msg +messageTime (ChatroomMessage msg _) = cmsgTime msg messageText :: Message -> Maybe Text messageText (DirectMessageMessage msg _) = Just $ msgText msg +messageText (ChatroomMessage msg _) = cmsgText msg messageUnread :: Message -> Bool messageUnread (DirectMessageMessage _ unread) = unread +messageUnread (ChatroomMessage _ unread) = unread formatMessage :: TimeZone -> Message -> String -formatMessage tzone (DirectMessageMessage msg _) = formatDirectMessage tzone msg +formatMessage tzone msg = concat + [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ messageTime msg + , maybe "<unnamed>" T.unpack $ idName $ messageFrom msg + , maybe "" ((": "<>) . T.unpack) $ messageText msg + ] data Conversation = DirectMessageConversation DirectMessageThread + | ChatroomConversation ChatroomState directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Conversation directMessageConversation peer = do @@ -54,8 +72,16 @@ directMessageConversation peer = do Just thread -> return $ DirectMessageConversation thread Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] +chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) +chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) + +chatroomConversationByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> m (Maybe Conversation) +chatroomConversationByStateData sdata = fmap ChatroomConversation <$> findChatroomByStateData sdata + reloadConversation :: MonadHead LocalState m => Conversation -> m Conversation reloadConversation (DirectMessageConversation thread) = directMessageConversation (msgPeer thread) +reloadConversation cur@(ChatroomConversation rstate) = + fromMaybe cur <$> chatroomConversation rstate lookupConversations :: MonadHead LocalState m => m [Conversation] lookupConversations = map DirectMessageConversation . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead @@ -63,13 +89,17 @@ lookupConversations = map DirectMessageConversation . toThreadList . lookupShare conversationName :: Conversation -> Text conversationName (DirectMessageConversation thread) = fromMaybe (T.pack "<unnamed>") $ idName $ msgPeer thread +conversationName (ChatroomConversation rstate) = fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate conversationPeer :: Conversation -> Maybe ComposedIdentity conversationPeer (DirectMessageConversation thread) = Just $ msgPeer thread +conversationPeer (ChatroomConversation _) = Nothing conversationHistory :: Conversation -> [Message] conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ threadToList thread +conversationHistory (ChatroomConversation rstate) = map (\msg -> ChatroomMessage msg False) $ roomStateMessages rstate -sendMessage :: (MonadHead LocalState m, MonadError String m) => Conversation -> Text -> m Message -sendMessage (DirectMessageConversation thread) text = DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False +sendMessage :: (MonadHead LocalState m, MonadError String m) => Conversation -> Text -> m (Maybe Message) +sendMessage (DirectMessageConversation thread) text = fmap Just $ DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False +sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text >> return Nothing diff --git a/src/Erebos/Identity.hs b/src/Erebos/Identity.hs index 8761fde..f2094f6 100644 --- a/src/Erebos/Identity.hs +++ b/src/Erebos/Identity.hs @@ -35,7 +35,6 @@ import Data.Foldable import Data.Function import Data.List import Data.Maybe -import Data.Ord import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -304,25 +303,18 @@ verifySignatures sidd = do throwError "signature verification failed" lookupProperty :: forall a m. Foldable m => (ExtendedIdentityData -> Maybe a) -> m (Stored (Signed ExtendedIdentityData)) -> Maybe a -lookupProperty sel topHeads = findResult filteredLayers - where findPropHeads :: Stored (Signed ExtendedIdentityData) -> [(Stored (Signed ExtendedIdentityData), a)] - findPropHeads sobj | Just x <- sel $ fromSigned sobj = [(sobj, x)] - | otherwise = findPropHeads =<< (eiddPrev $ fromSigned sobj) +lookupProperty sel topHeads = findResult propHeads + where + findPropHeads :: Stored (Signed ExtendedIdentityData) -> [ Stored (Signed ExtendedIdentityData) ] + findPropHeads sobj | Just _ <- sel $ fromSigned sobj = [ sobj ] + | otherwise = findPropHeads =<< (eiddPrev $ fromSigned sobj) - propHeads :: [(Stored (Signed ExtendedIdentityData), a)] - propHeads = findPropHeads =<< toList topHeads + propHeads :: [ Stored (Signed ExtendedIdentityData) ] + propHeads = filterAncestors $ findPropHeads =<< toList topHeads - historyLayers :: [Set (Stored (Signed ExtendedIdentityData))] - historyLayers = generations $ map fst propHeads - - filteredLayers :: [[(Stored (Signed ExtendedIdentityData), a)]] - filteredLayers = scanl (\cur obsolete -> filter ((`S.notMember` obsolete) . fst) cur) propHeads historyLayers - - findResult ([(_, x)] : _) = Just x - findResult ([] : _) = Nothing - findResult [] = Nothing - findResult [xs] = Just $ snd $ minimumBy (comparing fst) xs - findResult (_:rest) = findResult rest + findResult :: [ Stored (Signed ExtendedIdentityData) ] -> Maybe a + findResult [] = Nothing + findResult xs = sel $ fromSigned $ minimum xs mergeIdentity :: (MonadStorage m, MonadError String m, MonadIO m) => Identity f -> m UnifiedIdentity mergeIdentity idt | Just idt' <- toUnifiedIdentity idt = return idt' @@ -385,8 +377,9 @@ updateOwners updates orig@Identity { idOwner_ = Just owner, idUpdates_ = cupdate updateOwners _ orig@Identity { idOwner_ = Nothing } = orig sameIdentity :: (Foldable m, Foldable m') => Identity m -> Identity m' -> Bool -sameIdentity x y = not $ S.null $ S.intersection (refset x) (refset y) - where refset idt = foldr S.insert (ancestors $ toList $ idDataF idt) (idDataF idt) +sameIdentity x y = intersectsSorted (roots x) (roots y) + where + roots idt = uniq $ sort $ concatMap storedRoots $ toList $ idDataF idt unfoldOwners :: (Foldable m) => Identity m -> [ComposedIdentity] diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 41b6279..2064d1c 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -19,7 +19,9 @@ module Erebos.Network ( #endif dropPeer, isPeerDropped, - sendToPeer, sendToPeerStored, sendToPeerWith, + sendToPeer, sendManyToPeer, + sendToPeerStored, sendManyToPeerStored, + sendToPeerWith, runPeerService, discoveryPort, @@ -52,6 +54,9 @@ import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) import qualified Network.Socket.ByteString as S +import Foreign.C.Types +import Foreign.Marshal.Alloc + import Erebos.Channel #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE @@ -69,6 +74,9 @@ import Erebos.Storage.Merge discoveryPort :: PortNumber discoveryPort = 29665 +discoveryMulticastGroup :: HostAddress6 +discoveryMulticastGroup = tupleToHostAddress6 (0xff12, 0xb6a4, 0x6b1f, 0x0969, 0xcaee, 0xacc2, 0x5c93, 0x73e1) -- ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1 + announceIntervalSeconds :: Int announceIntervalSeconds = 60 @@ -247,8 +255,6 @@ startServer opt serverOrigHead logd' serverServices = do either (atomically . logd) return =<< runExceptT =<< atomically (readTQueue serverIOActions) - broadcastAddreses <- getBroadcastAddresses discoveryPort - let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) putMVar serverSocket sock @@ -259,9 +265,14 @@ startServer opt serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do - atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses - threadDelay $ announceIntervalSeconds * 1000 * 1000 + when (serverLocalDiscovery opt) $ forkServerThread server $ do + announceAddreses <- fmap concat $ sequence $ + [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock + , getBroadcastAddresses discoveryPort + ] + forever $ do + atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) announceAddreses + threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do st <- derivePartialStorage serverStorage @@ -301,10 +312,11 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - case paddr of - DatagramAddress addr -> void $ S.sendTo sock msg addr + handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + case paddr of + DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg + PeerIceSession ice -> iceSend ice msg #endif forkServerThread server $ forever $ do @@ -421,12 +433,18 @@ instance MonadFail PacketHandler where runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM () runPacketHandler secure peer@Peer {..} act = do let logd = writeTQueue $ serverErrorLog peerServer_ - runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case + runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case Left err -> do logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err Right ph -> do when (not $ null $ phHead ph) $ do - let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + body <- case phBodyStream ph of + Nothing -> return $ phBody ph + Just stream -> do + writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do + writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph + return [] + let packet = TransportPacket (TransportHeader $ phHead ph) body secreq = case (secure, phPlaintextReply ph) of (True, _) -> EncryptedOnly (False, False) -> PlaintextAllowed @@ -450,6 +468,7 @@ data PacketHandlerState = PacketHandlerState , phHead :: [TransportHeaderItem] , phAckedBy :: [TransportHeaderItem] , phBody :: [Ref] + , phBodyStream :: Maybe RawStreamWriter , phPlaintextReply :: Bool } @@ -462,6 +481,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } +sendBodyAsStream :: PacketHandler () +sendBodyAsStream = do + gets phBodyStream >>= \case + Nothing -> do + stream <- openStream + modify $ \ph -> ph { phBodyStream = Just stream } + Just _ -> return () + keepPlaintextReply :: PacketHandler () keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True } @@ -517,8 +544,12 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ finalizedChannel peer ch identity _ -> return () - Rejected dgst -> do - logd $ "rejected by peer: " ++ show dgst + Rejected dgst + | peerRequest : _ <- mapMaybe (\case TrChannelRequest d -> Just d; _ -> Nothing) headers + , peerRequest < dgst + -> return () -- Our request was rejected due to lower priority + + | otherwise -> logd $ "rejected by peer: " ++ show dgst DataRequest dgst | secure || dgst `elem` plaintextRefs -> do @@ -532,15 +563,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = -- otherwise lost the channel, so keep the reply plaintext as well. when (not secure) keepPlaintextReply - let bytes = lazyLoadBytes mref -- TODO: MTU - if (secure && BL.length bytes > 500) - then do - stream <- openStream - liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do - writeByteStringToStream stream bytes - else do - addBody $ mref + when (secure && BL.length (lazyLoadBytes mref) > 500) + sendBodyAsStream + + addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show dgst addHeader $ Rejected dgst @@ -593,9 +620,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelCookieWait {} -> return () ChannelCookieReceived {} -> process ChannelCookieConfirmed {} -> process - ChannelOurRequest our | dgst < refDigest (storedRef our) -> process - | otherwise -> reject - ChannelPeerRequest {} -> process + ChannelOurRequest our + | dgst < refDigest (storedRef our) -> process + | otherwise -> do + -- Reject peer channel request with lower priority + addHeader $ TrChannelRequest $ refDigest $ storedRef our + reject + ChannelPeerRequest prev + | dgst == wrDigest prev -> addHeader $ Acknowledged dgst + | otherwise -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process ChannelClosed {} -> return () @@ -647,12 +680,14 @@ setupChannel identity peer upid = do [ TrChannelRequest reqref , AnnounceSelf $ refDigest $ storedRef $ idData identity ] + let sendChannelRequest = do + sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ + TransportPacket (TransportHeader hitems) [storedRef req] + setPeerChannel peer $ ChannelOurRequest req liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelCookieConfirmed -> do - sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ - TransportPacket (TransportHeader hitems) [storedRef req] - setPeerChannel peer $ ChannelOurRequest req + ChannelCookieReceived -> sendChannelRequest + ChannelCookieConfirmed -> sendChannelRequest _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -806,10 +841,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case _ -> return False sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () -sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] +sendToPeer peer = sendManyToPeer peer . (: []) + +sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m () +sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True) sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m () -sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True] +sendToPeerStored peer = sendManyToPeerStored peer . (: []) + +sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () +sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () sendToPeerList peer parts = do @@ -912,9 +953,19 @@ runPeerServiceOn mbservice peer handler = liftIO $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () +joinMulticast :: Socket -> IO [ Word32 ] +joinMulticast sock = + withFdSocket sock $ \fd -> + alloca $ \pcount -> do + ptr <- cJoinMulticast fd pcount + count <- fromIntegral <$> peek pcount + forM [ 0 .. count - 1 ] $ \i -> + peekElemOff ptr i + getBroadcastAddresses :: PortNumber -> IO [SockAddr] getBroadcastAddresses port = do ptr <- cBroadcastAddresses @@ -922,6 +973,9 @@ getBroadcastAddresses port = do w <- peekElemOff ptr i if w == 0 then return [] else (SockAddrInet port w:) <$> parse (i + 1) - addrs <- parse 0 - cFree ptr - return addrs + if ptr == nullPtr + then return [] + else do + addrs <- parse 0 + cFree ptr + return addrs diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index a009ad1..cfbaea3 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -40,7 +40,17 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Trans +import Crypto.Cipher.ChaChaPoly1305 qualified as C +import Crypto.MAC.Poly1305 qualified as C (Auth(..), authTag) +import Crypto.Error +import Crypto.Random + +import Data.Binary +import Data.Binary.Get +import Data.Binary.Put import Data.Bits +import Data.ByteArray (Bytes, ScrubbedBytes) +import Data.ByteArray qualified as BA import Data.ByteString (ByteString) import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC @@ -51,7 +61,6 @@ import Data.Maybe import Data.Text (Text) import Data.Text qualified as T import Data.Void -import Data.Word import System.Clock @@ -68,6 +77,9 @@ protocolVersion = T.pack "0.1" protocolVersions :: [Text] protocolVersions = [protocolVersion] +keepAliveInternal :: TimeSpec +keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int) + data TransportPacket a = TransportPacket TransportHeader [a] @@ -101,6 +113,35 @@ data SecurityRequirement = PlaintextOnly | EncryptedOnly deriving (Eq, Ord) +data ParsedCookie = ParsedCookie + { cookieNonce :: C.Nonce + , cookieValidity :: Word32 + , cookieContent :: ByteString + , cookieMac :: C.Auth + } + +instance Eq ParsedCookie where + (==) = (==) `on` (\c -> ( BA.convert (cookieNonce c) :: ByteString, cookieValidity c, cookieContent c, cookieMac c )) + +instance Show ParsedCookie where + show ParsedCookie {..} = show (nonce, cookieValidity, cookieContent, mac) + where C.Auth mac = cookieMac + nonce = BA.convert cookieNonce :: ByteString + +instance Binary ParsedCookie where + put ParsedCookie {..} = do + putByteString $ BA.convert cookieNonce + putWord32be cookieValidity + putByteString $ BA.convert cookieMac + putByteString cookieContent + + get = do + Just cookieNonce <- maybeCryptoError . C.nonce12 <$> getByteString 12 + cookieValidity <- getWord32be + Just cookieMac <- maybeCryptoError . C.authTag <$> getByteString 16 + cookieContent <- BL.toStrict <$> getRemainingLazyByteString + return ParsedCookie {..} + isHeaderItemAcknowledged :: TransportHeaderItem -> Bool isHeaderItemAcknowledged = \case Acknowledged {} -> False @@ -165,9 +206,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () , gStorage :: PartialStorage + , gStartTime :: TimeSpec , gNowVar :: TVar TimeSpec , gNextTimeout :: TVar TimeSpec , gInitConfig :: Ref + , gCookieKey :: ScrubbedBytes + , gCookieStartTime :: Word32 } data Connection addr = Connection @@ -186,6 +230,7 @@ data Connection addr = Connection , cReservedPackets :: TVar Int , cSentPackets :: TVar [SentPacket] , cToAcknowledge :: TVar [Integer] + , cNextKeepAlive :: TVar (Maybe TimeSpec) , cInStreams :: TVar [(Word8, Stream)] , cOutStreams :: TVar [(Word8, Stream)] } @@ -440,15 +485,18 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do mStorage <- memoryStorage gStorage <- derivePartialStorage mStorage - startTime <- getTime MonotonicRaw - gNowVar <- newTVarIO startTime - gNextTimeout <- newTVarIO startTime + gStartTime <- getTime Monotonic + gNowVar <- newTVarIO gStartTime + gNextTimeout <- newTVarIO gStartTime gInitConfig <- store mStorage $ (Rec [] :: Object) + gCookieKey <- getRandomBytes 32 + gCookieStartTime <- runGet getWord32host . BL.pack . BA.unpack @ScrubbedBytes <$> getRandomBytes 4 + let gs = GlobalState {..} let signalTimeouts = forever $ do - now <- getTime MonotonicRaw + now <- getTime Monotonic next <- atomically $ do writeTVar gNowVar now readTVar gNextTimeout @@ -487,6 +535,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cReservedPackets <- newTVar 0 cSentPackets <- newTVar [] cToAcknowledge <- newTVar [] + cNextKeepAlive <- newTVar Nothing cInStreams <- newTVar [] cOutStreams <- newTVar [] let conn = Connection {..} @@ -548,6 +597,7 @@ processIncoming gs@GlobalState {..} = do Nothing -> throwError "empty packet" + now <- getTime Monotonic runExceptT parse >>= \case Right (Left (secure, objs, mbcounter)) | hobj:content <- objs @@ -562,6 +612,7 @@ processIncoming gs@GlobalState {..} = do case mbup of Just up -> putTMVar gNextUp (conn, (secure, up)) Nothing -> return () + updateKeepAlive conn now processAcknowledgements gs conn items ioAfter Nothing -> return () @@ -571,8 +622,9 @@ processIncoming gs@GlobalState {..} = do gLog $ show objs Right (Right (snum, seq8, content, counter)) - | Just Connection {..} <- mbconn + | Just conn@Connection {..} <- mbconn -> atomically $ do + updateKeepAlive conn now (lookup snum <$> readTVar cInStreams) >>= \case Nothing -> gLog $ "unexpected stream number " ++ show snum @@ -694,11 +746,38 @@ generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie -createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) +createCookie GlobalState {..} addr = do + (nonceBytes :: Bytes) <- getRandomBytes 12 + validUntil <- (fromNanoSecs (60 * 10^(9 :: Int)) +) <$> getTime Monotonic + let validSecondsFromStart = fromIntegral $ toNanoSecs (validUntil - gStartTime) `div` (10^(9 :: Int)) + cookieValidity = validSecondsFromStart - gCookieStartTime + plainContent = BC.pack (show addr) + throwCryptoErrorIO $ do + cookieNonce <- C.nonce12 nonceBytes + st1 <- C.initialize gCookieKey cookieNonce + let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1 + (cookieContent, st3) = C.encrypt plainContent st2 + cookieMac = C.finalize st3 + return $ Cookie $ BL.toStrict $ encode $ ParsedCookie {..} verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool -verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie - +verifyCookie GlobalState {..} addr (Cookie cookie) = do + ctime <- getTime Monotonic + return $ fromMaybe False $ do + ( _, _, ParsedCookie {..} ) <- either (const Nothing) Just $ decodeOrFail $ BL.fromStrict cookie + maybeCryptoError $ do + st1 <- C.initialize gCookieKey cookieNonce + let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1 + (plainContent, st3) = C.decrypt cookieContent st2 + mac = C.finalize st3 + + validSecondsFromStart = fromIntegral $ cookieValidity + gCookieStartTime + validUntil = gStartTime + fromNanoSecs (validSecondsFromStart * (10^(9 :: Int))) + return $ and + [ mac == cookieMac + , ctime <= validUntil + , show addr == BC.unpack plainContent + ] reservePacket :: Connection addr -> STM ReservedToSend reservePacket conn@Connection {..} = do @@ -713,9 +792,9 @@ reservePacket conn@Connection {..} = do return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () -resendBytes Connection {..} reserved sp = do +resendBytes conn@Connection {..} reserved sp = do let GlobalState {..} = cGlobalState - now <- getTime MonotonicRaw + now <- getTime Monotonic atomically $ do when (isJust reserved) $ do modifyTVar' cReservedPackets (subtract 1) @@ -726,6 +805,7 @@ resendBytes Connection {..} reserved sp = do , spRetryCount = spRetryCount sp + 1 } writeFlow gDataFlow (cAddress, spData sp) + updateKeepAlive conn now sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () sendBytes conn reserved bs = resendBytes conn reserved @@ -738,6 +818,12 @@ sendBytes conn reserved bs = resendBytes conn reserved , spData = bs } +updateKeepAlive :: Connection addr -> TimeSpec -> STM () +updateKeepAlive Connection {..} now = do + let next = now + keepAliveInternal + writeTVar cNextKeepAlive $ Just next + + processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) processOutgoing gs@GlobalState {..} = do @@ -777,11 +863,12 @@ processOutgoing gs@GlobalState {..} = do let onAck = sequence_ $ map (streamAccepted conn) $ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let mkPlain extraHeaders = - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems - in BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders + | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems = + BL.concat $ + (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems) + : map lazyLoadBytes content + | otherwise = BL.empty let usePlaintext = do plain <- mkPlain <$> generateCookieHeaders conn channel @@ -811,6 +898,13 @@ processOutgoing gs@GlobalState {..} = do sendBytes conn mbReserved' bs Nothing -> return () + let waitUntil :: TimeSpec -> TimeSpec -> STM () + waitUntil now till = do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || till < nextTimeout + then writeTVar gNextTimeout till + else retry + let retransmitPacket :: Connection addr -> STM (IO ()) retransmitPacket conn@Connection {..} = do now <- readTVar gNowVar @@ -819,11 +913,8 @@ processOutgoing gs@GlobalState {..} = do _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 if | now < nextTry -> do - nextTimeout <- readTVar gNextTimeout - if nextTimeout <= now || nextTry < nextTimeout - then do writeTVar gNextTimeout nextTry - return $ return () - else retry + waitUntil now nextTry + return $ return () | spRetryCount sp < 2 -> do reserved <- reservePacket conn writeTVar cSentPackets rest @@ -863,11 +954,28 @@ processOutgoing gs@GlobalState {..} = do writeTVar gIdentity (nid, cur : past) return $ return () + let sendKeepAlive :: Connection addr -> STM (IO ()) + sendKeepAlive Connection {..} = do + readTVar cNextKeepAlive >>= \case + Nothing -> retry + Just next -> do + now <- readTVar gNowVar + if next <= now + then do + writeTVar cNextKeepAlive Nothing + identity <- fst <$> readTVar gIdentity + let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] + writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], []) + else do + waitUntil now next + return $ return () + conns <- readTVar gConnections msum $ concat $ [ map retransmitPacket conns , map sendNextPacket conns , [ handleControlRequests ] + , map sendKeepAlive conns ] processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ()) diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c index 37c3e00..70685bc 100644 --- a/src/Erebos/Network/ifaddrs.c +++ b/src/Erebos/Network/ifaddrs.c @@ -1,11 +1,89 @@ #include "ifaddrs.h" +#include <errno.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifndef _WIN32 #include <arpa/inet.h> -#include <ifaddrs.h> #include <net/if.h> -#include <stdlib.h> -#include <sys/types.h> +#include <ifaddrs.h> #include <endian.h> +#include <sys/types.h> +#include <sys/socket.h> +#else +#include <winsock2.h> +#include <ws2ipdef.h> +#include <ws2tcpip.h> +#endif + +#define DISCOVERY_MULTICAST_GROUP "ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1" + +uint32_t * join_multicast(int fd, size_t * count) +{ + size_t capacity = 16; + *count = 0; + uint32_t * interfaces = malloc(sizeof(uint32_t) * capacity); + +#ifdef _WIN32 + interfaces[0] = 0; + *count = 1; +#else + struct ifaddrs * addrs; + if (getifaddrs(&addrs) < 0) + return 0; + + for (struct ifaddrs * ifa = addrs; ifa; ifa = ifa->ifa_next) { + if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET6 && + !(ifa->ifa_flags & IFF_LOOPBACK)) { + int idx = if_nametoindex(ifa->ifa_name); + + bool seen = false; + for (size_t i = 0; i < *count; i++) { + if (interfaces[i] == idx) { + seen = true; + break; + } + } + if (seen) + continue; + + if (*count + 1 >= capacity) { + capacity *= 2; + uint32_t * nret = realloc(interfaces, sizeof(uint32_t) * capacity); + if (nret) { + interfaces = nret; + } else { + free(interfaces); + *count = 0; + return NULL; + } + } + + interfaces[*count] = idx; + (*count)++; + } + } + + freeifaddrs(addrs); +#endif + + for (size_t i = 0; i < *count; i++) { + struct ipv6_mreq group; + group.ipv6mr_interface = interfaces[i]; + inet_pton(AF_INET6, DISCOVERY_MULTICAST_GROUP, &group.ipv6mr_multiaddr); + int ret = setsockopt(fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + (const void *) &group, sizeof(group)); + if (ret < 0) + fprintf(stderr, "IPV6_ADD_MEMBERSHIP failed: %s\n", strerror(errno)); + } + + return interfaces; +} + +#ifndef _WIN32 uint32_t * broadcast_addresses(void) { @@ -39,3 +117,51 @@ uint32_t * broadcast_addresses(void) ret[count] = 0; return ret; } + +#else // _WIN32 + +#include <winsock2.h> +#include <ws2tcpip.h> + +#pragma comment(lib, "ws2_32.lib") + +uint32_t * broadcast_addresses(void) +{ + uint32_t * ret = NULL; + SOCKET wsock = INVALID_SOCKET; + + struct WSAData wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) + return NULL; + + wsock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, 0); + if (wsock == INVALID_SOCKET) + goto cleanup; + + INTERFACE_INFO InterfaceList[32]; + unsigned long nBytesReturned; + + if (WSAIoctl(wsock, SIO_GET_INTERFACE_LIST, 0, 0, + InterfaceList, sizeof(InterfaceList), + &nBytesReturned, 0, 0) == SOCKET_ERROR) + goto cleanup; + + int numInterfaces = nBytesReturned / sizeof(INTERFACE_INFO); + + size_t capacity = 16, count = 0; + ret = malloc(sizeof(uint32_t) * capacity); + + for (int i = 0; i < numInterfaces && count < capacity - 1; i++) + if (InterfaceList[i].iiFlags & IFF_BROADCAST) + ret[count++] = InterfaceList[i].iiBroadcastAddress.AddressIn.sin_addr.s_addr; + + ret[count] = 0; +cleanup: + if (wsock != INVALID_SOCKET) + closesocket(wsock); + WSACleanup(); + + return ret; +} + +#endif diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h index 06d26ec..8852ec6 100644 --- a/src/Erebos/Network/ifaddrs.h +++ b/src/Erebos/Network/ifaddrs.h @@ -1,3 +1,5 @@ +#include <stddef.h> #include <stdint.h> +uint32_t * join_multicast(int fd, size_t * count); uint32_t * broadcast_addresses(void); diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs index d419a5e..8b794d8 100644 --- a/src/Erebos/Storage/Internal.hs +++ b/src/Erebos/Storage/Internal.hs @@ -241,7 +241,7 @@ writeFileOnce file content = bracket (openLockFile locked) doesFileExist file >>= \case True -> removeFile locked False -> do BL.hPut h content - hFlush h + hClose h renameFile locked file where locked = file ++ ".lock" @@ -254,13 +254,13 @@ writeFileChecked file prev content = bracket (openLockFile locked) removeFile locked return $ Left $ Just current (Nothing, False) -> do B.hPut h content - hFlush h + hClose h renameFile locked file return $ Right () (Just expected, True) -> do current <- B.readFile file if current == expected then do B.hPut h content - hFlush h + hClose h renameFile locked file return $ return () else do removeFile locked diff --git a/src/Erebos/Storage/Key.hs b/src/Erebos/Storage/Key.hs index b6afc20..5da79e3 100644 --- a/src/Erebos/Storage/Key.hs +++ b/src/Erebos/Storage/Key.hs @@ -80,6 +80,7 @@ moveKeys from to = liftIO $ do return M.empty (StorageMemory { memKeys = fromKeys }, StorageMemory { memKeys = toKeys }) -> do - modifyMVar_ fromKeys $ \fkeys -> do - modifyMVar_ toKeys $ return . M.union fkeys - return M.empty + when (fromKeys /= toKeys) $ do + modifyMVar_ fromKeys $ \fkeys -> do + modifyMVar_ toKeys $ return . M.union fkeys + return M.empty diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs index 9d9db13..a3b0fd7 100644 --- a/src/Erebos/Storage/Merge.hs +++ b/src/Erebos/Storage/Merge.hs @@ -97,13 +97,16 @@ storedGeneration x = doLookup x +-- |Returns list of sets starting with the set of given objects and +-- intcrementally adding parents. generations :: Storable a => [Stored a] -> [Set (Stored a)] generations = unfoldr gen . (,S.empty) - where gen (hs, cur) = case filter (`S.notMember` cur) $ previous =<< hs of + where gen (hs, cur) = case filter (`S.notMember` cur) hs of [] -> Nothing added -> let next = foldr S.insert cur added - in Just (next, (added, next)) + in Just (next, (previous =<< added, next)) +-- |Returns set containing all given objects and their ancestors ancestors :: Storable a => [Stored a] -> Set (Stored a) ancestors = last . (S.empty:) . generations diff --git a/src/windows/Erebos/Storage/Platform.hs b/src/windows/Erebos/Storage/Platform.hs new file mode 100644 index 0000000..76c940b --- /dev/null +++ b/src/windows/Erebos/Storage/Platform.hs @@ -0,0 +1,13 @@ +module Erebos.Storage.Platform ( + createFileExclusive, +) where + +import Data.Bits + +import System.IO +import System.Win32.File +import System.Win32.Types + +createFileExclusive :: FilePath -> IO Handle +createFileExclusive path = do + hANDLEToHandle =<< createFile path gENERIC_WRITE (fILE_SHARE_READ .|. fILE_SHARE_DELETE) Nothing cREATE_NEW fILE_ATTRIBUTE_NORMAL Nothing diff --git a/test/chatroom.test b/test/chatroom.test index 9be5665..93de1ff 100644 --- a/test/chatroom.test +++ b/test/chatroom.test @@ -98,25 +98,331 @@ test ChatroomSetup: test ChatroomMessages: spawn as p1 + spawn as p2 send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 - for p in [ p1 ]: + for p in [ p1, p2 ]: with p: send "chatroom-watch-local" send "start-server" - send "chatroom-create room" to p1 - expect /chatroom-create-done ([a-z0-9#]+) room.*/ from p1 capture room + send "chatroom-create first_room" to p1 + expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1 - for p in [ p1 ]: - with p: - expect /chatroom-watched-added [a-z0-9#]+ room sub [a-z]+/ + expect /chatroom-watched-added [a-z0-9#]+ first_room sub true/ from p1 + expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2 - send "chatroom-message-send $room message1" to p1 - expect /chatroom-message-new $room from Owner1 text message1/ from p1 + send "chatroom-message-send $room1_p1 message1" to p1 + expect /chatroom-message-new $room1_p1 room first_room from Owner1 text message1/ from p1 - send "chatroom-message-send $room message2" to p1 + send "chatroom-message-send $room1_p1 message2" to p1 local: - expect /chatroom-message-new $room from Owner1 text (.*)/ from p1 capture msg + expect /chatroom-message-new $room1_p1 room first_room from Owner1 text (.*)/ from p1 capture msg + guard (msg == "message2") + + # Subscribe to chatroom + + send "chatroom-subscribe $room1_p2" to p2 + expect /chatroom-watched-updated [a-z0-9#]+ first_room sub true .*/ from p2 + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner1 text (.*)/ capture msg + guard (msg == "message1") + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner1 text (.*)/ capture msg guard (msg == "message2") + + send "chatroom-message-send $room1_p2 message3" to p2 + for p in [ p1, p2 ]: + with p: + expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner2 text message3/ + + send "chatroom-message-send $room1_p1 message4" to p1 + for p in [ p1, p2 ]: + with p: + expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner1 text message4/ + + # Multiple rooms + + send "chatroom-create second_room" to p1 + expect /chatroom-create-done ([a-z0-9#]+) second_room.*/ from p1 capture room2_p1 + + send "chatroom-create third_room" to p2 + expect /chatroom-create-done ([a-z0-9#]+) third_room.*/ from p2 capture room3_p2 + + expect /chatroom-watched-added $room2_p1 second_room sub true/ from p1 + expect /chatroom-watched-added $room3_p2 third_room sub true/ from p2 + expect /chatroom-watched-added ([a-z0-9#]+) second_room sub false/ from p2 capture room2_p2 + expect /chatroom-watched-added ([a-z0-9#]+) third_room sub false/ from p1 capture room3_p1 + + spawn as p3 + send "create-identity Device3 Owner3" to p3 + send "chatroom-watch-local" to p3 + send "start-server" to p3 + expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p3 capture room1_p3 + expect /chatroom-watched-added ([a-z0-9#]+) second_room sub false/ from p3 capture room2_p3 + expect /chatroom-watched-added ([a-z0-9#]+) third_room sub false/ from p3 capture room3_p3 + + with p3: + for room in [ room1_p3, room2_p3, room3_p3 ]: + send "chatroom-subscribe $room" + expect /chatroom-watched-updated $room [a-z_]+ sub true .*/ + for i in [1..4]: + expect /chatroom-message-new $room1_p3 room first_room from Owner. text (.*)/ capture message + guard (message == "message$i") + + with p2: + send "chatroom-message-send $room2_p2 msg_r2_1" + send "chatroom-message-send $room2_p2 msg_r2_2" + send "chatroom-message-send $room2_p2 msg_r2_3" + expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_1/ + expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_2/ + expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_3/ + + send "chatroom-message-send $room3_p2 msg_r3_1" + send "chatroom-message-send $room3_p2 msg_r3_2" + send "chatroom-message-send $room3_p2 msg_r3_3" + expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_1/ + expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_2/ + expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_3/ + + with p1: + local: + expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message + guard (room == "second_room") + guard (message == "msg_r2_1") + local: + expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message + guard (room == "second_room") + guard (message == "msg_r2_2") + local: + expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message + guard (room == "second_room") + guard (message == "msg_r2_3") + + with p3: + expect /chatroom-message-new $room2_p3 room second_room from Owner2 text msg_r2_1/ + expect /chatroom-message-new $room2_p3 room second_room from Owner2 text msg_r2_2/ + expect /chatroom-message-new $room2_p3 room second_room from Owner2 text msg_r2_3/ + expect /chatroom-message-new $room3_p3 room third_room from Owner2 text msg_r3_1/ + expect /chatroom-message-new $room3_p3 room third_room from Owner2 text msg_r3_2/ + expect /chatroom-message-new $room3_p3 room third_room from Owner2 text msg_r3_3/ + + # Unsubscribe + + send "chatroom-unsubscribe $room1_p1" to p1 + expect /chatroom-watched-updated $room1_p1 [a-z_]+ sub false .*/ from p1 + send "chatroom-unsubscribe $room1_p3" to p3 + expect /chatroom-watched-updated $room1_p3 [a-z_]+ sub false .*/ from p3 + send "chatroom-unsubscribe $room2_p3" to p3 + expect /chatroom-watched-updated $room2_p3 [a-z_]+ sub false .*/ from p3 + + with p2: + send "chatroom-message-send $room1_p2 msg_r1_4" + expect /chatroom-message-new $room1_p2 room first_room from Owner2 text msg_r1_4/ + + send "chatroom-message-send $room2_p2 msg_r2_4" + expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_4/ + + send "chatroom-message-send $room3_p2 msg_r3_4" + expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_4/ + + with p1: + local: + expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message + guard (room == "second_room") + guard (message == "msg_r2_4") + + with p3: + local: + expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message + guard (room == "third_room") + guard (message == "msg_r3_4") + + +test ChatroomSubscribedBeforeStart: + spawn as p1 + spawn as p2 + + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + for p in [ p1, p2 ]: + with p: + send "chatroom-watch-local" + send "start-server" + + send "chatroom-create first_room" to p1 + expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1 + + expect /chatroom-watched-added [a-z0-9#]+ first_room sub true/ from p1 + expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2 + + with p2: + send "chatroom-subscribe $room1_p2" + expect /chatroom-watched-updated [a-z0-9#]+ first_room sub true .*/ + + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + for p in [p1, p2]: + with p: + send "start-server" + + send "chatroom-message-send $room1_p1 message1" to p1 + expect /chatroom-message-new $room1_p1 room first_room from Owner1 text message1/ from p1 + expect /chatroom-message-new $room1_p2 room first_room from Owner1 text message1/ from p2 + + send "chatroom-message-send $room1_p2 message2" to p2 + expect /chatroom-message-new $room1_p1 room first_room from Owner2 text message2/ from p1 + expect /chatroom-message-new $room1_p2 room first_room from Owner2 text message2/ from p2 + + +test ParallelThreads: + spawn as p1 + spawn as p2 + + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + for p in [ p1, p2 ]: + with p: + send "chatroom-watch-local" + send "start-server" + + send "chatroom-create first_room" to p1 + expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1 + + expect /chatroom-watched-added [a-z0-9#]+ first_room sub true/ from p1 + expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2 + + with p2: + send "chatroom-subscribe $room1_p2" + expect /chatroom-watched-updated [a-z0-9#]+ first_room sub true .*/ + + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + + send "chatroom-message-send $room1_p1 message1A" to p1 + send "chatroom-message-send $room1_p1 message1B" to p1 + send "chatroom-message-send $room1_p2 message2A" to p2 + send "chatroom-message-send $room1_p2 message2B" to p2 + with p1: + expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg + guard (msg == "1A") + with p1: + expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg + guard (msg == "1B") + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg + guard (msg == "2A") + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg + guard (msg == "2B") + + for p in [p1, p2]: + with p: + send "start-server" + + with p1: + expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg + guard (msg == "2A") + with p1: + expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg + guard (msg == "2B") + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg + guard (msg == "1A") + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg + guard (msg == "1B") + + +test ChatroomMembers: + spawn as p1 + spawn as p2 + spawn as p3 + + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + send "create-identity Device3 Owner3" to p3 + + for p in [ p1, p2, p3 ]: + with p: + send "chatroom-watch-local" + send "start-server" + + send "chatroom-create first_room" to p1 + expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1 + + expect /chatroom-watched-added $room1_p1 first_room sub true/ from p1 + expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2 + expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p3 capture room1_p3 + + local: + send "chatroom-members $room1_p1" to p1 + expect /chatroom-members-([a-z]+)/ from p1 capture done + guard (done == "done") + local: + send "chatroom-members $room1_p2" to p2 + expect /chatroom-members-([a-z]+)/ from p2 capture done + guard (done == "done") + + send "chatroom-message-send $room1_p1 message1" to p1 + send "chatroom-message-send $room1_p1 message2" to p1 + send "chatroom-join $room1_p2" to p2 + send "chatroom-message-send $room1_p2 message3" to p2 + send "chatroom-join $room1_p3" to p3 + + with p1: + expect /chatroom-message-new $room1_p1 room first_room from Owner1 text message2/ + expect /chatroom-message-new $room1_p1 room first_room from Owner2 text message3/ + expect /chatroom-message-new $room1_p1 room first_room from Owner3/ + with p2: + expect /chatroom-message-new $room1_p2 room first_room from Owner1 text message2/ + expect /chatroom-message-new $room1_p2 room first_room from Owner2 text message3/ + expect /chatroom-message-new $room1_p2 room first_room from Owner3/ + with p3: + expect /chatroom-message-new $room1_p3 room first_room from Owner1 text message2/ + expect /chatroom-message-new $room1_p3 room first_room from Owner2 text message3/ + expect /chatroom-message-new $room1_p3 room first_room from Owner3/ + + local: + send "chatroom-members $room1_p1" to p1 + expect /chatroom-members-item Owner1/ from p1 + expect /chatroom-members-item Owner2/ from p1 + expect /chatroom-members-item Owner3/ from p1 + expect /chatroom-members-([a-z]+)/ from p1 capture done + guard (done == "done") + local: + send "chatroom-members $room1_p2" to p2 + expect /chatroom-members-item Owner1/ from p2 + expect /chatroom-members-item Owner2/ from p2 + expect /chatroom-members-item Owner3/ from p2 + expect /chatroom-members-([a-z]+)/ from p2 capture done + guard (done == "done") + + send "chatroom-leave $room1_p1" to p1 + send "chatroom-leave $room1_p3" to p3 + + for p in [ p1, p2, p3 ]: + with p: + expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner1 leave/ + expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner3 leave/ + + send "chatroom-members $room1_p1" to p1 + send "chatroom-members $room1_p2" to p2 + send "chatroom-members $room1_p3" to p3 + for p in [ p1, p2, p3 ]: + with p: + expect /chatroom-members-item Owner2/ + expect /chatroom-members-([a-z]+)/ capture done + guard (done == "done") diff --git a/test/network.test b/test/network.test index 9540bf6..efd508f 100644 --- a/test/network.test +++ b/test/network.test @@ -178,6 +178,62 @@ test ManyStreams: expect /test-message-received blob 100[2-4] $ref/ from p2 +test MultipleServiceRefs: + spawn as p1 + spawn as p2 + send "create-identity Device1" to p1 + send "create-identity Device2" to p2 + send "start-server" to p1 + send "start-server" to p2 + expect from p1: + /peer 1 addr ${p2.node.ip} 29665/ + /peer 1 id Device2/ + expect from p2: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1/ + + let kbytes = 2 + + with p1: + send "store blob" + send "A" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_a + + # Create blobs with (kbytes * 1000) bytes each + + send "store blob" + send "B" + for j in [1 .. kbytes * 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_b + + send "store blob" + send "C" + for j in [1 .. kbytes * 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_c + + send "store blob" + send "D" + for j in [1 .. kbytes * 10]: + # 100 bytes each line + send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789" + send "" + expect /store-done (blake2#[0-9a-f]*)/ capture ref_d + + send "test-message-send 1 $ref_a $ref_b $ref_c $ref_d" + expect /test-message-send done/ + expect /test-message-received blob [0-9]+ $ref_a/ from p2 + expect /test-message-received blob [0-9]+ $ref_b/ from p2 + expect /test-message-received blob [0-9]+ $ref_c/ from p2 + expect /test-message-received blob [0-9]+ $ref_d/ from p2 + + test Reconnection: spawn as p1 with p1: |