diff options
53 files changed, 2881 insertions, 1132 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index cddb159..5295389 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,39 @@ # Revision history for erebos +## 0.2.0 -- 2025-08-06 + +* Weak references in records +* Use XDG data directory for default storage path +* Added `/identity` command to show details of current identity +* Support tunnel for peers in discovery service +* New CLI prompt implementation providing cleaner interface + * Avoids displaying sent messages twice – both in previous prompt and in message history + * Print received messages only for selected conversation + * Clear tab completion options after use + +* API + * Split `Erebos.Storage` into multiple modules + * Removed deprecated `Message.formatMessage` alias + * Renamed `Erebos.Message` module to `Erebos.DirectMessage` + * Added `StorageBackend` type class to allow custom storage implementation + * `MonadError` constraints use generic error type + * Replaced `Erebos.Network.peerAddress` with `getPeerAddress` and added `getPeerAddresses` + * Renamed `Erebos.Network.peerIdentity` to `getPeerIdentity` + * Renamed some functions in `Erebos.DirectMessage` module to make clear they are related only to direct messages + * `Erebos.Storage.Merge.generations`/`generationsBy` return `NonEmpty` + * Replaced `watchReceivedDirectMessages` with `watchDirectMessageThreads` + * Return type of `sendMessage` and `sendDirectMessage` is now `()` + * Some functions use `MonadStorage` instead of explicit `Storage` parameter: + * `Erebos.Set.storeSetAdd` + * `Erebos.State.makeSharedStateUpdate` + * `Erebos.Identity.createIdentity` + +## 0.1.9 -- 2025-07-08 + +* Option to show details or delete a conversation by giving index parameter without first selecting it +* Improved handling of ICE connections +* Automatic discovery of peers for pending direct messages + ## 0.1.8.1 -- 2025-03-29 * Fix build from sdist (add missing include) @@ -95,6 +95,9 @@ Test chatroom [19:03] Some Name: Hi `/conversations` : List started conversations with contacts or other peers. +`/new` +: List conversations with new (unread) messages. + `/<number>` : Select conversation, contact or peer `<number>` based on the last `/conversations`, `/contacts` or `/peers` output list. @@ -216,6 +219,9 @@ target device with `/<number>`. : Drop the currently selected peer. Afterwards, the connection can be re-established by either side. +`/identity` +: Show details of current identity + `/update-identity` : Interactively update current identity information diff --git a/erebos-tester.yaml b/erebos-tester.yaml index a44f080..c8239df 100644 --- a/erebos-tester.yaml +++ b/erebos-tester.yaml @@ -1 +1 @@ -tests: test/**/*.test +tests: test/**/*.et diff --git a/erebos.cabal b/erebos.cabal index f001a24..1937b97 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -1,7 +1,7 @@ Cabal-Version: 3.0 Name: erebos -Version: 0.1.8.1 +Version: 0.2.0 Synopsis: Decentralized messaging and synchronization Description: Library and simple CLI interface implementing the Erebos identity @@ -107,6 +107,7 @@ library Erebos.Discovery Erebos.Error Erebos.Identity + Erebos.Invite Erebos.Network Erebos.Object Erebos.Pairing @@ -124,7 +125,9 @@ library Erebos.Sync other-modules: + Erebos.Conversation.Class Erebos.Flow + Erebos.Network.Address Erebos.Network.Channel Erebos.Network.Protocol Erebos.Object.Internal @@ -143,7 +146,7 @@ library src/Erebos/Network/ifaddrs.h if flag(ice) - exposed-modules: + other-modules: Erebos.ICE c-sources: src/Erebos/ICE/pjproject.c diff --git a/main/Main.hs b/main/Main.hs index 26f4b12..a876d7b 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -1,9 +1,7 @@ -{-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} module Main (main) where -import Control.Arrow (first) import Control.Concurrent import Control.Exception import Control.Monad @@ -11,11 +9,13 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State import Control.Monad.Trans.Maybe +import Control.Monad.Writer import Crypto.Random -import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL +import Data.Bifunctor +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL import Data.Char import Data.List import Data.Maybe @@ -42,9 +42,6 @@ import Erebos.Chatroom import Erebos.Conversation import Erebos.DirectMessage import Erebos.Discovery -#ifdef ENABLE_ICE_SUPPORT -import Erebos.ICE -#endif import Erebos.Identity import Erebos.Network import Erebos.Object @@ -67,6 +64,7 @@ data Options = Options { optServer :: ServerOptions , optServices :: [ServiceOption] , optStorage :: StorageOption + , optCreateIdentity :: Maybe ( Maybe Text, [ Maybe Text ] ) , optChatroomAutoSubscribe :: Maybe Int , optDmBotEcho :: Maybe Text , optWebSocketServer :: Maybe Int @@ -74,9 +72,10 @@ data Options = Options , optShowVersion :: Bool } -data StorageOption = DefaultStorage - | FilesystemStorage FilePath - | MemoryStorage +data StorageOption + = DefaultStorage + | FilesystemStorage FilePath + | MemoryStorage data ServiceOption = ServiceOption { soptName :: String @@ -90,6 +89,7 @@ defaultOptions = Options { optServer = defaultServerOptions , optServices = availableServices , optStorage = DefaultStorage + , optCreateIdentity = Nothing , optChatroomAutoSubscribe = Nothing , optDmBotEcho = Nothing , optWebSocketServer = Nothing @@ -113,69 +113,101 @@ availableServices = True "peer discovery" ] -options :: [OptDescr (Options -> Options)] +options :: [ OptDescr (Options -> Writer [ String ] Options) ] options = - [ Option ['p'] ["port"] + [ Option [ 'p' ] [ "port" ] (ReqArg (\p -> so $ \opts -> opts { serverPort = read p }) "<port>") "local port to bind" - , Option ['s'] ["silent"] + , Option [ 's' ] [ "silent" ] (NoArg (so $ \opts -> opts { serverLocalDiscovery = False })) "do not send announce packets for local discovery" , Option [] [ "storage" ] - (ReqArg (\path -> \opts -> opts { optStorage = FilesystemStorage path }) "<path>") + (ReqArg (\path -> \opts -> return opts { optStorage = FilesystemStorage path }) "<path>") "use storage in <path>" , Option [] [ "memory-storage" ] - (NoArg (\opts -> opts { optStorage = MemoryStorage })) + (NoArg (\opts -> return opts { optStorage = MemoryStorage })) "use memory storage" - , Option [] ["chatroom-auto-subscribe"] - (ReqArg (\count -> \opts -> opts { optChatroomAutoSubscribe = Just (read count) }) "<count>") + , Option [] [ "create-identity" ] + (OptArg (\value -> \opts -> return opts + { optCreateIdentity = + let devName = T.pack <$> value + in maybe (Just ( devName, [] )) (Just . first (const devName)) (optCreateIdentity opts) + }) "<name>") + "create a new (device) identity in a new local state" + , Option [] [ "create-owner" ] + (OptArg (\value -> \opts -> return opts + { optCreateIdentity = + let ownerName = T.pack <$> value + in maybe (Just ( Nothing, [ ownerName ] )) (Just . second (ownerName :)) (optCreateIdentity opts) + }) "<name>") + "create owner for a new device identity" + , Option [] [ "chatroom-auto-subscribe" ] + (ReqArg (\count -> \opts -> return opts { optChatroomAutoSubscribe = Just (read count) }) "<count>") "automatically subscribe for up to <count> chatrooms" -#ifdef ENABLE_ICE_SUPPORT , Option [] [ "discovery-stun-port" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryStunPort = Just (read value) }) "<port>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryStunPort = Just (read value) }) "<port>") "offer specified <port> to discovery peers for STUN protocol" , Option [] [ "discovery-stun-server" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryStunServer = Just (read value) }) "<server>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryStunServer = Just (read value) }) "<server>") "offer <server> (domain name or IP address) to discovery peers for STUN protocol" , Option [] [ "discovery-turn-port" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryTurnPort = Just (read value) }) "<port>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryTurnPort = Just (read value) }) "<port>") "offer specified <port> to discovery peers for TURN protocol" , Option [] [ "discovery-turn-server" ] - (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryTurnServer = Just (read value) }) "<server>") + (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryTurnServer = Just (read value) }) "<server>") "offer <server> (domain name or IP address) to discovery peers for TURN protocol" -#endif - , Option [] ["dm-bot-echo"] - (ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") + , Option [] [ "discovery-tunnel" ] + (OptArg (\value -> \opts -> do + fun <- provideTunnelFun value + serviceAttr (\attrs -> return attrs { discoveryProvideTunnel = fun }) opts) "<peer-type>") + "offer to provide tunnel for peers of given <peer-type>, possible values: all, none, websocket" + , Option [] [ "dm-bot-echo" ] + (ReqArg (\prefix -> \opts -> return opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") "automatically reply to direct messages with the same text prefixed with <prefix>" , Option [] [ "websocket-server" ] - (ReqArg (\value -> \opts -> opts { optWebSocketServer = Just (read value) }) "<port>") + (ReqArg (\value -> \opts -> return opts { optWebSocketServer = Just (read value) }) "<port>") "start WebSocket server on given port" - , Option ['h'] ["help"] - (NoArg $ \opts -> opts { optShowHelp = True }) + , Option [ 'h' ] [ "help" ] + (NoArg $ \opts -> return opts { optShowHelp = True }) "show this help and exit" - , Option ['V'] ["version"] - (NoArg $ \opts -> opts { optShowVersion = True }) + , Option [ 'V' ] [ "version" ] + (NoArg $ \opts -> return opts { optShowVersion = True }) "show version and exit" ] where - so f opts = opts { optServer = f $ optServer opts } + so f opts = return opts { optServer = f $ optServer opts } - updateService :: Service s => (ServiceAttributes s -> ServiceAttributes s) -> SomeService -> SomeService + updateService :: (Service s, Monad m, Typeable m) => (ServiceAttributes s -> m (ServiceAttributes s)) -> SomeService -> m SomeService updateService f some@(SomeService proxy attrs) - | Just f' <- cast f = SomeService proxy (f' attrs) - | otherwise = some - - serviceAttr :: Service s => (ServiceAttributes s -> ServiceAttributes s) -> Options -> Options - serviceAttr f opts = opts { optServices = map (\sopt -> sopt { soptService = updateService f (soptService sopt) }) (optServices opts) } - -servicesOptions :: [OptDescr (Options -> Options)] + | Just f' <- cast f = SomeService proxy <$> f' attrs + | otherwise = return some + + serviceAttr :: (Service s, Monad m, Typeable m) => (ServiceAttributes s -> m (ServiceAttributes s)) -> Options -> m Options + serviceAttr f opts = do + services' <- forM (optServices opts) $ \sopt -> do + service <- updateService f (soptService sopt) + return sopt { soptService = service } + return opts { optServices = services' } + + provideTunnelFun :: Maybe String -> Writer [ String ] (Peer -> PeerAddress -> Bool) + provideTunnelFun Nothing = return $ \_ _ -> True + provideTunnelFun (Just "all") = return $ \_ _ -> True + provideTunnelFun (Just "none") = return $ \_ _ -> False + provideTunnelFun (Just "websocket") = return $ \_ -> \case + CustomPeerAddress addr | Just WebSocketAddress {} <- cast addr -> True + _ -> False + provideTunnelFun (Just name) = do + tell [ "Invalid value of --discovery-tunnel: ‘" <> name <> "’\n" ] + return $ \_ _ -> False + +servicesOptions :: [ OptDescr (Options -> Writer [ String ] Options) ] servicesOptions = concatMap helper $ "all" : map soptName availableServices where helper name = - [ Option [] ["enable-" <> name] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = True }) "" - , Option [] ["disable-" <> name] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = False }) "" + [ Option [] [ "enable-" <> name ] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = True }) "" + , Option [] [ "disable-" <> name ] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = False }) "" ] - so f opts = opts { optServices = f $ optServices opts } + so f opts = return opts { optServices = f $ optServices opts } change :: String -> (ServiceOption -> ServiceOption) -> [ServiceOption] -> [ServiceOption] change name f (s : ss) | soptName s == name || name == "all" @@ -193,13 +225,16 @@ getDefaultStorageDir = do main :: IO () main = do - (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case - (o, args, []) -> do - return (foldl (flip id) defaultOptions o, args) - (_, _, errs) -> do + let printErrors errs = do progName <- getProgName hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information." exitFailure + (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case + (wo, args, []) -> + case runWriter (foldM (flip ($)) defaultOptions wo) of + ( o, [] ) -> return ( o, args ) + ( _, errs ) -> printErrors errs + (_, _, errs) -> printErrors errs st <- liftIO $ case optStorage opts of DefaultStorage -> openStorage =<< getDefaultStorageDir @@ -207,7 +242,7 @@ main = do MemoryStorage -> memoryStorage case args of - ["cat-file", sref] -> do + [ "cat-file", sref ] -> do readRef st (BC.pack sref) >>= \case Nothing -> error "ref does not exist" Just ref -> BL.putStr $ lazyLoadBytes ref @@ -238,14 +273,22 @@ main = do Nothing -> putStrLn $ "Identity verification failed" _ -> error $ "unknown object type '" ++ objtype ++ "'" - ["show-generation", sref] -> readRef st (BC.pack sref) >>= \case + [ "show-generation", sref ] -> readRef st (BC.pack sref) >>= \case Nothing -> error "ref does not exist" Just ref -> print $ storedGeneration (wrappedLoad ref :: Stored Object) - ["update-identity"] -> do + [ "identity" ] -> do + loadHeads st >>= \case + (h : _) -> do + T.putStr $ showIdentityDetails $ headLocalIdentity h + [] -> do + T.putStrLn "no local state head" + exitFailure + + [ "update-identity" ] -> do withTerminal noCompletion $ \term -> do either (fail . showErebosError) return <=< runExceptT $ do - runReaderT (updateSharedIdentity term) =<< loadLocalStateHead term st + runReaderT (updateSharedIdentity term) =<< runReaderT (loadLocalStateHead term) st ("update-identity" : srefs) -> do withTerminal noCompletion $ \term -> do @@ -257,7 +300,7 @@ main = do (either (fail . showErebosError) return <=< runExceptT $ runReaderT (interactiveIdentityUpdate term idt) st) | otherwise -> error "invalid identity" - ["test"] -> runTestTool st + [ "test" ] -> runTestTool st [] -> do let header = "Usage: erebos [OPTION...]" @@ -287,7 +330,10 @@ main = do interactiveLoop :: Storage -> Options -> IO () interactiveLoop st opts = withTerminal commandCompletion $ \term -> do - erebosHead <- liftIO $ loadLocalStateHead term st + erebosHead <- either (fail . showErebosError) return <=< runExceptT . flip runReaderT st $ do + case optCreateIdentity opts of + Nothing -> loadLocalStateHead term + Just ( devName, names ) -> createLocalStateHead (names ++ [ devName ]) void $ printLine term $ T.unpack $ displayIdentity $ headLocalIdentity erebosHead let tui = hasTerminalUI term @@ -299,7 +345,7 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do Left cstate -> do pname <- case csContext cstate of NoContext -> return "" - SelectedPeer peer -> peerIdentity peer >>= return . \case + SelectedPeer peer -> getPeerIdentity peer >>= return . \case PeerIdentityFull pid -> maybe "<unnamed>" T.unpack $ idName $ finalOwner pid PeerIdentityRef wref _ -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">" PeerIdentityUnknown _ -> "<unknown>" @@ -315,54 +361,70 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do _ | all isSpace input -> getInputLinesTui eprompt '\\':rest -> (reverse ('\n':rest) ++) <$> getInputLinesTui (Right ">> ") _ -> return input - Nothing -> KeepPrompt mzero + Nothing + | tui -> KeepPrompt mzero + | otherwise -> KeepPrompt $ liftIO $ forever $ threadDelay 100000000 getInputCommandTui cstate = do - input <- getInputLinesTui cstate - let (CommandM cmd, line) = case input of - '/':rest -> let (scmd, args) = dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') rest - in if not (null scmd) && all isDigit scmd - then (cmdSelectContext, scmd) - else (fromMaybe (cmdUnknown scmd) $ lookup scmd commands, args) - _ -> (cmdSend, input) - return (cmd, line) - - getInputLinesPipe = do - join $ lift $ getInputLine term $ KeepPrompt . \case - Just input -> return input - Nothing -> liftIO $ forever $ threadDelay 100000000 - - getInputCommandPipe _ = do - input <- getInputLinesPipe - let (scmd, args) = dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') input - let (CommandM cmd, line) = (fromMaybe (cmdUnknown scmd) $ lookup scmd commands, args) - return (cmd, line) - - let getInputCommand = if tui then getInputCommandTui . Left - else getInputCommandPipe + let parseCommand cmdline = + case dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') cmdline of + ( scmd, args ) + | not (null scmd) && all isDigit scmd + -> ( cmdSelectContext, scmd ) + + | otherwise + -> ( fromMaybe (cmdUnknown scmd) $ lookup scmd commands, args ) + + ( CommandM cmd, line ) <- getInputLinesTui cstate >>= return . \case + '/' : input -> parseCommand input + input | not tui -> parseCommand input + input -> ( cmdSend, input ) + return ( cmd, line ) + + let getInputCommand = getInputCommandTui . Left + + contextVar <- liftIO $ newMVar NoContext _ <- liftIO $ do tzone <- getCurrentTimeZone - watchReceivedMessages erebosHead $ \smsg -> do - let msg = fromStored smsg - extPrintLn $ formatDirectMessage tzone msg - case optDmBotEcho opts of - Nothing -> return () - Just prefix -> do - res <- runExceptT $ flip runReaderT erebosHead $ sendDirectMessage (msgFrom msg) (prefix <> msgText msg) - case res of - Right reply -> extPrintLn $ formatDirectMessage tzone $ fromStored reply - Left err -> extPrintLn $ "Failed to send dm echo: " <> err + let self = finalOwner $ headLocalIdentity erebosHead + watchDirectMessageThreads erebosHead $ \prev cur -> do + forM_ (reverse $ dmThreadToListSince prev cur) $ \msg -> do + withMVar contextVar $ \ctx -> do + mbpid <- case ctx of + SelectedPeer peer -> getPeerIdentity peer >>= return . \case + PeerIdentityFull pid -> Just $ finalOwner pid + _ -> Nothing + SelectedContact contact + | Just cid <- contactIdentity contact -> return (Just cid) + SelectedConversation conv -> return $ conversationPeer conv + _ -> return Nothing + when (not tui || maybe False (msgPeer cur `sameIdentity`) mbpid) $ do + extPrintLn $ formatDirectMessage tzone msg + + case optDmBotEcho opts of + Just prefix + | not (msgFrom msg `sameIdentity` self) + -> do + void $ forkIO $ do + res <- runExceptT $ flip runReaderT erebosHead $ sendDirectMessage (msgFrom msg) (prefix <> msgText msg) + case res of + Right _ -> return () + Left err -> extPrintLn $ "Failed to send dm echo: " <> err + _ -> return () peers <- liftIO $ newMVar [] - contextOptions <- liftIO $ newMVar [] + contextOptions <- liftIO $ newMVar ( Nothing, [] ) chatroomSetVar <- liftIO $ newEmptyMVar let autoSubscribe = optChatroomAutoSubscribe opts chatroomList = fromSetBy (comparing roomStateData) . lookupSharedValue . lsShared . headObject $ erebosHead watched <- if isJust autoSubscribe || any roomStateSubscribe chatroomList - then fmap Just $ liftIO $ watchChatroomsForCli extPrintLn erebosHead chatroomSetVar contextOptions autoSubscribe - else return Nothing + then do + fmap Just $ liftIO $ watchChatroomsForCli tui extPrintLn erebosHead + chatroomSetVar contextVar contextOptions autoSubscribe + else do + return Nothing server <- liftIO $ do startServer (optServer opts) erebosHead extPrintLn $ @@ -374,10 +436,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do void $ liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange server - peerIdentity peer >>= \case + getPeerIdentity peer >>= \case pid@(PeerIdentityFull _) -> do dropped <- isPeerDropped peer - let shown = showPeer pid $ peerAddress peer + shown <- showPeer pid <$> getPeerAddress peer let update [] = ([(peer, shown)], (Nothing, "NEW")) update ((p,s):ps) | p == peer && dropped = (ps, (Nothing, "DEL")) @@ -389,8 +451,15 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do | otherwise = first (ctx:) $ ctxUpdate (n + 1) ctxs (op, updateType) <- modifyMVar peers (return . update) let updateType' = if dropped then "DEL" else updateType - idx <- modifyMVar contextOptions (return . ctxUpdate (1 :: Int)) - when (Just shown /= op) $ extPrintLn $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown + modifyMVar_ contextOptions $ \case + ( watch, clist ) + | watch == Just WatchPeers || not tui + -> do + let ( clist', idx ) = ctxUpdate (1 :: Int) clist + when (Just shown /= op) $ do + extPrintLn $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown + return ( Just WatchPeers, clist' ) + cur -> return cur _ -> return () let process :: CommandState -> MaybeT IO CommandState @@ -400,20 +469,23 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do Just h -> return h Nothing -> do lift $ extPrintLn "current head deleted" mzero - res <- liftIO $ runExceptT $ flip execStateT cstate { csHead = h } $ runReaderT cmd CommandInput - { ciServer = server - , ciTerminal = term - , ciLine = line - , ciPrint = extPrintLn - , ciOptions = opts - , ciPeers = liftIO $ modifyMVar peers $ \ps -> do - ps' <- filterM (fmap not . isPeerDropped . fst) ps - return (ps', ps') - , ciContextOptions = liftIO $ readMVar contextOptions - , ciSetContextOptions = \ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ctxs - , ciContextOptionsVar = contextOptions - , ciChatroomSetVar = chatroomSetVar - } + res <- liftIO $ modifyMVar contextVar $ \ctx -> do + res <- runExceptT $ flip execStateT cstate { csHead = h, csContext = ctx } $ runReaderT cmd CommandInput + { ciServer = server + , ciTerminal = term + , ciLine = line + , ciPrint = extPrintLn + , ciOptions = opts + , ciPeers = liftIO $ modifyMVar peers $ \ps -> do + ps' <- filterM (fmap not . isPeerDropped . fst) ps + return (ps', ps') + , ciContextOptions = liftIO $ snd <$> readMVar contextOptions + , ciSetContextOptions = \watch ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ( Just watch, ctxs ) + , ciContextVar = contextVar + , ciContextOptionsVar = contextOptions + , ciChatroomSetVar = chatroomSetVar + } + return ( either (const ctx) csContext res, res ) case res of Right cstate' | csQuit cstate' -> mzero @@ -427,10 +499,6 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do loop $ Just $ CommandState { csHead = erebosHead , csContext = NoContext -#ifdef ENABLE_ICE_SUPPORT - , csIceSessions = [] -#endif - , csIcePeer = Nothing , csWatchChatrooms = watched , csQuit = False } @@ -443,28 +511,33 @@ data CommandInput = CommandInput , ciPrint :: String -> IO () , ciOptions :: Options , ciPeers :: CommandM [(Peer, String)] - , ciContextOptions :: CommandM [CommandContext] - , ciSetContextOptions :: [CommandContext] -> Command - , ciContextOptionsVar :: MVar [ CommandContext ] + , ciContextOptions :: CommandM [ CommandContext ] + , ciSetContextOptions :: ContextWatchOptions -> [ CommandContext ] -> Command + , ciContextVar :: MVar CommandContext + , ciContextOptionsVar :: MVar ( Maybe ContextWatchOptions, [ CommandContext ] ) , ciChatroomSetVar :: MVar (Set ChatroomState) } data CommandState = CommandState { csHead :: Head LocalState , csContext :: CommandContext -#ifdef ENABLE_ICE_SUPPORT - , csIceSessions :: [IceSession] -#endif - , csIcePeer :: Maybe Peer , csWatchChatrooms :: Maybe WatchedHead , csQuit :: Bool } -data CommandContext = NoContext - | SelectedPeer Peer - | SelectedContact Contact - | SelectedChatroom ChatroomState - | SelectedConversation Conversation +data CommandContext + = NoContext + | SelectedPeer Peer + | SelectedContact Contact + | SelectedChatroom ChatroomState + | SelectedConversation Conversation + +data ContextWatchOptions + = WatchPeers + | WatchContacts + | WatchChatrooms + | WatchConversations + deriving (Eq) newtype CommandM a = CommandM (ReaderT CommandInput (StateT CommandState (ExceptT ErebosError IO)) a) deriving (Functor, Applicative, Monad, MonadReader CommandInput, MonadState CommandState, MonadError ErebosError) @@ -507,7 +580,7 @@ getSelectedConversation = gets csContext >>= getConversationFromContext getConversationFromContext :: CommandContext -> CommandM Conversation getConversationFromContext = \case - SelectedPeer peer -> peerIdentity peer >>= \case + SelectedPeer peer -> getPeerIdentity peer >>= \case PeerIdentityFull pid -> directMessageConversation $ finalOwner pid _ -> throwOtherError "incomplete peer identity" SelectedContact contact -> case contactIdentity contact of @@ -524,45 +597,39 @@ getSelectedOrManualContext :: CommandM CommandContext getSelectedOrManualContext = do asks ciLine >>= \case "" -> gets csContext - str | all isDigit str -> getContextByIndex (read str) + str | all isDigit str -> getContextByIndex id (read str) _ -> throwOtherError "invalid index" commands :: [(String, Command)] commands = - [ ("history", cmdHistory) - , ("peers", cmdPeers) - , ("peer-add", cmdPeerAdd) - , ("peer-add-public", cmdPeerAddPublic) - , ("peer-drop", cmdPeerDrop) - , ("send", cmdSend) - , ("delete", cmdDelete) - , ("update-identity", cmdUpdateIdentity) - , ("attach", cmdAttach) - , ("attach-accept", cmdAttachAccept) - , ("attach-reject", cmdAttachReject) - , ("chatrooms", cmdChatrooms) - , ("chatroom-create-public", cmdChatroomCreatePublic) - , ("contacts", cmdContacts) - , ("contact-add", cmdContactAdd) - , ("contact-accept", cmdContactAccept) - , ("contact-reject", cmdContactReject) - , ("conversations", cmdConversations) - , ("details", cmdDetails) - , ("discovery-init", cmdDiscoveryInit) - , ("discovery", cmdDiscovery) -#ifdef ENABLE_ICE_SUPPORT - , ("ice-create", cmdIceCreate) - , ("ice-destroy", cmdIceDestroy) - , ("ice-show", cmdIceShow) - , ("ice-connect", cmdIceConnect) - , ("ice-send", cmdIceSend) -#endif - , ("join", cmdJoin) - , ("join-as", cmdJoinAs) - , ("leave", cmdLeave) - , ("members", cmdMembers) - , ("select", cmdSelectContext) - , ("quit", cmdQuit) + [ ( "history", cmdHistory ) + , ( "identity", cmdIdentity ) + , ( "peers", cmdPeers ) + , ( "peer-add", cmdPeerAdd ) + , ( "peer-add-public", cmdPeerAddPublic ) + , ( "peer-drop", cmdPeerDrop ) + , ( "send", cmdSend ) + , ( "delete", cmdDelete ) + , ( "update-identity", cmdUpdateIdentity ) + , ( "attach", cmdAttach ) + , ( "attach-accept", cmdAttachAccept ) + , ( "attach-reject", cmdAttachReject ) + , ( "chatrooms", cmdChatrooms ) + , ( "chatroom-create-public", cmdChatroomCreatePublic ) + , ( "contacts", cmdContacts ) + , ( "contact-add", cmdContactAdd ) + , ( "contact-accept", cmdContactAccept ) + , ( "contact-reject", cmdContactReject ) + , ( "conversations", cmdConversations ) + , ( "new", cmdNew ) + , ( "details", cmdDetails ) + , ( "discovery", cmdDiscovery ) + , ( "join", cmdJoin ) + , ( "join-as", cmdJoinAs ) + , ( "leave", cmdLeave ) + , ( "members", cmdMembers ) + , ( "select", cmdSelectContext ) + , ( "quit", cmdQuit ) ] commandCompletion :: CompletionFunc IO @@ -585,7 +652,7 @@ cmdPeers :: Command cmdPeers = do peers <- join $ asks ciPeers set <- asks ciSetContextOptions - set $ map (SelectedPeer . fst) peers + set WatchPeers $ map (SelectedPeer . fst) peers forM_ (zip [1..] peers) $ \(i :: Int, (_, name)) -> do cmdPutStrLn $ "[" ++ show i ++ "] " ++ name @@ -597,11 +664,15 @@ cmdPeerAdd = void $ do [hostname] -> return (hostname, show discoveryPort) [] -> throwOtherError "missing peer address" addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just hostname) (Just port) + contextOptsVar <- asks ciContextOptionsVar + liftIO $ modifyMVar_ contextOptsVar $ return . first (const $ Just WatchPeers) liftIO $ serverPeer server (addrAddress addr) cmdPeerAddPublic :: Command cmdPeerAddPublic = do server <- asks ciServer + contextOptsVar <- asks ciContextOptionsVar + liftIO $ modifyMVar_ contextOptsVar $ return . first (const $ Just WatchPeers) liftIO $ mapM_ (serverPeer server . addrAddress) =<< gather 'a' where gather c @@ -635,8 +706,7 @@ cmdJoin = joinChatroom =<< getSelectedChatroom cmdJoinAs :: Command cmdJoinAs = do name <- asks ciLine - st <- getStorage - identity <- liftIO $ createIdentity st (Just $ T.pack name) Nothing + identity <- createIdentity (Just $ T.pack name) Nothing joinChatroomAs identity =<< getSelectedChatroom cmdLeave :: Command @@ -648,32 +718,36 @@ cmdMembers = do forM_ (chatroomMembers room) $ \x -> do cmdPutStrLn $ maybe "<unnamed>" T.unpack $ idName x -getContextByIndex :: Int -> CommandM CommandContext -getContextByIndex n = do - join (asks ciContextOptions) >>= \ctxs -> if - | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx - | otherwise -> throwOtherError "invalid index" +getContextByIndex :: (Maybe ContextWatchOptions -> Maybe ContextWatchOptions) -> Int -> CommandM CommandContext +getContextByIndex f n = do + contextOptsVar <- asks ciContextOptionsVar + join $ liftIO $ modifyMVar contextOptsVar $ \cur@( watch, ctxs ) -> if + | n > 0, (ctx : _) <- drop (n - 1) ctxs + -> return ( ( f watch, ctxs ), return ctx ) + + | otherwise + -> return ( cur, throwOtherError "invalid index" ) cmdSelectContext :: Command cmdSelectContext = do n <- read <$> asks ciLine - ctx <- getContextByIndex n + ctx <- getContextByIndex (const Nothing) n modify $ \s -> s { csContext = ctx } case ctx of SelectedChatroom rstate -> do when (not (roomStateSubscribe rstate)) $ do chatroomSetSubscribe (head $ roomStateData rstate) True _ -> return () + handleError (\_ -> return ()) $ do + conv <- getConversationFromContext ctx + tzone <- liftIO $ getCurrentTimeZone + mapM_ (cmdPutStrLn . formatMessage tzone) $ takeWhile messageUnread $ conversationHistory conv cmdSend :: Command cmdSend = void $ do text <- asks ciLine conv <- getSelectedConversation - sendMessage conv (T.pack text) >>= \case - Just msg -> do - tzone <- liftIO $ getCurrentTimeZone - cmdPutStrLn $ formatMessage tzone msg - Nothing -> return () + sendMessage conv (T.pack text) cmdDelete :: Command cmdDelete = void $ do @@ -690,6 +764,24 @@ cmdHistory = void $ do [] -> do cmdPutStrLn $ "<empty history>" +showIdentityDetails :: Foldable f => Identity f -> Text +showIdentityDetails identity = T.unlines $ go $ reverse $ unfoldOwners identity + where + go (i : is) = concat + [ maybeToList $ ("Name: " <>) <$> idName i + , map (("Ref: " <>) . T.pack . show . refDigest . storedRef) $ idDataF i + , map (("ExtRef: " <>) . T.pack . show . refDigest . storedRef) $ filter isExtension $ idExtDataF i + , do guard $ not (null is) + "" : "Device:" : map (" " <>) (go is) + ] + go [] = [] + isExtension x = case fromSigned x of BaseIdentityData {} -> False + _ -> True + +cmdIdentity :: Command +cmdIdentity = do + cmdPutStrLn . T.unpack . showIdentityDetails . localIdentity . fromStored =<< getLocalHead + cmdUpdateIdentity :: Command cmdUpdateIdentity = void $ do term <- asks ciTerminal @@ -704,8 +796,11 @@ cmdAttachAccept = attachAccept =<< getSelectedPeer cmdAttachReject :: Command cmdAttachReject = attachReject =<< getSelectedPeer -watchChatroomsForCli :: (String -> IO ()) -> Head LocalState -> MVar (Set ChatroomState) -> MVar [ CommandContext ] -> Maybe Int -> IO WatchedHead -watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe = do +watchChatroomsForCli + :: Bool -> (String -> IO ()) -> Head LocalState -> MVar (Set ChatroomState) + -> MVar CommandContext -> MVar ( Maybe ContextWatchOptions, [ CommandContext ] ) + -> Maybe Int -> IO WatchedHead +watchChatroomsForCli tui eprint h chatroomSetVar contextVar contextOptsVar autoSubscribe = do subscribedNumVar <- newEmptyMVar let ctxUpdate updateType (idx :: Int) rstate = \case @@ -744,28 +839,44 @@ watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe = do Just diff -> do modifyMVar_ chatroomSetVar $ return . const set + modifyMVar_ contextOptsVar $ \case + ( watch, clist ) + | watch == Just WatchChatrooms || not tui + -> do + let upd c = \case + AddedChatroom rstate -> ctxUpdate "NEW" 1 rstate c + RemovedChatroom rstate -> ctxUpdate "DEL" 1 rstate c + UpdatedChatroom _ rstate + | any ((\rsd -> not (null (rsdRoom rsd))) . fromStored) (roomStateData rstate) + -> do + ctxUpdate "UPD" 1 rstate c + | otherwise -> return c + ( watch, ) <$> foldM upd clist diff + cur -> return cur + forM_ diff $ \case AddedChatroom rstate -> do - modifyMVar_ contextVar $ ctxUpdate "NEW" 1 rstate modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then (+ 1) else id RemovedChatroom rstate -> do - modifyMVar_ contextVar $ ctxUpdate "DEL" 1 rstate modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then subtract 1 else id UpdatedChatroom oldroom rstate -> do - when (any ((\rsd -> not (null (rsdRoom rsd))) . fromStored) (roomStateData rstate)) $ do - modifyMVar_ contextVar $ ctxUpdate "UPD" 1 rstate when (any (not . null . rsdMessages . fromStored) (roomStateData rstate)) $ do - tzone <- getCurrentTimeZone - forM_ (reverse $ getMessagesSinceState rstate oldroom) $ \msg -> do - eprint $ concat $ - [ maybe "<unnamed>" T.unpack $ roomName =<< cmsgRoom msg - , formatTime defaultTimeLocale " [%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ cmsgTime msg - , maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg - , if cmsgLeave msg then " left" else "" - , maybe (if cmsgLeave msg then "" else " joined") ((": " ++) . T.unpack) $ cmsgText msg - ] + withMVar contextVar $ \ctx -> do + isSelected <- case ctx of + SelectedChatroom rstate' -> return $ isSameChatroom rstate' rstate + SelectedConversation conv -> return $ isChatroomStateConversation rstate conv + _ -> return False + when (not tui || isSelected) $ do + tzone <- getCurrentTimeZone + forM_ (reverse $ getMessagesSinceState rstate oldroom) $ \msg -> do + eprint $ concat $ + [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ cmsgTime msg + , maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg + , if cmsgLeave msg then " left" else "" + , maybe (if cmsgLeave msg then "" else " joined") ((": " ++) . T.unpack) $ cmsgText msg + ] modifyMVar_ subscribedNumVar $ return . (if roomStateSubscribe rstate then (+ 1) else id) . (if roomStateSubscribe oldroom then subtract 1 else id) @@ -777,9 +888,11 @@ ensureWatchedChatrooms = do eprint <- asks ciPrint h <- gets csHead chatroomSetVar <- asks ciChatroomSetVar - contextVar <- asks ciContextOptionsVar + contextVar <- asks ciContextVar + contextOptsVar <- asks ciContextOptionsVar autoSubscribe <- asks $ optChatroomAutoSubscribe . ciOptions - watched <- liftIO $ watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe + tui <- asks $ hasTerminalUI . ciTerminal + watched <- liftIO $ watchChatroomsForCli tui eprint h chatroomSetVar contextVar contextOptsVar autoSubscribe modify $ \s -> s { csWatchChatrooms = Just watched } Just _ -> return () @@ -789,7 +902,7 @@ cmdChatrooms = do chatroomSetVar <- asks ciChatroomSetVar chatroomList <- filter (not . roomStateDeleted) . fromSetBy (comparing roomStateData) <$> liftIO (readMVar chatroomSetVar) set <- asks ciSetContextOptions - set $ map SelectedChatroom chatroomList + set WatchChatrooms $ map SelectedChatroom chatroomList forM_ (zip [1..] chatroomList) $ \(i :: Int, rstate) -> do cmdPutStrLn $ "[" ++ show i ++ "] " ++ maybe "<unnamed>" T.unpack (roomName =<< roomStateRoom rstate) @@ -803,6 +916,8 @@ cmdChatroomCreatePublic = do getInputLine term $ KeepPrompt . maybe T.empty T.pack ensureWatchedChatrooms + contextOptsVar <- asks ciContextOptionsVar + liftIO $ modifyMVar_ contextOptsVar $ return . first (const $ Just WatchChatrooms) void $ createChatroom (if T.null name then Nothing else Just name) Nothing @@ -815,7 +930,7 @@ cmdContacts = do let contacts = fromSetBy (comparing contactName) $ lookupSharedValue $ lsShared $ headObject ehead verbose = "-v" `elem` args set <- asks ciSetContextOptions - set $ map SelectedContact contacts + set WatchContacts $ map SelectedContact contacts forM_ (zip [1..] contacts) $ \(i :: Int, c) -> do cmdPutStrLn $ T.unpack $ T.concat [ "[", T.pack (show i), "] ", contactName c @@ -841,19 +956,36 @@ cmdConversations :: Command cmdConversations = do conversations <- lookupConversations set <- asks ciSetContextOptions - set $ map SelectedConversation conversations + set WatchConversations $ map SelectedConversation conversations forM_ (zip [1..] conversations) $ \(i :: Int, conv) -> do cmdPutStrLn $ "[" ++ show i ++ "] " ++ T.unpack (conversationName conv) +cmdNew :: Command +cmdNew = do + conversations <- mapMaybe checkNew <$> lookupConversations + set <- asks ciSetContextOptions + set WatchConversations $ map (SelectedConversation . fst) conversations + tzone <- liftIO $ getCurrentTimeZone + forM_ (zip [1..] conversations) $ \(i :: Int, ( conv, msg )) -> do + cmdPutStrLn $ "[" ++ show i ++ "] " ++ T.unpack (conversationName conv) ++ " " ++ formatMessage tzone msg + where + checkNew conv + | (msg : _) <- conversationHistory conv + , messageUnread msg + = Just ( conv, msg ) + checkNew _ = Nothing + + cmdDetails :: Command cmdDetails = do getSelectedOrManualContext >>= \case SelectedPeer peer -> do + paddr <- getPeerAddress peer cmdPutStrLn $ unlines [ "Network peer:" - , " " <> show (peerAddress peer) + , " " <> show paddr ] - peerIdentity peer >>= \case + getPeerIdentity peer >>= \case PeerIdentityUnknown _ -> do cmdPutStrLn $ "unknown identity" PeerIdentityRef wref _ -> do @@ -908,19 +1040,6 @@ cmdDetails = do , map (BC.unpack . showRefDigest . refDigest . storedRef) $ idExtDataF cpid ] -cmdDiscoveryInit :: Command -cmdDiscoveryInit = void $ do - server <- asks ciServer - - (hostname, port) <- (words <$> asks ciLine) >>= return . \case - hostname:p:_ -> (hostname, p) - [hostname] -> (hostname, show discoveryPort) - [] -> ("discovery.erebosprotocol.net", show discoveryPort) - addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just hostname) (Just port) - peer <- liftIO $ serverPeer server (addrAddress addr) - sendToPeer peer $ DiscoverySelf [ T.pack "ICE" ] Nothing - modify $ \s -> s { csIcePeer = Just peer } - cmdDiscovery :: Command cmdDiscovery = void $ do server <- asks ciServer @@ -929,80 +1048,6 @@ cmdDiscovery = void $ do Nothing -> throwOtherError "failed to parse ref" Just dgst -> discoverySearch server dgst -#ifdef ENABLE_ICE_SUPPORT - -cmdIceCreate :: Command -cmdIceCreate = do - let getRole = \case - 'm':_ -> PjIceSessRoleControlling - 's':_ -> PjIceSessRoleControlled - _ -> PjIceSessRoleUnknown - - ( role, stun, turn ) <- asks (words . ciLine) >>= \case - [] -> return ( PjIceSessRoleControlling, Nothing, Nothing ) - [ role ] -> return - ( getRole role, Nothing, Nothing ) - [ role, server ] -> return - ( getRole role - , Just ( T.pack server, 0 ) - , Just ( T.pack server, 0 ) - ) - [ role, server, port ] -> return - ( getRole role - , Just ( T.pack server, read port ) - , Just ( T.pack server, read port ) - ) - [ role, stunServer, stunPort, turnServer, turnPort ] -> return - ( getRole role - , Just ( T.pack stunServer, read stunPort ) - , Just ( T.pack turnServer, read turnPort ) - ) - _ -> throwOtherError "invalid parameters" - - eprint <- asks ciPrint - Just cfg <- liftIO $ iceCreateConfig stun turn - sess <- liftIO $ iceCreateSession cfg role $ eprint <=< iceShow - modify $ \s -> s { csIceSessions = sess : csIceSessions s } - -cmdIceDestroy :: Command -cmdIceDestroy = do - s:ss <- gets csIceSessions - modify $ \st -> st { csIceSessions = ss } - liftIO $ iceDestroy s - -cmdIceShow :: Command -cmdIceShow = do - sess <- gets csIceSessions - eprint <- asks ciPrint - liftIO $ forM_ (zip [1::Int ..] sess) $ \(i, s) -> do - eprint $ "[" ++ show i ++ "]" - eprint =<< iceShow s - -cmdIceConnect :: Command -cmdIceConnect = do - s:_ <- gets csIceSessions - server <- asks ciServer - term <- asks ciTerminal - let loadInfo = - getInputLine term (KeepPrompt . maybe BC.empty BC.pack) >>= \case - line | BC.null line -> return [] - | otherwise -> (line :) <$> loadInfo - Right remote <- liftIO $ do - st <- memoryStorage - pst <- derivePartialStorage st - setPrompt term "" - rbytes <- (BL.fromStrict . BC.unlines) <$> loadInfo - copyRef st =<< storeRawBytes pst (BL.fromChunks [ BC.pack "rec ", BC.pack (show (BL.length rbytes)), BC.singleton '\n' ] `BL.append` rbytes) - liftIO $ iceConnect s (load remote) $ void $ serverPeerIce server s - -cmdIceSend :: Command -cmdIceSend = void $ do - s:_ <- gets csIceSessions - server <- asks ciServer - liftIO $ serverPeerIce server s - -#endif - cmdQuit :: Command cmdQuit = modify $ \s -> s { csQuit = True } diff --git a/main/State.hs b/main/State.hs index 150178e..5d66ba9 100644 --- a/main/State.hs +++ b/main/State.hs @@ -1,15 +1,17 @@ module State ( loadLocalStateHead, + createLocalStateHead, updateSharedIdentity, interactiveIdentityUpdate, ) where +import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class import Data.Foldable -import Data.Maybe import Data.Proxy +import Data.Text (Text) import Data.Text qualified as T import Erebos.Error @@ -22,34 +24,67 @@ import Erebos.Storage import Terminal -loadLocalStateHead :: MonadIO m => Terminal -> Storage -> m (Head LocalState) -loadLocalStateHead term st = loadHeads st >>= \case - (h:_) -> return h - [] -> liftIO $ do - setPrompt term "Name: " - name <- getInputLine term $ KeepPrompt . maybe T.empty T.pack +loadLocalStateHead + :: (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m) + => Terminal -> m (Head LocalState) +loadLocalStateHead term = getStorage >>= loadHeads >>= \case + (h : _) -> return h + [] -> do + name <- liftIO $ do + setPrompt term "Name: " + getInputLine term $ KeepPrompt . maybe T.empty T.pack - setPrompt term "Device: " - devName <- getInputLine term $ KeepPrompt . maybe T.empty T.pack + devName <- liftIO $ do + setPrompt term "Device: " + getInputLine term $ KeepPrompt . maybe T.empty T.pack - owner <- if - | T.null name -> return Nothing - | otherwise -> Just <$> createIdentity st (Just name) Nothing + ( owner, shared ) <- if + | T.null name -> do + return ( Nothing, [] ) + | otherwise -> do + owner <- createIdentity (Just name) Nothing + shared <- mstore SharedState + { ssPrev = [] + , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy + , ssValue = [ storedRef $ idExtData owner ] + } + return ( Just owner, [ shared ] ) - identity <- createIdentity st (if T.null devName then Nothing else Just devName) owner + identity <- createIdentity (if T.null devName then Nothing else Just devName) owner - shared <- wrappedStore st $ SharedState - { ssPrev = [] - , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy - , ssValue = [ storedRef $ idExtData $ fromMaybe identity owner ] - } + st <- getStorage storeHead st $ LocalState { lsPrev = Nothing , lsIdentity = idExtData identity - , lsShared = [ shared ] + , lsShared = shared , lsOther = [] } +createLocalStateHead + :: (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m) + => [ Maybe Text ] -> m (Head LocalState) +createLocalStateHead [] = throwOtherError "createLocalStateHead: empty name list" +createLocalStateHead ( ownerName : names ) = do + owner <- createIdentity ownerName Nothing + identity <- foldM createSingleIdentity owner names + shared <- case names of + [] -> return [] + _ : _ -> do + fmap (: []) $ mstore SharedState + { ssPrev = [] + , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy + , ssValue = [ storedRef $ idExtData owner ] + } + st <- getStorage + storeHead st $ LocalState + { lsPrev = Nothing + , lsIdentity = idExtData identity + , lsShared = shared + , lsOther = [] + } + where + createSingleIdentity owner name = createIdentity name (Just owner) + updateSharedIdentity :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Terminal -> m () updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case diff --git a/main/Terminal.hs b/main/Terminal.hs index 150bd8c..b8b953f 100644 --- a/main/Terminal.hs +++ b/main/Terminal.hs @@ -44,14 +44,20 @@ data Terminal = Terminal , termShowPrompt :: TVar Bool , termInput :: TVar ( String, String ) , termBottomLines :: TVar [ String ] + , termHistory :: TVar [ String ] + , termHistoryPos :: TVar Int + , termHistoryStash :: TVar ( String, String ) } data TerminalLine = TerminalLine { tlTerminal :: Terminal + , tlLineCount :: Int } data Input = InputChar Char + | InputMoveUp + | InputMoveDown | InputMoveRight | InputMoveLeft | InputMoveEnd @@ -84,6 +90,9 @@ initTerminal termCompletionFunc = do termShowPrompt <- newTVarIO False termInput <- newTVarIO ( "", "" ) termBottomLines <- newTVarIO [] + termHistory <- newTVarIO [] + termHistoryPos <- newTVarIO 0 + termHistoryStash <- newTVarIO ( "", "" ) return Terminal {..} bracketSet :: IO a -> (a -> IO b) -> a -> IO c -> IO c @@ -112,6 +121,8 @@ getInput = do '\ESC' -> do esc <- readEsc case parseEsc esc of + Just ( 'A' , [] ) -> return InputMoveUp + Just ( 'B' , [] ) -> return InputMoveDown Just ( 'C' , [] ) -> return InputMoveRight Just ( 'D' , [] ) -> return InputMoveLeft _ -> return (InputEscape esc) @@ -119,6 +130,8 @@ getInput = do '\DEL' -> return InputBackspace '\NAK' -> return InputClear '\ETB' -> return InputBackWord + '\DLE' -> return InputMoveUp + '\SO' -> return InputMoveDown '\SOH' -> return InputMoveStart '\ENQ' -> return InputMoveEnd '\EOT' -> return InputEnd @@ -136,19 +149,33 @@ getInput = do getInputLine :: Terminal -> (Maybe String -> InputHandling a) -> IO a getInputLine term@Terminal {..} handleResult = do - withMVar termLock $ \_ -> do - prompt <- atomically $ do - writeTVar termShowPrompt True - readTVar termPrompt - putStr $ prompt <> "\ESC[K" - drawBottomLines term - hFlush stdout - (handleResult <$> go) >>= \case + when termAnsi $ do + withMVar termLock $ \_ -> do + prompt <- atomically $ do + writeTVar termShowPrompt True + readTVar termPrompt + putStr $ prompt <> "\ESC[K" + drawBottomLines term + hFlush stdout + + mbLine <- go + forM_ mbLine $ \line -> do + let addLine xs + | null line = xs + | (x : _) <- xs, x == line = xs + | otherwise = line : xs + atomically $ do + writeTVar termHistory . addLine =<< readTVar termHistory + writeTVar termHistoryPos 0 + + case handleResult mbLine of KeepPrompt x -> do - termPutStr term "\n\ESC[J" + when termAnsi $ do + termPutStr term "\n\ESC[J" return x ErasePrompt x -> do - termPutStr term "\r\ESC[J" + when termAnsi $ do + termPutStr term "\r\ESC[J" return x where go = getInput >>= \case @@ -156,11 +183,12 @@ getInputLine term@Terminal {..} handleResult = do atomically $ do ( pre, post ) <- readTVar termInput writeTVar termInput ( "", "" ) - writeTVar termShowPrompt False - writeTVar termBottomLines [] + when termAnsi $ do + writeTVar termShowPrompt False + writeTVar termBottomLines [] return $ Just $ pre ++ post - InputChar '\t' -> do + InputChar '\t' | termAnsi -> do options <- withMVar termLock $ const $ do ( pre, post ) <- atomically $ readTVar termInput let updatePrompt pre' = do @@ -179,9 +207,11 @@ getInputLine term@Terminal {..} handleResult = do ( unused, completions@(c : cs) ) -> do let commonPrefixes' x y = fmap (\( common, _, _ ) -> common) $ T.commonPrefixes x y case foldl' (\mbcommon cur -> commonPrefixes' cur =<< mbcommon) (Just $ replacement c) (fmap replacement cs) of - Just common -> updatePrompt $ T.unpack unused ++ T.unpack common - Nothing -> return () - return $ map replacement completions + Just common | T.unpack common /= pre -> do + updatePrompt $ T.unpack unused ++ T.unpack common + return [] + _ -> do + return $ map replacement completions ( _, [] ) -> do return [] @@ -196,6 +226,37 @@ getInputLine term@Terminal {..} handleResult = do InputChar _ -> go + InputMoveUp -> withInput $ \prepost -> do + hist <- readTVar termHistory + pos <- readTVar termHistoryPos + case drop pos hist of + ( h : _ ) -> do + when (pos == 0) $ do + writeTVar termHistoryStash prepost + writeTVar termHistoryPos (pos + 1) + writeTVar termInput ( h, "" ) + ("\r\ESC[K" <>) <$> getCurrentPromptLine term + [] -> do + return "" + + InputMoveDown -> withInput $ \_ -> do + readTVar termHistoryPos >>= \case + 0 -> do + return "" + 1 -> do + writeTVar termHistoryPos 0 + writeTVar termInput =<< readTVar termHistoryStash + ("\r\ESC[K" <>) <$> getCurrentPromptLine term + pos -> do + writeTVar termHistoryPos (pos - 1) + hist <- readTVar termHistory + case drop (pos - 2) hist of + ( h : _ ) -> do + writeTVar termInput ( h, "" ) + ("\r\ESC[K" <>) <$> getCurrentPromptLine term + [] -> do + return "" + InputMoveRight -> withInput $ \case ( pre, c : post ) -> do writeTVar termInput ( pre ++ [ c ], post ) @@ -241,7 +302,7 @@ getInputLine term@Terminal {..} handleResult = do withInput f = do withMVar termLock $ const $ do str <- atomically $ f =<< readTVar termInput - when (not $ null str) $ do + when (termAnsi && not (null str)) $ do putStr str hFlush stdout go @@ -254,6 +315,8 @@ getCurrentPromptLine Terminal {..} = do return $ prompt <> pre <> "\ESC[s" <> post <> "\ESC[u" setPrompt :: Terminal -> String -> IO () +setPrompt Terminal { termAnsi = False } _ = do + return () setPrompt term@Terminal {..} prompt = do withMVar termLock $ \_ -> do join $ atomically $ do @@ -269,17 +332,26 @@ setPrompt term@Terminal {..} prompt = do printLine :: Terminal -> String -> IO TerminalLine printLine tlTerminal@Terminal {..} str = do withMVar termLock $ \_ -> do - promptLine <- atomically $ do - readTVar termShowPrompt >>= \case - True -> getCurrentPromptLine tlTerminal - False -> return "" - putStr $ "\r\ESC[K" <> str <> "\n\ESC[K" <> promptLine - drawBottomLines tlTerminal + let strLines = lines str + tlLineCount = length strLines + if termAnsi + then do + promptLine <- atomically $ do + readTVar termShowPrompt >>= \case + True -> getCurrentPromptLine tlTerminal + False -> return "" + putStr $ "\r\ESC[K" <> unlines strLines <> "\ESC[K" <> promptLine + drawBottomLines tlTerminal + else do + putStr $ unlines strLines + hFlush stdout return TerminalLine {..} printBottomLines :: Terminal -> String -> IO () +printBottomLines Terminal { termAnsi = False } _ = do + return () printBottomLines term@Terminal {..} str = do case lines str of [] -> clearBottomLines term @@ -290,6 +362,8 @@ printBottomLines term@Terminal {..} str = do hFlush stdout clearBottomLines :: Terminal -> IO () +clearBottomLines Terminal { termAnsi = False } = do + return () clearBottomLines Terminal {..} = do withMVar termLock $ \_ -> do atomically (readTVar termBottomLines) >>= \case diff --git a/main/Test.hs b/main/Test.hs index c563291..da49257 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -15,10 +15,12 @@ import Control.Monad.State import Crypto.Random import Data.Bool +import Data.ByteString (ByteString) import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.ByteString.Lazy.Char8 qualified as BL +import Data.Char import Data.Foldable import Data.Ord import Data.Text (Text) @@ -39,6 +41,7 @@ import Erebos.Contact import Erebos.DirectMessage import Erebos.Discovery import Erebos.Identity +import Erebos.Invite import Erebos.Network import Erebos.Object import Erebos.Pairing @@ -117,9 +120,9 @@ runTestTool st = do getLineMb :: MonadIO m => m (Maybe Text) getLineMb = liftIO $ catchIOError (Just <$> T.getLine) (\e -> if isEOFError e then return Nothing else ioError e) -getLines :: MonadIO m => m [Text] -getLines = getLineMb >>= \case - Just line | not (T.null line) -> (line:) <$> getLines +getLines :: MonadIO m => Text -> m [ Text ] +getLines eof = getLineMb >>= \case + Just line | line /= eof -> (line :) <$> getLines eof _ -> return [] getHead :: CommandM (Head LocalState) @@ -128,6 +131,26 @@ getHead = do modify $ \s -> s { tsHead = Just h } return h +showHex :: ByteString -> ByteString +showHex = B.concat . map showHexByte . B.unpack + where showHexChar x | x < 10 = x + o '0' + | otherwise = x + o 'a' - 10 + showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ] + o = fromIntegral . ord + +readHex :: ByteString -> Maybe ByteString +readHex = return . B.concat <=< readHex' + where readHex' bs | B.null bs = Just [] + readHex' bs = do (bx, bs') <- B.uncons bs + (by, bs'') <- B.uncons bs' + x <- hexDigit bx + y <- hexDigit by + (B.singleton (x * 16 + y) :) <$> readHex' bs'' + hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0' + | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10 + | otherwise = Nothing + o = fromIntegral . ord + type Output = MVar () @@ -227,14 +250,33 @@ directMessageAttributes out = DirectMessageAttributes { dmOwnerMismatch = afterCommit $ outLine out "dm-owner-mismatch" } -dmReceivedWatcher :: Output -> Stored DirectMessage -> IO () -dmReceivedWatcher out smsg = do - let msg = fromStored smsg - outLine out $ unwords - [ "dm-received" - , "from", maybe "<unnamed>" T.unpack $ idName $ msgFrom msg - , "text", T.unpack $ msgText msg - ] +discoveryAttributes :: DiscoveryAttributes +discoveryAttributes = (defaultServiceAttributes Proxy) + { discoveryProvideTunnel = \_ _ -> False + } + +inviteAttributes :: Output -> InviteServiceAttributes +inviteAttributes out = (defaultServiceAttributes Proxy) + { inviteHookAccepted = \token -> do + pid <- asks svcPeerIdentity + afterCommit $ outLine out $ "invite-accepted " <> BC.unpack (showHex token) <> " " <> (BC.unpack $ showRef $ storedRef $ idExtData pid) + , inviteHookReplyContact = \token _ -> do + afterCommit $ outLine out $ "invite-accept-done " <> BC.unpack (showHex token) <> " contact" + , inviteHookReplyInvalid = \token -> do + afterCommit $ outLine out $ "invite-accept-done " <> BC.unpack (showHex token) <> " invalid" + } + +dmThreadWatcher :: ComposedIdentity -> Output -> DirectMessageThread -> DirectMessageThread -> IO () +dmThreadWatcher self out prev cur = do + forM_ (reverse $ dmThreadToListSinceUnread prev cur) $ \( msg, new ) -> do + outLine out $ unwords + [ if sameIdentity self (msgFrom msg) + then "dm-sent" + else "dm-received" + , "from", maybe "<unnamed>" T.unpack $ idName $ msgFrom msg + , "new", if new then "yes" else "no" + , "text", T.unpack $ msgText msg + ] newtype CommandM a = CommandM (ReaderT TestInput (StateT TestState (ExceptT ErebosError IO)) a) @@ -258,63 +300,72 @@ instance MonadHead LocalState CommandM where type Command = CommandM () -commands :: [(Text, Command)] -commands = map (T.pack *** id) - [ ("store", cmdStore) - , ("load", cmdLoad) - , ("stored-generation", cmdStoredGeneration) - , ("stored-roots", cmdStoredRoots) - , ("stored-set-add", cmdStoredSetAdd) - , ("stored-set-list", cmdStoredSetList) - , ("head-create", cmdHeadCreate) - , ("head-replace", cmdHeadReplace) - , ("head-watch", cmdHeadWatch) - , ("head-unwatch", cmdHeadUnwatch) - , ("create-identity", cmdCreateIdentity) - , ("identity-info", cmdIdentityInfo) - , ("start-server", cmdStartServer) - , ("stop-server", cmdStopServer) - , ("peer-add", cmdPeerAdd) - , ("peer-drop", cmdPeerDrop) - , ("peer-list", cmdPeerList) - , ("test-message-send", cmdTestMessageSend) - , ("test-stream-open", cmdTestStreamOpen) - , ("test-stream-close", cmdTestStreamClose) - , ("test-stream-send", cmdTestStreamSend) - , ("local-state-get", cmdLocalStateGet) - , ("local-state-replace", cmdLocalStateReplace) - , ("local-state-wait", cmdLocalStateWait) - , ("shared-state-get", cmdSharedStateGet) - , ("shared-state-wait", cmdSharedStateWait) - , ("watch-local-identity", cmdWatchLocalIdentity) - , ("watch-shared-identity", cmdWatchSharedIdentity) - , ("update-local-identity", cmdUpdateLocalIdentity) - , ("update-shared-identity", cmdUpdateSharedIdentity) - , ("attach-to", cmdAttachTo) - , ("attach-accept", cmdAttachAccept) - , ("attach-reject", cmdAttachReject) - , ("contact-request", cmdContactRequest) - , ("contact-accept", cmdContactAccept) - , ("contact-reject", cmdContactReject) - , ("contact-list", cmdContactList) - , ("contact-set-name", cmdContactSetName) - , ("dm-send-peer", cmdDmSendPeer) - , ("dm-send-contact", cmdDmSendContact) - , ("dm-list-peer", cmdDmListPeer) - , ("dm-list-contact", cmdDmListContact) - , ("chatroom-create", cmdChatroomCreate) - , ("chatroom-delete", cmdChatroomDelete) - , ("chatroom-list-local", cmdChatroomListLocal) - , ("chatroom-watch-local", cmdChatroomWatchLocal) - , ("chatroom-set-name", cmdChatroomSetName) - , ("chatroom-subscribe", cmdChatroomSubscribe) - , ("chatroom-unsubscribe", cmdChatroomUnsubscribe) - , ("chatroom-members", cmdChatroomMembers) - , ("chatroom-join", cmdChatroomJoin) - , ("chatroom-join-as", cmdChatroomJoinAs) - , ("chatroom-leave", cmdChatroomLeave) - , ("chatroom-message-send", cmdChatroomMessageSend) - , ("discovery-connect", cmdDiscoveryConnect) +commands :: [ ( Text, Command ) ] +commands = + [ ( "store", cmdStore ) + , ( "store-raw", cmdStoreRaw ) + , ( "load", cmdLoad ) + , ( "load-type", cmdLoadType ) + , ( "stored-generation", cmdStoredGeneration ) + , ( "stored-roots", cmdStoredRoots ) + , ( "stored-set-add", cmdStoredSetAdd ) + , ( "stored-set-list", cmdStoredSetList ) + , ( "stored-difference", cmdStoredDifference ) + , ( "head-create", cmdHeadCreate ) + , ( "head-replace", cmdHeadReplace ) + , ( "head-watch", cmdHeadWatch ) + , ( "head-unwatch", cmdHeadUnwatch ) + , ( "create-identity", cmdCreateIdentity ) + , ( "identity-info", cmdIdentityInfo ) + , ( "start-server", cmdStartServer ) + , ( "stop-server", cmdStopServer ) + , ( "peer-add", cmdPeerAdd ) + , ( "peer-drop", cmdPeerDrop ) + , ( "peer-list", cmdPeerList ) + , ( "test-message-send", cmdTestMessageSend ) + , ( "test-stream-open", cmdTestStreamOpen ) + , ( "test-stream-close", cmdTestStreamClose ) + , ( "test-stream-send", cmdTestStreamSend ) + , ( "local-state-get", cmdLocalStateGet ) + , ( "local-state-replace", cmdLocalStateReplace ) + , ( "local-state-wait", cmdLocalStateWait ) + , ( "shared-state-get", cmdSharedStateGet ) + , ( "shared-state-wait", cmdSharedStateWait ) + , ( "watch-local-identity", cmdWatchLocalIdentity ) + , ( "watch-shared-identity", cmdWatchSharedIdentity ) + , ( "update-local-identity", cmdUpdateLocalIdentity ) + , ( "update-shared-identity", cmdUpdateSharedIdentity ) + , ( "attach-to", cmdAttachTo ) + , ( "attach-accept", cmdAttachAccept ) + , ( "attach-reject", cmdAttachReject ) + , ( "contact-request", cmdContactRequest ) + , ( "contact-accept", cmdContactAccept ) + , ( "contact-reject", cmdContactReject ) + , ( "contact-list", cmdContactList ) + , ( "contact-set-name", cmdContactSetName ) + , ( "dm-send-peer", cmdDmSendPeer ) + , ( "dm-send-contact", cmdDmSendContact ) + , ( "dm-send-identity", cmdDmSendIdentity ) + , ( "dm-list-peer", cmdDmListPeer ) + , ( "dm-list-contact", cmdDmListContact ) + , ( "dm-list-identity", cmdDmListIdentity ) + , ( "dm-mark-seen", cmdDmMarkSeen ) + , ( "chatroom-create", cmdChatroomCreate ) + , ( "chatroom-delete", cmdChatroomDelete ) + , ( "chatroom-list-local", cmdChatroomListLocal ) + , ( "chatroom-watch-local", cmdChatroomWatchLocal ) + , ( "chatroom-set-name", cmdChatroomSetName ) + , ( "chatroom-subscribe", cmdChatroomSubscribe ) + , ( "chatroom-unsubscribe", cmdChatroomUnsubscribe ) + , ( "chatroom-members", cmdChatroomMembers ) + , ( "chatroom-join", cmdChatroomJoin ) + , ( "chatroom-join-as", cmdChatroomJoinAs ) + , ( "chatroom-leave", cmdChatroomLeave ) + , ( "chatroom-message-send", cmdChatroomMessageSend ) + , ( "discovery-connect", cmdDiscoveryConnect ) + , ( "discovery-tunnel", cmdDiscoveryTunnel ) + , ( "invite-contact-create", cmdInviteContactCreate ) + , ( "invite-accept", cmdInviteAccept ) ] cmdStore :: Command @@ -322,7 +373,7 @@ cmdStore = do st <- asks tiStorage pst <- liftIO $ derivePartialStorage st [otype] <- asks tiParams - ls <- getLines + ls <- getLines T.empty let cnt = encodeUtf8 $ T.unlines ls full = BL.fromChunks @@ -335,6 +386,18 @@ cmdStore = do Right ref -> cmdOut $ "store-done " ++ show (refDigest ref) Left _ -> cmdOut $ "store-failed" +cmdStoreRaw :: Command +cmdStoreRaw = do + st <- asks tiStorage + pst <- liftIO $ derivePartialStorage st + [ eof ] <- asks tiParams + ls <- getLines eof + + let full = BL.fromStrict $ BC.init $ encodeUtf8 $ T.unlines ls + liftIO (copyRef st =<< storeRawBytes pst full) >>= \case + Right ref -> cmdOut $ "store-done " ++ show (refDigest ref) + Left _ -> cmdOut $ "store-failed" + cmdLoad :: Command cmdLoad = do st <- asks tiStorage @@ -347,6 +410,20 @@ cmdLoad = do cmdOut $ "load-line " <> T.unpack (decodeUtf8 $ BL.toStrict line) cmdOut "load-done" +cmdLoadType :: Command +cmdLoadType = do + st <- asks tiStorage + [ tref ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tref + let obj = load @Object ref + let otype = case obj of + Blob {} -> "blob" + Rec {} -> "rec" + OnDemand {} -> "ondemand" + ZeroObject {} -> "zero" + UnknownObject utype _ -> "unknown " <> decodeUtf8 utype + cmdOut $ "load-type " <> T.unpack otype + cmdStoredGeneration :: Command cmdStoredGeneration = do st <- asks tiStorage @@ -368,7 +445,7 @@ cmdStoredSetAdd = do [Just iref, Just sref] -> return (wrappedLoad iref, loadSet @[Stored Object] sref) [Just iref] -> return (wrappedLoad iref, emptySet) _ -> fail "unexpected parameters" - set' <- storeSetAdd st [item] set + set' <- storeSetAdd [ item ] set cmdOut $ "stored-set-add" ++ concatMap ((' ':) . show . refDigest . storedRef) (toComponents set') cmdStoredSetList :: Command @@ -381,6 +458,19 @@ cmdStoredSetList = do cmdOut $ "stored-set-item" ++ concatMap ((' ':) . show . refDigest . storedRef) item cmdOut $ "stored-set-done" +cmdStoredDifference :: Command +cmdStoredDifference = do + st <- asks tiStorage + ( trefs1, "|" : trefs2 ) <- span (/= "|") <$> asks tiParams + + let loadObjs = mapM (maybe (fail "invalid ref") (return . wrappedLoad @Object) <=< liftIO . readRef st . encodeUtf8) + objs1 <- loadObjs trefs1 + objs2 <- loadObjs trefs2 + + forM_ (storedDifference objs1 objs2) $ \item -> do + cmdOut $ "stored-difference-item " ++ (show $ refDigest $ storedRef item) + cmdOut $ "stored-difference-done" + cmdHeadCreate :: Command cmdHeadCreate = do [ ttid, tref ] <- asks tiParams @@ -435,7 +525,8 @@ cmdHeadUnwatch = do initTestHead :: Head LocalState -> Command initTestHead h = do - _ <- liftIO . watchReceivedMessages h . dmReceivedWatcher =<< asks tiOutput + let self = finalOwner $ headLocalIdentity h + _ <- liftIO . watchDirectMessageThreads h . dmThreadWatcher self =<< asks tiOutput modify $ \s -> s { tsHead = Just h } loadTestHead :: CommandM (Head LocalState) @@ -458,13 +549,13 @@ cmdCreateIdentity = do st <- asks tiStorage names <- asks tiParams - h <- liftIO $ do + h <- do Just identity <- if null names - then Just <$> createIdentity st Nothing Nothing - else foldrM (\n o -> Just <$> createIdentity st (Just n) o) Nothing names + then Just <$> createIdentity Nothing Nothing + else foldrM (\n o -> Just <$> createIdentity (Just n) o) Nothing names shared <- case names of - _:_:_ -> (:[]) <$> makeSharedStateUpdate st (Just $ finalOwner identity) [] + _:_:_ -> (: []) <$> makeSharedStateUpdate (Just $ finalOwner identity) [] _ -> return [] storeHead st $ LocalState @@ -497,21 +588,32 @@ cmdStartServer = do let parseParams = \case (name : value : rest) - | name == "services" -> T.splitOn "," value + | name == "services" -> second ( map splitServiceParams (T.splitOn "," value) ++ ) (parseParams rest) + (name : rest) + | name == "test-log" -> first (\o -> o { serverTestLog = True }) (parseParams rest) | otherwise -> parseParams rest - _ -> [] - serviceNames <- parseParams <$> asks tiParams + _ -> ( defaultServerOptions { serverErrorPrefix = "server-error-message " }, [] ) + + splitServiceParams svc = + case T.splitOn ":" svc of + name : params -> ( name, params ) + _ -> ( svc, [] ) + + ( serverOptions, serviceNames ) <- parseParams <$> asks tiParams h <- getOrLoadHead rsPeers <- liftIO $ newMVar (1, []) services <- forM serviceNames $ \case - "attach" -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" - "chatroom" -> return $ someService @ChatroomService Proxy - "contact" -> return $ someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact" - "discovery" -> return $ someService @DiscoveryService Proxy - "dm" -> return $ someServiceAttr $ directMessageAttributes out - "sync" -> return $ someService @SyncService Proxy - "test" -> return $ someServiceAttr $ (defaultServiceAttributes Proxy) + ( "attach", _ ) -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach" + ( "chatroom", _ ) -> return $ someService @ChatroomService Proxy + ( "contact", _ ) -> return $ someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact" + ( "discovery", params ) -> return $ someServiceAttr $ discoveryAttributes + { discoveryProvideTunnel = \_ _ -> "tunnel" `elem` params + } + ( "dm", _ ) -> return $ someServiceAttr $ directMessageAttributes out + ( "invite", _ ) -> return $ someServiceAttr $ inviteAttributes out + ( "sync", _ ) -> return $ someService @SyncService Proxy + ( "test", _ ) -> return $ someServiceAttr $ (defaultServiceAttributes Proxy) { testMessageReceived = \obj otype len sref -> do liftIO $ do void $ store (headStorage h) obj @@ -530,17 +632,22 @@ cmdStartServer = do outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ] go } - sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" + ( sname, _ ) -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" - rsServer <- liftIO $ startServer defaultServerOptions h (B.hPutStr stderr . (`BC.snoc` '\n') . BC.pack) services + let logPrint str = do BC.hPutStrLn stdout (BC.pack str) + hFlush stdout + rsServer <- liftIO $ startServer serverOptions h logPrint services rsPeerThread <- liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange rsServer let printPeer TestPeer {..} = do - params <- peerIdentity tpPeer >>= return . \case - PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) - _ -> [ "addr", show (peerAddress tpPeer) ] + params <- getPeerIdentity tpPeer >>= \case + PeerIdentityFull pid -> do + return $ ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) + _ -> do + paddr <- getPeerAddress tpPeer + return $ [ "addr", show paddr ] outLine out $ unwords $ [ "peer", show tpIndex ] ++ params update ( tpIndex, [] ) = do @@ -591,10 +698,11 @@ cmdPeerList = do tpeers <- liftIO $ readMVar rsPeers forM_ peers $ \peer -> do Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers - mbpid <- peerIdentity peer + mbpid <- getPeerIdentity peer + paddr <- getPeerAddress peer cmdOut $ unwords $ concat [ [ "peer-list-item", show (tpIndex tp) ] - , [ "addr", show (peerAddress peer) ] + , [ "addr", show paddr ] , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) _ -> [] ] @@ -807,7 +915,7 @@ cmdContactSetName = do cmdDmSendPeer :: Command cmdDmSendPeer = do [spidx, msg] <- asks tiParams - PeerIdentityFull to <- peerIdentity =<< getPeer spidx + PeerIdentityFull to <- getPeerIdentity =<< getPeer spidx void $ sendDirectMessage to msg cmdDmSendContact :: Command @@ -816,13 +924,22 @@ cmdDmSendContact = do Just to <- contactIdentity <$> getContact cid void $ sendDirectMessage to msg +cmdDmSendIdentity :: Command +cmdDmSendIdentity = do + st <- asks tiStorage + [ tid, msg ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just to <- return $ validateExtendedIdentity $ wrappedLoad ref + void $ sendDirectMessage to msg + dmList :: Foldable f => Identity f -> Command dmList peer = do - threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead + threads <- dmThreadList . lookupSharedValue . lsShared . headObject <$> getHead case find (sameIdentity peer . msgPeer) threads of Just thread -> do - forM_ (reverse $ threadToList thread) $ \DirectMessage {..} -> cmdOut $ "dm-list-item" + forM_ (reverse $ dmThreadToListUnread thread) $ \( DirectMessage {..}, new ) -> cmdOut $ "dm-list-item" <> " from " <> (maybe "<unnamed>" T.unpack $ idName msgFrom) + <> " new " <> (if new then "yes" else "no") <> " text " <> (T.unpack msgText) Nothing -> return () cmdOut "dm-list-done" @@ -830,7 +947,7 @@ dmList peer = do cmdDmListPeer :: Command cmdDmListPeer = do [spidx] <- asks tiParams - PeerIdentityFull to <- peerIdentity =<< getPeer spidx + PeerIdentityFull to <- getPeerIdentity =<< getPeer spidx dmList to cmdDmListContact :: Command @@ -839,6 +956,23 @@ cmdDmListContact = do Just to <- contactIdentity <$> getContact cid dmList to +cmdDmListIdentity :: Command +cmdDmListIdentity = do + st <- asks tiStorage + [ tid ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just pid <- return $ validateExtendedIdentity $ wrappedLoad ref + dmList pid + +cmdDmMarkSeen :: Command +cmdDmMarkSeen = do + st <- asks tiStorage + [ tid ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just pid <- return $ validateExtendedIdentity $ wrappedLoad ref + dmMarkAsSeen pid + cmdOut $ unwords [ "dm-mark-seen-done", T.unpack tid ] + cmdChatroomCreate :: Command cmdChatroomCreate = do [name] <- asks tiParams @@ -938,8 +1072,7 @@ cmdChatroomJoin = do cmdChatroomJoinAs :: Command cmdChatroomJoinAs = do [ cid, name ] <- asks tiParams - st <- asks tiStorage - identity <- liftIO $ createIdentity st (Just name) Nothing + identity <- createIdentity (Just name) Nothing joinChatroomAsByStateData identity =<< getChatroomStateData cid cmdOut $ unwords [ "chatroom-join-as-done", T.unpack cid ] @@ -961,3 +1094,24 @@ cmdDiscoveryConnect = do Just dgst <- return $ readRefDigest $ encodeUtf8 tref Just RunningServer {..} <- gets tsServer discoverySearch rsServer dgst + +cmdDiscoveryTunnel :: Command +cmdDiscoveryTunnel = do + [ tvia, ttarget ] <- asks tiParams + via <- getPeer tvia + Just target <- return $ readRefDigest $ encodeUtf8 ttarget + liftIO $ discoverySetupTunnel via target + +cmdInviteContactCreate :: Command +cmdInviteContactCreate = do + [ name ] <- asks tiParams + Just token <- inviteToken <$> createSingleContactInvite name + cmdOut $ unwords [ "invite-contact-create-done", BC.unpack (showHex token) ] + +cmdInviteAccept :: Command +cmdInviteAccept = do + [ tokenText, idref ] <- asks tiParams + Just token <- return $ readHex $ encodeUtf8 tokenText + Just from <- return $ readRefDigest $ encodeUtf8 idref + Just RunningServer {..} <- gets tsServer + acceptInvite rsServer from token diff --git a/main/WebSocket.hs b/main/WebSocket.hs index fbdd65f..7a957e2 100644 --- a/main/WebSocket.hs +++ b/main/WebSocket.hs @@ -1,4 +1,5 @@ module WebSocket ( + WebSocketAddress(..), startWebsocketServer, ) where @@ -26,8 +27,10 @@ instance Show WebSocketAddress where show (WebSocketAddress _ _) = "websocket" instance PeerAddressType WebSocketAddress where - sendBytesToAddress (WebSocketAddress _ conn) msg = do - WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg + sendBytesToAddress (WebSocketAddress _ conn) msg = do + WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg + connectionToAddressClosed (WebSocketAddress _ conn) = do + WS.sendClose conn BL.empty startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO () startWebsocketServer server addr port logd = do diff --git a/src/Erebos/Attach.hs b/src/Erebos/Attach.hs index fad6197..b7c642f 100644 --- a/src/Erebos/Attach.hs +++ b/src/Erebos/Attach.hs @@ -59,7 +59,7 @@ instance PairingResult AttachIdentity where liftIO $ mapM_ storeKey $ catMaybes [ keyFromData sec pub | sec <- keys, pub <- pkeys ] identity' <- mergeIdentity $ updateIdentity [ lsIdentity $ fromStored slocal ] identity - shared <- makeSharedStateUpdate st (Just owner) (lsShared $ fromStored slocal) + shared <- makeSharedStateUpdate (Just owner) (lsShared $ fromStored slocal) mstore (fromStored slocal) { lsIdentity = idExtData identity' , lsShared = [ shared ] diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs index 74456ff..5a86b23 100644 --- a/src/Erebos/Chatroom.hs +++ b/src/Erebos/Chatroom.hs @@ -17,6 +17,7 @@ module Erebos.Chatroom ( joinChatroomAs, joinChatroomAsByStateData, leaveChatroom, leaveChatroomByStateData, getMessagesSinceState, + isSameChatroom, ChatroomSetChange(..), watchChatrooms, @@ -49,6 +50,7 @@ import Data.Set qualified as S import Data.Text (Text) import Data.Time +import Erebos.Conversation.Class import Erebos.Identity import Erebos.PubKey import Erebos.Service @@ -60,6 +62,15 @@ import Erebos.Storage.Merge import Erebos.Util +instance ConversationType ChatroomState ChatMessage where + convMessageFrom = cmsgFrom + convMessageTime = cmsgTime + convMessageText = cmsgText + + convMessageListSince mbSince cstate = + map (, False) $ threadToListSince (maybe [] roomStateMessageData mbSince) (roomStateMessageData cstate) + + data ChatroomData = ChatroomData { rdPrev :: [Stored (Signed ChatroomData)] , rdName :: Maybe Text @@ -294,8 +305,7 @@ createChatroom rdName rdDescription = do } updateLocalState $ updateSharedState $ \rooms -> do - st <- getStorage - (, cstate) <$> storeSetAdd st cstate rooms + (, cstate) <$> storeSetAdd cstate rooms findAndUpdateChatroomState :: (MonadStorage m, MonadHead LocalState m) @@ -309,8 +319,7 @@ findAndUpdateChatroomState f = do upd <- act if roomStateData orig /= roomStateData upd then do - st <- getStorage - roomSet' <- storeSetAdd st upd roomSet + roomSet' <- storeSetAdd upd roomSet return (roomSet', Just upd) else do return (roomSet, Just upd) @@ -422,6 +431,11 @@ leaveChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupDa getMessagesSinceState :: ChatroomState -> ChatroomState -> [ChatMessage] getMessagesSinceState cur old = threadToListSince (roomStateMessageData old) (roomStateMessageData cur) +isSameChatroom :: ChatroomState -> ChatroomState -> Bool +isSameChatroom rstate rstate' = + let roots = filterAncestors . concatMap storedRoots . roomStateData + in intersectsSorted (roots rstate) (roots rstate') + data ChatroomSetChange = AddedChatroom ChatroomState | RemovedChatroom ChatroomState diff --git a/src/Erebos/Contact.hs b/src/Erebos/Contact.hs index 88e6c44..78a504a 100644 --- a/src/Erebos/Contact.hs +++ b/src/Erebos/Contact.hs @@ -4,6 +4,8 @@ module Erebos.Contact ( contactCustomName, contactName, + ContactData(..), + contactSetName, ContactService, @@ -83,13 +85,12 @@ contactName c = fromJust $ msum contactSetName :: MonadHead LocalState m => Contact -> Text -> Set Contact -> m (Set Contact) contactSetName contact name set = do - st <- getStorage - cdata <- wrappedStore st ContactData + cdata <- mstore ContactData { cdPrev = toComponents contact , cdIdentity = [] , cdName = Just name } - storeSetAdd st (mergeSorted @Contact [cdata]) set + storeSetAdd (mergeSorted @Contact [cdata]) set type ContactService = PairingService ContactAccepted @@ -166,10 +167,9 @@ contactReject = pairingReject @ContactAccepted Proxy finalizeContact :: MonadHead LocalState m => UnifiedIdentity -> m () finalizeContact identity = updateLocalState_ $ updateSharedState_ $ \contacts -> do - st <- getStorage - cdata <- wrappedStore st ContactData + cdata <- mstore ContactData { cdPrev = [] , cdIdentity = idExtDataF $ finalOwner identity , cdName = Nothing } - storeSetAdd st (mergeSorted @Contact [cdata]) contacts + storeSetAdd (mergeSorted @Contact [cdata]) contacts diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index dee6faa..2c6f967 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -7,9 +7,11 @@ module Erebos.Conversation ( formatMessage, Conversation, + isSameConversation, directMessageConversation, chatroomConversation, chatroomConversationByStateData, + isChatroomStateConversation, reloadConversation, lookupConversations, @@ -31,47 +33,60 @@ import Data.Time.Format import Data.Time.LocalTime import Erebos.Chatroom +import Erebos.Conversation.Class import Erebos.DirectMessage import Erebos.Identity import Erebos.State import Erebos.Storable -data Message = DirectMessageMessage DirectMessage Bool - | ChatroomMessage ChatMessage Bool +data Message = forall conv msg. ConversationType conv msg => Message msg Bool + +withMessage :: (forall conv msg. ConversationType conv msg => msg -> a) -> Message -> a +withMessage f (Message msg _) = f msg messageFrom :: Message -> ComposedIdentity -messageFrom (DirectMessageMessage msg _) = msgFrom msg -messageFrom (ChatroomMessage msg _) = cmsgFrom msg +messageFrom = withMessage convMessageFrom messageTime :: Message -> ZonedTime -messageTime (DirectMessageMessage msg _) = msgTime msg -messageTime (ChatroomMessage msg _) = cmsgTime msg +messageTime = withMessage convMessageTime messageText :: Message -> Maybe Text -messageText (DirectMessageMessage msg _) = Just $ msgText msg -messageText (ChatroomMessage msg _) = cmsgText msg +messageText = withMessage convMessageText messageUnread :: Message -> Bool -messageUnread (DirectMessageMessage _ unread) = unread -messageUnread (ChatroomMessage _ unread) = unread +messageUnread (Message _ unread) = unread formatMessage :: TimeZone -> Message -> String formatMessage tzone msg = concat - [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ messageTime msg + [ if messageUnread msg then "\ESC[93m" else "" + , formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ messageTime msg , maybe "<unnamed>" T.unpack $ idName $ messageFrom msg , maybe "" ((": "<>) . T.unpack) $ messageText msg + , if messageUnread msg then "\ESC[0m" else "" ] -data Conversation = DirectMessageConversation DirectMessageThread - | ChatroomConversation ChatroomState +data Conversation + = DirectMessageConversation DirectMessageThread + | ChatroomConversation ChatroomState + +withConversation :: (forall conv msg. ConversationType conv msg => conv -> a) -> Conversation -> a +withConversation f (DirectMessageConversation conv) = f conv +withConversation f (ChatroomConversation conv) = f conv + +isSameConversation :: Conversation -> Conversation -> Bool +isSameConversation (DirectMessageConversation t) (DirectMessageConversation t') + = sameIdentity (msgPeer t) (msgPeer t') +isSameConversation (ChatroomConversation rstate) (ChatroomConversation rstate') = isSameChatroom rstate rstate' +isSameConversation _ _ = False directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Conversation directMessageConversation peer = do - (find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case + createOrUpdateDirectMessagePeer peer + (find (sameIdentity peer . msgPeer) . dmThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case Just thread -> return $ DirectMessageConversation thread - Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] + Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] [] chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) @@ -79,13 +94,17 @@ chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateD chatroomConversationByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> m (Maybe Conversation) chatroomConversationByStateData sdata = fmap ChatroomConversation <$> findChatroomByStateData sdata +isChatroomStateConversation :: ChatroomState -> Conversation -> Bool +isChatroomStateConversation rstate (ChatroomConversation rstate') = isSameChatroom rstate rstate' +isChatroomStateConversation _ _ = False + reloadConversation :: MonadHead LocalState m => Conversation -> m Conversation reloadConversation (DirectMessageConversation thread) = directMessageConversation (msgPeer thread) reloadConversation cur@(ChatroomConversation rstate) = fromMaybe cur <$> chatroomConversation rstate -lookupConversations :: MonadHead LocalState m => m [Conversation] -lookupConversations = map DirectMessageConversation . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead +lookupConversations :: MonadHead LocalState m => m [ Conversation ] +lookupConversations = map DirectMessageConversation . dmThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead conversationName :: Conversation -> Text @@ -96,14 +115,13 @@ conversationPeer :: Conversation -> Maybe ComposedIdentity conversationPeer (DirectMessageConversation thread) = Just $ msgPeer thread conversationPeer (ChatroomConversation _) = Nothing -conversationHistory :: Conversation -> [Message] -conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ threadToList thread -conversationHistory (ChatroomConversation rstate) = map (\msg -> ChatroomMessage msg False) $ roomStateMessages rstate +conversationHistory :: Conversation -> [ Message ] +conversationHistory = withConversation $ map (uncurry Message) . convMessageListSince Nothing -sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m (Maybe Message) -sendMessage (DirectMessageConversation thread) text = fmap Just $ DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False -sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text >> return Nothing +sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m () +sendMessage (DirectMessageConversation thread) text = sendDirectMessage (msgPeer thread) text +sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text deleteConversation :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> m () deleteConversation (DirectMessageConversation _) = throwOtherError "deleting direct message conversation is not supported" diff --git a/src/Erebos/Conversation/Class.hs b/src/Erebos/Conversation/Class.hs new file mode 100644 index 0000000..6a28651 --- /dev/null +++ b/src/Erebos/Conversation/Class.hs @@ -0,0 +1,16 @@ +module Erebos.Conversation.Class ( + ConversationType(..), +) where + +import Data.Text (Text) +import Data.Time.LocalTime +import Data.Typeable + +import Erebos.Identity + + +class (Typeable conv, Typeable msg) => ConversationType conv msg | conv -> msg, msg -> conv where + convMessageFrom :: msg -> ComposedIdentity + convMessageTime :: msg -> ZonedTime + convMessageText :: msg -> Maybe Text + convMessageListSince :: Maybe conv -> conv -> [ ( msg, Bool ) ] diff --git a/src/Erebos/DirectMessage.hs b/src/Erebos/DirectMessage.hs index 05da865..dd10d35 100644 --- a/src/Erebos/DirectMessage.hs +++ b/src/Erebos/DirectMessage.hs @@ -1,43 +1,62 @@ module Erebos.DirectMessage ( DirectMessage(..), sendDirectMessage, + dmMarkAsSeen, + updateDirectMessagePeer, + createOrUpdateDirectMessagePeer, DirectMessageAttributes(..), defaultDirectMessageAttributes, DirectMessageThreads, - toThreadList, + dmThreadList, DirectMessageThread(..), - threadToList, - messageThreadView, + dmThreadToList, dmThreadToListSince, dmThreadToListUnread, dmThreadToListSinceUnread, + dmThreadView, - watchReceivedMessages, + watchDirectMessageThreads, formatDirectMessage, ) where +import Control.Concurrent.MVar import Control.Monad +import Control.Monad.Except import Control.Monad.Reader import Data.List import Data.Ord -import qualified Data.Set as S +import Data.Set (Set) +import Data.Set qualified as S import Data.Text (Text) -import qualified Data.Text as T +import Data.Text qualified as T import Data.Time.Format import Data.Time.LocalTime +import Erebos.Conversation.Class +import Erebos.Discovery import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service import Erebos.State import Erebos.Storable import Erebos.Storage.Head import Erebos.Storage.Merge + +instance ConversationType DirectMessageThread DirectMessage where + convMessageFrom = msgFrom + convMessageTime = msgTime + convMessageText = Just . msgText + + convMessageListSince mbSince thread = + threadToListHelper (msgSeen thread) (maybe S.empty (S.fromAscList . msgHead) mbSince) (msgHead thread) + + data DirectMessage = DirectMessage { msgFrom :: ComposedIdentity - , msgPrev :: [Stored DirectMessage] + , msgPrev :: [ Stored DirectMessage ] , msgTime :: ZonedTime , msgText :: Text } @@ -74,7 +93,6 @@ instance Service DirectMessage where let msg = fromStored smsg powner <- asks $ finalOwner . svcPeerIdentity erb <- svcGetLocal - st <- getStorage let DirectMessageThreads prev _ = lookupSharedValue $ lsShared $ fromStored erb sent = findMsgProperty powner msSent prev received = findMsgProperty powner msReceived prev @@ -83,7 +101,7 @@ instance Service DirectMessage where filterAncestors sent == filterAncestors (smsg : sent) then do when (received' /= received) $ do - next <- wrappedStore st $ MessageState + next <- mstore MessageState { msPrev = prev , msPeer = powner , msReady = [] @@ -91,37 +109,43 @@ instance Service DirectMessage where , msReceived = received' , msSeen = [] } - let threads = DirectMessageThreads [next] (messageThreadView [next]) - shared <- makeSharedStateUpdate st threads (lsShared $ fromStored erb) - svcSetLocal =<< wrappedStore st (fromStored erb) { lsShared = [shared] } + let threads = DirectMessageThreads [ next ] (dmThreadView [ next ]) + shared <- makeSharedStateUpdate threads (lsShared $ fromStored erb) + svcSetLocal =<< mstore (fromStored erb) { lsShared = [ shared ] } when (powner `sameIdentity` msgFrom msg) $ do replyStoredRef smsg else join $ asks $ dmOwnerMismatch . svcAttributes - serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal + serviceNewPeer = do + syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal - serviceStorageWatchers _ = (:[]) $ - SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + serviceUpdatedPeer = do + updateDirectMessagePeer . finalOwner =<< asks svcPeerIdentity + + serviceStorageWatchers _ = + [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer + , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers + ] data MessageState = MessageState - { msPrev :: [Stored MessageState] + { msPrev :: [ Stored MessageState ] , msPeer :: ComposedIdentity - , msReady :: [Stored DirectMessage] - , msSent :: [Stored DirectMessage] - , msReceived :: [Stored DirectMessage] - , msSeen :: [Stored DirectMessage] + , msReady :: [ Stored DirectMessage ] + , msSent :: [ Stored DirectMessage ] + , msReceived :: [ Stored DirectMessage ] + , msSeen :: [ Stored DirectMessage ] } -data DirectMessageThreads = DirectMessageThreads [Stored MessageState] [DirectMessageThread] +data DirectMessageThreads = DirectMessageThreads [ Stored MessageState ] [ DirectMessageThread ] instance Eq DirectMessageThreads where DirectMessageThreads mss _ == DirectMessageThreads mss' _ = mss == mss' -toThreadList :: DirectMessageThreads -> [DirectMessageThread] -toThreadList (DirectMessageThreads _ threads) = threads +dmThreadList :: DirectMessageThreads -> [ DirectMessageThread ] +dmThreadList (DirectMessageThreads _ threads) = threads instance Storable MessageState where store' MessageState {..} = storeRec $ do @@ -143,13 +167,13 @@ instance Storable MessageState where instance Mergeable DirectMessageThreads where type Component DirectMessageThreads = MessageState - mergeSorted mss = DirectMessageThreads mss (messageThreadView mss) + mergeSorted mss = DirectMessageThreads mss (dmThreadView mss) toComponents (DirectMessageThreads mss _) = mss instance SharedType DirectMessageThreads where sharedTypeID _ = mkSharedTypeID "ee793681-5976-466a-b0f0-4e1907d3fade" -findMsgProperty :: Foldable m => Identity m -> (MessageState -> [a]) -> [Stored MessageState] -> [a] +findMsgProperty :: Foldable m => Identity m -> (MessageState -> [ a ]) -> [ Stored MessageState ] -> [ a ] findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do guard $ msPeer x `sameIdentity` pid guard $ not $ null $ sel x @@ -157,11 +181,11 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m) - => Identity f -> Text -> m (Stored DirectMessage) -sendDirectMessage pid text = updateLocalState $ \ls -> do + => Identity f -> Text -> m () +sendDirectMessage pid text = updateLocalState_ $ \ls -> do let self = localIdentity $ fromStored ls powner = finalOwner pid - flip updateSharedState ls $ \(DirectMessageThreads prev _) -> do + flip updateSharedState_ ls $ \(DirectMessageThreads prev _) -> do let ready = findMsgProperty powner msReady prev received = findMsgProperty powner msReceived prev @@ -175,12 +199,69 @@ sendDirectMessage pid text = updateLocalState $ \ls -> do next <- mstore MessageState { msPrev = prev , msPeer = powner - , msReady = [smsg] + , msReady = [ smsg ] , msSent = [] , msReceived = [] , msSeen = [] } - return (DirectMessageThreads [next] (messageThreadView [next]), smsg) + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + +dmMarkAsSeen + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +dmMarkAsSeen pid = do + updateLocalState_ $ updateSharedState_ $ \(DirectMessageThreads prev _) -> do + let powner = finalOwner pid + received = findMsgProperty powner msReceived prev + next <- mstore MessageState + { msPrev = prev + , msPeer = powner + , msReady = [] + , msSent = [] + , msReceived = [] + , msSeen = received + } + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + +updateDirectMessagePeer + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +updateDirectMessagePeer = createOrUpdateDirectMessagePeer' False + +createOrUpdateDirectMessagePeer + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Identity f -> m () +createOrUpdateDirectMessagePeer = createOrUpdateDirectMessagePeer' True + +createOrUpdateDirectMessagePeer' + :: (Foldable f, Applicative f, MonadHead LocalState m) + => Bool -> Identity f -> m () +createOrUpdateDirectMessagePeer' create pid = do + let powner = finalOwner pid + updateLocalState_ $ updateSharedState_ $ \old@(DirectMessageThreads prev threads) -> do + let updatePeerThread = do + next <- mstore MessageState + { msPrev = prev + , msPeer = powner + , msReady = [] + , msSent = [] + , msReceived = [] + , msSeen = [] + } + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) + case find (sameIdentity powner . msgPeer) threads of + Nothing + | create + -> updatePeerThread + + Just thread + | oldPeer <- msgPeer thread + , newPeer <- updateIdentity (idExtDataF powner) oldPeer + , oldPeer /= newPeer + -> updatePeerThread + + _ -> return old + syncDirectMessageToPeer :: DirectMessageThreads -> ServiceHandler DirectMessage () syncDirectMessageToPeer (DirectMessageThreads mss _) = do @@ -205,28 +286,47 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do , msReceived = [] , msSeen = [] } - return $ DirectMessageThreads [next] (messageThreadView [next]) + return $ DirectMessageThreads [ next ] (dmThreadView [ next ]) else do return unchanged +findMissingPeers :: Server -> DirectMessageThreads -> ExceptT ErebosError IO () +findMissingPeers server threads = do + forM_ (dmThreadList threads) $ \thread -> do + when (msgHead thread /= msgReceived thread) $ do + mapM_ (discoverySearch server) $ map (refDigest . storedRef) $ idDataF $ msgPeer thread + data DirectMessageThread = DirectMessageThread { msgPeer :: ComposedIdentity - , msgHead :: [Stored DirectMessage] - , msgSent :: [Stored DirectMessage] - , msgSeen :: [Stored DirectMessage] + , msgHead :: [ Stored DirectMessage ] + , msgSent :: [ Stored DirectMessage ] + , msgSeen :: [ Stored DirectMessage ] + , msgReceived :: [ Stored DirectMessage ] } -threadToList :: DirectMessageThread -> [DirectMessage] -threadToList thread = helper S.empty $ msgHead thread - where helper seen msgs - | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs = - fromStored msg : helper (S.insert msg seen) (msgs' ++ msgPrev (fromStored msg)) - | otherwise = [] - cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg) +dmThreadToList :: DirectMessageThread -> [ DirectMessage ] +dmThreadToList thread = map fst $ threadToListHelper (msgSeen thread) S.empty $ msgHead thread + +dmThreadToListSince :: DirectMessageThread -> DirectMessageThread -> [ DirectMessage ] +dmThreadToListSince since thread = map fst $ threadToListHelper (msgSeen thread) (S.fromAscList $ msgHead since) (msgHead thread) + +dmThreadToListUnread :: DirectMessageThread -> [ ( DirectMessage, Bool ) ] +dmThreadToListUnread thread = threadToListHelper (msgSeen thread) S.empty $ msgHead thread + +dmThreadToListSinceUnread :: DirectMessageThread -> DirectMessageThread -> [ ( DirectMessage, Bool ) ] +dmThreadToListSinceUnread since thread = threadToListHelper (msgSeen thread) (S.fromAscList $ msgHead since) (msgHead thread) + +threadToListHelper :: [ Stored DirectMessage ] -> Set (Stored DirectMessage) -> [ Stored DirectMessage ] -> [ ( DirectMessage, Bool ) ] +threadToListHelper seen used msgs + | msg : msgs' <- filter (`S.notMember` used) $ reverse $ sortBy (comparing cmpView) msgs = + ( fromStored msg, not $ any (msg `precedesOrEquals`) seen ) : threadToListHelper seen (S.insert msg used) (msgs' ++ msgPrev (fromStored msg)) + | otherwise = [] + where + cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg) -messageThreadView :: [Stored MessageState] -> [DirectMessageThread] -messageThreadView = helper [] +dmThreadView :: [ Stored MessageState ] -> [ DirectMessageThread ] +dmThreadView = helper [] where helper used ms' = case filterAncestors ms' of mss@(sms : rest) | any (sameIdentity $ msPeer $ fromStored sms) used -> @@ -236,7 +336,7 @@ messageThreadView = helper [] in messageThreadFor peer mss : helper (peer : used) (msPrev (fromStored sms) ++ rest) _ -> [] -messageThreadFor :: ComposedIdentity -> [Stored MessageState] -> DirectMessageThread +messageThreadFor :: ComposedIdentity -> [ Stored MessageState ] -> DirectMessageThread messageThreadFor peer mss = let ready = findMsgProperty peer msReady mss sent = findMsgProperty peer msSent mss @@ -248,15 +348,28 @@ messageThreadFor peer mss = , msgHead = filterAncestors $ ready ++ received , msgSent = filterAncestors $ sent ++ received , msgSeen = filterAncestors $ ready ++ seen + , msgReceived = filterAncestors $ received } -watchReceivedMessages :: Head LocalState -> (Stored DirectMessage -> IO ()) -> IO WatchedHead -watchReceivedMessages h f = do - let self = finalOwner $ localIdentity $ headObject h +watchDirectMessageThreads :: Head LocalState -> (DirectMessageThread -> DirectMessageThread -> IO ()) -> IO WatchedHead +watchDirectMessageThreads h f = do + prevVar <- newMVar Nothing watchHeadWith h (lookupSharedValue . lsShared . headObject) $ \(DirectMessageThreads sms _) -> do - forM_ (map fromStored sms) $ \ms -> do - mapM_ f $ filter (not . sameIdentity self . msgFrom . fromStored) $ msReceived ms + modifyMVar_ prevVar $ \case + Just prev -> do + let addPeer (p : ps) p' + | p `sameIdentity` p' = p : ps + | otherwise = p : addPeer ps p' + addPeer [] p' = [ p' ] + + let peers = foldl' addPeer [] $ map (msPeer . fromStored) $ storedDifference prev sms + forM_ peers $ \peer -> do + f (messageThreadFor peer prev) (messageThreadFor peer sms) + return (Just sms) + + Nothing -> do + return (Just sms) formatDirectMessage :: TimeZone -> DirectMessage -> String formatDirectMessage tzone msg = concat diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 2fb0ffe..5590e4c 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -1,4 +1,5 @@ {-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} module Erebos.Discovery ( DiscoveryService(..), @@ -6,6 +7,7 @@ module Erebos.Discovery ( DiscoveryConnection(..), discoverySearch, + discoverySetupTunnel, ) where import Control.Concurrent @@ -13,7 +15,6 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader -import Data.IP qualified as IP import Data.List import Data.Map.Strict (Map) import Data.Map.Strict qualified as M @@ -25,15 +26,17 @@ import Data.Text (Text) import Data.Text qualified as T import Data.Word -import Network.Socket +import Text.Read #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity import Erebos.Network +import Erebos.Network.Address import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable @@ -45,18 +48,25 @@ type IceRemoteInfo = Stored Object data DiscoveryService - = DiscoverySelf [ Text ] (Maybe Int) - | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) + = DiscoverySelf [ DiscoveryAddress ] (Maybe Int) + | DiscoveryAcknowledged [ DiscoveryAddress ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) | DiscoverySearch (Either Ref RefDigest) - | DiscoveryResult (Either Ref RefDigest) [ Text ] + | DiscoveryResult (Either Ref RefDigest) [ DiscoveryAddress ] | DiscoveryConnectionRequest DiscoveryConnection | DiscoveryConnectionResponse DiscoveryConnection +data DiscoveryAddress + = DiscoveryIP InetAddress PortNumber + | DiscoveryICE + | DiscoveryTunnel + | DiscoveryOther Text + data DiscoveryAttributes = DiscoveryAttributes { discoveryStunPort :: Maybe Word16 , discoveryStunServer :: Maybe Text , discoveryTurnPort :: Maybe Word16 , discoveryTurnServer :: Maybe Text + , discoveryProvideTunnel :: Peer -> PeerAddress -> Bool } defaultDiscoveryAttributes :: DiscoveryAttributes @@ -65,12 +75,14 @@ defaultDiscoveryAttributes = DiscoveryAttributes , discoveryStunServer = Nothing , discoveryTurnPort = Nothing , discoveryTurnServer = Nothing + , discoveryProvideTunnel = \_ _ -> False } data DiscoveryConnection = DiscoveryConnection { dconnSource :: Either Ref RefDigest , dconnTarget :: Either Ref RefDigest , dconnAddress :: Maybe Text + , dconnTunnel :: Bool , dconnIceInfo :: Maybe IceRemoteInfo } @@ -78,6 +90,7 @@ emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConn emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} where dconnAddress = Nothing + dconnTunnel = False dconnIceInfo = Nothing instance Storable DiscoveryService where @@ -101,11 +114,12 @@ instance Storable DiscoveryService where DiscoveryConnectionResponse conn -> storeConnection "response" conn where - storeConnection ctype DiscoveryConnection {..} = do + storeConnection (ctype :: Text) DiscoveryConnection {..} = do storeText "connection" $ ctype either (storeRawRef "source") (storeRawWeak "source") dconnSource either (storeRawRef "target") (storeRawWeak "target") dconnTarget storeMbText "address" dconnAddress + when dconnTunnel $ storeEmpty "tunnel" storeMbRef "ice-info" dconnIceInfo load' = loadRec $ msum @@ -138,7 +152,7 @@ instance Storable DiscoveryService where , loadConnection "response" DiscoveryConnectionResponse ] where - loadConnection ctype ctor = do + loadConnection (ctype :: Text) ctor = do ctype' <- loadText "connection" guard $ ctype == ctype' dconnSource <- msum @@ -150,13 +164,37 @@ instance Storable DiscoveryService where , Right <$> loadRawWeak "target" ] dconnAddress <- loadMbText "address" + dconnTunnel <- isJust <$> loadMbEmpty "tunnel" dconnIceInfo <- loadMbRef "ice-info" return $ ctor DiscoveryConnection {..} +instance StorableText DiscoveryAddress where + toText = \case + DiscoveryIP addr port -> T.unwords [ T.pack $ show addr, T.pack $ show port ] + DiscoveryICE -> "ICE" + DiscoveryTunnel -> "tunnel" + DiscoveryOther str -> str + + fromText str = return $ if + | [ addrStr, portStr ] <- T.words str + , Just addr <- readMaybe $ T.unpack addrStr + , Just port <- readMaybe $ T.unpack portStr + -> DiscoveryIP addr port + + | "ice" <- T.toLower str + -> DiscoveryICE + + | "tunnel" <- str + -> DiscoveryTunnel + + | otherwise + -> DiscoveryOther str + + data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer - , dpAddress :: [ Text ] + , dpAddress :: [ DiscoveryAddress ] , dpIceSession :: Maybe IceSession } @@ -169,7 +207,11 @@ emptyPeer = DiscoveryPeer } data DiscoveryPeerState = DiscoveryPeerState - { dpsStunServer :: Maybe ( Text, Word16 ) + { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ] + -- ( original target, our write stream ) + , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ] + -- ( original source, ( from source, to target )) + , dpsStunServer :: Maybe ( Text, Word16 ) , dpsTurnServer :: Maybe ( Text, Word16 ) , dpsIceConfig :: Maybe IceConfig } @@ -187,7 +229,9 @@ instance Service DiscoveryService where type ServiceState DiscoveryService = DiscoveryPeerState emptyServiceState _ = DiscoveryPeerState - { dpsStunServer = Nothing + { dpsOurTunnelRequests = [] + , dpsRelayedTunnelRequests = [] + , dpsStunServer = Nothing , dpsTurnServer = Nothing , dpsIceConfig = Nothing } @@ -202,26 +246,22 @@ instance Service DiscoveryService where DiscoverySelf addrs priority -> do pid <- asks svcPeerIdentity peer <- asks svcPeer + paddrs <- getPeerAddresses peer + let insertHelper new old | dpPriority new > dpPriority old = new | otherwise = old - matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if - | addr == T.pack "ICE" -> do - return $ Just addr - | [ ipaddr, port ] <- words (T.unpack addr) - , DatagramAddress paddr <- peerAddress peer -> do - saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - return $ if paddr == addrAddress saddr - then Just addr - else Nothing - - | otherwise -> return Nothing + let matchedAddrs = flip filter addrs $ \case + DiscoveryICE -> True + DiscoveryIP ipaddr port -> + DatagramAddress (inetToSockAddr ( ipaddr, port )) `elem` paddrs + _ -> False forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do let dp = DiscoveryPeer { dpPriority = fromMaybe 0 priority , dpPeer = Just peer - , dpAddress = addrs + , dpAddress = matchedAddrs , dpIceSession = Nothing } svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s } @@ -233,14 +273,8 @@ instance Service DiscoveryService where (discoveryTurnPort attrs) DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do - paddr <- asks (peerAddress . svcPeer) >>= return . \case - (DatagramAddress saddr) -> case IP.fromSockAddr saddr of - Just (IP.IPv6 ipv6, _) - | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 - -> Just $ T.pack $ show (IP.toIPv4w ipv4) - Just (addr, _) - -> Just $ T.pack $ show addr - _ -> Nothing + paddr <- asks svcPeerAddress >>= return . \case + (DatagramAddress saddr) -> T.pack . show . fst <$> inetFromSockAddr saddr _ -> Nothing let toIceServer Nothing Nothing = Nothing @@ -255,10 +289,17 @@ instance Service DiscoveryService where DiscoverySearch edgst -> do dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal - replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer + peer <- asks svcPeer + paddr <- asks svcPeerAddress + attrs <- asks svcAttributes + let offerTunnel + | discoveryProvideTunnel attrs peer paddr = (++ [ DiscoveryTunnel ]) + | otherwise = id + replyPacket $ DiscoveryResult edgst $ maybe [] (offerTunnel . dpAddress) dpeer - DiscoveryResult edgst [] -> do - svcPrint $ "Discovery: " ++ show (either refDigest id edgst) ++ " not found" + DiscoveryResult _ [] -> do + -- not found + return () DiscoveryResult edgst addrs -> do let dgst = either refDigest id edgst @@ -269,56 +310,82 @@ instance Service DiscoveryService where discoveryPeer <- asks svcPeer let runAsService = runPeerService @DiscoveryService discoveryPeer - forM_ addrs $ \addr -> if - | addr == T.pack "ICE" - -> do -#ifdef ENABLE_ICE_SUPPORT - getIceConfig >>= \case - Just config -> void $ liftIO $ forkIO $ do - ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do - rinfo <- iceRemoteInfo ice - - -- Try to promote weak ref to normal one for older peers: - edgst' <- case edgst of - Left r -> return (Left r) - Right d -> refFromDigest st d >>= \case - Just r -> return (Left r) - Nothing -> return (Right d) - - res <- runExceptT $ sendToPeer discoveryPeer $ - DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - + let tryAddresses = \case + DiscoveryIP ipaddr port : _ -> do + void $ liftIO $ forkIO $ do + let saddr = inetToSockAddr ( ipaddr, port ) + peer <- serverPeer server saddr runAsService $ do - let upd dp = dp { dpIceSession = Just ice } + let upd dp = dp { dpPeer = Just peer } svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } - Nothing -> do - return () + DiscoveryICE : rest -> do +#ifdef ENABLE_ICE_SUPPORT + getIceConfig >>= \case + Just config -> do + void $ liftIO $ forkIO $ do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do + rinfo <- iceRemoteInfo ice + + -- Try to promote weak ref to normal one for older peers: + edgst' <- case edgst of + Left r -> return (Left r) + Right d -> refFromDigest st d >>= \case + Just r -> return (Left r) + Nothing -> return (Right d) + + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo } + case res of + Right _ -> return () + Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err + + runAsService $ do + let upd dp = dp { dpIceSession = Just ice } + svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + + Nothing -> do #endif - return () - - | [ ipaddr, port ] <- words (T.unpack addr) -> do - void $ liftIO $ forkIO $ do - saddr <- head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- serverPeer server (addrAddress saddr) - runAsService $ do - let upd dp = dp { dpPeer = Just peer } - svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s } + tryAddresses rest - | otherwise -> do - svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + DiscoveryTunnel : _ -> do + discoverySetupTunnelResponse dgst + + addr : rest -> do + svcPrint $ "Discovery: unsupported address in result: " ++ T.unpack (toText addr) + tryAddresses rest + + [] -> svcPrint $ "Discovery: no (supported) address received for " <> show dgst + + tryAddresses addrs DiscoveryConnectionRequest conn -> do self <- svcSelf + attrs <- asks svcAttributes let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) if either refDigest id (dconnTarget conn) `elem` identityDigests self then if + -- request for us, create ICE sesssion or tunnel + | dconnTunnel conn -> do + receivedStreams >>= \case + (tunnelReader : _) -> do + tunnelWriter <- openStream + replyPacket $ DiscoveryConnectionResponse rconn + { dconnTunnel = True + } + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + server <- asks svcServer + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + + [] -> do + svcPrint $ "Discovery: missing stream on tunnel request (endpoint)" + #ifdef ENABLE_ICE_SUPPORT - -- request for us, create ICE sesssion | Just prinfo <- dconnIceInfo conn -> do server <- asks svcServer peer <- asks svcPeer @@ -338,31 +405,72 @@ instance Service DiscoveryService where svcPrint $ "Discovery: unsupported connection request" else do - -- request to some of our peers, relay - mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal - case mbdp of + -- request to some of our peers, relay + peer <- asks svcPeer + paddr <- asks svcPeerAddress + mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal + streams <- receivedStreams + case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn Just dp - | Just dpeer <- dpPeer dp -> do - sendToPeer dpeer $ DiscoveryConnectionRequest conn + | Just dpeer <- dpPeer dp -> if + | dconnTunnel conn -> if + | not (discoveryProvideTunnel attrs peer paddr) -> do + replyPacket $ DiscoveryConnectionResponse rconn + | fromSource : _ <- streams -> do + void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do + toTarget <- openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest conn + | otherwise -> do + svcPrint $ "Discovery: missing stream on tunnel request (relay)" + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionRequest conn | otherwise -> svcPrint $ "Discovery: failed to relay connection request" DiscoveryConnectionResponse conn -> do self <- svcSelf + dps <- svcGet dpeers <- dgsPeers <$> svcGetGlobal + if either refDigest id (dconnSource conn) `elem` identityDigests self - then do + then do -- response to our request, try to connect to the peer server <- asks svcServer - if | Just addr <- dconnAddress conn - , [ipaddr, port] <- words (T.unpack addr) -> do - saddr <- liftIO $ head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- liftIO $ serverPeer server (addrAddress saddr) + if + | Just addr <- dconnAddress conn + , [ addrStr, portStr ] <- words (T.unpack addr) + , Just ipaddr <- readMaybe addrStr + , Just port <- readMaybe portStr + -> do + let saddr = inetToSockAddr ( ipaddr, port ) + peer <- liftIO $ serverPeer server saddr let upd dp = dp { dpPeer = Just peer } svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s } + | dconnTunnel conn + , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + receivedStreams >>= \case + tunnelReader : _ -> do + tunnelVia <- asks svcPeer + tunnelIdentity <- asks svcPeerIdentity + void $ liftIO $ forkIO $ do + tunnelStreamNumber <- getStreamWriterNumber tunnelWriter + let addr = TunnelAddress {..} + void $ serverPeerCustom server addr + receiveFromTunnel server addr + [] -> do + svcPrint $ "Discovery: missing stream in tunnel response" + liftIO $ closeStream tunnelWriter + + | Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps) + -> do + svcPrint $ "Discovery: tunnel request failed" + liftIO $ closeStream tunnelWriter + #ifdef ENABLE_ICE_SUPPORT | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp @@ -371,24 +479,49 @@ instance Service DiscoveryService where #endif | otherwise -> svcPrint $ "Discovery: connection request failed" - else do - -- response to relayed request - case M.lookup (either refDigest id $ dconnSource conn) dpeers of - Just dp | Just dpeer <- dpPeer dp -> do + else do + -- response to relayed request + streams <- receivedStreams + svcModify $ \s -> s { dpsRelayedTunnelRequests = + filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) } + + case M.lookup (either refDigest id $ dconnSource conn) dpeers of + Just dp | Just dpeer <- dpPeer dp -> if + -- successful tunnel request + | dconnTunnel conn + , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + , fromTarget : _ <- streams + -> liftIO $ do + toSourceVar <- newEmptyMVar + void $ forkIO $ runPeerService @DiscoveryService dpeer $ do + liftIO . putMVar toSourceVar =<< openStream + svcModify $ \s -> s { dpsRelayedTunnelRequests = + ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s } + replyPacket $ DiscoveryConnectionResponse conn + void $ forkIO $ do + relayStream fromSource toTarget + void $ forkIO $ do + toSource <- readMVar toSourceVar + relayStream fromTarget toSource + + -- failed tunnel request + | Just ( _, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps) + -> do + liftIO $ closeStream toTarget sendToPeer dpeer $ DiscoveryConnectionResponse conn - _ -> svcPrint $ "Discovery: failed to relay connection response" + + | otherwise -> do + sendToPeer dpeer $ DiscoveryConnectionResponse conn + _ -> svcPrint $ "Discovery: failed to relay connection response" serviceNewPeer = do server <- asks svcServer peer <- asks svcPeer - let addrToText saddr = do - ( addr, port ) <- IP.fromSockAddr saddr - Just $ T.pack $ show addr <> " " <> show port addrs <- concat <$> sequence - [ catMaybes . map addrToText <$> liftIO (getServerAddresses server) + [ catMaybes . map (fmap (uncurry DiscoveryIP) . inetFromSockAddr) <$> liftIO (getServerAddresses server) #ifdef ENABLE_ICE_SUPPORT - , return [ T.pack "ICE" ] + , return [ DiscoveryICE ] #endif ] @@ -437,7 +570,7 @@ discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> R discoverySearch server dgst = do peers <- liftIO $ getCurrentPeerList server match <- forM peers $ \peer -> do - peerIdentity peer >>= \case + getPeerIdentity peer >>= \case PeerIdentityFull pid -> do return $ dgst `elem` identityDigests pid _ -> return False @@ -447,3 +580,70 @@ discoverySearch server dgst = do } forM_ peers $ \peer -> do sendToPeer peer $ DiscoverySearch $ Right dgst + + +data TunnelAddress = TunnelAddress + { tunnelVia :: Peer + , tunnelIdentity :: UnifiedIdentity + , tunnelStreamNumber :: Int + , tunnelReader :: StreamReader + , tunnelWriter :: StreamWriter + } + +instance Eq TunnelAddress where + x == y = (==) + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Ord TunnelAddress where + compare x y = compare + (idData (tunnelIdentity x), tunnelStreamNumber x) + (idData (tunnelIdentity y), tunnelStreamNumber y) + +instance Show TunnelAddress where + show tunnel = concat + [ "tunnel@" + , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel + , "/" <> show (tunnelStreamNumber tunnel) + ] + +instance PeerAddressType TunnelAddress where + sendBytesToAddress TunnelAddress {..} bytes = do + writeStream tunnelWriter bytes + + connectionToAddressClosed TunnelAddress {..} = do + closeStream tunnelWriter + +relayStream :: StreamReader -> StreamWriter -> IO () +relayStream r w = do + p <- readStreamPacket r + writeStreamPacket w p + case p of + StreamClosed {} -> return () + _ -> relayStream r w + +receiveFromTunnel :: Server -> TunnelAddress -> IO () +receiveFromTunnel server taddr = do + p <- readStreamPacket (tunnelReader taddr) + case p of + StreamData {..} -> do + receivedFromCustomAddress server taddr stpData + receiveFromTunnel server taddr + StreamClosed {} -> do + return () + + +discoverySetupTunnel :: Peer -> RefDigest -> IO () +discoverySetupTunnel via target = do + runPeerService via $ do + discoverySetupTunnelResponse target + +discoverySetupTunnelResponse :: RefDigest -> ServiceHandler DiscoveryService () +discoverySetupTunnelResponse target = do + self <- refDigest . storedRef . idData <$> svcSelf + stream <- openStream + svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s } + replyPacket $ DiscoveryConnectionRequest + (emptyConnection (Right self) (Right target)) + { dconnTunnel = True + } diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index dceeb2c..a3dd9bc 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -16,7 +16,7 @@ module Erebos.ICE ( iceConnect, iceSend, - iceSetChan, + serverPeerIce, ) where import Control.Arrow @@ -32,7 +32,6 @@ import Data.Text (Text) import Data.Text qualified as T import Data.Text.Encoding qualified as T import Data.Text.Read qualified as T -import Data.Void import Data.Word import Foreign.C.String @@ -43,7 +42,7 @@ import Foreign.Marshal.Array import Foreign.Ptr import Foreign.StablePtr -import Erebos.Flow +import Erebos.Network import Erebos.Object import Erebos.Storable import Erebos.Storage @@ -53,7 +52,7 @@ import Erebos.Storage data IceSession = IceSession { isStrans :: PjIceStrans , _isConfig :: IceConfig - , isChan :: MVar (Either [ByteString] (Flow Void ByteString)) + , isChan :: MVar (Either [ ByteString ] (ByteString -> IO ())) } instance Eq IceSession where @@ -65,6 +64,10 @@ instance Ord IceSession where instance Show IceSession where show _ = "<ICE>" +instance PeerAddressType IceSession where + sendBytesToAddress = iceSend + connectionToAddressClosed = iceDestroy + data IceRemoteInfo = IceRemoteInfo { iriUsernameFrament :: Text @@ -126,9 +129,9 @@ instance StorableText IceCandidate where data PjIceStransCfg newtype IceConfig = IceConfig (ForeignPtr PjIceStransCfg) -foreign import ccall unsafe "pjproject.h &ice_cfg_free" +foreign import ccall unsafe "pjproject.h &erebos_ice_cfg_free" ice_cfg_free :: FunPtr (Ptr PjIceStransCfg -> IO ()) -foreign import ccall unsafe "pjproject.h ice_cfg_create" +foreign import ccall unsafe "pjproject.h erebos_ice_cfg_create" ice_cfg_create :: CString -> Word16 -> CString -> Word16 -> IO (Ptr PjIceStransCfg) iceCreateConfig :: Maybe ( Text, Word16 ) -> Maybe ( Text, Word16 ) -> IO (Maybe IceConfig) @@ -140,7 +143,7 @@ iceCreateConfig stun turn = then return Nothing else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg -foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread" +foreign import ccall unsafe "pjproject.h erebos_ice_cfg_stop_thread" ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO () iceStopThread :: IceConfig -> IO () @@ -158,13 +161,13 @@ iceCreateSession icfg@(IceConfig fcfg) role cb = do forkIO $ cb sess sess <- IceSession <$> (withForeignPtr fcfg $ \cfg -> - {#call ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr) + {#call erebos_ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr) ) <*> pure icfg <*> (newMVar $ Left []) return $ sess -{#fun ice_destroy as ^ { isStrans `IceSession' } -> `()' #} +{#fun erebos_ice_destroy as iceDestroy { isStrans `IceSession' } -> `()' #} iceRemoteInfo :: IceSession -> IO IceRemoteInfo iceRemoteInfo sess = do @@ -179,7 +182,7 @@ iceRemoteInfo sess = do let cptrs = take maxcand $ iterate (`plusPtr` maxlen) bytes pokeArray carr $ take maxcand cptrs - ncand <- {#call ice_encode_session #} (isStrans sess) ufrag pass def carr (fromIntegral maxlen) (fromIntegral maxcand) + ncand <- {#call erebos_ice_encode_session #} (isStrans sess) ufrag pass def carr (fromIntegral maxlen) (fromIntegral maxcand) if ncand < 0 then fail "failed to generate ICE remote info" else IceRemoteInfo <$> (T.pack <$> peekCString ufrag) @@ -196,13 +199,13 @@ iceShow sess = do iceConnect :: IceSession -> IceRemoteInfo -> (IO ()) -> IO () iceConnect sess remote cb = do cbptr <- newStablePtr $ cb - ice_connect sess cbptr + erebos_ice_connect sess cbptr (iriUsernameFrament remote) (iriPassword remote) (iriDefaultCandidate remote) (iriCandidates remote) -{#fun ice_connect { isStrans `IceSession', castStablePtrToPtr `StablePtr (IO ())', +{#fun erebos_ice_connect { isStrans `IceSession', castStablePtrToPtr `StablePtr (IO ())', withText* `Text', withText* `Text', withText* `Text', withTextArray* `[Text]'& } -> `()' #} withText :: Text -> (Ptr CChar -> IO a) -> IO a @@ -218,19 +221,19 @@ withTextArray tsAll f = helper tsAll [] withByteStringLen :: Num n => ByteString -> ((Ptr CChar, n) -> IO a) -> IO a withByteStringLen t f = unsafeUseAsCStringLen t (f . (id *** fromIntegral)) -{#fun ice_send as ^ { isStrans `IceSession', withByteStringLen* `ByteString'& } -> `()' #} +{#fun erebos_ice_send as iceSend { isStrans `IceSession', withByteStringLen* `ByteString'& } -> `()' #} foreign export ccall ice_call_cb :: StablePtr (IO ()) -> IO () ice_call_cb :: StablePtr (IO ()) -> IO () ice_call_cb = join . deRefStablePtr -iceSetChan :: IceSession -> Flow Void ByteString -> IO () -iceSetChan sess chan = do +iceSetServer :: IceSession -> Server -> IO () +iceSetServer sess server = do modifyMVar_ (isChan sess) $ \orig -> do case orig of - Left buf -> mapM_ (writeFlowIO chan) $ reverse buf + Left buf -> mapM_ (receivedFromCustomAddress server sess) $ reverse buf Right _ -> return () - return $ Right chan + return $ Right $ receivedFromCustomAddress server sess foreign export ccall ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO () @@ -238,5 +241,12 @@ ice_rx_data sptr buf len = do sess <- deRefStablePtr sptr bs <- packCStringLen (buf, len) modifyMVar_ (isChan sess) $ \case - mc@(Right chan) -> writeFlowIO chan bs >> return mc - Left bss -> return $ Left (bs:bss) + mc@(Right sendToServer) -> sendToServer bs >> return mc + Left bss -> return $ Left (bs : bss) + + +serverPeerIce :: Server -> IceSession -> IO Peer +serverPeerIce server ice = do + peer <- serverPeerCustom server ice + iceSetServer ice server + return peer diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index e9446fe..8d91eac 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -78,7 +78,7 @@ static void cb_on_ice_complete(pj_ice_strans * strans, { if (status != PJ_SUCCESS) { ice_perror("cb_on_ice_complete", status); - ice_destroy(strans); + erebos_ice_destroy(strans); return; } @@ -139,7 +139,7 @@ exit: pthread_mutex_unlock(&mutex); } -struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * erebos_ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ) { ice_init(); @@ -189,11 +189,11 @@ struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_ return ecfg; fail: - ice_cfg_free( ecfg ); + erebos_ice_cfg_free( ecfg ); return NULL; } -void ice_cfg_free( struct erebos_ice_cfg * ecfg ) +void erebos_ice_cfg_free( struct erebos_ice_cfg * ecfg ) { if( ! ecfg ) return; @@ -216,14 +216,14 @@ void ice_cfg_free( struct erebos_ice_cfg * ecfg ) free( ecfg ); } -void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) +void erebos_ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg ) { if( ! ecfg ) return; ecfg->exit = true; } -pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, +pj_ice_strans * erebos_ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ) { ice_init(); @@ -249,7 +249,7 @@ pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role return res; } -void ice_destroy(pj_ice_strans * strans) +void erebos_ice_destroy(pj_ice_strans * strans) { struct user_data * udata = pj_ice_strans_get_user_data(strans); if (udata->sptr) @@ -264,7 +264,7 @@ void ice_destroy(pj_ice_strans * strans) pj_ice_strans_destroy(strans); } -ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, +ssize_t erebos_ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, char * def, char * candidates[], size_t maxlen, size_t maxcand) { int n; @@ -318,7 +318,7 @@ ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, return cand_cnt; } -void ice_connect(pj_ice_strans * strans, HsStablePtr cb, +void erebos_ice_connect(pj_ice_strans * strans, HsStablePtr cb, const char * ufrag, const char * pass, const char * defcand, const char * tcandidates[], size_t ncand) { @@ -409,7 +409,7 @@ void ice_connect(pj_ice_strans * strans, HsStablePtr cb, } } -void ice_send(pj_ice_strans * strans, const char * data, size_t len) +void erebos_ice_send(pj_ice_strans * strans, const char * data, size_t len) { if (!pj_ice_strans_sess_is_complete(strans)) { fprintf(stderr, "ICE: negotiation has not been started or is in progress\n"); diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index c31e227..7a1b96d 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -3,18 +3,18 @@ #include <pjnath.h> #include <HsFFI.h> -struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, +struct erebos_ice_cfg * erebos_ice_cfg_create( const char * stun_server, uint16_t stun_port, const char * turn_server, uint16_t turn_port ); -void ice_cfg_free( struct erebos_ice_cfg * cfg ); -void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); +void erebos_ice_cfg_free( struct erebos_ice_cfg * cfg ); +void erebos_ice_cfg_stop_thread( struct erebos_ice_cfg * cfg ); -pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, +pj_ice_strans * erebos_ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb ); -void ice_destroy(pj_ice_strans * strans); +void erebos_ice_destroy(pj_ice_strans * strans); -ssize_t ice_encode_session(pj_ice_strans *, char * ufrag, char * pass, +ssize_t erebos_ice_encode_session(pj_ice_strans *, char * ufrag, char * pass, char * def, char * candidates[], size_t maxlen, size_t maxcand); -void ice_connect(pj_ice_strans * strans, HsStablePtr cb, +void erebos_ice_connect(pj_ice_strans * strans, HsStablePtr cb, const char * ufrag, const char * pass, const char * defcand, const char * candidates[], size_t ncand); -void ice_send(pj_ice_strans *, const char * data, size_t len); +void erebos_ice_send(pj_ice_strans *, const char * data, size_t len); diff --git a/src/Erebos/Identity.hs b/src/Erebos/Identity.hs index a3f17b5..491df6e 100644 --- a/src/Erebos/Identity.hs +++ b/src/Erebos/Identity.hs @@ -214,29 +214,33 @@ isExtension x = case fromSigned x of BaseIdentityData {} -> False _ -> True -createIdentity :: Storage -> Maybe Text -> Maybe UnifiedIdentity -> IO UnifiedIdentity -createIdentity st name owner = do - (secret, public) <- generateKeys st - (_secretMsg, publicMsg) <- generateKeys st - - let signOwner :: Signed a -> ReaderT Storage IO (Signed a) +createIdentity + :: forall m e. (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m) + => Maybe Text -> Maybe UnifiedIdentity -> m UnifiedIdentity +createIdentity name owner = do + st <- getStorage + ( secret, public ) <- liftIO $ generateKeys st + ( _secretMsg, publicMsg ) <- liftIO $ generateKeys st + + let signOwner :: Signed a -> m (Signed a) signOwner idd | Just o <- owner = do - Just ownerSecret <- loadKeyMb (iddKeyIdentity $ fromSigned $ idData o) + ownerSecret <- maybe (throwOtherError "failed to load private key") return =<< + loadKeyMb (iddKeyIdentity $ fromSigned $ idData o) signAdd ownerSecret idd | otherwise = return idd - Just identity <- flip runReaderT st $ do - baseData <- mstore =<< signOwner =<< sign secret =<< - mstore (emptyIdentityData public) - { iddOwner = idData <$> owner - , iddKeyMessage = Just publicMsg - } - let extOwner = do - odata <- idExtData <$> owner - guard $ isExtension odata - return odata - + baseData <- mstore =<< signOwner =<< sign secret =<< + mstore (emptyIdentityData public) + { iddOwner = idData <$> owner + , iddKeyMessage = Just publicMsg + } + let extOwner = do + odata <- idExtData <$> owner + guard $ isExtension odata + return odata + + maybe (throwOtherError "created invalid identity") return =<< do validateExtendedIdentityF . I.Identity <$> if isJust name || isJust extOwner then mstore =<< signOwner =<< sign secret =<< @@ -245,7 +249,6 @@ createIdentity st name owner = do , ideOwner = extOwner } else return $ baseToExtended baseData - return identity validateIdentity :: Stored (Signed IdentityData) -> Maybe UnifiedIdentity validateIdentity = validateIdentityF . I.Identity @@ -388,13 +391,13 @@ sameIdentity x y = intersectsSorted (roots x) (roots y) roots idt = uniq $ sort $ concatMap storedRoots $ toList $ idDataF idt -unfoldOwners :: (Foldable m) => Identity m -> [ComposedIdentity] +unfoldOwners :: Foldable m => Identity m -> [ComposedIdentity] unfoldOwners = unfoldr (fmap (\i -> (i, idOwner i))) . Just . toComposedIdentity -finalOwner :: (Foldable m, Applicative m) => Identity m -> ComposedIdentity +finalOwner :: Foldable m => Identity m -> ComposedIdentity finalOwner = last . unfoldOwners -displayIdentity :: (Foldable m, Applicative m) => Identity m -> Text +displayIdentity :: Foldable m => Identity m -> Text displayIdentity identity = T.concat [ T.intercalate (T.pack " / ") $ map (fromMaybe (T.pack "<unnamed>") . idName) owners ] diff --git a/src/Erebos/Invite.hs b/src/Erebos/Invite.hs new file mode 100644 index 0000000..f860fbc --- /dev/null +++ b/src/Erebos/Invite.hs @@ -0,0 +1,213 @@ +module Erebos.Invite ( + Invite(..), + InviteData(..), + InviteService, + InviteServiceAttributes(..), + + createSingleContactInvite, + acceptInvite, +) where + +import Control.Arrow +import Control.Monad +import Control.Monad.Except +import Control.Monad.IO.Class +import Control.Monad.Reader + +import Crypto.Random + +import Data.ByteString (ByteString) +import Data.ByteString.Char8 qualified as BC +import Data.Foldable +import Data.Ord +import Data.Text (Text) + +import Erebos.Contact +import Erebos.Identity +import Erebos.Network +import Erebos.Object +import Erebos.PubKey +import Erebos.Service +import Erebos.Set +import Erebos.State +import Erebos.Storable +import Erebos.Storage.Merge +import Erebos.Util + + +data Invite = Invite + { inviteData :: [ Stored InviteData ] + , inviteToken :: Maybe ByteString + , inviteAccepted :: [ Stored (Signed ExtendedIdentityData) ] + , inviteContact :: Maybe Text + } + +data InviteData = InviteData + { invdPrev :: [ Stored InviteData ] + , invdToken :: Maybe ByteString + , invdAccepted :: Maybe (Stored (Signed ExtendedIdentityData)) + , invdContact :: Maybe Text + } + +instance Storable InviteData where + store' x = storeRec $ do + mapM_ (storeRef "PREV") $ invdPrev x + mapM_ (storeBinary "token") $ invdToken x + mapM_ (storeRef "accepted") $ invdAccepted x + mapM_ (storeText "contact") $ invdContact x + + load' = loadRec $ InviteData + <$> loadRefs "PREV" + <*> loadMbBinary "token" + <*> loadMbRef "accepted" + <*> loadMbText "contact" + + +instance Mergeable Invite where + type Component Invite = InviteData + + mergeSorted invdata = Invite + { inviteData = invdata + , inviteToken = findPropertyFirst invdToken invdata + , inviteAccepted = findProperty invdAccepted invdata + , inviteContact = findPropertyFirst invdContact invdata + } + + toComponents = inviteData + +instance SharedType (Set Invite) where + sharedTypeID _ = mkSharedTypeID "78da787a-9380-432e-a51d-532a30d27b3d" + + +createSingleContactInvite :: MonadHead LocalState m => Text -> m Invite +createSingleContactInvite name = do + token <- liftIO $ getRandomBytes 32 + invite <- mergeSorted @Invite . (: []) <$> mstore InviteData + { invdPrev = [] + , invdToken = Just token + , invdAccepted = Nothing + , invdContact = Just name + } + updateLocalState_ $ updateSharedState_ $ \invites -> do + storeSetAdd invite invites + return invite + +identityOwnerDigests :: Foldable f => Identity f -> [ RefDigest ] +identityOwnerDigests pid = map (refDigest . storedRef) $ concatMap toList $ toList $ generations $ idExtDataF $ finalOwner pid + +acceptInvite :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> ByteString -> m () +acceptInvite server from token = do + let matchPeer peer = do + getPeerIdentity peer >>= \case + PeerIdentityFull pid -> do + return $ from `elem` identityOwnerDigests pid + _ -> return False + liftIO (findPeer server matchPeer) >>= \case + Just peer -> runPeerService @InviteService peer $ do + svcModify (token :) + replyPacket $ AcceptInvite token + Nothing -> do + throwOtherError "peer not found" + + +data InviteService + = AcceptInvite ByteString + | InvalidInvite ByteString + | ContactInvite ByteString (Maybe Text) + | UnknownInvitePacket + +data InviteServiceAttributes = InviteServiceAttributes + { inviteHookAccepted :: ByteString -> ServiceHandler InviteService () + , inviteHookReplyContact :: ByteString -> Maybe Text -> ServiceHandler InviteService () + , inviteHookReplyInvalid :: ByteString -> ServiceHandler InviteService () + } + +defaultInviteServiceAttributes :: InviteServiceAttributes +defaultInviteServiceAttributes = InviteServiceAttributes + { inviteHookAccepted = \_ -> return () + , inviteHookReplyContact = \_ _ -> return () + , inviteHookReplyInvalid = \_ -> return () + } + +instance Storable InviteService where + store' x = storeRec $ case x of + AcceptInvite token -> storeBinary "accept" token + InvalidInvite token -> storeBinary "invalid" token + ContactInvite token mbName -> do + storeBinary "valid" token + maybe (storeEmpty "contact") (storeText "contact") mbName + UnknownInvitePacket -> return () + + load' = loadRec $ msum + [ AcceptInvite <$> loadBinary "accept" + , InvalidInvite <$> loadBinary "invalid" + , ContactInvite <$> loadBinary "valid" <*> msum + [ return Nothing <* loadEmpty "contact" + , Just <$> loadText "contact" + ] + , return UnknownInvitePacket + ] + +instance Service InviteService where + serviceID _ = mkServiceID "70bff715-6856-43a0-8c58-007a06a26eb1" + + type ServiceState InviteService = [ ByteString ] -- accepted invites, waiting for reply + emptyServiceState _ = [] + + type ServiceAttributes InviteService = InviteServiceAttributes + defaultServiceAttributes _ = defaultInviteServiceAttributes + + serviceHandler = fromStored >>> \case + AcceptInvite token -> do + asks (inviteHookAccepted . svcAttributes) >>= ($ token) + invites <- fromSetBy (comparing inviteToken) . lookupSharedValue . lsShared . fromStored <$> getLocalHead + case find ((Just token ==) . inviteToken) invites of + Just invite + | Just name <- inviteContact invite + , [] <- inviteAccepted invite + -> do + identity <- asks svcPeerIdentity + cdata <- mstore ContactData + { cdPrev = [] + , cdIdentity = idExtDataF $ finalOwner identity + , cdName = Just name + } + invdata <- mstore InviteData + { invdPrev = inviteData invite + , invdToken = Nothing + , invdAccepted = Just (idExtData identity) + , invdContact = Nothing + } + updateLocalState_ $ updateSharedState_ $ storeSetAdd (mergeSorted @Contact [ cdata ]) + updateLocalState_ $ updateSharedState_ $ storeSetAdd (mergeSorted @Invite [ invdata ]) + replyPacket $ ContactInvite token Nothing + + | otherwise -> do + replyPacket $ InvalidInvite token + + Nothing -> do + replyPacket $ InvalidInvite token + + InvalidInvite token -> do + asks (inviteHookReplyInvalid . svcAttributes) >>= ($ token) + svcModify $ filter (/= token) + svcPrint $ "Invite " <> BC.unpack (showHex token) <> " rejected as invalid" + + ContactInvite token mbName -> do + asks (inviteHookReplyContact . svcAttributes) >>= ($ mbName) . ($ token) + waitingTokens <- svcGet + if token `elem` waitingTokens + then do + svcSet $ filter (/= token) waitingTokens + identity <- asks svcPeerIdentity + cdata <- mstore ContactData + { cdPrev = [] + , cdIdentity = idExtDataF $ finalOwner identity + , cdName = Nothing + } + updateLocalState_ $ updateSharedState_ $ storeSetAdd (mergeSorted @Contact [ cdata ]) + else do + svcPrint $ "Received unexpected invite response for " <> BC.unpack (showHex token) + + UnknownInvitePacket -> do + svcPrint $ "Received unknown invite packet" diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index b341974..6265bbf 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE CPP #-} - module Erebos.Network ( Server, startServer, @@ -10,8 +8,8 @@ module Erebos.Network ( ServerOptions(..), serverIdentity, defaultServerOptions, Peer, peerServer, peerStorage, - PeerAddress(..), peerAddress, - PeerIdentity(..), peerIdentity, + PeerAddress(..), getPeerAddress, getPeerAddresses, + PeerIdentity(..), getPeerIdentity, WaitingRef, wrDigest, Service(..), @@ -20,9 +18,7 @@ module Erebos.Network ( serverPeer, serverPeerCustom, -#ifdef ENABLE_ICE_SUPPORT - serverPeerIce, -#endif + findPeer, dropPeer, isPeerDropped, sendToPeer, sendManyToPeer, @@ -66,10 +62,8 @@ import Network.Socket hiding (ControlMessage) import Network.Socket.ByteString qualified as S import Erebos.Error -#ifdef ENABLE_ICE_SUPPORT -import Erebos.ICE -#endif import Erebos.Identity +import Erebos.Network.Address import Erebos.Network.Channel import Erebos.Network.Protocol import Erebos.Object.Internal @@ -121,12 +115,16 @@ getNextPeerChange = atomically . readTChan . serverChanPeer data ServerOptions = ServerOptions { serverPort :: PortNumber , serverLocalDiscovery :: Bool + , serverErrorPrefix :: String + , serverTestLog :: Bool } defaultServerOptions :: ServerOptions defaultServerOptions = ServerOptions { serverPort = discoveryPort , serverLocalDiscovery = True + , serverErrorPrefix = "" + , serverTestLog = False } @@ -141,6 +139,14 @@ data Peer = Peer , peerWaitingRefs :: TMVar [WaitingRef] } +-- | Get current main address of the peer (used to send new packets). +getPeerAddress :: MonadIO m => Peer -> m PeerAddress +getPeerAddress = liftIO . return . peerAddress + +-- | Get all known addresses of given peer. +getPeerAddresses :: MonadIO m => Peer -> m [ PeerAddress ] +getPeerAddresses = fmap (: []) . getPeerAddress + peerServer :: Peer -> Server peerServer = peerServer_ @@ -166,36 +172,24 @@ instance Eq Peer where class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where sendBytesToAddress :: addr -> ByteString -> IO () + connectionToAddressClosed :: addr -> IO () data PeerAddress = forall addr. PeerAddressType addr => CustomPeerAddress addr | DatagramAddress SockAddr -#ifdef ENABLE_ICE_SUPPORT - | PeerIceSession IceSession -#endif instance Show PeerAddress where show (CustomPeerAddress addr) = show addr - show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of - Just (IP.IPv6 ipv6, port) - | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 - -> [show (IP.toIPv4w ipv4), show port] - Just (addr, port) - -> [show addr, show port] - _ -> [show saddr] - -#ifdef ENABLE_ICE_SUPPORT - show (PeerIceSession ice) = show ice -#endif + show (DatagramAddress saddr) = + case inetFromSockAddr saddr of + Just ( addr, port ) -> unwords [ show addr, show port ] + _ -> show saddr instance Eq PeerAddress where CustomPeerAddress addr == CustomPeerAddress addr' | Just addr'' <- cast addr' = addr == addr'' DatagramAddress addr == DatagramAddress addr' = addr == addr' -#ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice == PeerIceSession ice' = ice == ice' -#endif _ == _ = False instance Ord PeerAddress where @@ -206,20 +200,16 @@ instance Ord PeerAddress where compare _ (CustomPeerAddress _ ) = GT compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' -#ifdef ENABLE_ICE_SUPPORT - compare (DatagramAddress _ ) _ = LT - compare _ (DatagramAddress _ ) = GT - compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' -#endif +data PeerIdentity + = PeerIdentityUnknown (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ]) + | PeerIdentityRef WaitingRef (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ]) + | PeerIdentityFull UnifiedIdentity -data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) - | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) - | PeerIdentityFull UnifiedIdentity - -peerIdentity :: MonadIO m => Peer -> m PeerIdentity -peerIdentity = liftIO . atomically . readTVar . peerIdentityVar +-- | Get currently known identity of the given peer +getPeerIdentity :: MonadIO m => Peer -> m PeerIdentity +getPeerIdentity = liftIO . atomically . readTVar . peerIdentityVar data PeerState @@ -277,7 +267,16 @@ startServer serverOptions serverOrigHead logd' serverServices = do let logd = writeTQueue serverErrorLog forkServerThread server $ forever $ do - logd' =<< atomically (readTQueue serverErrorLog) + logd' . (serverErrorPrefix serverOptions <>) =<< atomically (readTQueue serverErrorLog) + + logt <- if + | serverTestLog serverOptions -> do + serverTestLog <- newTQueueIO + forkServerThread server $ forever $ do + logd' =<< atomically (readTQueue serverTestLog) + return $ writeTQueue serverTestLog + | otherwise -> do + return $ \_ -> return () forkServerThread server $ dataResponseWorker server forkServerThread server $ forever $ do @@ -327,13 +326,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do announceUpdate idt forM_ serverServices $ \(SomeService service _) -> do - forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do - withMVar serverPeers $ mapM_ $ \peer -> atomically $ do - readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue serverIOActions $ do - runPeerService peer $ act x - _ -> return () + forM_ (serviceStorageWatchers service) $ \case + SomeStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + GlobalStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + atomically $ writeTQueue serverIOActions $ do + act server x forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -345,9 +349,6 @@ startServer serverOptions serverOrigHead logd' serverServices = do case paddr of CustomPeerAddress addr -> sendBytesToAddress addr msg DatagramAddress addr -> void $ S.sendTo sock msg addr -#ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg -#endif forkServerThread server $ forever $ do readFlowIO serverControlFlow >>= \case @@ -389,9 +390,13 @@ startServer serverOptions serverOrigHead logd' serverServices = do prefs <- forM objs $ storeObject $ peerInStorage peer identity <- readMVar serverIdentity_ let svcs = map someServiceID serverServices - handlePacket identity secure peer chanSvc svcs header prefs + handlePacket paddr identity secure peer chanSvc svcs header prefs peerLoop Nothing -> do + case paddr of + DatagramAddress _ -> return () + CustomPeerAddress caddr -> connectionToAddressClosed caddr + dropPeer peer atomically $ writeTChan serverChanPeer peer peerLoop @@ -399,7 +404,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do ReceivedAnnounce addr _ -> do void $ serverPeer' server addr - erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow + erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd logt protocolRawPath protocolControlFlow forkServerThread server $ withSocketsDo $ do let hints = defaultHints @@ -411,9 +416,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do bracket (open addr) close loop forkServerThread server $ forever $ do - ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc + ( peer, paddr, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of - Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref) + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams paddr peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server @@ -560,10 +565,10 @@ appendDistinct x (y:ys) | x == y = y : ys | otherwise = y : appendDistinct x ys appendDistinct x [] = [x] -handlePacket :: UnifiedIdentity -> Bool - -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ] +handlePacket :: PeerAddress -> UnifiedIdentity -> Bool + -> Peer -> TQueue ( Peer, PeerAddress, ServiceID, Ref, [ RawStreamReader ] ) -> [ ServiceID ] -> TransportHeader -> [ PartialRef ] -> IO () -handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do +handlePacket paddr identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- getPeerChannel peer let sidentity = idData identity @@ -699,7 +704,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = then do streamReaders <- mapM acceptStream $ lookupNewStreams headers void $ newWaitingRef dgst $ \ref -> - liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders ) + liftIO $ atomically $ writeTQueue chanSvc ( peer, paddr, svc, ref, streamReaders ) else throwError $ "missing service object " ++ show dgst | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" @@ -779,7 +784,7 @@ finalizedChannel peer@Peer {..} ch self = do -- Notify services about new peer readTVar peerIdentityVar >>= \case - PeerIdentityFull _ -> notifyServicesOfPeer peer + PeerIdentityFull _ -> notifyServicesOfPeer True peer _ -> return () @@ -805,7 +810,7 @@ handleIdentityAnnounce self peer ref = liftIO $ atomically $ do PeerIdentityFull pid | idData pid `precedes` wrappedLoad ref -> validateAndUpdate (idUpdates pid) $ \_ -> do - notifyServicesOfPeer peer + notifyServicesOfPeer False peer _ -> return () @@ -817,15 +822,18 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do -> do writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid' writeTChan (serverChanPeer $ peerServer peer) peer - when (idData pid /= idData pid') $ notifyServicesOfPeer peer + when (pid /= pid') $ do + notifyServicesOfPeer False peer | otherwise -> return () -notifyServicesOfPeer :: Peer -> STM () -notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do +notifyServicesOfPeer :: Bool -> Peer -> STM () +notifyServicesOfPeer new peer@Peer { peerServer_ = Server {..} } = do writeTQueue serverIOActions $ do + paddr <- getPeerAddress peer forM_ serverServices $ \service@(SomeService _ attrs) -> - runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer + runPeerServiceOn (Just ( service, attrs )) [] paddr peer $ + if new then serviceNewPeer else serviceUpdatedPeer receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () @@ -853,15 +861,6 @@ serverPeer server paddr = do serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr) -#ifdef ENABLE_ICE_SUPPORT -serverPeerIce :: Server -> IceSession -> IO Peer -serverPeerIce server@Server {..} ice = do - let paddr = PeerIceSession ice - peer <- serverPeer' server paddr - iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath - return peer -#endif - serverPeer' :: Server -> PeerAddress -> IO Peer serverPeer' server paddr = do (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do @@ -874,6 +873,13 @@ serverPeer' server paddr = do writeFlow (serverControlFlow server) (RequestConnection paddr) return peer +findPeer :: Server -> (Peer -> IO Bool) -> IO (Maybe Peer) +findPeer server test = withMVar (serverPeers server) (helper . M.elems) + where + helper (p : ps) = test p >>= \case True -> return (Just p) + False -> helper ps + helper [] = return Nothing + dropPeer :: MonadIO m => Peer -> m () dropPeer peer = liftIO $ do modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do @@ -983,10 +989,12 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) lookupService _ [] = Nothing runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () -runPeerService = runPeerServiceOn Nothing [] +runPeerService peer handler = do + paddr <- getPeerAddress peer + runPeerServiceOn Nothing [] paddr peer handler -runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m () -runPeerServiceOn mbservice newStreams peer handler = liftIO $ do +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> PeerAddress -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice newStreams paddr peer handler = liftIO $ do let server = peerServer peer proxy = Proxy @s svc = serviceID proxy @@ -1008,6 +1016,7 @@ runPeerServiceOn mbservice newStreams peer handler = liftIO $ do let inp = ServiceInput { svcAttributes = attr , svcPeer = peer + , svcPeerAddress = paddr , svcPeerIdentity = peerId , svcServer = server , svcPrintOp = atomically . logd @@ -1027,7 +1036,7 @@ runPeerServiceOn mbservice newStreams peer handler = liftIO $ do putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global _ -> do - atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer) + atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show paddr _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" @@ -1054,30 +1063,11 @@ modifyServiceGlobalState server proxy f = do throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" -foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) -foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) -foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) +foreign import ccall unsafe "Network/ifaddrs.h erebos_join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) +foreign import ccall unsafe "Network/ifaddrs.h erebos_local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) +foreign import ccall unsafe "Network/ifaddrs.h erebos_broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO () -data InetAddress = InetAddress { fromInetAddress :: IP.IP } - -instance F.Storable InetAddress where - sizeOf _ = sizeOf (undefined :: CInt) + 16 - alignment _ = 8 - - peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case - AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) - AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) - _ -> fail "InetAddress: unknown family" - - poke ptr (InetAddress addr) = case addr of - IP.IPv4 ip -> do - pokeByteOff ptr 0 (packFamily AF_INET) - pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) - IP.IPv6 ip -> do - pokeByteOff ptr 0 (packFamily AF_INET6) - pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) - joinMulticast :: Socket -> IO [ Word32 ] joinMulticast sock = withFdSocket sock $ \fd -> @@ -1104,7 +1094,7 @@ getServerAddresses Server {..} = do count <- fromIntegral <$> peek pcount res <- peekArray count ptr cFree ptr - return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res + return $ map (inetToSockAddr . (, serverPort serverOptions )) res getBroadcastAddresses :: PortNumber -> IO [SockAddr] getBroadcastAddresses port = do diff --git a/src/Erebos/Network.hs-boot b/src/Erebos/Network.hs-boot index af77581..17a5275 100644 --- a/src/Erebos/Network.hs-boot +++ b/src/Erebos/Network.hs-boot @@ -4,5 +4,6 @@ import Erebos.Object.Internal data Server data Peer +data PeerAddress peerStorage :: Peer -> Storage diff --git a/src/Erebos/Network/Address.hs b/src/Erebos/Network/Address.hs new file mode 100644 index 0000000..63f6af1 --- /dev/null +++ b/src/Erebos/Network/Address.hs @@ -0,0 +1,65 @@ +module Erebos.Network.Address ( + InetAddress(..), + inetFromSockAddr, + inetToSockAddr, + + SockAddr, PortNumber, +) where + +import Data.Bifunctor +import Data.IP qualified as IP +import Data.Word + +import Foreign.C.Types +import Foreign.Marshal.Array +import Foreign.Ptr +import Foreign.Storable as F + +import Network.Socket + +import Text.Read + + +newtype InetAddress = InetAddress { fromInetAddress :: IP.IP } + deriving (Eq, Ord) + +instance Show InetAddress where + show (InetAddress ipaddr) + | IP.IPv6 ipv6 <- ipaddr + , ( 0, 0, 0xffff, ipv4 ) <- IP.fromIPv6w ipv6 + = show (IP.toIPv4w ipv4) + + | otherwise + = show ipaddr + +instance Read InetAddress where + readPrec = do + readPrec >>= return . InetAddress . \case + IP.IPv4 ipv4 -> IP.IPv6 $ IP.toIPv6w ( 0, 0, 0xffff, IP.fromIPv4w ipv4 ) + ipaddr -> ipaddr + + readListPrec = readListPrecDefault + +instance F.Storable InetAddress where + sizeOf _ = sizeOf (undefined :: CInt) + 16 + alignment _ = 8 + + peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case + AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) + AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) + _ -> fail "InetAddress: unknown family" + + poke ptr (InetAddress addr) = case addr of + IP.IPv4 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET) + pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) + IP.IPv6 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET6) + pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) + + +inetFromSockAddr :: SockAddr -> Maybe ( InetAddress, PortNumber ) +inetFromSockAddr saddr = first InetAddress <$> IP.fromSockAddr saddr + +inetToSockAddr :: ( InetAddress, PortNumber ) -> SockAddr +inetToSockAddr = IP.toSockAddr . first fromInetAddress diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index 025f52c..f67e296 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -213,6 +213,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () + , gTestLog :: String -> STM () , gStorage :: PartialStorage , gStartTime :: TimeSpec , gNowVar :: TVar TimeSpec @@ -249,6 +250,12 @@ instance Eq (Connection addr) where connAddress :: Connection addr -> addr connAddress = cAddress +showConnAddress :: forall addr. Connection addr -> String +showConnAddress Connection {..} = helper cGlobalState cAddress + where + helper :: GlobalState addr -> addr -> String + helper GlobalState {} = show + connData :: Connection addr -> Flow (Maybe (Bool, TransportPacket PartialObject)) (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) @@ -273,6 +280,7 @@ connClose conn@Connection {..} = do connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) connAddWriteStream conn@Connection {..} = do + let GlobalState {..} = cGlobalState outStreams <- readTVar cOutStreams let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)]) doInsert n (s@(n', _) : rest) | n == n' = @@ -289,14 +297,16 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' + lift $ gTestLog $ "net-ostream-open " <> showConnAddress conn <> " " <> show streamNumber <> " " <> show (length outStreams') return ( StreamOpen streamNumber , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) - , go cGlobalState streamNumber stream + , go streamNumber stream ) where - go gs@GlobalState {..} streamNumber stream = do + go streamNumber stream = do + let GlobalState {..} = cGlobalState (reserved, msg) <- atomically $ do readTVar (sState stream) >>= \case StreamRunning -> return () @@ -309,6 +319,8 @@ connAddWriteStream conn@Connection {..} = do return (stpData, True, return ()) StreamClosed {} -> do atomically $ do + gTestLog $ "net-ostream-close-send " <> showConnAddress conn <> " " <> show streamNumber + atomically $ do -- wait for ack on all sent stream data waits <- readTVar (sWaitingForAck stream) when (waits > 0) retry @@ -352,7 +364,7 @@ connAddWriteStream conn@Connection {..} = do sendBytes conn mbReserved' bs Nothing -> return () - when cont $ go gs streamNumber stream + when cont $ go streamNumber stream connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader connAddReadStream Connection {..} streamNumber = do @@ -411,8 +423,10 @@ streamAccepted Connection {..} snum = atomically $ do Nothing -> return () streamClosed :: Connection addr -> Word8 -> IO () -streamClosed Connection {..} snum = atomically $ do - modifyTVar' cOutStreams $ filter ((snum /=) . fst) +streamClosed conn@Connection {..} snum = atomically $ do + streams <- filter ((snum /=) . fst) <$> readTVar cOutStreams + writeTVar cOutStreams streams + gTestLog cGlobalState $ "net-ostream-close-ack " <> showConnAddress conn <> " " <> show snum <> " " <> show (length streams) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case @@ -494,10 +508,11 @@ data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest) erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) => UnifiedIdentity -> (String -> STM ()) + -> (String -> STM ()) -> SymFlow (addr, ByteString) -> Flow (ControlRequest addr) (ControlMessage addr) -> IO () -erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do +erebosNetworkProtocol initialIdentity gLog gTestLog gDataFlow gControlFlow = do gIdentity <- newTVarIO (initialIdentity, []) gConnections <- newTVarIO [] gNextUp <- newEmptyTMVarIO @@ -561,6 +576,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cOutStreams <- newTVar [] let conn = Connection {..} + gTestLog $ "net-conn-new " <> show cAddress writeTVar gConnections (conn : conns) return conn @@ -917,7 +933,10 @@ processOutgoing gs@GlobalState {..} = do , rsOnAck = rsOnAck rs >> onAck }) <$> mbReserved sendBytes conn mbReserved' bs - Nothing -> return () + Nothing -> do + when (isJust mbReserved) $ do + atomically $ do + modifyTVar' cReservedPackets (subtract 1) let waitUntil :: TimeSpec -> TimeSpec -> STM () waitUntil now till = do diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c index ff4382a..8139b5e 100644 --- a/src/Erebos/Network/ifaddrs.c +++ b/src/Erebos/Network/ifaddrs.c @@ -22,7 +22,7 @@ #define DISCOVERY_MULTICAST_GROUP "ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1" -uint32_t * join_multicast(int fd, size_t * count) +uint32_t * erebos_join_multicast(int fd, size_t * count) { size_t capacity = 16; *count = 0; @@ -117,7 +117,7 @@ static bool copy_local_address( struct InetAddress * dst, const struct sockaddr #ifndef _WIN32 -struct InetAddress * local_addresses( size_t * count ) +struct InetAddress * erebos_local_addresses( size_t * count ) { struct ifaddrs * addrs; if( getifaddrs( &addrs ) < 0 ) @@ -153,7 +153,7 @@ struct InetAddress * local_addresses( size_t * count ) return ret; } -uint32_t * broadcast_addresses(void) +uint32_t * erebos_broadcast_addresses(void) { struct ifaddrs * addrs; if (getifaddrs(&addrs) < 0) @@ -196,7 +196,7 @@ uint32_t * broadcast_addresses(void) #pragma comment(lib, "ws2_32.lib") -struct InetAddress * local_addresses( size_t * count ) +struct InetAddress * erebos_local_addresses( size_t * count ) { * count = 0; struct InetAddress * ret = NULL; @@ -237,7 +237,7 @@ cleanup: return ret; } -uint32_t * broadcast_addresses(void) +uint32_t * erebos_broadcast_addresses(void) { uint32_t * ret = NULL; SOCKET wsock = INVALID_SOCKET; diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h index 2ee45a7..2b3c014 100644 --- a/src/Erebos/Network/ifaddrs.h +++ b/src/Erebos/Network/ifaddrs.h @@ -13,6 +13,6 @@ struct InetAddress uint8_t addr[16]; } __attribute__((packed)); -uint32_t * join_multicast(int fd, size_t * count); -struct InetAddress * local_addresses( size_t * count ); -uint32_t * broadcast_addresses(void); +uint32_t * erebos_join_multicast(int fd, size_t * count); +struct InetAddress * erebos_local_addresses( size_t * count ); +uint32_t * erebos_broadcast_addresses(void); diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs index 97ca7a3..fe00579 100644 --- a/src/Erebos/Object/Internal.hs +++ b/src/Erebos/Object/Internal.hs @@ -55,26 +55,27 @@ import Control.Monad.Writer import Crypto.Hash import Data.Bifunctor +import Data.ByteArray qualified as BA import Data.ByteString (ByteString) -import qualified Data.ByteArray as BA -import qualified Data.ByteString as B -import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL -import qualified Data.ByteString.Lazy.Char8 as BLC +import Data.ByteString qualified as B +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL +import Data.ByteString.Lazy.Char8 qualified as BLC import Data.Char import Data.Function import Data.Maybe import Data.Ratio import Data.Set (Set) -import qualified Data.Set as S +import Data.Set qualified as S import Data.Text (Text) -import qualified Data.Text as T +import Data.Text qualified as T import Data.Text.Encoding import Data.Text.Encoding.Error import Data.Time.Calendar import Data.Time.Clock import Data.Time.Format import Data.Time.LocalTime +import Data.Word import System.IO.Unsafe @@ -129,6 +130,7 @@ copyRecItem' st = \case copyObject' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (c (Object' c')) copyObject' _ (Blob bs) = return $ return $ Blob bs copyObject' st (Rec rs) = fmap Rec . sequence <$> mapM (\( n, item ) -> fmap ( n, ) <$> copyRecItem' st item) rs +copyObject' _ (OnDemand size dgst) = return $ return $ OnDemand size dgst copyObject' _ ZeroObject = return $ return ZeroObject copyObject' _ (UnknownObject otype content) = return $ return $ UnknownObject otype content @@ -150,7 +152,8 @@ partialRefFromDigest st dgst = Ref st dgst data Object' c = Blob ByteString - | Rec [(ByteString, RecItem' c)] + | Rec [ ( ByteString, RecItem' c ) ] + | OnDemand Word64 RefDigest | ZeroObject | UnknownObject ByteString ByteString deriving (Show) @@ -176,8 +179,12 @@ type RecItem = RecItem' Complete serializeObject :: Object' c -> BL.ByteString serializeObject = \case Blob cnt -> BL.fromChunks [BC.pack "blob ", BC.pack (show $ B.length cnt), BC.singleton '\n', cnt] - Rec rec -> let cnt = BL.fromChunks $ concatMap (uncurry serializeRecItem) rec - in BL.fromChunks [BC.pack "rec ", BC.pack (show $ BL.length cnt), BC.singleton '\n'] `BL.append` cnt + Rec rec -> + let cnt = BL.fromChunks $ concatMap (uncurry serializeRecItem) rec + in BL.fromChunks [ BC.pack "rec ", BC.pack (show $ BL.length cnt), BC.singleton '\n' ] `BL.append` cnt + OnDemand size dgst -> + let cnt = BC.unlines [ BC.pack (show size), showRefDigest dgst ] + in BL.fromChunks [ BC.pack "ondemand ", BC.pack (show $ B.length cnt), BC.singleton '\n', cnt ] ZeroObject -> BL.empty UnknownObject otype cnt -> BL.fromChunks [ otype, BC.singleton ' ', BC.pack (show $ B.length cnt), BC.singleton '\n', cnt ] @@ -236,46 +243,72 @@ unsafeDeserializeObject st bytes = (line, rest) | Just (otype, len) <- splitObjPrefix line -> do let (content, next) = first BL.toStrict $ BL.splitAt (fromIntegral len) $ BL.drop 1 rest guard $ B.length content == len - (,next) <$> case otype of - _ | otype == BC.pack "blob" -> return $ Blob content - | otype == BC.pack "rec" -> maybe (throwOtherError $ "malformed record item ") - (return . Rec) $ sequence $ map parseRecLine $ mergeCont [] $ BC.lines content - | otherwise -> return $ UnknownObject otype content + (, next) <$> if + | otype == BC.pack "blob" + -> return $ Blob content + | otype == BC.pack "rec" + , Just ritems <- parseRecordBody st content + -> return $ Rec ritems + | otype == BC.pack "ondemand" + , Just ondemand <- parseOnDemand st content + -> return ondemand + | otherwise + -> return $ UnknownObject otype content _ -> throwOtherError $ "malformed object" - where splitObjPrefix line = do - [otype, tlen] <- return $ BLC.words line - (len, rest) <- BLC.readInt tlen - guard $ BL.null rest - return (BL.toStrict otype, len) - - mergeCont cs (a:b:rest) | Just ('\t', b') <- BC.uncons b = mergeCont (b':BC.pack "\n":cs) (a:rest) - mergeCont cs (a:rest) = B.concat (a : reverse cs) : mergeCont [] rest - mergeCont _ [] = [] - - parseRecLine line = do - colon <- BC.elemIndex ':' line - space <- BC.elemIndex ' ' line - guard $ colon < space - let name = B.take colon line - itype = B.take (space-colon-1) $ B.drop (colon+1) line - content = B.drop (space+1) line - - let val = fromMaybe (RecUnknown itype content) $ - case BC.unpack itype of - "e" -> do guard $ B.null content - return RecEmpty - "i" -> do (num, rest) <- BC.readInteger content - guard $ B.null rest - return $ RecInt num - "n" -> RecNum <$> parseRatio content - "t" -> return $ RecText $ decodeUtf8With lenientDecode content - "b" -> RecBinary <$> readHex content - "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content) - "u" -> RecUUID <$> U.fromASCIIBytes content - "r" -> RecRef . Ref st <$> readRefDigest content - "w" -> RecWeak <$> readRefDigest content - _ -> Nothing - return (name, val) + where + splitObjPrefix line = do + [ otype, tlen ] <- return $ BLC.words line + ( len, rest ) <- BLC.readInt tlen + guard $ BL.null rest + return ( BL.toStrict otype, len ) + +parseRecordBody :: Storage' c -> ByteString -> Maybe [ ( ByteString, RecItem' c ) ] +parseRecordBody _ body | B.null body = Just [] +parseRecordBody st body = do + colon <- BC.elemIndex ':' body + space <- BC.elemIndex ' ' $ B.drop (colon + 1) body + let name = B.take colon body + itype = B.take space $ B.drop (colon + 1) body + ( content, remainingBody ) <- parseTabEscapedLines $ B.drop (space + colon + 2) body + + let val = fromMaybe (RecUnknown itype content) $ + case BC.unpack itype of + "e" -> do guard $ B.null content + return RecEmpty + "i" -> do ( num, rest ) <- BC.readInteger content + guard $ B.null rest + return $ RecInt num + "n" -> RecNum <$> parseRatio content + "t" -> return $ RecText $ decodeUtf8With lenientDecode content + "b" -> RecBinary <$> readHex content + "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content) + "u" -> RecUUID <$> U.fromASCIIBytes content + "r" -> RecRef . Ref st <$> readRefDigest content + "w" -> RecWeak <$> readRefDigest content + _ -> Nothing + (( name, val ) :) <$> parseRecordBody st remainingBody + +-- Split given ByteString on the first newline not preceded by tab; replace +-- "\t\n" in the first part with "\n". +parseTabEscapedLines :: ByteString -> Maybe ( ByteString, ByteString ) +parseTabEscapedLines = parseLines [] + where + parseLines linesReversed cur = do + newline <- BC.elemIndex '\n' cur + case BC.indexMaybe cur (newline + 1) of + Just '\t' -> parseLines (B.take (newline + 1) cur : linesReversed) (B.drop (newline + 2) cur) + _ -> Just ( BC.concat $ reverse $ B.take newline cur : linesReversed, B.drop (newline + 1) cur ) + +parseOnDemand :: Storage' c -> ByteString -> Maybe (Object' c) +parseOnDemand _ body = do + newline1 <- BC.elemIndex '\n' body + newline2 <- BC.elemIndex '\n' $ B.drop (newline1 + 1) body + guard (newline1 + newline2 + 2 == B.length body) + ( size, sizeRest ) <- BC.readWord64 (B.take newline1 body) + guard (B.null sizeRest) + dgst <- readRefDigest $ B.take newline2 $ B.drop (newline1 + 1) body + return $ OnDemand size dgst + deserializeObject :: PartialStorage -> BL.ByteString -> Except ErebosError (PartialObject, BL.ByteString) deserializeObject = unsafeDeserializeObject @@ -332,10 +365,12 @@ class Storable a where class Storable a => ZeroStorable a where fromZero :: Storage -> a -data Store = StoreBlob ByteString - | StoreRec (forall c. StorageCompleteness c => Storage' c -> [IO [(ByteString, RecItem' c)]]) - | StoreZero - | StoreUnknown ByteString ByteString +data Store + = StoreBlob ByteString + | StoreRec (forall c. StorageCompleteness c => Storage' c -> [IO [(ByteString, RecItem' c)]]) + | StoreOnDemand Word64 RefDigest + | StoreZero + | StoreUnknown ByteString ByteString evalStore :: StorageCompleteness c => Storage' c -> Store -> IO (Ref' c) evalStore st = unsafeStoreObject st <=< evalStoreObject st @@ -343,6 +378,7 @@ evalStore st = unsafeStoreObject st <=< evalStoreObject st evalStoreObject :: StorageCompleteness c => Storage' c -> Store -> IO (Object' c) evalStoreObject _ (StoreBlob x) = return $ Blob x evalStoreObject s (StoreRec f) = Rec . concat <$> sequence (f s) +evalStoreObject _ (StoreOnDemand size dgst) = return $ OnDemand size dgst evalStoreObject _ StoreZero = return ZeroObject evalStoreObject _ (StoreUnknown otype content) = return $ UnknownObject otype content @@ -379,6 +415,7 @@ instance Storable Object where store' (Rec xs) = StoreRec $ \st -> return $ do Rec xs' <- copyObject st (Rec xs) return xs' + store' (OnDemand size dgst) = StoreOnDemand size dgst store' ZeroObject = StoreZero store' (UnknownObject otype content) = StoreUnknown otype content @@ -703,8 +740,6 @@ loadRawWeaks name = mapMaybe p <$> loadRecItems -type Stored a = Stored' Complete a - instance Storable a => Storable (Stored a) where store st = copyRef st . storedRef store' (Stored _ x) = store' x @@ -714,10 +749,10 @@ instance ZeroStorable a => ZeroStorable (Stored a) where fromZero st = Stored (zeroRef st) $ fromZero st fromStored :: Stored a -> a -fromStored (Stored _ x) = x +fromStored = storedObject' storedRef :: Stored a -> Ref -storedRef (Stored ref _) = ref +storedRef = storedRef' wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) wrappedStore st x = do ref <- liftIO $ store st x @@ -726,9 +761,8 @@ wrappedStore st x = do ref <- liftIO $ store st x wrappedLoad :: Storable a => Ref -> Stored a wrappedLoad ref = Stored ref (load ref) -copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => - Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a)) -copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref' +copyStored :: forall m a. MonadIO m => Storage -> Stored a -> m (Stored a) +copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (\r -> Stored r x) <$> copyRef' st ref' -- |Passed function needs to preserve the object representation to be safe unsafeMapStored :: (a -> b) -> Stored a -> Stored b diff --git a/src/Erebos/Pairing.hs b/src/Erebos/Pairing.hs index e3ebf2b..d1fdc79 100644 --- a/src/Erebos/Pairing.hs +++ b/src/Erebos/Pairing.hs @@ -209,7 +209,7 @@ pairingRequest :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e pairingRequest _ peer = do self <- liftIO $ serverIdentity $ peerServer peer nonce <- liftIO $ getRandomBytes 32 - pid <- peerIdentity peer >>= \case + pid <- getPeerIdentity peer >>= \case PeerIdentityFull pid -> return pid _ -> throwOtherError "incomplete peer identity" sendToPeerWith @(PairingService a) peer $ \case diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index fefc503..303f9db 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -51,6 +51,9 @@ class ( serviceNewPeer :: ServiceHandler s () serviceNewPeer = return () + serviceUpdatedPeer :: ServiceHandler s () + serviceUpdatedPeer = return () + type ServiceAttributes s = attr | attr -> s type ServiceAttributes s = Proxy s defaultServiceAttributes :: proxy s -> ServiceAttributes s @@ -104,7 +107,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p) -data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) +data SomeStorageWatcher s + = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ()) + | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT ErebosError IO ()) mkServiceID :: String -> ServiceID @@ -113,6 +118,7 @@ mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString data ServiceInput s = ServiceInput { svcAttributes :: ServiceAttributes s , svcPeer :: Peer + , svcPeerAddress :: PeerAddress , svcPeerIdentity :: UnifiedIdentity , svcServer :: Server , svcPrintOp :: String -> IO () diff --git a/src/Erebos/Set.hs b/src/Erebos/Set.hs index 270c0ba..7453be4 100644 --- a/src/Erebos/Set.hs +++ b/src/Erebos/Set.hs @@ -10,7 +10,6 @@ module Erebos.Set ( ) where import Control.Arrow -import Control.Monad.IO.Class import Data.Function import Data.List @@ -53,14 +52,14 @@ emptySet = Set [] loadSet :: Mergeable a => Ref -> Set a loadSet = mergeSorted . (:[]) . wrappedLoad -storeSetAdd :: (Mergeable a, MonadIO m) => Storage -> a -> Set a -> m (Set a) -storeSetAdd st x (Set prev) = Set . (:[]) <$> wrappedStore st SetItem +storeSetAdd :: (Mergeable a, MonadStorage m) => a -> Set a -> m (Set a) +storeSetAdd x (Set prev) = Set . (: []) <$> mstore SetItem { siPrev = prev , siItem = toComponents x } -storeSetAddComponent :: (Mergeable a, MonadStorage m, MonadIO m) => Stored (Component a) -> Set a -> m (Set a) -storeSetAddComponent component (Set prev) = Set . (:[]) <$> mstore SetItem +storeSetAddComponent :: (Mergeable a, MonadStorage m) => Stored (Component a) -> Set a -> m (Set a) +storeSetAddComponent component (Set prev) = Set . (: []) <$> mstore SetItem { siPrev = prev , siItem = [ component ] } diff --git a/src/Erebos/State.hs b/src/Erebos/State.hs index 076a8c0..06e5c54 100644 --- a/src/Erebos/State.hs +++ b/src/Erebos/State.hs @@ -6,6 +6,7 @@ module Erebos.State ( MonadStorage(..), MonadHead(..), updateLocalHead_, + LocalHeadT(..), updateLocalState, updateLocalState_, updateSharedState, updateSharedState_, @@ -17,9 +18,11 @@ module Erebos.State ( mergeSharedIdentity, ) where +import Control.Monad import Control.Monad.Except import Control.Monad.Reader +import Data.Bifunctor import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.Typeable @@ -66,7 +69,7 @@ instance Storable LocalState where lsPrev <- loadMbRawWeak "PREV" lsIdentity <- loadRef "id" lsShared <- loadRefs "shared" - lsOther <- filter ((`notElem` [ BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems + lsOther <- filter ((`notElem` [ BC.pack "PREV", BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems return LocalState {..} instance HeadType LocalState where @@ -101,6 +104,35 @@ instance (HeadType a, MonadIO m) => MonadHead a (ReaderT (Head a) m) where snd <$> updateHead h f +newtype LocalHeadT h m a = LocalHeadT { runLocalHeadT :: Storage -> Stored h -> m ( a, Stored h ) } + +instance Functor m => Functor (LocalHeadT h m) where + fmap f (LocalHeadT act) = LocalHeadT $ \st h -> first f <$> act st h + +instance Monad m => Applicative (LocalHeadT h m) where + pure x = LocalHeadT $ \_ h -> pure ( x, h ) + (<*>) = ap + +instance Monad m => Monad (LocalHeadT h m) where + return = pure + LocalHeadT act >>= f = LocalHeadT $ \st h -> do + ( x, h' ) <- act st h + let (LocalHeadT act') = f x + act' st h' + +instance MonadIO m => MonadIO (LocalHeadT h m) where + liftIO act = LocalHeadT $ \_ h -> ( , h ) <$> liftIO act + +instance MonadIO m => MonadStorage (LocalHeadT h m) where + getStorage = LocalHeadT $ \st h -> return ( st, h ) + +instance (HeadType h, MonadIO m) => MonadHead h (LocalHeadT h m) where + updateLocalHead f = LocalHeadT $ \st h -> do + let LocalHeadT act = f h + ( ( h', x ), _ ) <- act st h + return ( x, h' ) + + localIdentity :: LocalState -> UnifiedIdentity localIdentity ls = maybe (error "failed to verify local identity") (updateOwners $ maybe [] idExtDataF $ lookupSharedValue $ lsShared ls) @@ -128,12 +160,11 @@ updateSharedState :: forall a b m. (SharedType a, MonadHead LocalState m) => (a updateSharedState f = \ls -> do let shared = lsShared $ fromStored ls val = lookupSharedValue shared - st <- getStorage (val', x) <- f val (,x) <$> if toComponents val' == toComponents val then return ls - else do shared' <- makeSharedStateUpdate st val' shared - wrappedStore st (fromStored ls) { lsShared = [shared'] } + else do shared' <- makeSharedStateUpdate val' shared + mstore (fromStored ls) { lsShared = [shared'] } lookupSharedValue :: forall a. SharedType a => [Stored SharedState] -> a lookupSharedValue = mergeSorted . filterAncestors . map wrappedLoad . concatMap (ssValue . fromStored) . filterAncestors . helper @@ -141,8 +172,8 @@ lookupSharedValue = mergeSorted . filterAncestors . map wrappedLoad . concatMap | otherwise = helper $ ssPrev (fromStored x) ++ xs helper [] = [] -makeSharedStateUpdate :: forall a m. MonadIO m => SharedType a => Storage -> a -> [Stored SharedState] -> m (Stored SharedState) -makeSharedStateUpdate st val prev = liftIO $ wrappedStore st SharedState +makeSharedStateUpdate :: forall a m. (SharedType a, MonadStorage m) => a -> [ Stored SharedState ] -> m (Stored SharedState) +makeSharedStateUpdate val prev = mstore SharedState { ssPrev = prev , ssType = Just $ sharedTypeID @a Proxy , ssValue = storedRef <$> toComponents val diff --git a/src/Erebos/Storable.hs b/src/Erebos/Storable.hs index caaf525..5ccb180 100644 --- a/src/Erebos/Storable.hs +++ b/src/Erebos/Storable.hs @@ -11,6 +11,7 @@ defined here as well. module Erebos.Storable ( Storable(..), ZeroStorable(..), StorableText(..), StorableDate(..), StorableUUID(..), + StorageCompleteness(..), Store, StoreRec, storeBlob, storeRec, storeZero, diff --git a/src/Erebos/Storage/Backend.hs b/src/Erebos/Storage/Backend.hs index 620d423..59097b6 100644 --- a/src/Erebos/Storage/Backend.hs +++ b/src/Erebos/Storage/Backend.hs @@ -9,12 +9,15 @@ module Erebos.Storage.Backend ( Complete, Partial, Storage, PartialStorage, newStorage, + refDigestBytes, WatchID, startWatchID, nextWatchID, ) where import Control.Concurrent.MVar +import Data.ByteArray qualified as BA +import Data.ByteString (ByteString) import Data.HashTable.IO qualified as HT import Erebos.Object.Internal @@ -26,3 +29,7 @@ newStorage stBackend = do stRefGeneration <- newMVar =<< HT.new stRefRoots <- newMVar =<< HT.new return Storage {..} + + +refDigestBytes :: RefDigest -> ByteString +refDigestBytes = BA.convert diff --git a/src/Erebos/Storage/Head.hs b/src/Erebos/Storage/Head.hs index 3239fe0..285902d 100644 --- a/src/Erebos/Storage/Head.hs +++ b/src/Erebos/Storage/Head.hs @@ -113,7 +113,7 @@ loadHeadRaw st@Storage {..} tid hid = do -- | Reload the given head from storage, returning `Head' with updated object, -- or `Nothing' if there is no longer head with the particular ID in storage. reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a)) -reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid +reloadHead (Head hid val) = loadHead (storedStorage val) hid -- | Store a new `Head' of type 'a' in the storage. storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) @@ -232,8 +232,8 @@ watchHeadWith -> (Head a -> b) -- ^ Selector function -> (b -> IO ()) -- ^ Callback -> IO WatchedHead -- ^ Watched head handle -watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do - watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb +watchHeadWith (Head hid val) sel cb = do + watchHeadRaw (storedStorage val) (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb -- | Watch the given head using raw IDs and a selector from `Ref'. watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs index 303beb3..db211bb 100644 --- a/src/Erebos/Storage/Internal.hs +++ b/src/Erebos/Storage/Internal.hs @@ -20,7 +20,7 @@ module Erebos.Storage.Internal ( Generation(..), HeadID(..), HeadTypeID(..), - Stored'(..), storedStorage, + Stored(..), storedStorage, ) where import Control.Arrow @@ -37,6 +37,7 @@ import Data.ByteArray qualified as BA import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL +import Data.Function import Data.HashTable.IO qualified as HT import Data.Hashable import Data.Kind @@ -239,17 +240,20 @@ newtype HeadID = HeadID UUID newtype HeadTypeID = HeadTypeID UUID deriving (Eq, Ord) -data Stored' c a = Stored (Ref' c) a +data Stored a = Stored + { storedRef' :: Ref + , storedObject' :: a + } deriving (Show) -instance Eq (Stored' c a) where - Stored r1 _ == Stored r2 _ = refDigest r1 == refDigest r2 +instance Eq (Stored a) where + (==) = (==) `on` (refDigest . storedRef') -instance Ord (Stored' c a) where - compare (Stored r1 _) (Stored r2 _) = compare (refDigest r1) (refDigest r2) +instance Ord (Stored a) where + compare = compare `on` (refDigest . storedRef') -storedStorage :: Stored' c a -> Storage' c -storedStorage (Stored (Ref st _) _) = st +storedStorage :: Stored a -> Storage +storedStorage = refStorage . storedRef' type Complete = Identity diff --git a/src/Erebos/Storage/Memory.hs b/src/Erebos/Storage/Memory.hs index 677e8c5..26bb181 100644 --- a/src/Erebos/Storage/Memory.hs +++ b/src/Erebos/Storage/Memory.hs @@ -4,7 +4,8 @@ module Erebos.Storage.Memory ( derivePartialStorage, ) where -import Control.Concurrent.MVar +import Control.Concurrent +import Control.Monad import Data.ByteArray (ScrubbedBytes) import Data.ByteString.Lazy qualified as BL @@ -62,14 +63,19 @@ instance (StorageCompleteness c, Typeable p) => StorageBackend (MemoryStorage p backendReplaceHead StorageMemory {..} tid hid expected new = do res <- modifyMVar memHeads $ \hs -> do - ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers - return $ case partition ((==(tid, hid)) . fst) hs of - ( [] , _ ) -> ( hs, Left Nothing ) + case partition ((==(tid, hid)) . fst) hs of + ( [] , _ ) -> return ( hs, Left Nothing ) (( _, dgst ) : _, hs' ) - | dgst == expected -> ((( tid, hid ), new ) : hs', Right ( new, ws )) - | otherwise -> ( hs, Left $ Just dgst ) + | dgst == expected -> do + ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers + return ((( tid, hid ), new ) : hs', Right ( new, ws )) + | otherwise -> do + return ( hs, Left $ Just dgst ) case res of - Right ( dgst, ws ) -> mapM_ ($ dgst) ws >> return (Right dgst) + Right ( dgst, ws ) -> do + void $ forkIO $ do + mapM_ ($ dgst) ws + return (Right dgst) Left x -> return $ Left x backendWatchHead StorageMemory {..} tid hid cb = modifyMVar memWatchers $ return . watchListAdd tid hid cb diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs index 41725af..8221e91 100644 --- a/src/Erebos/Storage/Merge.hs +++ b/src/Erebos/Storage/Merge.hs @@ -7,7 +7,7 @@ module Erebos.Storage.Merge ( compareGeneration, generationMax, storedGeneration, - generations, + generations, generationsBy, ancestors, precedes, precedesOrEquals, @@ -17,6 +17,8 @@ module Erebos.Storage.Merge ( findProperty, findPropertyFirst, + + storedDifference, ) where import Control.Concurrent.MVar @@ -25,6 +27,8 @@ import Data.ByteString.Char8 qualified as BC import Data.HashTable.IO qualified as HT import Data.Kind import Data.List +import Data.List.NonEmpty (NonEmpty) +import Data.List.NonEmpty qualified as NE import Data.Maybe import Data.Set (Set) import Data.Set qualified as S @@ -52,7 +56,7 @@ merge xs = mergeSorted $ filterAncestors xs storeMerge :: (Mergeable a, Storable a) => [Stored (Component a)] -> IO (Stored a) storeMerge [] = error "merge: empty list" -storeMerge xs@(Stored ref _ : _) = wrappedStore (refStorage ref) $ mergeSorted $ filterAncestors xs +storeMerge xs@(x : _) = wrappedStore (storedStorage x) $ mergeSorted $ filterAncestors xs previous :: Storable a => Stored a -> [Stored a] previous (Stored ref _) = case load ref of @@ -100,16 +104,24 @@ storedGeneration x = -- |Returns list of sets starting with the set of given objects and -- intcrementally adding parents. -generations :: Storable a => [Stored a] -> [Set (Stored a)] -generations = unfoldr gen . (,S.empty) - where gen (hs, cur) = case filter (`S.notMember` cur) hs of - [] -> Nothing - added -> let next = foldr S.insert cur added - in Just (next, (previous =<< added, next)) +generations :: Storable a => [ Stored a ] -> NonEmpty (Set (Stored a)) +generations = generationsBy previous + +-- |Returns list of sets starting with the set of given objects and +-- intcrementally adding parents, with the first parameter being +-- a function to get all the parents of given object. +generationsBy :: Ord a => (a -> [ a ]) -> [ a ] -> NonEmpty (Set a) +generationsBy parents xs = NE.unfoldr gen ( xs, S.fromList xs ) + where + gen ( hs, cur ) = ( cur, ) $ + case filter (`S.notMember` cur) (parents =<< hs) of + [] -> Nothing + added -> let next = foldr S.insert cur added + in Just ( added, next ) -- |Returns set containing all given objects and their ancestors ancestors :: Storable a => [Stored a] -> Set (Stored a) -ancestors = last . (S.empty:) . generations +ancestors = NE.last . generations precedes :: Storable a => Stored a -> Stored a -> Bool precedes x y = not $ x `elem` filterAncestors [x, y] @@ -162,3 +174,18 @@ findPropertyFirst sel = fmap (fromJust . sel . fromStored) . listToMaybe . filte findPropHeads :: forall a b. Storable a => (a -> Maybe b) -> Stored a -> [Stored a] findPropHeads sel sobj | Just _ <- sel $ fromStored sobj = [sobj] | otherwise = findPropHeads sel =<< previous sobj + + +-- | Compute symmetrict difference between two stored histories. In other +-- words, return all 'Stored a' objects reachable (via 'previous') from first +-- given set, but not from the second; and vice versa. +storedDifference :: Storable a => [ Stored a ] -> [ Stored a ] -> [ Stored a ] +storedDifference xs' ys' = + let xs = filterAncestors xs' + ys = filterAncestors ys' + + filteredPrevious blocked zs = filterAncestors (previous zs ++ blocked) `diffSorted` blocked + xg = S.toAscList $ NE.last $ generationsBy (filteredPrevious ys) $ filterAncestors (xs ++ ys) `diffSorted` ys + yg = S.toAscList $ NE.last $ generationsBy (filteredPrevious xs) $ filterAncestors (ys ++ xs) `diffSorted` xs + + in xg `mergeUniq` yg diff --git a/src/Erebos/Sync.hs b/src/Erebos/Sync.hs index d837a14..5f5fdec 100644 --- a/src/Erebos/Sync.hs +++ b/src/Erebos/Sync.hs @@ -31,6 +31,7 @@ instance Service SyncService where else return ls serviceNewPeer = notifyPeer . lsShared . fromStored =<< svcGetLocal + serviceUpdatedPeer = serviceNewPeer serviceStorageWatchers _ = (:[]) $ SomeStorageWatcher (lsShared . fromStored) notifyPeer instance Storable SyncService where diff --git a/src/Erebos/Util.hs b/src/Erebos/Util.hs index 0381c3e..0d53e98 100644 --- a/src/Erebos/Util.hs +++ b/src/Erebos/Util.hs @@ -22,15 +22,16 @@ mergeBy cmp (x : xs) (y : ys) = case cmp x y of mergeBy _ xs [] = xs mergeBy _ [] ys = ys -mergeUniqBy :: (a -> a -> Ordering) -> [a] -> [a] -> [a] -mergeUniqBy cmp (x : xs) (y : ys) = case cmp x y of - LT -> x : mergeBy cmp xs (y : ys) - EQ -> x : mergeBy cmp xs ys - GT -> y : mergeBy cmp (x : xs) ys +mergeUniqBy :: (a -> a -> Ordering) -> [ a ] -> [ a ] -> [ a ] +mergeUniqBy cmp (x : xs) (y : ys) = + case cmp x y of + LT -> x : mergeUniqBy cmp xs (y : ys) + EQ -> x : mergeUniqBy cmp xs ys + GT -> y : mergeUniqBy cmp (x : xs) ys mergeUniqBy _ xs [] = xs mergeUniqBy _ [] ys = ys -mergeUniq :: Ord a => [a] -> [a] -> [a] +mergeUniq :: Ord a => [ a ] -> [ a ] -> [ a ] mergeUniq = mergeUniqBy compare diffSorted :: Ord a => [a] -> [a] -> [a] diff --git a/test/attach.test b/test/attach.et index afbdd0e..afbdd0e 100644 --- a/test/attach.test +++ b/test/attach.et diff --git a/test/chatroom.test b/test/chatroom.et index 54f9b2a..54f9b2a 100644 --- a/test/chatroom.test +++ b/test/chatroom.et diff --git a/test/common.et b/test/common.et new file mode 100644 index 0000000..89941f0 --- /dev/null +++ b/test/common.et @@ -0,0 +1,3 @@ +module common + +export def refpat = /blake2#[0-9a-f]*/ diff --git a/test/contact.test b/test/contact.et index 978f8a6..978f8a6 100644 --- a/test/contact.test +++ b/test/contact.et diff --git a/test/discovery.et b/test/discovery.et new file mode 100644 index 0000000..e80a755 --- /dev/null +++ b/test/discovery.et @@ -0,0 +1,218 @@ +module discovery + +def refpat = /blake2#[0-9a-f]*/ + +test ManualDiscovery: + let services = "discovery" + + subnet sd + subnet s1 + subnet s2 + + spawn as pd on sd + spawn as p1 on s1 + spawn as p2 on s2 + send "create-identity Discovery" to pd + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + expect /create-identity-done ref ($refpat).*/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + send "identity-info $p1owner" to p1 + expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase + + expect /create-identity-done ref $refpat.*/ from p2 + expect /create-identity-done ref $refpat.*/ from pd + + # Test discovery using owner and device identities: + for id in [ p1obase, p1base ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + for p in [ p1, p2 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p.node.ip} 29665/ + /peer [12] id .*/ + + send "discovery-connect $id" to p2 + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p + + # Test delayed discovery with new peer + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services" to p + + with p1: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p1.node.ip} 29665/ + /peer [12] id Device1 Owner1/ + + send "discovery-connect $id" to p2 + + with p2: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p2.node.ip} 29665/ + /peer [12] id Device2 Owner2/ + + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + expect from p2: + /peer [0-9]+ addr ${p1.node.ip} 29665/ + /peer [0-9]+ id Device1 Owner1/ + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p + + +test DiscoveryTunnel: + let services = "discovery:tunnel" + + subnet sd + subnet s1 + subnet s2 + + spawn as pd on sd + spawn as p1 on s1 + spawn as p2 on s2 + + for n in [ p1.node, p2.node ]: + shell on n: + nft add table inet filter + nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }' + nft add rule inet filter input 'ct state { established, related } accept' + + send "create-identity Discovery" to pd + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + expect /create-identity-done ref ($refpat).*/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + send "identity-info $p1owner" to p1 + expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase + + expect /create-identity-done ref $refpat.*/ from p2 + expect /create-identity-done ref $refpat.*/ from pd + + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services test-log" to p + + for p in [ p1, p2 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p.node.ip} 29665/ + /peer [12] id .*/ + + send "discovery-tunnel 1 $id" to p2 + + expect /net-ostream-open ${pd.node.ip} 29665 1 1/ from p2 + expect /net-ostream-open ${p1.node.ip} 29665 1 1/ from pd + expect /net-ostream-open ${pd.node.ip} 29665 1 1/ from p1 + expect /net-ostream-open ${p2.node.ip} 29665 1 1/ from pd + + expect from p1: + /peer 2 addr tunnel@.*/ + /peer 2 id Device2 Owner2/ + expect from p2: + /peer 2 addr tunnel@.*/ + /peer 2 id Device1 Owner1/ + + send "peer-drop 2" to p1 + send "peer-drop 2" to p2 + + expect /net-ostream-close-ack ${pd.node.ip} 29665 1 0/ from p2 + expect /net-ostream-close-ack ${p1.node.ip} 29665 1 0/ from pd + expect /net-ostream-close-ack ${pd.node.ip} 29665 1 0/ from p1 + expect /net-ostream-close-ack ${p2.node.ip} 29665 1 0/ from pd + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p + + +test DiscoveryTunnelRefused: + let services = "discovery" + + subnet sd + subnet s1 + subnet s2 + + spawn as pd on sd + spawn as p1 on s1 + spawn as p2 on s2 + + for n in [ p1.node, p2.node ]: + shell on n: + nft add table inet filter + nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }' + nft add rule inet filter input 'ct state { established, related } accept' + + send "create-identity Discovery" to pd + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + + expect /create-identity-done ref ($refpat).*/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + send "identity-info $p1owner" to p1 + expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase + + expect /create-identity-done ref $refpat.*/ from p2 + expect /create-identity-done ref $refpat.*/ from pd + + for id in [ p1obase ]: + for p in [ pd, p1, p2 ]: + send "start-server services $services test-log" to p + + for p in [ p1, p2 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [12] addr ${p.node.ip} 29665/ + /peer [12] id .*/ + + send "discovery-tunnel 1 $id" to p2 + expect /net-ostream-open ${pd.node.ip} 29665 1 1/ from p2 + expect /net-ostream-close-ack ${pd.node.ip} 29665 1 0/ from p2 + + for p in [ pd, p1, p2 ]: + send "stop-server" to p + for p in [ pd, p1, p2 ]: + expect /stop-server-done/ from p diff --git a/test/discovery.test b/test/discovery.test deleted file mode 100644 index 3be6275..0000000 --- a/test/discovery.test +++ /dev/null @@ -1,91 +0,0 @@ -module discovery - -test ManualDiscovery: - let services = "discovery" - let refpat = /blake2#[0-9a-f]*/ - - subnet sd - subnet s1 - subnet s2 - - spawn as pd on sd - spawn as p1 on s1 - spawn as p2 on s2 - send "create-identity Discovery" to pd - send "create-identity Device1 Owner1" to p1 - send "create-identity Device2 Owner2" to p2 - - expect /create-identity-done ref ($refpat).*/ from p1 capture p1id - send "identity-info $p1id" to p1 - expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner - send "identity-info $p1owner" to p1 - expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase - - expect /create-identity-done ref $refpat.*/ from p2 - expect /create-identity-done ref $refpat.*/ from pd - - # Test discovery using owner and device identities: - for id in [ p1obase, p1base ]: - for p in [ pd, p1, p2 ]: - send "start-server services $services" to p - - for p in [ p1, p2 ]: - with p: - send "peer-add ${pd.node.ip}" - expect: - /peer 1 addr ${pd.node.ip} 29665/ - /peer 1 id Discovery/ - expect from pd: - /peer [12] addr ${p.node.ip} 29665/ - /peer [12] id .*/ - - send "discovery-connect $id" to p2 - - expect from p1: - /peer [0-9]+ addr ${p2.node.ip} 29665/ - /peer [0-9]+ id Device2 Owner2/ - expect from p2: - /peer [0-9]+ addr ${p1.node.ip} 29665/ - /peer [0-9]+ id Device1 Owner1/ - - for p in [ pd, p1, p2 ]: - send "stop-server" to p - for p in [ pd, p1, p2 ]: - expect /stop-server-done/ from p - - # Test delayed discovery with new peer - for id in [ p1obase ]: - for p in [ pd, p1, p2 ]: - send "start-server services $services" to p - - with p1: - send "peer-add ${pd.node.ip}" - expect: - /peer 1 addr ${pd.node.ip} 29665/ - /peer 1 id Discovery/ - expect from pd: - /peer [12] addr ${p1.node.ip} 29665/ - /peer [12] id Device1 Owner1/ - - send "discovery-connect $id" to p2 - - with p2: - send "peer-add ${pd.node.ip}" - expect: - /peer 1 addr ${pd.node.ip} 29665/ - /peer 1 id Discovery/ - expect from pd: - /peer [12] addr ${p2.node.ip} 29665/ - /peer [12] id Device2 Owner2/ - - expect from p1: - /peer [0-9]+ addr ${p2.node.ip} 29665/ - /peer [0-9]+ id Device2 Owner2/ - expect from p2: - /peer [0-9]+ addr ${p1.node.ip} 29665/ - /peer [0-9]+ id Device1 Owner1/ - - for p in [ pd, p1, p2 ]: - send "stop-server" to p - for p in [ pd, p1, p2 ]: - expect /stop-server-done/ from p diff --git a/test/graph.et b/test/graph.et new file mode 100644 index 0000000..38ec3c4 --- /dev/null +++ b/test/graph.et @@ -0,0 +1,111 @@ +module graph + +test StoredDifference: + spawn as p1 + with p1: + # ref names: r<level>_<num> + + send: + "store rec" + "num:i 1" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r1_1 + + send: + "store rec" + "PREV:r $r1_1" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r2_1 + + send "stored-difference $r2_1 |" + expect /stored-difference-item $r1_1/ + expect /stored-difference-item $r2_1/ + local: + expect /stored-difference-(.*)/ capture done + guard (done == "done") + + send: + "store rec" + "PREV:r $r2_1" + "num:i 1" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r3_1 + + send "stored-difference $r1_1 | $r3_1" + expect /stored-difference-item $r2_1/ + expect /stored-difference-item $r3_1/ + local: + expect /stored-difference-(.*)/ capture done + guard (done == "done") + + send: + "store rec" + "PREV:r $r2_1" + "num:i 2" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r3_2 + + send: + "store rec" + "PREV:r $r3_1" + "num:i 1" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r4_1 + + send: + "store rec" + "PREV:r $r3_2" + "num:i 2" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r4_2 + + send "stored-difference $r4_1 | $r4_2" + expect /stored-difference-item $r3_1/ + expect /stored-difference-item $r3_2/ + expect /stored-difference-item $r4_1/ + expect /stored-difference-item $r4_2/ + local: + expect /stored-difference-(.*)/ capture done + guard (done == "done") + + + send: + "store rec" + "PREV:r $r2_1" + "num:i 3" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r3_3 + + send: + "store rec" + "PREV:r $r3_2" + "PREV:r $r3_3" + "num:i 3" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r4_3 + + send: + "store rec" + "PREV:r $r3_3" + "num:i 4" + "" + expect /store-done (blake2#[0-9a-f]*)/ capture r4_4 + + send "stored-difference $r4_1 $r4_2 | $r4_3 $r4_4" + expect /stored-difference-item $r3_1/ + expect /stored-difference-item $r3_3/ + expect /stored-difference-item $r4_1/ + expect /stored-difference-item $r4_2/ + expect /stored-difference-item $r4_3/ + expect /stored-difference-item $r4_4/ + local: + expect /stored-difference-(.*)/ capture done + guard (done == "done") + + send "stored-difference $r1_1 $r2_1 $r3_2 $r3_3 | $r4_1 $r4_3" + expect /stored-difference-item $r3_1/ + expect /stored-difference-item $r4_1/ + expect /stored-difference-item $r4_3/ + local: + expect /stored-difference-(.*)/ capture done + guard (done == "done") diff --git a/test/invite.et b/test/invite.et new file mode 100644 index 0000000..bf1a45a --- /dev/null +++ b/test/invite.et @@ -0,0 +1,73 @@ +module invite + +import common + +test InviteContact: + let services = "contact,invite" + + spawn as p1 + spawn as p2 + + send "create-identity Device1 Owner1" to p1 + expect /create-identity-done ref ($refpat)/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + send "identity-info $p1owner" to p1 + expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase + + send "create-identity Device2 Owner2" to p2 + expect /create-identity-done ref ($refpat)/ from p2 capture p2id + + send "start-server services $services" to p1 + send "start-server services $services" to p2 + + expect from p1: + /peer ([0-9]+) addr ${p2.node.ip} 29665/ capture peer1_2 + /peer $peer1_2 id Device2 Owner2/ + + expect from p2: + /peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1 + /peer $peer2_1 id Device1 Owner1/ + + send "invite-contact-create Contact2" to p1 + expect from p1 /invite-contact-create-done ([^ ]+)/ capture token + + with p2: + send "invite-accept 00 $p1obase" + expect /invite-accept-done 00 invalid/ + + send "contact-list" + expect: + /contact-list-(.*)/ capture done + guard (done == "done") + + with p2: + send "invite-accept $token $p1obase" + expect /invite-accept-done $token contact/ + + send "contact-list" + expect: + /contact-list-item [a-z0-9#]+ Owner1 Owner1/ + /contact-list-(.*)/ capture done + guard (done == "done") + + with p2: + send "invite-accept $token $p1obase" + expect /invite-accept-done $token invalid/ + + send "contact-list" + expect: + /contact-list-item [a-z0-9#]+ Owner1 Owner1/ + /contact-list-(.*)/ capture done + guard (done == "done") + + with p1: + expect /invite-accepted 00 $p2id/ + expect /invite-accepted $token $p2id/ + expect /invite-accepted $token $p2id/ + + send "contact-list" + expect: + /contact-list-item [a-z0-9#]+ Contact2 Owner2/ + /contact-list-(.*)/ capture done + guard (done == "done") diff --git a/test/message.et b/test/message.et new file mode 100644 index 0000000..c4b61e3 --- /dev/null +++ b/test/message.et @@ -0,0 +1,340 @@ +module message + +import common + +test DirectMessage: + let services = "contact,dm" + + spawn as p1 + spawn as p2 + send "create-identity Device1 Owner1" to p1 + send "create-identity Device2 Owner2" to p2 + send "start-server services $services" to p1 + send "start-server services $services" to p2 + + expect from p1: + /peer ([0-9]+) addr ${p2.node.ip} 29665/ capture peer1_2 + /peer $peer1_2 id Device2 Owner2/ + + expect from p2: + /peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1 + /peer $peer2_1 id Device1 Owner1/ + + with p1: + send "dm-list-peer $peer1_2" + expect /dm-list-done/ + + # Send messages to peers + + for i in [1..2]: + send "dm-send-peer $peer1_2 hello$i" to p1 + expect /dm-sent from Owner1 new no text hello$i/ from p1 + expect /dm-received from Owner1 new yes text hello$i/ from p2 + + for i in [1..2]: + send "dm-send-peer $peer2_1 hi$i" to p2 + expect /dm-sent from Owner2 new no text hi$i/ from p2 + expect /dm-received from Owner2 new yes text hi$i/ from p1 + + for i in [3..4]: + send "dm-send-peer $peer1_2 hello$i" to p1 + expect /dm-sent from Owner1 new no text hello$i/ from p1 + expect /dm-received from Owner1 new yes text hello$i/ from p2 + send "dm-send-peer $peer2_1 hi$i" to p2 + expect /dm-sent from Owner2 new no text hi$i/ from p2 + expect /dm-received from Owner2 new yes text hi$i/ from p1 + + # Create contacts + + local: + send "contact-request $peer1_2" to p1 + expect /contact-request $peer2_1 ([0-9]*)/ from p2 capture code2 + expect /contact-response $peer1_2 ([0-9]*)/ from p1 capture code1 + guard (code1 == code2) + + send "contact-accept $peer1_2" to p1 + send "contact-accept $peer2_1" to p2 + expect /contact-request-done $peer2_1/ from p2 + expect /contact-response-done $peer1_2/ from p1 + + send "contact-list" to p1 + expect from p1: + /contact-list-item ([a-z0-9#]+) Owner2 Owner2/ capture c1_2 + /contact-list-(.*)/ capture done1_1 + + send "contact-list" to p2 + expect from p2: + /contact-list-item ([a-z0-9#]+) Owner1 Owner1/ capture c2_1 + /contact-list-(.*)/ capture done1_2 + + # Send messages to contacts + + for i in [1..2]: + send "dm-send-contact $c1_2 hello_c_$i" to p1 + expect /dm-sent from Owner1 new no text hello_c_$i/ from p1 + expect /dm-received from Owner1 new yes text hello_c_$i/ from p2 + + for i in [1..2]: + send "dm-send-contact $c2_1 hi_c_$i" to p2 + expect /dm-sent from Owner2 new no text hi_c_$i/ from p2 + expect /dm-received from Owner2 new yes text hi_c_$i/ from p1 + + for i in [3..4]: + send "dm-send-contact $c1_2 hello_c_$i" to p1 + expect /dm-sent from Owner1 new no text hello_c_$i/ from p1 + expect /dm-received from Owner1 new yes text hello_c_$i/ from p2 + send "dm-send-contact $c2_1 hi_c_$i" to p2 + expect /dm-sent from Owner2 new no text hi_c_$i/ from p2 + expect /dm-received from Owner2 new yes text hi_c_$i/ from p1 + + send "dm-list-contact $c1_2" to p1 + send "dm-list-contact $c2_1" to p2 + for p in [p1, p2]: + with p: + for i in [1..4]: + expect /dm-list-item from Owner1 new [a-z]+ text hello_c_$i/ + expect /dm-list-item from Owner2 new [a-z]+ text hi_c_$i/ + for i in [1..4]: + expect /dm-list-item from Owner1 new [a-z]+ text hello$i/ + expect /dm-list-item from Owner2 new [a-z]+ text hi$i/ + expect /dm-list-(.*)/ capture done + guard (done == "done") + + # Reload message history + + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + for p in [p1, p2]: + with p: + send "start-server services $services" + + with p1: + send "contact-list" + expect: + /contact-list-item $c1_2 Owner2 Owner2/ + /contact-list-(.*)/ capture done + guard (done == "done") + + send "dm-list-contact $c1_2" to p1 + send "dm-list-contact $c2_1" to p2 + for p in [p1, p2]: + with p: + for i in [1..4]: + expect /dm-list-item from Owner1 new [a-z]+ text hello_c_$i/ + expect /dm-list-item from Owner2 new [a-z]+ text hi_c_$i/ + for i in [1..4]: + expect /dm-list-item from Owner1 new [a-z]+ text hello$i/ + expect /dm-list-item from Owner2 new [a-z]+ text hi$i/ + expect /dm-list-(.*)/ capture done + guard (done == "done") + + # Send message while offline + + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + send "start-server services $services" to p2 + + send "dm-send-contact $c1_2 while_offline" to p1 + expect /dm-sent from Owner1 new no text while_offline/ from p1 + send "start-server services $services" to p1 + + expect /dm-received from Owner1 new yes text while_offline/ from p2 + + for p in [p1, p2]: + with p: + send "stop-server" + for p in [p1, p2]: + with p: + expect /stop-server-done/ + send "start-server services $services" to p1 + + send "dm-send-contact $c1_2 while_peer_offline" to p1 + expect /dm-sent from Owner1 new no text while_peer_offline/ from p1 + send "start-server services $services" to p2 + + expect /dm-received from Owner1 new yes text while_peer_offline/ from p2 + + +test DirectMessageDiscovery: + let services = "dm,discovery" + + subnet sd + subnet s1 + subnet s2 + subnet s3 + subnet s4 + + spawn on sd as pd + spawn on s1 as p1 + spawn on s2 as p2 + spawn on s3 as p3 + spawn on s4 as p4 + + send "create-identity Discovery" to pd + + send "create-identity Device1 Owner1" to p1 + expect /create-identity-done ref ($refpat)/ from p1 capture p1_id + send "identity-info $p1_id" to p1 + expect /identity-info ref $p1_id base ($refpat) owner ($refpat).*/ from p1 capture p1_base, p1_owner + + send "create-identity Device2 Owner2" to p2 + expect /create-identity-done ref ($refpat)/ from p2 capture p2_id + send "identity-info $p2_id" to p2 + expect /identity-info ref $p2_id base ($refpat) owner ($refpat).*/ from p2 capture p2_base, p2_owner + send "identity-info $p2_owner" to p2 + expect /identity-info ref $p2_owner base ($refpat).*/ from p2 capture p2_obase + + send "create-identity Device3 Owner3" to p3 + expect /create-identity-done ref ($refpat)/ from p3 capture p3_id + send "identity-info $p3_id" to p3 + expect /identity-info ref $p3_id base ($refpat) owner ($refpat).*/ from p3 capture p3_base, p3_owner + + send "create-identity Device4 Owner4" to p4 + expect /create-identity-done ref ($refpat)/ from p4 capture p4_id + send "identity-info $p4_id" to p4 + expect /identity-info ref $p4_id base ($refpat) owner ($refpat).*/ from p4 capture p4_base, p4_owner + + + for p in [ p1, p2, p3, p4 ]: + with p: + send "start-server services $services" + + for p in [ p2, p3, p4 ]: + with p1: + send "peer-add ${p.node.ip}" + expect: + /peer [0-9]+ addr ${p.node.ip} 29665/ + /peer [0-9]+ id Device. Owner./ + expect from p: + /peer 1 addr ${p1.node.ip} 29665/ + /peer 1 id Device1 Owner1/ + + # Make sure p1 has other identities in storage: + for i in [ 1 .. 3 ]: + send "dm-send-peer $i init1" to p1 + for p in [ p2, p3, p4 ]: + expect /dm-received from Owner1 new yes text init1/ from p + send "dm-send-identity $p1_owner init2" to p + expect /dm-received from Owner. new yes text init2/ from p1 + + # Restart servers to remove peers: + for p in [ p1, p2, p3, p4 ]: + with p: + send "stop-server" + for p in [ p1, p2, p3, p4 ]: + with p: + expect /stop-server-done/ + + # Prepare message before peers connect to discovery + send "dm-send-identity $p4_owner hello_to_p4" to p1 + + for p in [ p1, p2, p3, p4, pd ]: + with p: + send "start-server services $services" + + for p in [ p2, p3, p4, p1 ]: + with p: + send "peer-add ${pd.node.ip}" + expect: + /peer 1 addr ${pd.node.ip} 29665/ + /peer 1 id Discovery/ + expect from pd: + /peer [0-9]+ addr ${p.node.ip} 29665/ + /peer [0-9]+ id Device. Owner./ + + multiply_timeout by 2.0 + + # Connect via discovery manually, then send message + send "discovery-connect $p2_obase" to p1 + expect from p1: + /peer [0-9]+ addr ${p2.node.ip} 29665/ + /peer [0-9]+ id Device2 Owner2/ + send "dm-send-identity $p2_owner hello_to_p2" to p1 + expect /dm-received from Owner1 new yes text hello_to_p2/ from p2 + + # Send message, expect automatic discovery + send "dm-send-identity $p3_owner hello_to_p3" to p1 + expect /dm-received from Owner1 new yes text hello_to_p3/ from p3 + + # Verify the first message + expect /dm-received from Owner1 new yes text hello_to_p4/ from p4 + + for p in [ p1, p2, p3, p4, pd ]: + send "stop-server" to p + for p in [ p1, p2, p3, p4, pd ]: + expect /stop-server-done/ from p + + +test DirectMessageSeen: + let services = "dm,attach,sync" + + spawn as p1 + spawn as p2 + + send "create-identity Device1 Owner1" to p1 + expect /create-identity-done ref ($refpat)/ from p1 capture p1id + send "identity-info $p1id" to p1 + expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner + + send "create-identity Device2 Owner2" to p2 + expect /create-identity-done ref ($refpat)/ from p2 capture p2id + send "identity-info $p2id" to p2 + expect /identity-info ref $p2id base ($refpat) owner ($refpat).*/ from p2 capture p2base, p2owner + + send "start-server services $services" to p1 + send "start-server services $services" to p2 + + expect from p1: + /peer ([0-9]+) addr ${p2.node.ip} 29665/ capture peer1_2 + /peer $peer1_2 id Device2 Owner2/ + + expect from p2: + /peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1 + /peer $peer2_1 id Device1 Owner1/ + + for i in [ 1 .. 2 ]: + send "dm-send-peer $peer1_2 msg_a_$i" to p1 + expect /dm-sent from Owner1 new no text msg_a_$i/ from p1 + expect /dm-received from Owner1 new yes text msg_a_$i/ from p2 + + for i in [ 1 .. 2 ]: + send "dm-send-peer $peer2_1 msg_b_$i" to p2 + expect /dm-sent from Owner2 new no text msg_b_$i/ from p2 + expect /dm-received from Owner2 new yes text msg_b_$i/ from p1 + + send "dm-list-identity $p2owner" to p1 + send "dm-list-identity $p1owner" to p2 + for i in [ 1 .. 2 ]: + expect /dm-list-item from Owner1 new no text msg_a_$i/ from p1 + expect /dm-list-item from Owner1 new no text msg_a_$i/ from p2 + for i in [ 1 .. 2 ]: + expect /dm-list-item from Owner2 new yes text msg_b_$i/ from p1 + expect /dm-list-item from Owner2 new no text msg_b_$i/ from p2 + for p in [ p1, p2 ]: + expect /dm-list-(.*)/ from p capture done + guard (done == "done") + + send "dm-mark-seen $p2owner" to p1 + expect /dm-mark-seen-done $p2owner/ from p1 + send "dm-mark-seen $p1owner" to p2 + expect /dm-mark-seen-done $p1owner/ from p2 + + send "dm-list-identity $p2owner" to p1 + send "dm-list-identity $p1owner" to p2 + for i in [1..2]: + expect /dm-list-item from Owner1 new no text msg_a_$i/ from p1 + expect /dm-list-item from Owner1 new no text msg_a_$i/ from p2 + for i in [1..2]: + expect /dm-list-item from Owner2 new no text msg_b_$i/ from p1 + expect /dm-list-item from Owner2 new no text msg_b_$i/ from p2 + for p in [ p1, p2 ]: + expect /dm-list-(.*)/ from p capture done + guard (done == "done") diff --git a/test/message.test b/test/message.test deleted file mode 100644 index c0e251b..0000000 --- a/test/message.test +++ /dev/null @@ -1,151 +0,0 @@ -test DirectMessage: - let services = "contact,dm" - - spawn as p1 - spawn as p2 - send "create-identity Device1 Owner1" to p1 - send "create-identity Device2 Owner2" to p2 - send "start-server services $services" to p1 - send "start-server services $services" to p2 - - expect from p1: - /peer ([0-9]+) addr ${p2.node.ip} 29665/ capture peer1_2 - /peer $peer1_2 id Device2 Owner2/ - - expect from p2: - /peer ([0-9]+) addr ${p1.node.ip} 29665/ capture peer2_1 - /peer $peer2_1 id Device1 Owner1/ - - with p1: - send "dm-list-peer $peer1_2" - expect /dm-list-done/ - - # Send messages to peers - - for i in [1..2]: - send "dm-send-peer $peer1_2 hello$i" to p1 - expect /dm-received from Owner1 text hello$i/ from p2 - - for i in [1..2]: - send "dm-send-peer $peer2_1 hi$i" to p2 - expect /dm-received from Owner2 text hi$i/ from p1 - - for i in [3..4]: - send "dm-send-peer $peer1_2 hello$i" to p1 - expect /dm-received from Owner1 text hello$i/ from p2 - send "dm-send-peer $peer2_1 hi$i" to p2 - expect /dm-received from Owner2 text hi$i/ from p1 - - # Create contacts - - local: - send "contact-request $peer1_2" to p1 - expect /contact-request $peer2_1 ([0-9]*)/ from p2 capture code2 - expect /contact-response $peer1_2 ([0-9]*)/ from p1 capture code1 - guard (code1 == code2) - - send "contact-accept $peer1_2" to p1 - send "contact-accept $peer2_1" to p2 - expect /contact-request-done $peer2_1/ from p2 - expect /contact-response-done $peer1_2/ from p1 - - send "contact-list" to p1 - expect from p1: - /contact-list-item ([a-z0-9#]+) Owner2 Owner2/ capture c1_2 - /contact-list-(.*)/ capture done1_1 - - send "contact-list" to p2 - expect from p2: - /contact-list-item ([a-z0-9#]+) Owner1 Owner1/ capture c2_1 - /contact-list-(.*)/ capture done1_2 - - # Send messages to contacts - - for i in [1..2]: - send "dm-send-contact $c1_2 hello_c_$i" to p1 - expect /dm-received from Owner1 text hello_c_$i/ from p2 - - for i in [1..2]: - send "dm-send-contact $c2_1 hi_c_$i" to p2 - expect /dm-received from Owner2 text hi_c_$i/ from p1 - - for i in [3..4]: - send "dm-send-contact $c1_2 hello_c_$i" to p1 - expect /dm-received from Owner1 text hello_c_$i/ from p2 - send "dm-send-contact $c2_1 hi_c_$i" to p2 - expect /dm-received from Owner2 text hi_c_$i/ from p1 - - send "dm-list-contact $c1_2" to p1 - send "dm-list-contact $c2_1" to p2 - for p in [p1, p2]: - with p: - for i in [1..4]: - expect /dm-list-item from Owner1 text hello_c_$i/ - expect /dm-list-item from Owner2 text hi_c_$i/ - for i in [1..4]: - expect /dm-list-item from Owner1 text hello$i/ - expect /dm-list-item from Owner2 text hi$i/ - expect /dm-list-(.*)/ capture done - guard (done == "done") - - # Reload message history - - for p in [p1, p2]: - with p: - send "stop-server" - for p in [p1, p2]: - with p: - expect /stop-server-done/ - for p in [p1, p2]: - with p: - send "start-server services $services" - - with p1: - send "contact-list" - expect: - /contact-list-item $c1_2 Owner2 Owner2/ - /contact-list-(.*)/ capture done - guard (done == "done") - - send "dm-list-contact $c1_2" to p1 - send "dm-list-contact $c2_1" to p2 - for p in [p1, p2]: - with p: - for i in [1..4]: - expect /dm-list-item from Owner1 text hello_c_$i/ - expect /dm-list-item from Owner2 text hi_c_$i/ - for i in [1..4]: - expect /dm-list-item from Owner1 text hello$i/ - expect /dm-list-item from Owner2 text hi$i/ - expect /dm-list-(.*)/ capture done - guard (done == "done") - - # Send message while offline - - for p in [p1, p2]: - with p: - send "stop-server" - for p in [p1, p2]: - with p: - expect /stop-server-done/ - send "start-server services $services" to p2 - - send "dm-send-contact $c1_2 while_offline" to p1 - send "start-server services $services" to p1 - - expect /dm-received from Owner1 text while_offline/ from p2 - - for p in [p1, p2]: - with p: - send "stop-server" - for p in [p1, p2]: - with p: - expect /stop-server-done/ - send "start-server services $services" to p1 - - send "dm-send-contact $c1_2 while_peer_offline" to p1 - # TODO: sync from p1 on peer p2 discovery not ensured without addition wait - #wait - send "start-server services $services" to p2 - - expect /dm-received from Owner1 text while_peer_offline/ from p2 diff --git a/test/network.test b/test/network.et index 0f49a1e..a670f35 100644 --- a/test/network.test +++ b/test/network.et @@ -189,8 +189,8 @@ test ServiceStreams: spawn as p2 send "create-identity Device1" to p1 send "create-identity Device2" to p2 - send "start-server services $services" to p1 - send "start-server services $services" to p2 + send "start-server services $services test-log" to p1 + send "start-server services $services test-log" to p2 expect from p1: /peer 1 addr ${p2.node.ip} 29665/ /peer 1 id Device2/ @@ -202,6 +202,8 @@ test ServiceStreams: expect /test-stream-open-done 1 ([0-9]+)/ from p1 capture stream1 expect /test-stream-open-from 1 $stream1/ from p2 + expect /net-ostream-open ${p2.node.ip} 29665 1 1/ from p1 + send "test-stream-send 1 $stream1 hello" to p1 expect /test-stream-send-done 1 $stream1/ from p1 expect /test-stream-received 1 $stream1 0 hello/ from p2 @@ -210,12 +212,18 @@ test ServiceStreams: expect /test-stream-close-done 1 $stream1/ from p1 expect /test-stream-closed-from 1 $stream1 1/ from p2 + expect /net-ostream-close-send ${p2.node.ip} 29665 1/ from p1 + expect /net-ostream-close-ack ${p2.node.ip} 29665 1 0/ from p1 + send "test-stream-open 1 8" to p2 expect /test-stream-open-done 1 ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)/ from p2 capture stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 expect /test-stream-open-from 1 $stream2_1 $stream2_2 $stream2_3 $stream2_4 $stream2_5 $stream2_6 $stream2_7 $stream2_8/ from p1 let streams2 = [ stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 ] with p2: + expect /net-ostream-open ${p1.node.ip} 29665 . 8/ + flush matching /net-ostream-open ${p1.node.ip} 29665.*/ + for i in [ 1..20 ]: for s in streams2: send "test-stream-send 1 $s hello$i" @@ -226,6 +234,9 @@ test ServiceStreams: send "test-stream-close 1 $s" for s in streams2: expect /test-stream-close-done 1 $s/ + + expect /net-ostream-close-ack ${p1.node.ip} 29665 . 0/ + flush matching /net-ostream-close-[a-z]* ${p1.node.ip} 29665.*/ with p1: for i in [ 1..20 ]: for s in streams2: diff --git a/test/storage.test b/test/storage.et index 2230eac..845971b 100644 --- a/test/storage.test +++ b/test/storage.et @@ -1,3 +1,5 @@ +import common + test Storage: spawn as p1 @@ -432,8 +434,6 @@ test SharedStateWatcher: test LocalStateKeepUnknown: - let refpat = /blake2#[0-9a-f]*/ - spawn as p with p: send "create-identity Device" @@ -480,8 +480,6 @@ test LocalStateKeepUnknown: test UnknownObjectType: - let refpat = /blake2#[0-9a-f]*/ - spawn as p spawn as p2 on p.node @@ -502,8 +500,6 @@ test UnknownObjectType: test UnknownRecordItemType: - let refpat = /blake2#[0-9a-f]*/ - spawn as p spawn as p2 on p.node @@ -521,3 +517,109 @@ test UnknownRecordItemType: local: expect /load-(.*)/ capture done guard (done == "done") + + +test ObjectFormat: + spawn as p + with p: + # Empty blob + local: + send "store-raw EOF" + send "blob 0\n" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "blob") + + # Small blob + local: + send "store-raw EOF" + send "blob 2\nab" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "blob") + + let empty_rec_ref = "blake2#6027623e8817cd2d214cc754caaa71f50190a1e5feeb9d9107c8aeabb189fbb2" + + # Empty record + local: + send "store-raw EOF" + send "rec 0\n" + send "EOF" + expect /store-done ($refpat)/ capture r + guard (r == empty_rec_ref) + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "rec") + + # Small record + local: + send "store-raw EOF" + send "rec 28\nnum:n 1\ntext:t abc\nempty:e \n" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "rec") + + # Record with multiline items + local: + send "store-raw EOF" + send "rec 34\nfirst:t abc\n\tdef\nsecond:t \tx\n\ty\tz\n" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "rec") + + # Record with unknown type + local: + send "store-raw EOF" + send "rec 14\nnum:UNKNOWN 1\n" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "rec") + + # Invalid records + for content in [ "rec 6\nnum 1\n", "rec 6\nnum:n\n", "rec 7\nnum:n 1" ]: + send "store-raw EOF" + send "$content" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "unknown rec") + + # Empty unknown + local: + send "store-raw EOF" + send "test-unknown 0\n" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "unknown test-unknown") + + # Ondemand object + local: + send "store-raw EOF" + send "ondemand 74\n6\n$empty_rec_ref\n" + send "EOF" + expect /store-done ($refpat)/ capture r + + send "load-type $r" + expect /load-type (.*)/ capture type + guard (type == "ondemand") diff --git a/test/sync.test b/test/sync.et index d465b11..d465b11 100644 --- a/test/sync.test +++ b/test/sync.et |