summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md14
-rw-r--r--README.md109
-rw-r--r--erebos.cabal17
-rw-r--r--main/Main.hs277
-rw-r--r--main/Test.hs41
-rw-r--r--src/Erebos/Chatroom.hs216
-rw-r--r--src/Erebos/Conversation.hs36
-rw-r--r--src/Erebos/Identity.hs33
-rw-r--r--src/Erebos/Network.hs120
-rw-r--r--src/Erebos/Network/Protocol.hs150
-rw-r--r--src/Erebos/Network/ifaddrs.c132
-rw-r--r--src/Erebos/Network/ifaddrs.h2
-rw-r--r--src/Erebos/Storage/Internal.hs6
-rw-r--r--src/Erebos/Storage/Key.hs7
-rw-r--r--src/Erebos/Storage/Merge.hs7
-rw-r--r--src/windows/Erebos/Storage/Platform.hs13
-rw-r--r--test/chatroom.test326
-rw-r--r--test/network.test56
18 files changed, 1344 insertions, 218 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bc1ea1d..de69a6e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,19 @@
# Revision history for erebos
+## 0.1.6 -- 2024-08-12
+
+* Chatroom members list and join/leave commands
+* Fix sending multiple data responses in a stream
+* Added `--storage`/`--memory-storage` command-line options
+* Compatibility with GHC up to 9.10
+* Local discovery with IPv6
+
+## 0.1.5 -- 2024-07-16
+
+* Public chatrooms for multiple participants
+* Send keep-alive packets on idle connection
+* Windows support
+
## 0.1.4 -- 2024-06-11
* Added `/conversations` command to list and select conversations
diff --git a/README.md b/README.md
index 82bd329..9535aab 100644
--- a/README.md
+++ b/README.md
@@ -60,7 +60,8 @@ Add public peer:
[1] PEER UPD discovery1.erebosprotocol.net [37.221.243.57 29665]
```
-Select the peer and send it a message, the public server just responds with automatic echo message:
+Select the peer and send it a message, the public server just responds with
+automatic echo message:
```
> /1
discovery1.erebosprotocol.net> hello
@@ -68,30 +69,68 @@ discovery1.erebosprotocol.net> hello
[18:55] discovery1.erebosprotocol.net: Echo: hello
```
+List chatrooms known to the peers:
+```
+> /chatrooms
+[1] Test chatroom
+[2] Second test chatroom
+```
+
+Enter a chatroom and send a message there:
+```
+> /1
+Test chatroom> Hi
+Test chatroom [19:03] Some Name: Hi
+```
+
### Messaging
`/peers`
-List peers with direct network connection. Peers are discovered automatically
-on local network or can be manually added.
+: List peers with direct network connection. Peers are discovered automatically
+ on local network or can be manually added.
`/contacts`
-List known contacts (see below).
+: List known contacts (see below).
`/conversations`
-List started conversations with contacts or other peers.
+: List started conversations with contacts or other peers.
`/<number>`
-Select conversation, contact or peer `<number>` based on the last
-`/conversations`, `/contacts` or `/peers` output list.
+: Select conversation, contact or peer `<number>` based on the last
+ `/conversations`, `/contacts` or `/peers` output list.
`<message>`
-Send `<message>` to selected conversation.
+: Send `<message>` to selected conversation.
`/history`
-Show message history of the selected conversation.
+: Show message history of the selected conversation.
`/details`
-Show information about the selected conversations, contact or peer.
+: Show information about the selected conversations, contact or peer.
+
+### Chatrooms
+
+Currently only public unmoderated chatrooms are supported, which means that any
+network peer is allowed to read and post to the chatroom. Individual messages
+are signed, so message author can not be forged.
+
+`/chatrooms`
+: List known chatrooms.
+
+`/chatroom-create-public [<name>]`
+: Create public unmoderated chatroom. Room name can be passed as command
+ argument or entered interactively.
+
+`/members`
+: List members of the chatroom – usesers who sent any message or joined via the
+`join` command.
+
+`/join`
+: Join chatroom without sending text message.
+
+`/leave`
+: Leave the chatroom. User will no longer be listed as a member and erebos tool
+ will no longer collect message of this chatroom.
### Add contacts
@@ -101,21 +140,22 @@ contacts to contact list (similar to bluetooth device pairing). Before adding
new contact, list peers using `/peers` command and select one with `/<number>`.
`/contacts`
-List already added contacts.
+: List already added contacts.
`/contact-add`
-Add selected peer as contact. Six-digit verification code will be computed
-based on peer keys, which will be displayed on both devices and needs to be
-checked that both numbers are same. After that it needs to be confirmed using
-`/contact-accept` to finish the process.
+: Add selected peer as contact. Six-digit verification code will be computed
+ based on peer keys, which will be displayed on both devices and needs to be
+ checked that both numbers are same. After that it needs to be confirmed using
+ `/contact-accept` to finish the process.
`/contact-accept`
-Confirm that displayed verification codes are same on both devices and add the
-selected peer as contact. The side, which did not initiate the contact adding
-process, needs to select the corresponding peer with `/<number>` command first.
+: Confirm that displayed verification codes are same on both devices and add
+ the selected peer as contact. The side, which did not initiate the contact
+ adding process, needs to select the corresponding peer with `/<number>`
+ command first.
`/contact-reject`
-Reject contact request or verification code of selected peer.
+: Reject contact request or verification code of selected peer.
### Attach other devices
@@ -134,37 +174,38 @@ Before attaching device, list peers using `/peers` command and select the
target device with `/<number>`.
`/attach`
-Attach current device to the selected peer. After the process completes the
-owner of the selected peer will become owner of this device as well. Six-digit
-verification code will be displayed on both devices and the user needs to check
-that both are the same before confirmation using the `/attach-accept` command.
+: Attach current device to the selected peer. After the process completes the
+ owner of the selected peer will become owner of this device as well.
+ Six-digit verification code will be displayed on both devices and the user
+ needs to check that both are the same before confirmation using the
+ `/attach-accept` command.
`/attach-accept`
-Confirm that displayed verification codes are same on both devices and complete
-the attachment process (or wait for the confirmation on the peer device). The
-side, which did not initiate the attachment process, needs to select the
-corresponding peer with `/<number>` command first.
+: Confirm that displayed verification codes are same on both devices and
+ complete the attachment process (or wait for the confirmation on the peer
+ device). The side, which did not initiate the attachment process, needs to
+ select the corresponding peer with `/<number>` command first.
`/attach-reject`
-Reject device attachment request or verification code of selected peer.
+: Reject device attachment request or verification code of selected peer.
### Other
`/peer-add <host> [<port>]`
-Manually add network peer with given hostname or IP address.
+: Manually add network peer with given hostname or IP address.
`/peer-add-public`
-Add known public network peer(s).
+: Add known public network peer(s).
`/peer-drop`
-Drop the currently selected peer. Afterwards, the connection can be
-re-established by either side.
+: Drop the currently selected peer. Afterwards, the connection can be
+ re-established by either side.
`/update-identity`
-Interactively update current identity information
+: Interactively update current identity information
`/quit`
-Quit the erebos tool.
+: Quit the erebos tool.
Storage
diff --git a/erebos.cabal b/erebos.cabal
index 472811f..2629048 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -1,7 +1,7 @@
Cabal-Version: 3.0
Name: erebos
-Version: 0.1.4
+Version: 0.1.6
Synopsis: Decentralized messaging and synchronization
Description:
Library and simple CLI interface implementing the Erebos identity
@@ -54,7 +54,7 @@ common common
-Wno-error=unused-imports
build-depends:
- base >=4.13 && <4.20,
+ base ^>= { 4.15, 4.16, 4.17, 4.18, 4.19, 4.20 },
default-extensions:
DefaultSignatures
@@ -116,7 +116,6 @@ library
Erebos.Storage.Internal
other-modules:
Erebos.Flow
- Erebos.Storage.List
Erebos.Storage.Platform
Erebos.Util
@@ -144,7 +143,7 @@ library
bytestring >=0.10 && <0.13,
clock >=0.8 && < 0.9,
containers >= 0.6 && <0.8,
- cryptonite >=0.25 && <0.31,
+ crypton ^>= { 1.0 },
deepseq >= 1.4 && <1.6,
directory >= 1.3 && <1.4,
filepath >=1.4 && <1.6,
@@ -161,7 +160,11 @@ library
uuid >=1.3 && <1.4,
zlib >=0.6 && <0.8
- if true
+ if os(windows)
+ hs-source-dirs: src/windows
+ build-depends:
+ Win32 ^>= { 2.14 },
+ else
hs-source-dirs: src/unix
build-depends:
unix ^>= { 2.7, 2.8 },
@@ -184,14 +187,14 @@ executable erebos
build-depends:
bytestring,
- cryptonite,
+ crypton,
directory,
erebos,
haskeline >=0.7 && <0.9,
mtl,
network,
process >=1.6 && <1.7,
- template-haskell >=2.17 && <2.22,
+ template-haskell ^>= { 2.17, 2.18, 2.19, 2.20, 2.21, 2.22 },
text,
time,
transformers >= 0.5 && <0.7,
diff --git a/main/Main.hs b/main/Main.hs
index 0eb414c..94c0418 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -24,6 +24,7 @@ import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding qualified as T
import Data.Text.IO qualified as T
+import Data.Time.Format
import Data.Time.LocalTime
import Data.Typeable
@@ -37,6 +38,7 @@ import System.IO
import Erebos.Attach
import Erebos.Contact
+import Erebos.Chatroom
import Erebos.Conversation
#ifdef ENABLE_ICE_SUPPORT
import Erebos.Discovery
@@ -59,11 +61,17 @@ import Version
data Options = Options
{ optServer :: ServerOptions
, optServices :: [ServiceOption]
+ , optStorage :: StorageOption
+ , optChatroomAutoSubscribe :: Maybe Int
, optDmBotEcho :: Maybe Text
, optShowHelp :: Bool
, optShowVersion :: Bool
}
+data StorageOption = DefaultStorage
+ | FilesystemStorage FilePath
+ | MemoryStorage
+
data ServiceOption = ServiceOption
{ soptName :: String
, soptService :: SomeService
@@ -75,6 +83,8 @@ defaultOptions :: Options
defaultOptions = Options
{ optServer = defaultServerOptions
, optServices = availableServices
+ , optStorage = DefaultStorage
+ , optChatroomAutoSubscribe = Nothing
, optDmBotEcho = Nothing
, optShowHelp = False
, optShowVersion = False
@@ -86,6 +96,8 @@ availableServices =
True "attach (to) other devices"
, ServiceOption "sync" (someService @SyncService Proxy)
True "synchronization with attached devices"
+ , ServiceOption "chatroom" (someService @ChatroomService Proxy)
+ True "chatrooms with multiple participants"
, ServiceOption "contact" (someService @ContactService Proxy)
True "create contacts with network peers"
, ServiceOption "dm" (someService @DirectMessage Proxy)
@@ -104,6 +116,15 @@ options =
, Option ['s'] ["silent"]
(NoArg (so $ \opts -> opts { serverLocalDiscovery = False }))
"do not send announce packets for local discovery"
+ , Option [] [ "storage" ]
+ (ReqArg (\path -> \opts -> opts { optStorage = FilesystemStorage path }) "<path>")
+ "use storage in <path>"
+ , Option [] [ "memory-storage" ]
+ (NoArg (\opts -> opts { optStorage = MemoryStorage }))
+ "use memory storage"
+ , Option [] ["chatroom-auto-subscribe"]
+ (ReqArg (\count -> \opts -> opts { optChatroomAutoSubscribe = Just (read count) }) "<count>")
+ "automatically subscribe for up to <count> chatrooms"
, Option [] ["dm-bot-echo"]
(ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>")
"automatically reply to direct messages with the same text prefixed with <prefix>"
@@ -133,8 +154,20 @@ servicesOptions = concatMap helper $ "all" : map soptName availableServices
main :: IO ()
main = do
- st <- liftIO $ openStorage . fromMaybe "./.erebos" =<< lookupEnv "EREBOS_DIR"
- getArgs >>= \case
+ (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case
+ (o, args, []) -> do
+ return (foldl (flip id) defaultOptions o, args)
+ (_, _, errs) -> do
+ progName <- getProgName
+ hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information."
+ exitFailure
+
+ st <- liftIO $ case optStorage opts of
+ DefaultStorage -> openStorage . fromMaybe "./.erebos" =<< lookupEnv "EREBOS_DIR"
+ FilesystemStorage path -> openStorage path
+ MemoryStorage -> memoryStorage
+
+ case args of
["cat-file", sref] -> do
readRef st (BC.pack sref) >>= \case
Nothing -> error "ref does not exist"
@@ -150,7 +183,7 @@ main = do
forM_ (signedSignature signed) $ \sig -> do
putStr $ "SIG "
BC.putStrLn $ showRef $ storedRef $ sigKey $ fromStored sig
- "identity" -> case validateIdentityF (wrappedLoad <$> refs) of
+ "identity" -> case validateExtendedIdentityF (wrappedLoad <$> refs) of
Just identity -> do
let disp :: Identity m -> IO ()
disp idt = do
@@ -160,7 +193,7 @@ main = do
case idOwner idt of
Nothing -> return ()
Just owner -> do
- mapM_ (putStrLn . ("OWNER " ++) . BC.unpack . showRefDigest . refDigest . storedRef) $ idDataF owner
+ mapM_ (putStrLn . ("OWNER " ++) . BC.unpack . showRefDigest . refDigest . storedRef) $ idExtDataF owner
disp owner
disp identity
Nothing -> putStrLn $ "Identity verification failed"
@@ -184,32 +217,30 @@ main = do
["test"] -> runTestTool st
- args -> case getOpt Permute (options ++ servicesOptions) args of
- (o, [], []) -> do
- let opts = foldl (flip id) defaultOptions o
- header = "Usage: erebos [OPTION...]"
- serviceDesc ServiceOption {..} = padService (" " <> soptName) <> soptDescription
-
- padTo n str = str <> replicate (n - length str) ' '
- padOpt = padTo 37
- padService = padTo 16
-
- if | optShowHelp opts -> putStr $ usageInfo header options <> unlines
- (
- [ padOpt " --enable-<service>" <> "enable network service <service>"
- , padOpt " --disable-<service>" <> "disable network service <service>"
- , padOpt " --enable-all" <> "enable all network services"
- , padOpt " --disable-all" <> "disable all network services"
- , ""
- , "Available network services:"
- ] ++ map serviceDesc availableServices
- )
- | optShowVersion opts -> putStrLn versionLine
- | otherwise -> interactiveLoop st opts
- (_, _, errs) -> do
- progName <- getProgName
- hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information."
- exitFailure
+ [] -> do
+ let header = "Usage: erebos [OPTION...]"
+ serviceDesc ServiceOption {..} = padService (" " <> soptName) <> soptDescription
+
+ padTo n str = str <> replicate (n - length str) ' '
+ padOpt = padTo 37
+ padService = padTo 16
+
+ if | optShowHelp opts -> putStr $ usageInfo header options <> unlines
+ (
+ [ padOpt " --enable-<service>" <> "enable network service <service>"
+ , padOpt " --disable-<service>" <> "disable network service <service>"
+ , padOpt " --enable-all" <> "enable all network services"
+ , padOpt " --disable-all" <> "disable all network services"
+ , ""
+ , "Available network services:"
+ ] ++ map serviceDesc availableServices
+ )
+ | optShowVersion opts -> putStrLn versionLine
+ | otherwise -> interactiveLoop st opts
+
+ (cmdname : _) -> do
+ hPutStrLn stderr $ "Unknown command `" <> cmdname <> "'"
+ exitFailure
inputSettings :: Settings IO
@@ -222,8 +253,10 @@ interactiveLoop st opts = runInputT inputSettings $ do
tui <- haveTerminalUI
extPrint <- getExternalPrint
- let extPrintLn str = extPrint $ case reverse str of ('\n':_) -> str
- _ -> str ++ "\n";
+ let extPrintLn str = do
+ let str' = case reverse str of ('\n':_) -> str
+ _ -> str ++ "\n";
+ extPrint $! str' -- evaluate str before calling extPrint to avoid blinking
let getInputLinesTui eprompt = do
prompt <- case eprompt of
@@ -235,6 +268,7 @@ interactiveLoop st opts = runInputT inputSettings $ do
PeerIdentityRef wref _ -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">"
PeerIdentityUnknown _ -> "<unknown>"
SelectedContact contact -> return $ T.unpack $ contactName contact
+ SelectedChatroom rstate -> return $ T.unpack $ fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate
SelectedConversation conv -> return $ T.unpack $ conversationName conv
return $ pname ++ "> "
Right prompt -> return prompt
@@ -281,13 +315,20 @@ interactiveLoop st opts = runInputT inputSettings $ do
Right reply -> extPrintLn $ formatDirectMessage tzone $ fromStored reply
Left err -> extPrintLn $ "Failed to send dm echo: " <> err
+ peers <- liftIO $ newMVar []
+ contextOptions <- liftIO $ newMVar []
+ chatroomSetVar <- liftIO $ newEmptyMVar
+
+ let autoSubscribe = optChatroomAutoSubscribe opts
+ chatroomList = fromSetBy (comparing roomStateData) . lookupSharedValue . lsShared . headObject $ erebosHead
+ watched <- if isJust autoSubscribe || any roomStateSubscribe chatroomList
+ then fmap Just $ liftIO $ watchChatroomsForCli extPrintLn erebosHead chatroomSetVar contextOptions autoSubscribe
+ else return Nothing
+
server <- liftIO $ do
startServer (optServer opts) erebosHead extPrintLn $
map soptService $ filter soptEnabled $ optServices opts
- peers <- liftIO $ newMVar []
- contextOptions <- liftIO $ newMVar []
-
void $ liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange server
peerIdentity peer >>= \case
@@ -320,11 +361,14 @@ interactiveLoop st opts = runInputT inputSettings $ do
{ ciServer = server
, ciLine = line
, ciPrint = extPrintLn
+ , ciOptions = opts
, ciPeers = liftIO $ modifyMVar peers $ \ps -> do
ps' <- filterM (fmap not . isPeerDropped . fst) ps
return (ps', ps')
, ciContextOptions = liftIO $ readMVar contextOptions
, ciSetContextOptions = \ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ctxs
+ , ciContextOptionsVar = contextOptions
+ , ciChatroomSetVar = chatroomSetVar
}
case res of
Right cstate'
@@ -343,6 +387,7 @@ interactiveLoop st opts = runInputT inputSettings $ do
, csIceSessions = []
#endif
, csIcePeer = Nothing
+ , csWatchChatrooms = watched
, csQuit = False
}
@@ -351,9 +396,12 @@ data CommandInput = CommandInput
{ ciServer :: Server
, ciLine :: String
, ciPrint :: String -> IO ()
+ , ciOptions :: Options
, ciPeers :: CommandM [(Peer, String)]
, ciContextOptions :: CommandM [CommandContext]
, ciSetContextOptions :: [CommandContext] -> Command
+ , ciContextOptionsVar :: MVar [ CommandContext ]
+ , ciChatroomSetVar :: MVar (Set ChatroomState)
}
data CommandState = CommandState
@@ -363,12 +411,14 @@ data CommandState = CommandState
, csIceSessions :: [IceSession]
#endif
, csIcePeer :: Maybe Peer
+ , csWatchChatrooms :: Maybe WatchedHead
, csQuit :: Bool
}
data CommandContext = NoContext
| SelectedPeer Peer
| SelectedContact Contact
+ | SelectedChatroom ChatroomState
| SelectedConversation Conversation
newtype CommandM a = CommandM (ReaderT CommandInput (StateT CommandState (ExceptT String IO)) a)
@@ -402,6 +452,11 @@ getSelectedPeer = gets csContext >>= \case
SelectedPeer peer -> return peer
_ -> throwError "no peer selected"
+getSelectedChatroom :: CommandM ChatroomState
+getSelectedChatroom = gets csContext >>= \case
+ SelectedChatroom rstate -> return rstate
+ _ -> throwError "no chatroom selected"
+
getSelectedConversation :: CommandM Conversation
getSelectedConversation = gets csContext >>= \case
SelectedPeer peer -> peerIdentity peer >>= \case
@@ -410,6 +465,10 @@ getSelectedConversation = gets csContext >>= \case
SelectedContact contact -> case contactIdentity contact of
Just cid -> directMessageConversation cid
Nothing -> throwError "contact without erebos identity"
+ SelectedChatroom rstate ->
+ chatroomConversation rstate >>= \case
+ Just conv -> return conv
+ Nothing -> throwError "invalid chatroom"
SelectedConversation conv -> reloadConversation conv
_ -> throwError "no contact, peer or conversation selected"
@@ -425,6 +484,8 @@ commands =
, ("attach", cmdAttach)
, ("attach-accept", cmdAttachAccept)
, ("attach-reject", cmdAttachReject)
+ , ("chatrooms", cmdChatrooms)
+ , ("chatroom-create-public", cmdChatroomCreatePublic)
, ("contacts", cmdContacts)
, ("contact-add", cmdContactAdd)
, ("contact-accept", cmdContactAccept)
@@ -440,6 +501,9 @@ commands =
, ("ice-connect", cmdIceConnect)
, ("ice-send", cmdIceSend)
#endif
+ , ("join", cmdJoin)
+ , ("leave", cmdLeave)
+ , ("members", cmdMembers)
, ("select", cmdSelectContext)
, ("quit", cmdQuit)
]
@@ -492,20 +556,41 @@ showPeer pidentity paddr =
PeerIdentityFull pid -> T.unpack $ displayIdentity pid
in name ++ " [" ++ show paddr ++ "]"
+cmdJoin :: Command
+cmdJoin = joinChatroom =<< getSelectedChatroom
+
+cmdLeave :: Command
+cmdLeave = leaveChatroom =<< getSelectedChatroom
+
+cmdMembers :: Command
+cmdMembers = do
+ Just room <- findChatroomByStateData . head . roomStateData =<< getSelectedChatroom
+ forM_ (chatroomMembers room) $ \x -> do
+ liftIO $ putStrLn $ maybe "<unnamed>" T.unpack $ idName x
+
+
cmdSelectContext :: Command
cmdSelectContext = do
n <- read <$> asks ciLine
join (asks ciContextOptions) >>= \ctxs -> if
- | n > 0, (ctx : _) <- drop (n - 1) ctxs -> modify $ \s -> s { csContext = ctx }
+ | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do
+ modify $ \s -> s { csContext = ctx }
+ case ctx of
+ SelectedChatroom rstate -> do
+ when (not (roomStateSubscribe rstate)) $ do
+ chatroomSetSubscribe (head $ roomStateData rstate) True
+ _ -> return ()
| otherwise -> throwError "invalid index"
cmdSend :: Command
cmdSend = void $ do
text <- asks ciLine
conv <- getSelectedConversation
- msg <- sendMessage conv $ T.pack text
- tzone <- liftIO $ getCurrentTimeZone
- liftIO $ putStrLn $ formatMessage tzone msg
+ sendMessage conv (T.pack text) >>= \case
+ Just msg -> do
+ tzone <- liftIO $ getCurrentTimeZone
+ liftIO $ putStrLn $ formatMessage tzone msg
+ Nothing -> return ()
cmdHistory :: Command
cmdHistory = void $ do
@@ -530,6 +615,110 @@ cmdAttachAccept = attachAccept =<< getSelectedPeer
cmdAttachReject :: Command
cmdAttachReject = attachReject =<< getSelectedPeer
+watchChatroomsForCli :: (String -> IO ()) -> Head LocalState -> MVar (Set ChatroomState) -> MVar [ CommandContext ] -> Maybe Int -> IO WatchedHead
+watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe = do
+ subscribedNumVar <- newEmptyMVar
+
+ let ctxUpdate updateType (idx :: Int) rstate = \case
+ SelectedChatroom rstate' : rest
+ | currentRoots <- filterAncestors (concatMap storedRoots $ roomStateData rstate)
+ , any ((`intersectsSorted` currentRoots) . storedRoots) $ roomStateData rstate'
+ -> do
+ eprint $ "[" <> show idx <> "] CHATROOM " <> updateType <> " " <> name
+ return (SelectedChatroom rstate : rest)
+ selected : rest
+ -> do
+ (selected : ) <$> ctxUpdate updateType (idx + 1) rstate rest
+ []
+ -> do
+ eprint $ "[" <> show idx <> "] CHATROOM " <> updateType <> " " <> name
+ return [ SelectedChatroom rstate ]
+ where
+ name = maybe "<unnamed>" T.unpack $ roomName =<< roomStateRoom rstate
+
+ watchChatrooms h $ \set -> \case
+ Nothing -> do
+ let chatroomList = fromSetBy (comparing roomStateData) set
+ (subscribed, notSubscribed) = partition roomStateSubscribe chatroomList
+ subscribedNum = length subscribed
+
+ putMVar chatroomSetVar set
+ putMVar subscribedNumVar subscribedNum
+
+ case autoSubscribe of
+ Nothing -> return ()
+ Just num -> do
+ forM_ (take (num - subscribedNum) notSubscribed) $ \rstate -> do
+ (runExceptT $ flip runReaderT h $ chatroomSetSubscribe (head $ roomStateData rstate) True) >>= \case
+ Right () -> return ()
+ Left err -> eprint err
+
+ Just diff -> do
+ modifyMVar_ chatroomSetVar $ return . const set
+ forM_ diff $ \case
+ AddedChatroom rstate -> do
+ modifyMVar_ contextVar $ ctxUpdate "NEW" 1 rstate
+ modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then (+ 1) else id
+
+ RemovedChatroom rstate -> do
+ modifyMVar_ contextVar $ ctxUpdate "DEL" 1 rstate
+ modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then subtract 1 else id
+
+ UpdatedChatroom oldroom rstate -> do
+ when (any ((\rsd -> not (null (rsdRoom rsd))) . fromStored) (roomStateData rstate)) $ do
+ modifyMVar_ contextVar $ ctxUpdate "UPD" 1 rstate
+ when (any (not . null . rsdMessages . fromStored) (roomStateData rstate)) $ do
+ tzone <- getCurrentTimeZone
+ forM_ (reverse $ getMessagesSinceState rstate oldroom) $ \msg -> do
+ eprint $ concat $
+ [ maybe "<unnamed>" T.unpack $ roomName =<< cmsgRoom msg
+ , formatTime defaultTimeLocale " [%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ cmsgTime msg
+ , maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg
+ , if cmsgLeave msg then " left" else ""
+ , maybe (if cmsgLeave msg then "" else " joined") ((": " ++) . T.unpack) $ cmsgText msg
+ ]
+ modifyMVar_ subscribedNumVar $ return
+ . (if roomStateSubscribe rstate then (+ 1) else id)
+ . (if roomStateSubscribe oldroom then subtract 1 else id)
+
+ensureWatchedChatrooms :: Command
+ensureWatchedChatrooms = do
+ gets csWatchChatrooms >>= \case
+ Nothing -> do
+ eprint <- asks ciPrint
+ h <- gets csHead
+ chatroomSetVar <- asks ciChatroomSetVar
+ contextVar <- asks ciContextOptionsVar
+ autoSubscribe <- asks $ optChatroomAutoSubscribe . ciOptions
+ watched <- liftIO $ watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe
+ modify $ \s -> s { csWatchChatrooms = Just watched }
+ Just _ -> return ()
+
+cmdChatrooms :: Command
+cmdChatrooms = do
+ ensureWatchedChatrooms
+ chatroomSetVar <- asks ciChatroomSetVar
+ chatroomList <- fromSetBy (comparing roomStateData) <$> liftIO (readMVar chatroomSetVar)
+ set <- asks ciSetContextOptions
+ set $ map SelectedChatroom chatroomList
+ forM_ (zip [1..] chatroomList) $ \(i :: Int, rstate) -> do
+ liftIO $ putStrLn $ "[" ++ show i ++ "] " ++ maybe "<unnamed>" T.unpack (roomName =<< roomStateRoom rstate)
+
+cmdChatroomCreatePublic :: Command
+cmdChatroomCreatePublic = do
+ name <- asks ciLine >>= \case
+ line | not (null line) -> return $ T.pack line
+ _ -> liftIO $ do
+ T.putStr $ T.pack "Name: "
+ hFlush stdout
+ T.getLine
+
+ ensureWatchedChatrooms
+ void $ createChatroom
+ (if T.null name then Nothing else Just name)
+ Nothing
+
+
cmdContacts :: Command
cmdContacts = do
args <- words <$> asks ciLine
@@ -586,6 +775,9 @@ cmdDetails = do
SelectedContact contact -> do
printContactDetails contact
+ SelectedChatroom rstate -> do
+ liftIO $ putStrLn $ "Chatroom: " <> (T.unpack $ fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate)
+
SelectedConversation conv -> do
case conversationPeer conv of
Just pid -> printContactOrIdentityDetails pid
@@ -703,3 +895,10 @@ cmdIceSend = void $ do
cmdQuit :: Command
cmdQuit = modify $ \s -> s { csQuit = True }
+
+
+intersectsSorted :: Ord a => [a] -> [a] -> Bool
+intersectsSorted (x:xs) (y:ys) | x < y = intersectsSorted xs (y:ys)
+ | x > y = intersectsSorted (x:xs) ys
+ | otherwise = True
+intersectsSorted _ _ = False
diff --git a/main/Test.hs b/main/Test.hs
index d5737c2..c6448b8 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -97,7 +97,7 @@ runTestTool st = do
Nothing -> return ()
runExceptT (evalStateT testLoop initTestState) >>= \case
- Left x -> hPutStrLn stderr x
+ Left x -> B.hPutStr stderr $ (`BC.snoc` '\n') $ BC.pack x
Right () -> return ()
getLineMb :: MonadIO m => m (Maybe Text)
@@ -121,7 +121,7 @@ outLine :: Output -> String -> IO ()
outLine mvar line = do
evaluate $ foldl' (flip seq) () line
withMVar mvar $ \() -> do
- putStrLn line
+ B.putStr $ (`BC.snoc` '\n') $ BC.pack line
hFlush stdout
cmdOut :: String -> Command
@@ -283,6 +283,9 @@ commands = map (T.pack *** id)
, ("chatroom-set-name", cmdChatroomSetName)
, ("chatroom-subscribe", cmdChatroomSubscribe)
, ("chatroom-unsubscribe", cmdChatroomUnsubscribe)
+ , ("chatroom-members", cmdChatroomMembers)
+ , ("chatroom-join", cmdChatroomJoin)
+ , ("chatroom-leave", cmdChatroomLeave)
, ("chatroom-message-send", cmdChatroomMessageSend)
]
@@ -428,7 +431,7 @@ cmdStartServer = do
h <- getOrLoadHead
rsPeers <- liftIO $ newMVar (1, [])
- rsServer <- liftIO $ startServer defaultServerOptions h (hPutStrLn stderr)
+ rsServer <- liftIO $ startServer defaultServerOptions h (B.hPutStr stderr . (`BC.snoc` '\n') . BC.pack)
[ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach"
, someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact"
, someServiceAttr $ directMessageAttributes out
@@ -501,11 +504,11 @@ cmdPeerList = do
cmdTestMessageSend :: Command
cmdTestMessageSend = do
- [spidx, tref] <- asks tiParams
+ spidx : trefs <- asks tiParams
st <- asks tiStorage
- Just ref <- liftIO $ readRef st (encodeUtf8 tref)
+ Just refs <- liftIO $ fmap sequence $ mapM (readRef st . encodeUtf8) trefs
peer <- getPeer spidx
- sendToPeer peer $ TestMessage $ wrappedLoad ref
+ sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs
cmdOut "test-message-send done"
cmdSharedStateGet :: Command
@@ -726,11 +729,13 @@ cmdChatroomWatchLocal = do
, [ "new" ], map (show . refDigest . storedRef) (roomStateData room)
]
when (any (not . null . rsdMessages . fromStored) (roomStateData room)) $ do
- forM_ (getMessagesSinceState room oldroom) $ \msg -> do
+ forM_ (reverse $ getMessagesSinceState room oldroom) $ \msg -> do
outLine out $ unwords $ concat
[ [ "chatroom-message-new" ]
, [ show . refDigest . storedRef . head . filterAncestors . concatMap storedRoots . toComponents $ room ]
+ , [ "room", maybe "<unnamed>" T.unpack $ roomName =<< cmsgRoom msg ]
, [ "from", maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg ]
+ , if cmsgLeave msg then [ "leave" ] else []
, maybe [] (("text":) . (:[]) . T.unpack) $ cmsgText msg
]
@@ -753,8 +758,28 @@ cmdChatroomUnsubscribe = do
to <- getChatroomStateData cid
void $ chatroomSetSubscribe to False
+cmdChatroomMembers :: Command
+cmdChatroomMembers = do
+ [ cid ] <- asks tiParams
+ Just chatroom <- findChatroomByStateData =<< getChatroomStateData cid
+ forM_ (chatroomMembers chatroom) $ \user -> do
+ cmdOut $ unwords [ "chatroom-members-item", maybe "<unnamed>" T.unpack $ idName user ]
+ cmdOut "chatroom-members-done"
+
+cmdChatroomJoin :: Command
+cmdChatroomJoin = do
+ [ cid ] <- asks tiParams
+ joinChatroomByStateData =<< getChatroomStateData cid
+ cmdOut "chatroom-join-done"
+
+cmdChatroomLeave :: Command
+cmdChatroomLeave = do
+ [ cid ] <- asks tiParams
+ leaveChatroomByStateData =<< getChatroomStateData cid
+ cmdOut "chatroom-leave-done"
+
cmdChatroomMessageSend :: Command
cmdChatroomMessageSend = do
[cid, msg] <- asks tiParams
to <- getChatroomStateData cid
- void $ chatroomMessageByStateData to msg
+ void $ sendChatroomMessageByStateData to msg
diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs
index 673c59f..c8b5805 100644
--- a/src/Erebos/Chatroom.hs
+++ b/src/Erebos/Chatroom.hs
@@ -11,14 +11,20 @@ module Erebos.Chatroom (
findChatroomByRoomData,
findChatroomByStateData,
chatroomSetSubscribe,
+ chatroomMembers,
+ joinChatroom, joinChatroomByStateData,
+ leaveChatroom, leaveChatroomByStateData,
getMessagesSinceState,
ChatroomSetChange(..),
watchChatrooms,
- ChatMessage, cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave,
+ ChatMessage,
+ cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave,
+ cmsgRoom, cmsgRoomData,
ChatMessageData(..),
- chatroomMessageByStateData,
+ sendChatroomMessage,
+ sendChatroomMessageByStateData,
ChatroomService(..),
) where
@@ -29,6 +35,9 @@ import Control.Monad.Except
import Control.Monad.IO.Class
import Data.Bool
+import Data.Either
+import Data.Foldable
+import Data.Function
import Data.IORef
import Data.List
import Data.Maybe
@@ -111,6 +120,11 @@ data ChatMessage = ChatMessage
{ cmsgData :: Stored (Signed ChatMessageData)
}
+validateSingleMessage :: Stored (Signed ChatMessageData) -> Maybe ChatMessage
+validateSingleMessage sdata = do
+ guard $ fromStored sdata `isSignedBy` idKeyMessage (mdFrom (fromSigned sdata))
+ return $ ChatMessage sdata
+
cmsgFrom :: ChatMessage -> ComposedIdentity
cmsgFrom = mdFrom . fromSigned . cmsgData
@@ -126,6 +140,12 @@ cmsgText = mdText . fromSigned . cmsgData
cmsgLeave :: ChatMessage -> Bool
cmsgLeave = mdLeave . fromSigned . cmsgData
+cmsgRoom :: ChatMessage -> Maybe Chatroom
+cmsgRoom = either (const Nothing) Just . runExcept . validateChatroom . cmsgRoomData
+
+cmsgRoomData :: ChatMessage -> [ Stored (Signed ChatroomData) ]
+cmsgRoomData = concat . findProperty ((\case [] -> Nothing; xs -> Just xs) . mdRoom . fromStored . signedData) . (: []) . cmsgData
+
instance Storable ChatMessageData where
store' ChatMessageData {..} = storeRec $ do
mapM_ (storeRef "SPREV") mdPrev
@@ -146,37 +166,42 @@ instance Storable ChatMessageData where
mdLeave <- isJust <$> loadMbEmpty "leave"
return ChatMessageData {..}
-threadToList :: [Stored (Signed ChatMessageData)] -> [ChatMessage]
-threadToList thread = helper S.empty $ thread
+threadToListSince :: [ Stored (Signed ChatMessageData) ] -> [ Stored (Signed ChatMessageData) ] -> [ ChatMessage ]
+threadToListSince since thread = helper (S.fromList since) thread
where
helper :: S.Set (Stored (Signed ChatMessageData)) -> [Stored (Signed ChatMessageData)] -> [ChatMessage]
helper seen msgs
| msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs =
- messageFromData msg : helper (S.insert msg seen) (msgs' ++ mdPrev (fromSigned msg))
+ maybe id (:) (validateSingleMessage msg) $
+ helper (S.insert msg seen) (msgs' ++ mdPrev (fromSigned msg))
| otherwise = []
cmpView msg = (zonedTimeToUTC $ mdTime $ fromSigned msg, msg)
- messageFromData :: Stored (Signed ChatMessageData) -> ChatMessage
- messageFromData sdata = ChatMessage { cmsgData = sdata }
+sendChatroomMessage
+ :: (MonadStorage m, MonadHead LocalState m, MonadError String m)
+ => ChatroomState -> Text -> m ()
+sendChatroomMessage rstate msg = sendChatroomMessageByStateData (head $ roomStateData rstate) msg
-chatroomMessageByStateData
+sendChatroomMessageByStateData
:: (MonadStorage m, MonadHead LocalState m, MonadError String m)
=> Stored ChatroomStateData -> Text -> m ()
-chatroomMessageByStateData lookupData msg = void $ findAndUpdateChatroomState $ \cstate -> do
+sendChatroomMessageByStateData lookupData msg = sendRawChatroomMessageByStateData lookupData Nothing (Just msg) False
+
+sendRawChatroomMessageByStateData
+ :: (MonadStorage m, MonadHead LocalState m, MonadError String m)
+ => Stored ChatroomStateData -> Maybe (Stored (Signed ChatMessageData)) -> Maybe Text -> Bool -> m ()
+sendRawChatroomMessageByStateData lookupData mdReplyTo mdText mdLeave = void $ findAndUpdateChatroomState $ \cstate -> do
guard $ any (lookupData `precedesOrEquals`) $ roomStateData cstate
Just $ do
- self <- finalOwner . localIdentity . fromStored <$> getLocalHead
- secret <- loadKey $ idKeyMessage self
- time <- liftIO getZonedTime
- mdata <- mstore =<< sign secret =<< mstore ChatMessageData
- { mdPrev = roomStateMessageData cstate
- , mdRoom = []
- , mdFrom = self
- , mdReplyTo = Nothing
- , mdTime = time
- , mdText = Just msg
- , mdLeave = False
- }
+ mdFrom <- finalOwner . localIdentity . fromStored <$> getLocalHead
+ secret <- loadKey $ idKeyMessage mdFrom
+ mdTime <- liftIO getZonedTime
+ let mdPrev = roomStateMessageData cstate
+ mdRoom = if null (roomStateMessageData cstate)
+ then maybe [] roomData (roomStateRoom cstate)
+ else []
+
+ mdata <- mstore =<< sign secret =<< mstore ChatMessageData {..}
mergeSorted . (:[]) <$> mstore ChatroomStateData
{ rsdPrev = roomStateData cstate
, rsdRoom = []
@@ -224,7 +249,7 @@ instance Mergeable ChatroomState where
ChatroomStateData {..} | null rsdMessages -> Nothing
| otherwise -> Just rsdMessages
roomStateSubscribe = fromMaybe False $ findPropertyFirst rsdSubscribe roomStateData
- roomStateMessages = threadToList $ concatMap (rsdMessages . fromStored) roomStateData
+ roomStateMessages = threadToListSince [] $ concatMap (rsdMessages . fromStored) roomStateData
in ChatroomState {..}
toComponents = roomStateData
@@ -321,11 +346,38 @@ chatroomSetSubscribe lookupData subscribe = void $ findAndUpdateChatroomState $
, rsdMessages = []
}
+chatroomMembers :: ChatroomState -> [ ComposedIdentity ]
+chatroomMembers ChatroomState {..} =
+ map (mdFrom . fromSigned . head) $
+ filter (any $ not . mdLeave . fromSigned) $ -- keep only users that hasn't left
+ map (filterAncestors . map snd) $ -- gather message data per each identity and filter ancestors
+ groupBy ((==) `on` fst) $ -- group on identity root
+ sortBy (comparing fst) $ -- sort by first root of identity data
+ map (\x -> ( head . filterAncestors . concatMap storedRoots . idDataF . mdFrom . fromSigned $ x, x )) $
+ toList $ ancestors $ roomStateMessageData
+
+joinChatroom
+ :: (MonadStorage m, MonadHead LocalState m, MonadError String m)
+ => ChatroomState -> m ()
+joinChatroom rstate = joinChatroomByStateData (head $ roomStateData rstate)
+
+joinChatroomByStateData
+ :: (MonadStorage m, MonadHead LocalState m, MonadError String m)
+ => Stored ChatroomStateData -> m ()
+joinChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupData Nothing Nothing False
+
+leaveChatroom
+ :: (MonadStorage m, MonadHead LocalState m, MonadError String m)
+ => ChatroomState -> m ()
+leaveChatroom rstate = leaveChatroomByStateData (head $ roomStateData rstate)
+
+leaveChatroomByStateData
+ :: (MonadStorage m, MonadHead LocalState m, MonadError String m)
+ => Stored ChatroomStateData -> m ()
+leaveChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupData Nothing Nothing True
+
getMessagesSinceState :: ChatroomState -> ChatroomState -> [ChatMessage]
-getMessagesSinceState cur old = takeWhile notOld (roomStateMessages cur)
- where
- notOld msg = cmsgData msg `notElem` roomStateMessageData old
- -- TODO: parallel message threads
+getMessagesSinceState cur old = threadToListSince (roomStateMessageData old) (roomStateMessageData cur)
data ChatroomSetChange = AddedChatroom ChatroomState
@@ -365,13 +417,18 @@ makeChatroomDiff [] ys = map (AddedChatroom . snd) ys
data ChatroomService = ChatroomService
{ chatRoomQuery :: Bool
, chatRoomInfo :: [Stored (Signed ChatroomData)]
+ , chatRoomSubscribe :: [Stored (Signed ChatroomData)]
+ , chatRoomUnsubscribe :: [Stored (Signed ChatroomData)]
, chatRoomMessage :: [Stored (Signed ChatMessageData)]
}
+ deriving (Eq)
emptyPacket :: ChatroomService
emptyPacket = ChatroomService
{ chatRoomQuery = False
, chatRoomInfo = []
+ , chatRoomSubscribe = []
+ , chatRoomUnsubscribe = []
, chatRoomMessage = []
}
@@ -379,17 +436,22 @@ instance Storable ChatroomService where
store' ChatroomService {..} = storeRec $ do
when chatRoomQuery $ storeEmpty "room-query"
forM_ chatRoomInfo $ storeRef "room-info"
+ forM_ chatRoomSubscribe $ storeRef "room-subscribe"
+ forM_ chatRoomUnsubscribe $ storeRef "room-unsubscribe"
forM_ chatRoomMessage $ storeRef "room-message"
load' = loadRec $ do
chatRoomQuery <- isJust <$> loadMbEmpty "room-query"
chatRoomInfo <- loadRefs "room-info"
+ chatRoomSubscribe <- loadRefs "room-subscribe"
+ chatRoomUnsubscribe <- loadRefs "room-unsubscribe"
chatRoomMessage <- loadRefs "room-message"
return ChatroomService {..}
data PeerState = PeerState
{ psSendRoomUpdates :: Bool
, psLastList :: [(Stored ChatroomStateData, ChatroomState)]
+ , psSubscribedTo :: [ Stored (Signed ChatroomData) ] -- least root for each room
}
instance Service ChatroomService where
@@ -399,12 +461,18 @@ instance Service ChatroomService where
emptyServiceState _ = PeerState
{ psSendRoomUpdates = False
, psLastList = []
+ , psSubscribedTo = []
}
serviceHandler spacket = do
let ChatroomService {..} = fromStored spacket
+
+ previouslyUpdated <- psSendRoomUpdates <$> svcGet
svcModify $ \s -> s { psSendRoomUpdates = True }
+ when (not previouslyUpdated) $ do
+ syncChatroomsToPeer . lookupSharedValue . lsShared . fromStored =<< getLocalHead
+
when chatRoomQuery $ do
rooms <- listChatrooms
replyPacket emptyPacket
@@ -420,7 +488,7 @@ instance Service ChatroomService where
maybe [] roomData . roomStateRoom
let prev = concatMap roomStateData $ filter isCurrentRoom rooms
- prevRoom = concatMap (rsdRoom . fromStored) prev
+ prevRoom = filterAncestors $ concat $ findProperty ((\case [] -> Nothing; xs -> Just xs) . rsdRoom) prev
room = filterAncestors $ (roomInfo : ) prevRoom
-- update local state only if we got roomInfo not present there
@@ -436,6 +504,51 @@ instance Service ChatroomService where
else return set
foldM upd roomSet chatRoomInfo
+ forM_ chatRoomSubscribe $ \subscribeData -> do
+ mbRoomState <- findChatroomByRoomData subscribeData
+ forM_ mbRoomState $ \roomState ->
+ forM (roomStateRoom roomState) $ \room -> do
+ let leastRoot = head . filterAncestors . concatMap storedRoots . roomData $ room
+ svcModify $ \ps -> ps { psSubscribedTo = leastRoot : psSubscribedTo ps }
+ replyPacket emptyPacket
+ { chatRoomMessage = roomStateMessageData roomState
+ }
+
+ forM_ chatRoomUnsubscribe $ \unsubscribeData -> do
+ mbRoomState <- findChatroomByRoomData unsubscribeData
+ forM_ (mbRoomState >>= roomStateRoom) $ \room -> do
+ let leastRoot = head . filterAncestors . concatMap storedRoots . roomData $ room
+ svcModify $ \ps -> ps { psSubscribedTo = filter (/= leastRoot) (psSubscribedTo ps) }
+
+ when (not (null chatRoomMessage)) $ do
+ updateLocalHead_ $ updateSharedState_ $ \roomSet -> do
+ let rooms = fromSetBy (comparing $ roomName <=< roomStateRoom) roomSet
+ upd set (msgData :: Stored (Signed ChatMessageData))
+ | Just msg <- validateSingleMessage msgData = do
+ let roomInfo = cmsgRoomData msg
+ currentRoots = filterAncestors $ concatMap storedRoots roomInfo
+ isCurrentRoom = any ((`intersectsSorted` currentRoots) . storedRoots) .
+ maybe [] roomData . roomStateRoom
+
+ let prevData = concatMap roomStateData $ filter isCurrentRoom rooms
+ prev = mergeSorted prevData
+ prevMessages = roomStateMessageData prev
+ messages = filterAncestors $ msgData : prevMessages
+
+ -- update local state only if subscribed and we got some new messages
+ if roomStateSubscribe prev && messages /= prevMessages
+ then do
+ sdata <- mstore ChatroomStateData
+ { rsdPrev = prevData
+ , rsdRoom = []
+ , rsdSubscribe = Nothing
+ , rsdMessages = messages
+ }
+ storeSetAddComponent sdata set
+ else return set
+ | otherwise = return set
+ foldM upd roomSet chatRoomMessage
+
serviceNewPeer = do
replyPacket emptyPacket { chatRoomQuery = True }
@@ -447,11 +560,50 @@ syncChatroomsToPeer set = do
ps@PeerState {..} <- svcGet
when psSendRoomUpdates $ do
let curList = chatroomSetToList set
- updates <- fmap (concat . catMaybes) $
- forM (makeChatroomDiff psLastList curList) $ return . \case
+ diff = makeChatroomDiff psLastList curList
+
+ roomUpdates <- fmap (concat . catMaybes) $
+ forM diff $ return . \case
AddedChatroom room -> roomData <$> roomStateRoom room
RemovedChatroom {} -> Nothing
- UpdatedChatroom _ room -> roomData <$> roomStateRoom room
- when (not $ null updates) $ do
- replyPacket $ emptyPacket { chatRoomInfo = updates }
+ UpdatedChatroom oldroom room
+ | roomStateData oldroom /= roomStateData room -> roomData <$> roomStateRoom room
+ | otherwise -> Nothing
+
+ (subscribe, unsubscribe) <- fmap (partitionEithers . concat . catMaybes) $
+ forM diff $ return . \case
+ AddedChatroom room
+ | roomStateSubscribe room
+ -> map Left . roomData <$> roomStateRoom room
+ RemovedChatroom oldroom
+ | roomStateSubscribe oldroom
+ -> map Right . roomData <$> roomStateRoom oldroom
+ UpdatedChatroom oldroom room
+ | roomStateSubscribe oldroom /= roomStateSubscribe room
+ -> map (if roomStateSubscribe room then Left else Right) . roomData <$> roomStateRoom room
+ _ -> Nothing
+
+ messages <- fmap concat $ do
+ let leastRootFor = head . filterAncestors . concatMap storedRoots . roomData
+ forM diff $ return . \case
+ AddedChatroom rstate
+ | Just room <- roomStateRoom rstate
+ , leastRootFor room `elem` psSubscribedTo
+ -> roomStateMessageData rstate
+ UpdatedChatroom oldstate rstate
+ | Just room <- roomStateRoom rstate
+ , leastRootFor room `elem` psSubscribedTo
+ , roomStateMessageData oldstate /= roomStateMessageData rstate
+ -> roomStateMessageData rstate
+ _ -> []
+
+ let packet = emptyPacket
+ { chatRoomInfo = roomUpdates
+ , chatRoomSubscribe = subscribe
+ , chatRoomUnsubscribe = unsubscribe
+ , chatRoomMessage = messages
+ }
+
+ when (packet /= emptyPacket) $ do
+ replyPacket packet
svcSet $ ps { psLastList = curList }
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs
index 94d2399..63475bd 100644
--- a/src/Erebos/Conversation.hs
+++ b/src/Erebos/Conversation.hs
@@ -1,12 +1,15 @@
module Erebos.Conversation (
Message,
messageFrom,
+ messageTime,
messageText,
messageUnread,
formatMessage,
Conversation,
directMessageConversation,
+ chatroomConversation,
+ chatroomConversationByStateData,
reloadConversation,
lookupConversations,
@@ -23,30 +26,45 @@ import Data.List
import Data.Maybe
import Data.Text (Text)
import Data.Text qualified as T
+import Data.Time.Format
import Data.Time.LocalTime
import Erebos.Identity
+import Erebos.Chatroom
import Erebos.Message hiding (formatMessage)
import Erebos.State
import Erebos.Storage
data Message = DirectMessageMessage DirectMessage Bool
+ | ChatroomMessage ChatMessage Bool
messageFrom :: Message -> ComposedIdentity
messageFrom (DirectMessageMessage msg _) = msgFrom msg
+messageFrom (ChatroomMessage msg _) = cmsgFrom msg
+
+messageTime :: Message -> ZonedTime
+messageTime (DirectMessageMessage msg _) = msgTime msg
+messageTime (ChatroomMessage msg _) = cmsgTime msg
messageText :: Message -> Maybe Text
messageText (DirectMessageMessage msg _) = Just $ msgText msg
+messageText (ChatroomMessage msg _) = cmsgText msg
messageUnread :: Message -> Bool
messageUnread (DirectMessageMessage _ unread) = unread
+messageUnread (ChatroomMessage _ unread) = unread
formatMessage :: TimeZone -> Message -> String
-formatMessage tzone (DirectMessageMessage msg _) = formatDirectMessage tzone msg
+formatMessage tzone msg = concat
+ [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ messageTime msg
+ , maybe "<unnamed>" T.unpack $ idName $ messageFrom msg
+ , maybe "" ((": "<>) . T.unpack) $ messageText msg
+ ]
data Conversation = DirectMessageConversation DirectMessageThread
+ | ChatroomConversation ChatroomState
directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Conversation
directMessageConversation peer = do
@@ -54,8 +72,16 @@ directMessageConversation peer = do
Just thread -> return $ DirectMessageConversation thread
Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] []
+chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation)
+chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate)
+
+chatroomConversationByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> m (Maybe Conversation)
+chatroomConversationByStateData sdata = fmap ChatroomConversation <$> findChatroomByStateData sdata
+
reloadConversation :: MonadHead LocalState m => Conversation -> m Conversation
reloadConversation (DirectMessageConversation thread) = directMessageConversation (msgPeer thread)
+reloadConversation cur@(ChatroomConversation rstate) =
+ fromMaybe cur <$> chatroomConversation rstate
lookupConversations :: MonadHead LocalState m => m [Conversation]
lookupConversations = map DirectMessageConversation . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead
@@ -63,13 +89,17 @@ lookupConversations = map DirectMessageConversation . toThreadList . lookupShare
conversationName :: Conversation -> Text
conversationName (DirectMessageConversation thread) = fromMaybe (T.pack "<unnamed>") $ idName $ msgPeer thread
+conversationName (ChatroomConversation rstate) = fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate
conversationPeer :: Conversation -> Maybe ComposedIdentity
conversationPeer (DirectMessageConversation thread) = Just $ msgPeer thread
+conversationPeer (ChatroomConversation _) = Nothing
conversationHistory :: Conversation -> [Message]
conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ threadToList thread
+conversationHistory (ChatroomConversation rstate) = map (\msg -> ChatroomMessage msg False) $ roomStateMessages rstate
-sendMessage :: (MonadHead LocalState m, MonadError String m) => Conversation -> Text -> m Message
-sendMessage (DirectMessageConversation thread) text = DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False
+sendMessage :: (MonadHead LocalState m, MonadError String m) => Conversation -> Text -> m (Maybe Message)
+sendMessage (DirectMessageConversation thread) text = fmap Just $ DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False
+sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text >> return Nothing
diff --git a/src/Erebos/Identity.hs b/src/Erebos/Identity.hs
index 8761fde..f2094f6 100644
--- a/src/Erebos/Identity.hs
+++ b/src/Erebos/Identity.hs
@@ -35,7 +35,6 @@ import Data.Foldable
import Data.Function
import Data.List
import Data.Maybe
-import Data.Ord
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text (Text)
@@ -304,25 +303,18 @@ verifySignatures sidd = do
throwError "signature verification failed"
lookupProperty :: forall a m. Foldable m => (ExtendedIdentityData -> Maybe a) -> m (Stored (Signed ExtendedIdentityData)) -> Maybe a
-lookupProperty sel topHeads = findResult filteredLayers
- where findPropHeads :: Stored (Signed ExtendedIdentityData) -> [(Stored (Signed ExtendedIdentityData), a)]
- findPropHeads sobj | Just x <- sel $ fromSigned sobj = [(sobj, x)]
- | otherwise = findPropHeads =<< (eiddPrev $ fromSigned sobj)
+lookupProperty sel topHeads = findResult propHeads
+ where
+ findPropHeads :: Stored (Signed ExtendedIdentityData) -> [ Stored (Signed ExtendedIdentityData) ]
+ findPropHeads sobj | Just _ <- sel $ fromSigned sobj = [ sobj ]
+ | otherwise = findPropHeads =<< (eiddPrev $ fromSigned sobj)
- propHeads :: [(Stored (Signed ExtendedIdentityData), a)]
- propHeads = findPropHeads =<< toList topHeads
+ propHeads :: [ Stored (Signed ExtendedIdentityData) ]
+ propHeads = filterAncestors $ findPropHeads =<< toList topHeads
- historyLayers :: [Set (Stored (Signed ExtendedIdentityData))]
- historyLayers = generations $ map fst propHeads
-
- filteredLayers :: [[(Stored (Signed ExtendedIdentityData), a)]]
- filteredLayers = scanl (\cur obsolete -> filter ((`S.notMember` obsolete) . fst) cur) propHeads historyLayers
-
- findResult ([(_, x)] : _) = Just x
- findResult ([] : _) = Nothing
- findResult [] = Nothing
- findResult [xs] = Just $ snd $ minimumBy (comparing fst) xs
- findResult (_:rest) = findResult rest
+ findResult :: [ Stored (Signed ExtendedIdentityData) ] -> Maybe a
+ findResult [] = Nothing
+ findResult xs = sel $ fromSigned $ minimum xs
mergeIdentity :: (MonadStorage m, MonadError String m, MonadIO m) => Identity f -> m UnifiedIdentity
mergeIdentity idt | Just idt' <- toUnifiedIdentity idt = return idt'
@@ -385,8 +377,9 @@ updateOwners updates orig@Identity { idOwner_ = Just owner, idUpdates_ = cupdate
updateOwners _ orig@Identity { idOwner_ = Nothing } = orig
sameIdentity :: (Foldable m, Foldable m') => Identity m -> Identity m' -> Bool
-sameIdentity x y = not $ S.null $ S.intersection (refset x) (refset y)
- where refset idt = foldr S.insert (ancestors $ toList $ idDataF idt) (idDataF idt)
+sameIdentity x y = intersectsSorted (roots x) (roots y)
+ where
+ roots idt = uniq $ sort $ concatMap storedRoots $ toList $ idDataF idt
unfoldOwners :: (Foldable m) => Identity m -> [ComposedIdentity]
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 41b6279..2064d1c 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -19,7 +19,9 @@ module Erebos.Network (
#endif
dropPeer,
isPeerDropped,
- sendToPeer, sendToPeerStored, sendToPeerWith,
+ sendToPeer, sendManyToPeer,
+ sendToPeerStored, sendManyToPeerStored,
+ sendToPeerWith,
runPeerService,
discoveryPort,
@@ -52,6 +54,9 @@ import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
import qualified Network.Socket.ByteString as S
+import Foreign.C.Types
+import Foreign.Marshal.Alloc
+
import Erebos.Channel
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
@@ -69,6 +74,9 @@ import Erebos.Storage.Merge
discoveryPort :: PortNumber
discoveryPort = 29665
+discoveryMulticastGroup :: HostAddress6
+discoveryMulticastGroup = tupleToHostAddress6 (0xff12, 0xb6a4, 0x6b1f, 0x0969, 0xcaee, 0xacc2, 0x5c93, 0x73e1) -- ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1
+
announceIntervalSeconds :: Int
announceIntervalSeconds = 60
@@ -247,8 +255,6 @@ startServer opt serverOrigHead logd' serverServices = do
either (atomically . logd) return =<< runExceptT =<<
atomically (readTQueue serverIOActions)
- broadcastAddreses <- getBroadcastAddresses discoveryPort
-
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
putMVar serverSocket sock
@@ -259,9 +265,14 @@ startServer opt serverOrigHead logd' serverServices = do
return sock
loop sock = do
- when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
- atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses
- threadDelay $ announceIntervalSeconds * 1000 * 1000
+ when (serverLocalDiscovery opt) $ forkServerThread server $ do
+ announceAddreses <- fmap concat $ sequence $
+ [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock
+ , getBroadcastAddresses discoveryPort
+ ]
+ forever $ do
+ atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) announceAddreses
+ threadDelay $ announceIntervalSeconds * 1000 * 1000
let announceUpdate identity = do
st <- derivePartialStorage serverStorage
@@ -301,10 +312,11 @@ startServer opt serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- case paddr of
- DatagramAddress addr -> void $ S.sendTo sock msg addr
+ handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ case paddr of
+ DatagramAddress addr -> void $ S.sendTo sock msg addr
#ifdef ENABLE_ICE_SUPPORT
- PeerIceSession ice -> iceSend ice msg
+ PeerIceSession ice -> iceSend ice msg
#endif
forkServerThread server $ forever $ do
@@ -421,12 +433,18 @@ instance MonadFail PacketHandler where
runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler secure peer@Peer {..} act = do
let logd = writeTQueue $ serverErrorLog peerServer_
- runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case
+ runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case
Left err -> do
logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err
Right ph -> do
when (not $ null $ phHead ph) $ do
- let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
+ body <- case phBodyStream ph of
+ Nothing -> return $ phBody ph
+ Just stream -> do
+ writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do
+ writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph
+ return []
+ let packet = TransportPacket (TransportHeader $ phHead ph) body
secreq = case (secure, phPlaintextReply ph) of
(True, _) -> EncryptedOnly
(False, False) -> PlaintextAllowed
@@ -450,6 +468,7 @@ data PacketHandlerState = PacketHandlerState
, phHead :: [TransportHeaderItem]
, phAckedBy :: [TransportHeaderItem]
, phBody :: [Ref]
+ , phBodyStream :: Maybe RawStreamWriter
, phPlaintextReply :: Bool
}
@@ -462,6 +481,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy
addBody :: Ref -> PacketHandler ()
addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
+sendBodyAsStream :: PacketHandler ()
+sendBodyAsStream = do
+ gets phBodyStream >>= \case
+ Nothing -> do
+ stream <- openStream
+ modify $ \ph -> ph { phBodyStream = Just stream }
+ Just _ -> return ()
+
keepPlaintextReply :: PacketHandler ()
keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True }
@@ -517,8 +544,12 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
liftSTM $ finalizedChannel peer ch identity
_ -> return ()
- Rejected dgst -> do
- logd $ "rejected by peer: " ++ show dgst
+ Rejected dgst
+ | peerRequest : _ <- mapMaybe (\case TrChannelRequest d -> Just d; _ -> Nothing) headers
+ , peerRequest < dgst
+ -> return () -- Our request was rejected due to lower priority
+
+ | otherwise -> logd $ "rejected by peer: " ++ show dgst
DataRequest dgst
| secure || dgst `elem` plaintextRefs -> do
@@ -532,15 +563,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
-- otherwise lost the channel, so keep the reply plaintext as well.
when (not secure) keepPlaintextReply
- let bytes = lazyLoadBytes mref
-- TODO: MTU
- if (secure && BL.length bytes > 500)
- then do
- stream <- openStream
- liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
- writeByteStringToStream stream bytes
- else do
- addBody $ mref
+ when (secure && BL.length (lazyLoadBytes mref) > 500)
+ sendBodyAsStream
+
+ addBody $ mref
| otherwise -> do
logd $ "unauthorized data request for " ++ show dgst
addHeader $ Rejected dgst
@@ -593,9 +620,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
ChannelCookieWait {} -> return ()
ChannelCookieReceived {} -> process
ChannelCookieConfirmed {} -> process
- ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
- | otherwise -> reject
- ChannelPeerRequest {} -> process
+ ChannelOurRequest our
+ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> do
+ -- Reject peer channel request with lower priority
+ addHeader $ TrChannelRequest $ refDigest $ storedRef our
+ reject
+ ChannelPeerRequest prev
+ | dgst == wrDigest prev -> addHeader $ Acknowledged dgst
+ | otherwise -> process
ChannelOurAccept {} -> reject
ChannelEstablished {} -> process
ChannelClosed {} -> return ()
@@ -647,12 +680,14 @@ setupChannel identity peer upid = do
[ TrChannelRequest reqref
, AnnounceSelf $ refDigest $ storedRef $ idData identity
]
+ let sendChannelRequest = do
+ sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
+ TransportPacket (TransportHeader hitems) [storedRef req]
+ setPeerChannel peer $ ChannelOurRequest req
liftIO $ atomically $ do
getPeerChannel peer >>= \case
- ChannelCookieConfirmed -> do
- sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
- TransportPacket (TransportHeader hitems) [storedRef req]
- setPeerChannel peer $ ChannelOurRequest req
+ ChannelCookieReceived -> sendChannelRequest
+ ChannelCookieConfirmed -> sendChannelRequest
_ -> return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
@@ -806,10 +841,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case
_ -> return False
sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
-sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True]
+sendToPeer peer = sendManyToPeer peer . (: [])
+
+sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m ()
+sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True)
sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m ()
-sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True]
+sendToPeerStored peer = sendManyToPeerStored peer . (: [])
+
+sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
+sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList peer parts = do
@@ -912,9 +953,19 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO ()
+joinMulticast :: Socket -> IO [ Word32 ]
+joinMulticast sock =
+ withFdSocket sock $ \fd ->
+ alloca $ \pcount -> do
+ ptr <- cJoinMulticast fd pcount
+ count <- fromIntegral <$> peek pcount
+ forM [ 0 .. count - 1 ] $ \i ->
+ peekElemOff ptr i
+
getBroadcastAddresses :: PortNumber -> IO [SockAddr]
getBroadcastAddresses port = do
ptr <- cBroadcastAddresses
@@ -922,6 +973,9 @@ getBroadcastAddresses port = do
w <- peekElemOff ptr i
if w == 0 then return []
else (SockAddrInet port w:) <$> parse (i + 1)
- addrs <- parse 0
- cFree ptr
- return addrs
+ if ptr == nullPtr
+ then return []
+ else do
+ addrs <- parse 0
+ cFree ptr
+ return addrs
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index a009ad1..cfbaea3 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -40,7 +40,17 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans
+import Crypto.Cipher.ChaChaPoly1305 qualified as C
+import Crypto.MAC.Poly1305 qualified as C (Auth(..), authTag)
+import Crypto.Error
+import Crypto.Random
+
+import Data.Binary
+import Data.Binary.Get
+import Data.Binary.Put
import Data.Bits
+import Data.ByteArray (Bytes, ScrubbedBytes)
+import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC
@@ -51,7 +61,6 @@ import Data.Maybe
import Data.Text (Text)
import Data.Text qualified as T
import Data.Void
-import Data.Word
import System.Clock
@@ -68,6 +77,9 @@ protocolVersion = T.pack "0.1"
protocolVersions :: [Text]
protocolVersions = [protocolVersion]
+keepAliveInternal :: TimeSpec
+keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int)
+
data TransportPacket a = TransportPacket TransportHeader [a]
@@ -101,6 +113,35 @@ data SecurityRequirement = PlaintextOnly
| EncryptedOnly
deriving (Eq, Ord)
+data ParsedCookie = ParsedCookie
+ { cookieNonce :: C.Nonce
+ , cookieValidity :: Word32
+ , cookieContent :: ByteString
+ , cookieMac :: C.Auth
+ }
+
+instance Eq ParsedCookie where
+ (==) = (==) `on` (\c -> ( BA.convert (cookieNonce c) :: ByteString, cookieValidity c, cookieContent c, cookieMac c ))
+
+instance Show ParsedCookie where
+ show ParsedCookie {..} = show (nonce, cookieValidity, cookieContent, mac)
+ where C.Auth mac = cookieMac
+ nonce = BA.convert cookieNonce :: ByteString
+
+instance Binary ParsedCookie where
+ put ParsedCookie {..} = do
+ putByteString $ BA.convert cookieNonce
+ putWord32be cookieValidity
+ putByteString $ BA.convert cookieMac
+ putByteString cookieContent
+
+ get = do
+ Just cookieNonce <- maybeCryptoError . C.nonce12 <$> getByteString 12
+ cookieValidity <- getWord32be
+ Just cookieMac <- maybeCryptoError . C.authTag <$> getByteString 16
+ cookieContent <- BL.toStrict <$> getRemainingLazyByteString
+ return ParsedCookie {..}
+
isHeaderItemAcknowledged :: TransportHeaderItem -> Bool
isHeaderItemAcknowledged = \case
Acknowledged {} -> False
@@ -165,9 +206,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
, gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))
, gLog :: String -> STM ()
, gStorage :: PartialStorage
+ , gStartTime :: TimeSpec
, gNowVar :: TVar TimeSpec
, gNextTimeout :: TVar TimeSpec
, gInitConfig :: Ref
+ , gCookieKey :: ScrubbedBytes
+ , gCookieStartTime :: Word32
}
data Connection addr = Connection
@@ -186,6 +230,7 @@ data Connection addr = Connection
, cReservedPackets :: TVar Int
, cSentPackets :: TVar [SentPacket]
, cToAcknowledge :: TVar [Integer]
+ , cNextKeepAlive :: TVar (Maybe TimeSpec)
, cInStreams :: TVar [(Word8, Stream)]
, cOutStreams :: TVar [(Word8, Stream)]
}
@@ -440,15 +485,18 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
mStorage <- memoryStorage
gStorage <- derivePartialStorage mStorage
- startTime <- getTime MonotonicRaw
- gNowVar <- newTVarIO startTime
- gNextTimeout <- newTVarIO startTime
+ gStartTime <- getTime Monotonic
+ gNowVar <- newTVarIO gStartTime
+ gNextTimeout <- newTVarIO gStartTime
gInitConfig <- store mStorage $ (Rec [] :: Object)
+ gCookieKey <- getRandomBytes 32
+ gCookieStartTime <- runGet getWord32host . BL.pack . BA.unpack @ScrubbedBytes <$> getRandomBytes 4
+
let gs = GlobalState {..}
let signalTimeouts = forever $ do
- now <- getTime MonotonicRaw
+ now <- getTime Monotonic
next <- atomically $ do
writeTVar gNowVar now
readTVar gNextTimeout
@@ -487,6 +535,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
cReservedPackets <- newTVar 0
cSentPackets <- newTVar []
cToAcknowledge <- newTVar []
+ cNextKeepAlive <- newTVar Nothing
cInStreams <- newTVar []
cOutStreams <- newTVar []
let conn = Connection {..}
@@ -548,6 +597,7 @@ processIncoming gs@GlobalState {..} = do
Nothing -> throwError "empty packet"
+ now <- getTime Monotonic
runExceptT parse >>= \case
Right (Left (secure, objs, mbcounter))
| hobj:content <- objs
@@ -562,6 +612,7 @@ processIncoming gs@GlobalState {..} = do
case mbup of
Just up -> putTMVar gNextUp (conn, (secure, up))
Nothing -> return ()
+ updateKeepAlive conn now
processAcknowledgements gs conn items
ioAfter
Nothing -> return ()
@@ -571,8 +622,9 @@ processIncoming gs@GlobalState {..} = do
gLog $ show objs
Right (Right (snum, seq8, content, counter))
- | Just Connection {..} <- mbconn
+ | Just conn@Connection {..} <- mbconn
-> atomically $ do
+ updateKeepAlive conn now
(lookup snum <$> readTVar cInStreams) >>= \case
Nothing ->
gLog $ "unexpected stream number " ++ show snum
@@ -694,11 +746,38 @@ generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader,
_ -> return Nothing
createCookie :: GlobalState addr -> addr -> IO Cookie
-createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr)
+createCookie GlobalState {..} addr = do
+ (nonceBytes :: Bytes) <- getRandomBytes 12
+ validUntil <- (fromNanoSecs (60 * 10^(9 :: Int)) +) <$> getTime Monotonic
+ let validSecondsFromStart = fromIntegral $ toNanoSecs (validUntil - gStartTime) `div` (10^(9 :: Int))
+ cookieValidity = validSecondsFromStart - gCookieStartTime
+ plainContent = BC.pack (show addr)
+ throwCryptoErrorIO $ do
+ cookieNonce <- C.nonce12 nonceBytes
+ st1 <- C.initialize gCookieKey cookieNonce
+ let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1
+ (cookieContent, st3) = C.encrypt plainContent st2
+ cookieMac = C.finalize st3
+ return $ Cookie $ BL.toStrict $ encode $ ParsedCookie {..}
verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool
-verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie
-
+verifyCookie GlobalState {..} addr (Cookie cookie) = do
+ ctime <- getTime Monotonic
+ return $ fromMaybe False $ do
+ ( _, _, ParsedCookie {..} ) <- either (const Nothing) Just $ decodeOrFail $ BL.fromStrict cookie
+ maybeCryptoError $ do
+ st1 <- C.initialize gCookieKey cookieNonce
+ let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1
+ (plainContent, st3) = C.decrypt cookieContent st2
+ mac = C.finalize st3
+
+ validSecondsFromStart = fromIntegral $ cookieValidity + gCookieStartTime
+ validUntil = gStartTime + fromNanoSecs (validSecondsFromStart * (10^(9 :: Int)))
+ return $ and
+ [ mac == cookieMac
+ , ctime <= validUntil
+ , show addr == BC.unpack plainContent
+ ]
reservePacket :: Connection addr -> STM ReservedToSend
reservePacket conn@Connection {..} = do
@@ -713,9 +792,9 @@ reservePacket conn@Connection {..} = do
return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
-resendBytes Connection {..} reserved sp = do
+resendBytes conn@Connection {..} reserved sp = do
let GlobalState {..} = cGlobalState
- now <- getTime MonotonicRaw
+ now <- getTime Monotonic
atomically $ do
when (isJust reserved) $ do
modifyTVar' cReservedPackets (subtract 1)
@@ -726,6 +805,7 @@ resendBytes Connection {..} reserved sp = do
, spRetryCount = spRetryCount sp + 1
}
writeFlow gDataFlow (cAddress, spData sp)
+ updateKeepAlive conn now
sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO ()
sendBytes conn reserved bs = resendBytes conn reserved
@@ -738,6 +818,12 @@ sendBytes conn reserved bs = resendBytes conn reserved
, spData = bs
}
+updateKeepAlive :: Connection addr -> TimeSpec -> STM ()
+updateKeepAlive Connection {..} now = do
+ let next = now + keepAliveInternal
+ writeTVar cNextKeepAlive $ Just next
+
+
processOutgoing :: forall addr. GlobalState addr -> STM (IO ())
processOutgoing gs@GlobalState {..} = do
@@ -777,11 +863,12 @@ processOutgoing gs@GlobalState {..} = do
let onAck = sequence_ $ map (streamAccepted conn) $
catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems)
- let mkPlain extraHeaders =
- let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems
- in BL.concat $
- (serializeObject $ transportToObject gStorage header)
- : map lazyLoadBytes content
+ let mkPlain extraHeaders
+ | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems =
+ BL.concat $
+ (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems)
+ : map lazyLoadBytes content
+ | otherwise = BL.empty
let usePlaintext = do
plain <- mkPlain <$> generateCookieHeaders conn channel
@@ -811,6 +898,13 @@ processOutgoing gs@GlobalState {..} = do
sendBytes conn mbReserved' bs
Nothing -> return ()
+ let waitUntil :: TimeSpec -> TimeSpec -> STM ()
+ waitUntil now till = do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || till < nextTimeout
+ then writeTVar gNextTimeout till
+ else retry
+
let retransmitPacket :: Connection addr -> STM (IO ())
retransmitPacket conn@Connection {..} = do
now <- readTVar gNowVar
@@ -819,11 +913,8 @@ processOutgoing gs@GlobalState {..} = do
_ -> retry
let nextTry = spTime sp + fromNanoSecs 1000000000
if | now < nextTry -> do
- nextTimeout <- readTVar gNextTimeout
- if nextTimeout <= now || nextTry < nextTimeout
- then do writeTVar gNextTimeout nextTry
- return $ return ()
- else retry
+ waitUntil now nextTry
+ return $ return ()
| spRetryCount sp < 2 -> do
reserved <- reservePacket conn
writeTVar cSentPackets rest
@@ -863,11 +954,28 @@ processOutgoing gs@GlobalState {..} = do
writeTVar gIdentity (nid, cur : past)
return $ return ()
+ let sendKeepAlive :: Connection addr -> STM (IO ())
+ sendKeepAlive Connection {..} = do
+ readTVar cNextKeepAlive >>= \case
+ Nothing -> retry
+ Just next -> do
+ now <- readTVar gNowVar
+ if next <= now
+ then do
+ writeTVar cNextKeepAlive Nothing
+ identity <- fst <$> readTVar gIdentity
+ let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]
+ writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], [])
+ else do
+ waitUntil now next
+ return $ return ()
+
conns <- readTVar gConnections
msum $ concat $
[ map retransmitPacket conns
, map sendNextPacket conns
, [ handleControlRequests ]
+ , map sendKeepAlive conns
]
processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ())
diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c
index 37c3e00..70685bc 100644
--- a/src/Erebos/Network/ifaddrs.c
+++ b/src/Erebos/Network/ifaddrs.c
@@ -1,11 +1,89 @@
#include "ifaddrs.h"
+#include <errno.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifndef _WIN32
#include <arpa/inet.h>
-#include <ifaddrs.h>
#include <net/if.h>
-#include <stdlib.h>
-#include <sys/types.h>
+#include <ifaddrs.h>
#include <endian.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#else
+#include <winsock2.h>
+#include <ws2ipdef.h>
+#include <ws2tcpip.h>
+#endif
+
+#define DISCOVERY_MULTICAST_GROUP "ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1"
+
+uint32_t * join_multicast(int fd, size_t * count)
+{
+ size_t capacity = 16;
+ *count = 0;
+ uint32_t * interfaces = malloc(sizeof(uint32_t) * capacity);
+
+#ifdef _WIN32
+ interfaces[0] = 0;
+ *count = 1;
+#else
+ struct ifaddrs * addrs;
+ if (getifaddrs(&addrs) < 0)
+ return 0;
+
+ for (struct ifaddrs * ifa = addrs; ifa; ifa = ifa->ifa_next) {
+ if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET6 &&
+ !(ifa->ifa_flags & IFF_LOOPBACK)) {
+ int idx = if_nametoindex(ifa->ifa_name);
+
+ bool seen = false;
+ for (size_t i = 0; i < *count; i++) {
+ if (interfaces[i] == idx) {
+ seen = true;
+ break;
+ }
+ }
+ if (seen)
+ continue;
+
+ if (*count + 1 >= capacity) {
+ capacity *= 2;
+ uint32_t * nret = realloc(interfaces, sizeof(uint32_t) * capacity);
+ if (nret) {
+ interfaces = nret;
+ } else {
+ free(interfaces);
+ *count = 0;
+ return NULL;
+ }
+ }
+
+ interfaces[*count] = idx;
+ (*count)++;
+ }
+ }
+
+ freeifaddrs(addrs);
+#endif
+
+ for (size_t i = 0; i < *count; i++) {
+ struct ipv6_mreq group;
+ group.ipv6mr_interface = interfaces[i];
+ inet_pton(AF_INET6, DISCOVERY_MULTICAST_GROUP, &group.ipv6mr_multiaddr);
+ int ret = setsockopt(fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
+ (const void *) &group, sizeof(group));
+ if (ret < 0)
+ fprintf(stderr, "IPV6_ADD_MEMBERSHIP failed: %s\n", strerror(errno));
+ }
+
+ return interfaces;
+}
+
+#ifndef _WIN32
uint32_t * broadcast_addresses(void)
{
@@ -39,3 +117,51 @@ uint32_t * broadcast_addresses(void)
ret[count] = 0;
return ret;
}
+
+#else // _WIN32
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
+#pragma comment(lib, "ws2_32.lib")
+
+uint32_t * broadcast_addresses(void)
+{
+ uint32_t * ret = NULL;
+ SOCKET wsock = INVALID_SOCKET;
+
+ struct WSAData wsaData;
+ if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
+ return NULL;
+
+ wsock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, 0);
+ if (wsock == INVALID_SOCKET)
+ goto cleanup;
+
+ INTERFACE_INFO InterfaceList[32];
+ unsigned long nBytesReturned;
+
+ if (WSAIoctl(wsock, SIO_GET_INTERFACE_LIST, 0, 0,
+ InterfaceList, sizeof(InterfaceList),
+ &nBytesReturned, 0, 0) == SOCKET_ERROR)
+ goto cleanup;
+
+ int numInterfaces = nBytesReturned / sizeof(INTERFACE_INFO);
+
+ size_t capacity = 16, count = 0;
+ ret = malloc(sizeof(uint32_t) * capacity);
+
+ for (int i = 0; i < numInterfaces && count < capacity - 1; i++)
+ if (InterfaceList[i].iiFlags & IFF_BROADCAST)
+ ret[count++] = InterfaceList[i].iiBroadcastAddress.AddressIn.sin_addr.s_addr;
+
+ ret[count] = 0;
+cleanup:
+ if (wsock != INVALID_SOCKET)
+ closesocket(wsock);
+ WSACleanup();
+
+ return ret;
+}
+
+#endif
diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h
index 06d26ec..8852ec6 100644
--- a/src/Erebos/Network/ifaddrs.h
+++ b/src/Erebos/Network/ifaddrs.h
@@ -1,3 +1,5 @@
+#include <stddef.h>
#include <stdint.h>
+uint32_t * join_multicast(int fd, size_t * count);
uint32_t * broadcast_addresses(void);
diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs
index d419a5e..8b794d8 100644
--- a/src/Erebos/Storage/Internal.hs
+++ b/src/Erebos/Storage/Internal.hs
@@ -241,7 +241,7 @@ writeFileOnce file content = bracket (openLockFile locked)
doesFileExist file >>= \case
True -> removeFile locked
False -> do BL.hPut h content
- hFlush h
+ hClose h
renameFile locked file
where locked = file ++ ".lock"
@@ -254,13 +254,13 @@ writeFileChecked file prev content = bracket (openLockFile locked)
removeFile locked
return $ Left $ Just current
(Nothing, False) -> do B.hPut h content
- hFlush h
+ hClose h
renameFile locked file
return $ Right ()
(Just expected, True) -> do
current <- B.readFile file
if current == expected then do B.hPut h content
- hFlush h
+ hClose h
renameFile locked file
return $ return ()
else do removeFile locked
diff --git a/src/Erebos/Storage/Key.hs b/src/Erebos/Storage/Key.hs
index b6afc20..5da79e3 100644
--- a/src/Erebos/Storage/Key.hs
+++ b/src/Erebos/Storage/Key.hs
@@ -80,6 +80,7 @@ moveKeys from to = liftIO $ do
return M.empty
(StorageMemory { memKeys = fromKeys }, StorageMemory { memKeys = toKeys }) -> do
- modifyMVar_ fromKeys $ \fkeys -> do
- modifyMVar_ toKeys $ return . M.union fkeys
- return M.empty
+ when (fromKeys /= toKeys) $ do
+ modifyMVar_ fromKeys $ \fkeys -> do
+ modifyMVar_ toKeys $ return . M.union fkeys
+ return M.empty
diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs
index 9d9db13..a3b0fd7 100644
--- a/src/Erebos/Storage/Merge.hs
+++ b/src/Erebos/Storage/Merge.hs
@@ -97,13 +97,16 @@ storedGeneration x =
doLookup x
+-- |Returns list of sets starting with the set of given objects and
+-- intcrementally adding parents.
generations :: Storable a => [Stored a] -> [Set (Stored a)]
generations = unfoldr gen . (,S.empty)
- where gen (hs, cur) = case filter (`S.notMember` cur) $ previous =<< hs of
+ where gen (hs, cur) = case filter (`S.notMember` cur) hs of
[] -> Nothing
added -> let next = foldr S.insert cur added
- in Just (next, (added, next))
+ in Just (next, (previous =<< added, next))
+-- |Returns set containing all given objects and their ancestors
ancestors :: Storable a => [Stored a] -> Set (Stored a)
ancestors = last . (S.empty:) . generations
diff --git a/src/windows/Erebos/Storage/Platform.hs b/src/windows/Erebos/Storage/Platform.hs
new file mode 100644
index 0000000..76c940b
--- /dev/null
+++ b/src/windows/Erebos/Storage/Platform.hs
@@ -0,0 +1,13 @@
+module Erebos.Storage.Platform (
+ createFileExclusive,
+) where
+
+import Data.Bits
+
+import System.IO
+import System.Win32.File
+import System.Win32.Types
+
+createFileExclusive :: FilePath -> IO Handle
+createFileExclusive path = do
+ hANDLEToHandle =<< createFile path gENERIC_WRITE (fILE_SHARE_READ .|. fILE_SHARE_DELETE) Nothing cREATE_NEW fILE_ATTRIBUTE_NORMAL Nothing
diff --git a/test/chatroom.test b/test/chatroom.test
index 9be5665..93de1ff 100644
--- a/test/chatroom.test
+++ b/test/chatroom.test
@@ -98,25 +98,331 @@ test ChatroomSetup:
test ChatroomMessages:
spawn as p1
+ spawn as p2
send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
- for p in [ p1 ]:
+ for p in [ p1, p2 ]:
with p:
send "chatroom-watch-local"
send "start-server"
- send "chatroom-create room" to p1
- expect /chatroom-create-done ([a-z0-9#]+) room.*/ from p1 capture room
+ send "chatroom-create first_room" to p1
+ expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1
- for p in [ p1 ]:
- with p:
- expect /chatroom-watched-added [a-z0-9#]+ room sub [a-z]+/
+ expect /chatroom-watched-added [a-z0-9#]+ first_room sub true/ from p1
+ expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2
- send "chatroom-message-send $room message1" to p1
- expect /chatroom-message-new $room from Owner1 text message1/ from p1
+ send "chatroom-message-send $room1_p1 message1" to p1
+ expect /chatroom-message-new $room1_p1 room first_room from Owner1 text message1/ from p1
- send "chatroom-message-send $room message2" to p1
+ send "chatroom-message-send $room1_p1 message2" to p1
local:
- expect /chatroom-message-new $room from Owner1 text (.*)/ from p1 capture msg
+ expect /chatroom-message-new $room1_p1 room first_room from Owner1 text (.*)/ from p1 capture msg
+ guard (msg == "message2")
+
+ # Subscribe to chatroom
+
+ send "chatroom-subscribe $room1_p2" to p2
+ expect /chatroom-watched-updated [a-z0-9#]+ first_room sub true .*/ from p2
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner1 text (.*)/ capture msg
+ guard (msg == "message1")
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner1 text (.*)/ capture msg
guard (msg == "message2")
+
+ send "chatroom-message-send $room1_p2 message3" to p2
+ for p in [ p1, p2 ]:
+ with p:
+ expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner2 text message3/
+
+ send "chatroom-message-send $room1_p1 message4" to p1
+ for p in [ p1, p2 ]:
+ with p:
+ expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner1 text message4/
+
+ # Multiple rooms
+
+ send "chatroom-create second_room" to p1
+ expect /chatroom-create-done ([a-z0-9#]+) second_room.*/ from p1 capture room2_p1
+
+ send "chatroom-create third_room" to p2
+ expect /chatroom-create-done ([a-z0-9#]+) third_room.*/ from p2 capture room3_p2
+
+ expect /chatroom-watched-added $room2_p1 second_room sub true/ from p1
+ expect /chatroom-watched-added $room3_p2 third_room sub true/ from p2
+ expect /chatroom-watched-added ([a-z0-9#]+) second_room sub false/ from p2 capture room2_p2
+ expect /chatroom-watched-added ([a-z0-9#]+) third_room sub false/ from p1 capture room3_p1
+
+ spawn as p3
+ send "create-identity Device3 Owner3" to p3
+ send "chatroom-watch-local" to p3
+ send "start-server" to p3
+ expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p3 capture room1_p3
+ expect /chatroom-watched-added ([a-z0-9#]+) second_room sub false/ from p3 capture room2_p3
+ expect /chatroom-watched-added ([a-z0-9#]+) third_room sub false/ from p3 capture room3_p3
+
+ with p3:
+ for room in [ room1_p3, room2_p3, room3_p3 ]:
+ send "chatroom-subscribe $room"
+ expect /chatroom-watched-updated $room [a-z_]+ sub true .*/
+ for i in [1..4]:
+ expect /chatroom-message-new $room1_p3 room first_room from Owner. text (.*)/ capture message
+ guard (message == "message$i")
+
+ with p2:
+ send "chatroom-message-send $room2_p2 msg_r2_1"
+ send "chatroom-message-send $room2_p2 msg_r2_2"
+ send "chatroom-message-send $room2_p2 msg_r2_3"
+ expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_1/
+ expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_2/
+ expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_3/
+
+ send "chatroom-message-send $room3_p2 msg_r3_1"
+ send "chatroom-message-send $room3_p2 msg_r3_2"
+ send "chatroom-message-send $room3_p2 msg_r3_3"
+ expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_1/
+ expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_2/
+ expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_3/
+
+ with p1:
+ local:
+ expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message
+ guard (room == "second_room")
+ guard (message == "msg_r2_1")
+ local:
+ expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message
+ guard (room == "second_room")
+ guard (message == "msg_r2_2")
+ local:
+ expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message
+ guard (room == "second_room")
+ guard (message == "msg_r2_3")
+
+ with p3:
+ expect /chatroom-message-new $room2_p3 room second_room from Owner2 text msg_r2_1/
+ expect /chatroom-message-new $room2_p3 room second_room from Owner2 text msg_r2_2/
+ expect /chatroom-message-new $room2_p3 room second_room from Owner2 text msg_r2_3/
+ expect /chatroom-message-new $room3_p3 room third_room from Owner2 text msg_r3_1/
+ expect /chatroom-message-new $room3_p3 room third_room from Owner2 text msg_r3_2/
+ expect /chatroom-message-new $room3_p3 room third_room from Owner2 text msg_r3_3/
+
+ # Unsubscribe
+
+ send "chatroom-unsubscribe $room1_p1" to p1
+ expect /chatroom-watched-updated $room1_p1 [a-z_]+ sub false .*/ from p1
+ send "chatroom-unsubscribe $room1_p3" to p3
+ expect /chatroom-watched-updated $room1_p3 [a-z_]+ sub false .*/ from p3
+ send "chatroom-unsubscribe $room2_p3" to p3
+ expect /chatroom-watched-updated $room2_p3 [a-z_]+ sub false .*/ from p3
+
+ with p2:
+ send "chatroom-message-send $room1_p2 msg_r1_4"
+ expect /chatroom-message-new $room1_p2 room first_room from Owner2 text msg_r1_4/
+
+ send "chatroom-message-send $room2_p2 msg_r2_4"
+ expect /chatroom-message-new $room2_p2 room second_room from Owner2 text msg_r2_4/
+
+ send "chatroom-message-send $room3_p2 msg_r3_4"
+ expect /chatroom-message-new $room3_p2 room third_room from Owner2 text msg_r3_4/
+
+ with p1:
+ local:
+ expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message
+ guard (room == "second_room")
+ guard (message == "msg_r2_4")
+
+ with p3:
+ local:
+ expect /chatroom-message-new [a-z0-9#]+ room ([a-z_]+) from Owner2 text ([a-z0-9_]+)/ capture room, message
+ guard (room == "third_room")
+ guard (message == "msg_r3_4")
+
+
+test ChatroomSubscribedBeforeStart:
+ spawn as p1
+ spawn as p2
+
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+
+ for p in [ p1, p2 ]:
+ with p:
+ send "chatroom-watch-local"
+ send "start-server"
+
+ send "chatroom-create first_room" to p1
+ expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1
+
+ expect /chatroom-watched-added [a-z0-9#]+ first_room sub true/ from p1
+ expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2
+
+ with p2:
+ send "chatroom-subscribe $room1_p2"
+ expect /chatroom-watched-updated [a-z0-9#]+ first_room sub true .*/
+
+ for p in [p1, p2]:
+ with p:
+ send "stop-server"
+ for p in [p1, p2]:
+ with p:
+ expect /stop-server-done/
+ for p in [p1, p2]:
+ with p:
+ send "start-server"
+
+ send "chatroom-message-send $room1_p1 message1" to p1
+ expect /chatroom-message-new $room1_p1 room first_room from Owner1 text message1/ from p1
+ expect /chatroom-message-new $room1_p2 room first_room from Owner1 text message1/ from p2
+
+ send "chatroom-message-send $room1_p2 message2" to p2
+ expect /chatroom-message-new $room1_p1 room first_room from Owner2 text message2/ from p1
+ expect /chatroom-message-new $room1_p2 room first_room from Owner2 text message2/ from p2
+
+
+test ParallelThreads:
+ spawn as p1
+ spawn as p2
+
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+
+ for p in [ p1, p2 ]:
+ with p:
+ send "chatroom-watch-local"
+ send "start-server"
+
+ send "chatroom-create first_room" to p1
+ expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1
+
+ expect /chatroom-watched-added [a-z0-9#]+ first_room sub true/ from p1
+ expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2
+
+ with p2:
+ send "chatroom-subscribe $room1_p2"
+ expect /chatroom-watched-updated [a-z0-9#]+ first_room sub true .*/
+
+ for p in [p1, p2]:
+ with p:
+ send "stop-server"
+ for p in [p1, p2]:
+ with p:
+ expect /stop-server-done/
+
+ send "chatroom-message-send $room1_p1 message1A" to p1
+ send "chatroom-message-send $room1_p1 message1B" to p1
+ send "chatroom-message-send $room1_p2 message2A" to p2
+ send "chatroom-message-send $room1_p2 message2B" to p2
+ with p1:
+ expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "1A")
+ with p1:
+ expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "1B")
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "2A")
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "2B")
+
+ for p in [p1, p2]:
+ with p:
+ send "start-server"
+
+ with p1:
+ expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "2A")
+ with p1:
+ expect /chatroom-message-new $room1_p1 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "2B")
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "1A")
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner. text message(..)/ capture msg
+ guard (msg == "1B")
+
+
+test ChatroomMembers:
+ spawn as p1
+ spawn as p2
+ spawn as p3
+
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+ send "create-identity Device3 Owner3" to p3
+
+ for p in [ p1, p2, p3 ]:
+ with p:
+ send "chatroom-watch-local"
+ send "start-server"
+
+ send "chatroom-create first_room" to p1
+ expect /chatroom-create-done ([a-z0-9#]+) first_room.*/ from p1 capture room1_p1
+
+ expect /chatroom-watched-added $room1_p1 first_room sub true/ from p1
+ expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p2 capture room1_p2
+ expect /chatroom-watched-added ([a-z0-9#]+) first_room sub false/ from p3 capture room1_p3
+
+ local:
+ send "chatroom-members $room1_p1" to p1
+ expect /chatroom-members-([a-z]+)/ from p1 capture done
+ guard (done == "done")
+ local:
+ send "chatroom-members $room1_p2" to p2
+ expect /chatroom-members-([a-z]+)/ from p2 capture done
+ guard (done == "done")
+
+ send "chatroom-message-send $room1_p1 message1" to p1
+ send "chatroom-message-send $room1_p1 message2" to p1
+ send "chatroom-join $room1_p2" to p2
+ send "chatroom-message-send $room1_p2 message3" to p2
+ send "chatroom-join $room1_p3" to p3
+
+ with p1:
+ expect /chatroom-message-new $room1_p1 room first_room from Owner1 text message2/
+ expect /chatroom-message-new $room1_p1 room first_room from Owner2 text message3/
+ expect /chatroom-message-new $room1_p1 room first_room from Owner3/
+ with p2:
+ expect /chatroom-message-new $room1_p2 room first_room from Owner1 text message2/
+ expect /chatroom-message-new $room1_p2 room first_room from Owner2 text message3/
+ expect /chatroom-message-new $room1_p2 room first_room from Owner3/
+ with p3:
+ expect /chatroom-message-new $room1_p3 room first_room from Owner1 text message2/
+ expect /chatroom-message-new $room1_p3 room first_room from Owner2 text message3/
+ expect /chatroom-message-new $room1_p3 room first_room from Owner3/
+
+ local:
+ send "chatroom-members $room1_p1" to p1
+ expect /chatroom-members-item Owner1/ from p1
+ expect /chatroom-members-item Owner2/ from p1
+ expect /chatroom-members-item Owner3/ from p1
+ expect /chatroom-members-([a-z]+)/ from p1 capture done
+ guard (done == "done")
+ local:
+ send "chatroom-members $room1_p2" to p2
+ expect /chatroom-members-item Owner1/ from p2
+ expect /chatroom-members-item Owner2/ from p2
+ expect /chatroom-members-item Owner3/ from p2
+ expect /chatroom-members-([a-z]+)/ from p2 capture done
+ guard (done == "done")
+
+ send "chatroom-leave $room1_p1" to p1
+ send "chatroom-leave $room1_p3" to p3
+
+ for p in [ p1, p2, p3 ]:
+ with p:
+ expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner1 leave/
+ expect /chatroom-message-new [a-z0-9#]+ room first_room from Owner3 leave/
+
+ send "chatroom-members $room1_p1" to p1
+ send "chatroom-members $room1_p2" to p2
+ send "chatroom-members $room1_p3" to p3
+ for p in [ p1, p2, p3 ]:
+ with p:
+ expect /chatroom-members-item Owner2/
+ expect /chatroom-members-([a-z]+)/ capture done
+ guard (done == "done")
diff --git a/test/network.test b/test/network.test
index 9540bf6..efd508f 100644
--- a/test/network.test
+++ b/test/network.test
@@ -178,6 +178,62 @@ test ManyStreams:
expect /test-message-received blob 100[2-4] $ref/ from p2
+test MultipleServiceRefs:
+ spawn as p1
+ spawn as p2
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server" to p1
+ send "start-server" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ let kbytes = 2
+
+ with p1:
+ send "store blob"
+ send "A"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_a
+
+ # Create blobs with (kbytes * 1000) bytes each
+
+ send "store blob"
+ send "B"
+ for j in [1 .. kbytes * 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_b
+
+ send "store blob"
+ send "C"
+ for j in [1 .. kbytes * 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_c
+
+ send "store blob"
+ send "D"
+ for j in [1 .. kbytes * 10]:
+ # 100 bytes each line
+ send "123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789 123456789"
+ send ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture ref_d
+
+ send "test-message-send 1 $ref_a $ref_b $ref_c $ref_d"
+ expect /test-message-send done/
+ expect /test-message-received blob [0-9]+ $ref_a/ from p2
+ expect /test-message-received blob [0-9]+ $ref_b/ from p2
+ expect /test-message-received blob [0-9]+ $ref_c/ from p2
+ expect /test-message-received blob [0-9]+ $ref_d/ from p2
+
+
test Reconnection:
spawn as p1
with p1: