summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md23
-rw-r--r--erebos.cabal32
-rw-r--r--main/Main.hs63
-rw-r--r--main/State.hs17
-rw-r--r--main/Terminal.hs5
-rw-r--r--main/Test.hs120
-rw-r--r--main/Test/Service.hs21
-rw-r--r--main/WebSocket.hs45
-rw-r--r--src/Erebos/Conversation.hs2
-rw-r--r--src/Erebos/DirectMessage.hs23
-rw-r--r--src/Erebos/Discovery.hs329
-rw-r--r--src/Erebos/Flow.hs37
-rw-r--r--src/Erebos/ICE.chs7
-rw-r--r--src/Erebos/ICE/pjproject.c97
-rw-r--r--src/Erebos/ICE/pjproject.h7
-rw-r--r--src/Erebos/Network.hs192
-rw-r--r--src/Erebos/Network/Protocol.hs45
-rw-r--r--src/Erebos/Object.hs5
-rw-r--r--src/Erebos/Object/Internal.hs21
-rw-r--r--src/Erebos/Pairing.hs27
-rw-r--r--src/Erebos/Service.hs21
-rw-r--r--src/Erebos/Service/Stream.hs74
-rw-r--r--src/Erebos/State.hs4
-rw-r--r--src/Erebos/Storage/Disk.hs2
-rw-r--r--src/Erebos/Storage/Head.hs9
-rw-r--r--src/Erebos/Storage/Internal.hs76
-rw-r--r--src/Erebos/Storage/Merge.hs2
-rw-r--r--src/Erebos/UUID.hs24
-rw-r--r--src/Erebos/Util.hs30
-rw-r--r--test/common.test3
-rw-r--r--test/discovery.test60
-rw-r--r--test/message.test114
-rw-r--r--test/network.test52
33 files changed, 1172 insertions, 417 deletions
diff --git a/README.md b/README.md
index 07cee2a..8957dd7 100644
--- a/README.md
+++ b/README.md
@@ -102,11 +102,13 @@ Test chatroom [19:03] Some Name: Hi
`<message>`
: Send `<message>` to selected conversation.
-`/history`
-: Show message history of the selected conversation.
+`/history [<number>]`
+: Show message history of the selected conversation, or the one identified by
+ `<number>` if given.
-`/details`
-: Show information about the selected conversations, contact or peer.
+`/details [<number>]`
+: Show information about the selected conversations, contact or peer; or the
+ one identified by `<number>` if given.
### Chatrooms
@@ -137,12 +139,13 @@ are signed, so message author can not be forged.
: Leave the chatroom. User will no longer be listed as a member and erebos tool
will no longer collect message of this chatroom.
-`/delete`
-: Delete the chatroom; this action is only synchronized with devices belonging
-to the current user and does not affect the chatroom state for others. Due to
-the storage design, the chatroom data will not be purged from the local state
-history, but the chatroom will no longer be listed as available and no futher
-updates for this chatroom will be collected or shared with other peers.
+`/delete [<number>]`
+: Delete the chatroom (currently selected one, or the one identified by
+ `<number>`); this action is only synchronized with devices belonging to the
+ current user and does not affect the chatroom state for others. Due to the
+ storage design, the chatroom data will not be purged from the local state
+ history, but the chatroom will no longer be listed as available and no futher
+ updates for this chatroom will be collected or shared with other peers.
### Add contacts
diff --git a/erebos.cabal b/erebos.cabal
index 7f96d80..f001a24 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -39,6 +39,10 @@ Flag ci
default: False
manual: True
+Flag cryptonite
+ description: Use deprecated 'cryptonite' package
+ default: False
+
source-repository head
type: git
location: https://code.erebosprotocol.net/erebos
@@ -104,12 +108,11 @@ library
Erebos.Error
Erebos.Identity
Erebos.Network
- Erebos.Network.Channel
- Erebos.Network.Protocol
Erebos.Object
Erebos.Pairing
Erebos.PubKey
Erebos.Service
+ Erebos.Service.Stream
Erebos.Set
Erebos.State
Erebos.Storable
@@ -122,11 +125,14 @@ library
other-modules:
Erebos.Flow
+ Erebos.Network.Channel
+ Erebos.Network.Protocol
Erebos.Object.Internal
Erebos.Storage.Disk
Erebos.Storage.Internal
Erebos.Storage.Memory
Erebos.Storage.Platform
+ Erebos.UUID
Erebos.Util
c-sources:
@@ -154,7 +160,6 @@ library
bytestring >=0.10 && <0.13,
clock >=0.8 && < 0.9,
containers ^>= { 0.6, 0.7, 0.8 },
- crypton ^>= { 0.34, 1.0 },
deepseq >= 1.4 && <1.6,
directory >= 1.3 && <1.4,
filepath >=1.4 && <1.6,
@@ -168,9 +173,16 @@ library
stm >=2.5 && <2.6,
text >= 1.2 && <2.2,
time ^>= { 1.8, 1.9, 1.10, 1.11, 1.12, 1.13, 1.14 },
- uuid >=1.3 && <1.4,
+ uuid-types ^>= { 1.0.4 },
zlib >=0.6 && <0.8
+ if !flag(cryptonite)
+ build-depends:
+ crypton ^>= { 0.34, 1.0 },
+ else
+ build-depends:
+ cryptonite >=0.25 && <0.31,
+
if os(windows)
hs-source-dirs: src/windows
build-depends:
@@ -195,13 +207,13 @@ executable erebos
Test.Service
Version
Version.Git
+ WebSocket
autogen-modules:
Paths_erebos
build-depends:
ansi-terminal ^>= { 0.11, 1.0, 1.1 },
bytestring,
- crypton,
directory,
erebos,
mtl,
@@ -212,4 +224,12 @@ executable erebos
text,
time,
transformers >= 0.5 && <0.7,
- uuid,
+ uuid-types,
+ websockets ^>= { 0.12.7, 0.13 },
+
+ if !flag(cryptonite)
+ build-depends:
+ crypton,
+ else
+ build-depends:
+ cryptonite,
diff --git a/main/Main.hs b/main/Main.hs
index 3f78db1..26f4b12 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -61,6 +61,7 @@ import State
import Terminal
import Test
import Version
+import WebSocket
data Options = Options
{ optServer :: ServerOptions
@@ -68,6 +69,7 @@ data Options = Options
, optStorage :: StorageOption
, optChatroomAutoSubscribe :: Maybe Int
, optDmBotEcho :: Maybe Text
+ , optWebSocketServer :: Maybe Int
, optShowHelp :: Bool
, optShowVersion :: Bool
}
@@ -90,6 +92,7 @@ defaultOptions = Options
, optStorage = DefaultStorage
, optChatroomAutoSubscribe = Nothing
, optDmBotEcho = Nothing
+ , optWebSocketServer = Nothing
, optShowHelp = False
, optShowVersion = False
}
@@ -144,6 +147,9 @@ options =
, Option [] ["dm-bot-echo"]
(ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>")
"automatically reply to direct messages with the same text prefixed with <prefix>"
+ , Option [] [ "websocket-server" ]
+ (ReqArg (\value -> \opts -> opts { optWebSocketServer = Just (read value) }) "<port>")
+ "start WebSocket server on given port"
, Option ['h'] ["help"]
(NoArg $ \opts -> opts { optShowHelp = True })
"show this help and exit"
@@ -362,6 +368,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
startServer (optServer opts) erebosHead extPrintLn $
map soptService $ filter soptEnabled $ optServices opts
+ case optWebSocketServer opts of
+ Just port -> startWebsocketServer server "::" port extPrintLn
+ Nothing -> return ()
+
void $ liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange server
peerIdentity peer >>= \case
@@ -493,7 +503,10 @@ getSelectedChatroom = gets csContext >>= \case
_ -> throwOtherError "no chatroom selected"
getSelectedConversation :: CommandM Conversation
-getSelectedConversation = gets csContext >>= \case
+getSelectedConversation = gets csContext >>= getConversationFromContext
+
+getConversationFromContext :: CommandContext -> CommandM Conversation
+getConversationFromContext = \case
SelectedPeer peer -> peerIdentity peer >>= \case
PeerIdentityFull pid -> directMessageConversation $ finalOwner pid
_ -> throwOtherError "incomplete peer identity"
@@ -507,6 +520,13 @@ getSelectedConversation = gets csContext >>= \case
SelectedConversation conv -> reloadConversation conv
_ -> throwOtherError "no contact, peer or conversation selected"
+getSelectedOrManualContext :: CommandM CommandContext
+getSelectedOrManualContext = do
+ asks ciLine >>= \case
+ "" -> gets csContext
+ str | all isDigit str -> getContextByIndex (read str)
+ _ -> throwOtherError "invalid index"
+
commands :: [(String, Command)]
commands =
[ ("history", cmdHistory)
@@ -628,19 +648,22 @@ cmdMembers = do
forM_ (chatroomMembers room) $ \x -> do
cmdPutStrLn $ maybe "<unnamed>" T.unpack $ idName x
+getContextByIndex :: Int -> CommandM CommandContext
+getContextByIndex n = do
+ join (asks ciContextOptions) >>= \ctxs -> if
+ | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx
+ | otherwise -> throwOtherError "invalid index"
cmdSelectContext :: Command
cmdSelectContext = do
n <- read <$> asks ciLine
- join (asks ciContextOptions) >>= \ctxs -> if
- | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do
- modify $ \s -> s { csContext = ctx }
- case ctx of
- SelectedChatroom rstate -> do
- when (not (roomStateSubscribe rstate)) $ do
- chatroomSetSubscribe (head $ roomStateData rstate) True
- _ -> return ()
- | otherwise -> throwOtherError "invalid index"
+ ctx <- getContextByIndex n
+ modify $ \s -> s { csContext = ctx }
+ case ctx of
+ SelectedChatroom rstate -> do
+ when (not (roomStateSubscribe rstate)) $ do
+ chatroomSetSubscribe (head $ roomStateData rstate) True
+ _ -> return ()
cmdSend :: Command
cmdSend = void $ do
@@ -654,12 +677,12 @@ cmdSend = void $ do
cmdDelete :: Command
cmdDelete = void $ do
- deleteConversation =<< getSelectedConversation
+ deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext
modify $ \s -> s { csContext = NoContext }
cmdHistory :: Command
cmdHistory = void $ do
- conv <- getSelectedConversation
+ conv <- getConversationFromContext =<< getSelectedOrManualContext
case conversationHistory conv of
thread@(_:_) -> do
tzone <- liftIO $ getCurrentTimeZone
@@ -824,7 +847,7 @@ cmdConversations = do
cmdDetails :: Command
cmdDetails = do
- gets csContext >>= \case
+ getSelectedOrManualContext >>= \case
SelectedPeer peer -> do
cmdPutStrLn $ unlines
[ "Network peer:"
@@ -900,17 +923,11 @@ cmdDiscoveryInit = void $ do
cmdDiscovery :: Command
cmdDiscovery = void $ do
- Just peer <- gets csIcePeer
- st <- getStorage
+ server <- asks ciServer
sref <- asks ciLine
- eprint <- asks ciPrint
- liftIO $ readRef st (BC.pack sref) >>= \case
- Nothing -> error "ref does not exist"
- Just ref -> do
- res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref
- case res of
- Right _ -> return ()
- Left err -> eprint err
+ case readRefDigest (BC.pack sref) of
+ Nothing -> throwOtherError "failed to parse ref"
+ Just dgst -> discoverySearch server dgst
#ifdef ENABLE_ICE_SUPPORT
diff --git a/main/State.hs b/main/State.hs
index d357844..150178e 100644
--- a/main/State.hs
+++ b/main/State.hs
@@ -58,9 +58,8 @@ updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case
Nothing -> throwOtherError "no existing shared identity"
interactiveIdentityUpdate :: (Foldable f, MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => Terminal -> Identity f -> m UnifiedIdentity
-interactiveIdentityUpdate term identity = do
- let public = idKeyIdentity identity
-
+interactiveIdentityUpdate term fidentity = do
+ identity <- mergeIdentity fidentity
name <- liftIO $ do
setPrompt term $ T.unpack $ T.concat $ concat
[ [ T.pack "Name" ]
@@ -71,11 +70,11 @@ interactiveIdentityUpdate term identity = do
]
getInputLine term $ KeepPrompt . maybe T.empty T.pack
- if | T.null name -> mergeIdentity identity
+ if | T.null name -> return identity
| otherwise -> do
- secret <- loadKey public
- maybe (throwOtherError "created invalid identity") return . validateIdentity =<<
- mstore =<< sign secret =<< mstore (emptyIdentityData public)
- { iddPrev = toList $ idDataF identity
- , iddName = Just name
+ secret <- loadKey $ idKeyIdentity identity
+ maybe (throwOtherError "created invalid identity") return . validateExtendedIdentity =<<
+ mstore =<< sign secret =<< mstore . ExtendedIdentityData =<< return (emptyIdentityExtension $ idData identity)
+ { idePrev = toList $ idExtDataF identity
+ , ideName = Just name
}
diff --git a/main/Terminal.hs b/main/Terminal.hs
index 5dc3612..150bd8c 100644
--- a/main/Terminal.hs
+++ b/main/Terminal.hs
@@ -31,8 +31,9 @@ import Data.List
import Data.Text (Text)
import Data.Text qualified as T
-import System.IO
import System.Console.ANSI
+import System.IO
+import System.IO.Error
data Terminal = Terminal
@@ -107,7 +108,7 @@ termPutStr Terminal {..} str = do
getInput :: IO Input
getInput = do
- getChar >>= \case
+ handleJust (guard . isEOFError) (\() -> return InputEnd) $ getChar >>= \case
'\ESC' -> do
esc <- readEsc
case parseEsc esc of
diff --git a/main/Test.hs b/main/Test.hs
index e54285a..fa8501e 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -26,7 +26,7 @@ import Data.Text qualified as T
import Data.Text.Encoding
import Data.Text.IO qualified as T
import Data.Typeable
-import Data.UUID qualified as U
+import Data.UUID.Types qualified as U
import Network.Socket
@@ -44,6 +44,7 @@ import Erebos.Object
import Erebos.Pairing
import Erebos.PubKey
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Set
import Erebos.State
import Erebos.Storable
@@ -66,10 +67,17 @@ data TestState = TestState
data RunningServer = RunningServer
{ rsServer :: Server
- , rsPeers :: MVar (Int, [(Int, Peer)])
+ , rsPeers :: MVar ( Int, [ TestPeer ] )
, rsPeerThread :: ThreadId
}
+data TestPeer = TestPeer
+ { tpIndex :: Int
+ , tpPeer :: Peer
+ , tpStreamReaders :: MVar [ (Int, StreamReader ) ]
+ , tpStreamWriters :: MVar [ (Int, StreamWriter ) ]
+ }
+
initTestState :: TestState
initTestState = TestState
{ tsHead = Nothing
@@ -137,17 +145,20 @@ cmdOut line = do
getPeer :: Text -> CommandM Peer
-getPeer spidx = do
+getPeer spidx = tpPeer <$> getTestPeer spidx
+
+getTestPeer :: Text -> CommandM TestPeer
+getTestPeer spidx = do
Just RunningServer {..} <- gets tsServer
- Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers)
+ Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers)
return peer
-getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int
+getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int
getPeerIndex pmvar = do
peer <- asks svcPeer
- maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar)
+ maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar)
-pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a
+pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a
pairingAttributes _ out peers prefix = PairingAttributes
{ pairingHookRequest = return ()
@@ -267,6 +278,9 @@ commands = map (T.pack *** id)
, ("peer-drop", cmdPeerDrop)
, ("peer-list", cmdPeerList)
, ("test-message-send", cmdTestMessageSend)
+ , ("test-stream-open", cmdTestStreamOpen)
+ , ("test-stream-close", cmdTestStreamClose)
+ , ("test-stream-send", cmdTestStreamSend)
, ("local-state-get", cmdLocalStateGet)
, ("local-state-replace", cmdLocalStateReplace)
, ("local-state-wait", cmdLocalStateWait)
@@ -286,6 +300,7 @@ commands = map (T.pack *** id)
, ("contact-set-name", cmdContactSetName)
, ("dm-send-peer", cmdDmSendPeer)
, ("dm-send-contact", cmdDmSendContact)
+ , ("dm-send-identity", cmdDmSendIdentity)
, ("dm-list-peer", cmdDmListPeer)
, ("dm-list-contact", cmdDmListContact)
, ("chatroom-create", cmdChatroomCreate)
@@ -501,7 +516,20 @@ cmdStartServer = do
{ testMessageReceived = \obj otype len sref -> do
liftIO $ do
void $ store (headStorage h) obj
- outLine out $ unwords ["test-message-received", otype, len, sref]
+ outLine out $ unwords [ "test-message-received", otype, len, sref ]
+ , testStreamsReceived = \streams -> do
+ pidx <- getPeerIndex rsPeers
+ liftIO $ do
+ nums <- mapM getStreamReaderNumber streams
+ outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums
+ forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do
+ let go = readStreamPacket stream >>= \case
+ StreamData seqNum bytes -> do
+ outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ]
+ go
+ StreamClosed seqNum -> do
+ outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ]
+ go
}
sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'"
@@ -510,15 +538,23 @@ cmdStartServer = do
rsPeerThread <- liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange rsServer
- let printPeer (idx, p) = do
- params <- peerIdentity p >>= return . \case
+ let printPeer TestPeer {..} = do
+ params <- peerIdentity tpPeer >>= return . \case
PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
- _ -> [ "addr", show (peerAddress p) ]
- outLine out $ unwords $ [ "peer", show idx ] ++ params
+ _ -> [ "addr", show (peerAddress tpPeer) ]
+ outLine out $ unwords $ [ "peer", show tpIndex ] ++ params
- update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)])
- update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur
- | otherwise = fmap (p:) <$> update (nid, ps)
+ update ( tpIndex, [] ) = do
+ tpPeer <- return peer
+ tpStreamReaders <- newMVar []
+ tpStreamWriters <- newMVar []
+ let tp = TestPeer {..}
+ printPeer tp
+ return ( tpIndex + 1, [ tp ] )
+
+ update cur@( nid, p : ps )
+ | tpPeer p == peer = printPeer p >> return cur
+ | otherwise = fmap (p :) <$> update ( nid, ps )
modifyMVar_ rsPeers update
@@ -555,10 +591,10 @@ cmdPeerList = do
peers <- liftIO $ getCurrentPeerList rsServer
tpeers <- liftIO $ readMVar rsPeers
forM_ peers $ \peer -> do
- Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers
+ Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers
mbpid <- peerIdentity peer
cmdOut $ unwords $ concat
- [ [ "peer-list-item", show n ]
+ [ [ "peer-list-item", show (tpIndex tp) ]
, [ "addr", show (peerAddress peer) ]
, case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
_ -> []
@@ -575,6 +611,40 @@ cmdTestMessageSend = do
sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs
cmdOut "test-message-send done"
+cmdTestStreamOpen :: Command
+cmdTestStreamOpen = do
+ spidx : rest <- asks tiParams
+ tp <- getTestPeer spidx
+ count <- case rest of
+ [] -> return 1
+ tcount : _ -> return $ read $ T.unpack tcount
+
+ out <- asks tiOutput
+ runPeerService (tpPeer tp) $ do
+ streams <- openTestStreams count
+ afterCommit $ do
+ nums <- mapM getStreamWriterNumber streams
+ modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams)
+ outLine out $ unwords $ "test-stream-open-done"
+ : T.unpack spidx
+ : map show nums
+
+cmdTestStreamClose :: Command
+cmdTestStreamClose = do
+ [ spidx, sid ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ closeStream stream
+ cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ]
+
+cmdTestStreamSend :: Command
+cmdTestStreamSend = do
+ [ spidx, sid, content ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ writeStream stream $ encodeUtf8 content
+ cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ]
+
cmdLocalStateGet :: Command
cmdLocalStateGet = do
h <- getHead
@@ -747,6 +817,14 @@ cmdDmSendContact = do
Just to <- contactIdentity <$> getContact cid
void $ sendDirectMessage to msg
+cmdDmSendIdentity :: Command
+cmdDmSendIdentity = do
+ st <- asks tiStorage
+ [ tid, msg ] <- asks tiParams
+ Just ref <- liftIO $ readRef st $ encodeUtf8 tid
+ Just to <- return $ validateExtendedIdentity $ wrappedLoad ref
+ void $ sendDirectMessage to msg
+
dmList :: Foldable f => Identity f -> Command
dmList peer = do
threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead
@@ -888,11 +966,7 @@ cmdChatroomMessageSend = do
cmdDiscoveryConnect :: Command
cmdDiscoveryConnect = do
- st <- asks tiStorage
[ tref ] <- asks tiParams
- Just ref <- liftIO $ readRef st $ encodeUtf8 tref
-
+ Just dgst <- return $ readRefDigest $ encodeUtf8 tref
Just RunningServer {..} <- gets tsServer
- peers <- liftIO $ getCurrentPeerList rsServer
- forM_ peers $ \peer -> do
- sendToPeer peer $ DiscoverySearch ref
+ discoverySearch rsServer dgst
diff --git a/main/Test/Service.hs b/main/Test/Service.hs
index 8c58dee..c0be07d 100644
--- a/main/Test/Service.hs
+++ b/main/Test/Service.hs
@@ -1,8 +1,11 @@
module Test.Service (
TestMessage(..),
TestMessageAttributes(..),
+
+ openTestStreams,
) where
+import Control.Monad
import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as BL
@@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL
import Erebos.Network
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
data TestMessage = TestMessage (Stored Object)
data TestMessageAttributes = TestMessageAttributes
{ testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage ()
+ , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage ()
}
instance Storable TestMessage where
@@ -26,7 +31,10 @@ instance Service TestMessage where
serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8"
type ServiceAttributes TestMessage = TestMessageAttributes
- defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ())
+ defaultServiceAttributes _ = TestMessageAttributes
+ { testMessageReceived = \_ _ _ _ -> return ()
+ , testStreamsReceived = \_ -> return ()
+ }
serviceHandler smsg = do
let TestMessage sobj = fromStored smsg
@@ -36,3 +44,14 @@ instance Service TestMessage where
cb <- asks $ testMessageReceived . svcAttributes
cb obj otype len (show $ refDigest $ storedRef sobj)
_ -> return ()
+
+ streams <- receivedStreams
+ when (not $ null streams) $ do
+ cb <- asks $ testStreamsReceived . svcAttributes
+ cb streams
+
+
+openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ]
+openTestStreams count = do
+ replyPacket . TestMessage =<< mstore (Rec [])
+ replicateM count openStream
diff --git a/main/WebSocket.hs b/main/WebSocket.hs
new file mode 100644
index 0000000..fbdd65f
--- /dev/null
+++ b/main/WebSocket.hs
@@ -0,0 +1,45 @@
+module WebSocket (
+ startWebsocketServer,
+) where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+
+import Data.ByteString.Lazy qualified as BL
+import Data.Unique
+
+import Erebos.Network
+
+import Network.WebSockets qualified as WS
+
+
+data WebSocketAddress = WebSocketAddress Unique WS.Connection
+
+instance Eq WebSocketAddress where
+ WebSocketAddress u _ == WebSocketAddress u' _ = u == u'
+
+instance Ord WebSocketAddress where
+ compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u'
+
+instance Show WebSocketAddress where
+ show (WebSocketAddress _ _) = "websocket"
+
+instance PeerAddressType WebSocketAddress where
+ sendBytesToAddress (WebSocketAddress _ conn) msg = do
+ WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg
+
+startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO ()
+startWebsocketServer server addr port logd = do
+ void $ forkIO $ do
+ WS.runServer addr port $ \pending -> do
+ conn <- WS.acceptRequest pending
+ u <- newUnique
+ let paddr = WebSocketAddress u conn
+ void $ serverPeerCustom server paddr
+ handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do
+ WS.withPingThread conn 30 (return ()) $ do
+ forever $ do
+ WS.receiveDataMessage conn >>= \case
+ WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg
+ WS.Text {} -> logd $ "unexpected websocket text message"
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs
index dee6faa..187fddd 100644
--- a/src/Erebos/Conversation.hs
+++ b/src/Erebos/Conversation.hs
@@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con
directMessageConversation peer = do
(find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case
Just thread -> return $ DirectMessageConversation thread
- Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] []
+ Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] []
chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation)
chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate)
diff --git a/src/Erebos/DirectMessage.hs b/src/Erebos/DirectMessage.hs
index 05da865..dc6724c 100644
--- a/src/Erebos/DirectMessage.hs
+++ b/src/Erebos/DirectMessage.hs
@@ -17,6 +17,7 @@ module Erebos.DirectMessage (
) where
import Control.Monad
+import Control.Monad.Except
import Control.Monad.Reader
import Data.List
@@ -27,8 +28,10 @@ import qualified Data.Text as T
import Data.Time.Format
import Data.Time.LocalTime
+import Erebos.Discovery
import Erebos.Identity
import Erebos.Network
+import Erebos.Object
import Erebos.Service
import Erebos.State
import Erebos.Storable
@@ -102,8 +105,10 @@ instance Service DirectMessage where
serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal
- serviceStorageWatchers _ = (:[]) $
- SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ serviceStorageWatchers _ =
+ [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers
+ ]
data MessageState = MessageState
@@ -209,12 +214,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do
else do
return unchanged
+findMissingPeers :: Server -> DirectMessageThreads -> ExceptT ErebosError IO ()
+findMissingPeers server threads = do
+ forM_ (toThreadList threads) $ \thread -> do
+ when (msgHead thread /= msgReceived thread) $ do
+ mapM_ (discoverySearch server) $ map (refDigest . storedRef) $ idDataF $ msgPeer thread
+
data DirectMessageThread = DirectMessageThread
{ msgPeer :: ComposedIdentity
- , msgHead :: [Stored DirectMessage]
- , msgSent :: [Stored DirectMessage]
- , msgSeen :: [Stored DirectMessage]
+ , msgHead :: [ Stored DirectMessage ]
+ , msgSent :: [ Stored DirectMessage ]
+ , msgSeen :: [ Stored DirectMessage ]
+ , msgReceived :: [ Stored DirectMessage ]
}
threadToList :: DirectMessageThread -> [DirectMessage]
@@ -248,6 +260,7 @@ messageThreadFor peer mss =
, msgHead = filterAncestors $ ready ++ received
, msgSent = filterAncestors $ sent ++ received
, msgSeen = filterAncestors $ ready ++ seen
+ , msgReceived = filterAncestors $ received
}
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 48500d7..2fb0ffe 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -3,7 +3,9 @@
module Erebos.Discovery (
DiscoveryService(..),
DiscoveryAttributes(..),
- DiscoveryConnection(..)
+ DiscoveryConnection(..),
+
+ discoverySearch,
) where
import Control.Concurrent
@@ -12,9 +14,13 @@ import Control.Monad.Except
import Control.Monad.Reader
import Data.IP qualified as IP
+import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
import Data.Maybe
+import Data.Proxy
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
@@ -31,11 +37,18 @@ import Erebos.Service
import Erebos.Storable
+#ifndef ENABLE_ICE_SUPPORT
+type IceConfig = ()
+type IceSession = ()
+type IceRemoteInfo = Stored Object
+#endif
+
+
data DiscoveryService
= DiscoverySelf [ Text ] (Maybe Int)
| DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
- | DiscoverySearch Ref
- | DiscoveryResult Ref [ Text ]
+ | DiscoverySearch (Either Ref RefDigest)
+ | DiscoveryResult (Either Ref RefDigest) [ Text ]
| DiscoveryConnectionRequest DiscoveryConnection
| DiscoveryConnectionResponse DiscoveryConnection
@@ -55,17 +68,13 @@ defaultDiscoveryAttributes = DiscoveryAttributes
}
data DiscoveryConnection = DiscoveryConnection
- { dconnSource :: Ref
- , dconnTarget :: Ref
+ { dconnSource :: Either Ref RefDigest
+ , dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
-#ifdef ENABLE_ICE_SUPPORT
, dconnIceInfo :: Maybe IceRemoteInfo
-#else
- , dconnIceInfo :: Maybe (Stored Object)
-#endif
}
-emptyConnection :: Ref -> Ref -> DiscoveryConnection
+emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
@@ -84,19 +93,20 @@ instance Storable DiscoveryService where
storeMbInt "stun-port" stunPort
storeMbText "turn-server" turnServer
storeMbInt "turn-port" turnPort
- DiscoverySearch ref -> storeRawRef "search" ref
- DiscoveryResult ref addr -> do
- storeRawRef "result" ref
+ DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst
+ DiscoveryResult edgst addr -> do
+ either (storeRawRef "result") (storeRawWeak "result") edgst
mapM_ (storeText "address") addr
DiscoveryConnectionRequest conn -> storeConnection "request" conn
DiscoveryConnectionResponse conn -> storeConnection "response" conn
- where storeConnection ctype conn = do
- storeText "connection" $ ctype
- storeRawRef "source" $ dconnSource conn
- storeRawRef "target" $ dconnTarget conn
- storeMbText "address" $ dconnAddress conn
- storeMbRef "ice-info" $ dconnIceInfo conn
+ where
+ storeConnection ctype DiscoveryConnection {..} = do
+ storeText "connection" $ ctype
+ either (storeRawRef "source") (storeRawWeak "source") dconnSource
+ either (storeRawRef "target") (storeRawWeak "target") dconnTarget
+ storeMbText "address" dconnAddress
+ storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
[ do
@@ -114,29 +124,59 @@ instance Storable DiscoveryService where
<*> loadMbInt "stun-port"
<*> loadMbText "turn-server"
<*> loadMbInt "turn-port"
- , DiscoverySearch <$> loadRawRef "search"
+ , DiscoverySearch <$> msum
+ [ Left <$> loadRawRef "search"
+ , Right <$> loadRawWeak "search"
+ ]
, DiscoveryResult
- <$> loadRawRef "result"
+ <$> msum
+ [ Left <$> loadRawRef "result"
+ , Right <$> loadRawWeak "result"
+ ]
<*> loadTexts "address"
, loadConnection "request" DiscoveryConnectionRequest
, loadConnection "response" DiscoveryConnectionResponse
]
- where loadConnection ctype ctor = do
- ctype' <- loadText "connection"
- guard $ ctype == ctype'
- return . ctor =<< DiscoveryConnection
- <$> loadRawRef "source"
- <*> loadRawRef "target"
- <*> loadMbText "address"
- <*> loadMbRef "ice-info"
+ where
+ loadConnection ctype ctor = do
+ ctype' <- loadText "connection"
+ guard $ ctype == ctype'
+ dconnSource <- msum
+ [ Left <$> loadRawRef "source"
+ , Right <$> loadRawWeak "source"
+ ]
+ dconnTarget <- msum
+ [ Left <$> loadRawRef "target"
+ , Right <$> loadRawWeak "target"
+ ]
+ dconnAddress <- loadMbText "address"
+ dconnIceInfo <- loadMbRef "ice-info"
+ return $ ctor DiscoveryConnection {..}
data DiscoveryPeer = DiscoveryPeer
{ dpPriority :: Int
, dpPeer :: Maybe Peer
, dpAddress :: [ Text ]
-#ifdef ENABLE_ICE_SUPPORT
, dpIceSession :: Maybe IceSession
-#endif
+ }
+
+emptyPeer :: DiscoveryPeer
+emptyPeer = DiscoveryPeer
+ { dpPriority = 0
+ , dpPeer = Nothing
+ , dpAddress = []
+ , dpIceSession = Nothing
+ }
+
+data DiscoveryPeerState = DiscoveryPeerState
+ { dpsStunServer :: Maybe ( Text, Word16 )
+ , dpsTurnServer :: Maybe ( Text, Word16 )
+ , dpsIceConfig :: Maybe IceConfig
+ }
+
+data DiscoveryGlobalState = DiscoveryGlobalState
+ { dgsPeers :: Map RefDigest DiscoveryPeer
+ , dgsSearchingFor :: Set RefDigest
}
instance Service DiscoveryService where
@@ -145,13 +185,18 @@ instance Service DiscoveryService where
type ServiceAttributes DiscoveryService = DiscoveryAttributes
defaultServiceAttributes _ = defaultDiscoveryAttributes
-#ifdef ENABLE_ICE_SUPPORT
- type ServiceState DiscoveryService = Maybe IceConfig
- emptyServiceState _ = Nothing
-#endif
+ type ServiceState DiscoveryService = DiscoveryPeerState
+ emptyServiceState _ = DiscoveryPeerState
+ { dpsStunServer = Nothing
+ , dpsTurnServer = Nothing
+ , dpsIceConfig = Nothing
+ }
- type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer
- emptyServiceGlobalState _ = M.empty
+ type ServiceGlobalState DiscoveryService = DiscoveryGlobalState
+ emptyServiceGlobalState _ = DiscoveryGlobalState
+ { dgsPeers = M.empty
+ , dgsSearchingFor = S.empty
+ }
serviceHandler msg = case fromStored msg of
DiscoverySelf addrs priority -> do
@@ -172,15 +217,14 @@ instance Service DiscoveryService where
| otherwise -> return Nothing
- forM_ (idDataF =<< unfoldOwners pid) $ \s ->
- svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer
- { dpPriority = fromMaybe 0 priority
- , dpPeer = Just peer
- , dpAddress = addrs
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+ forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do
+ let dp = DiscoveryPeer
+ { dpPriority = fromMaybe 0 priority
+ , dpPeer = Just peer
+ , dpAddress = addrs
+ , dpIceSession = Nothing
+ }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s }
attrs <- asks svcAttributes
replyPacket $ DiscoveryAcknowledged matchedAddrs
(discoveryStunServer attrs)
@@ -189,7 +233,6 @@ instance Service DiscoveryService where
(discoveryTurnPort attrs)
DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do
-#ifdef ENABLE_ICE_SUPPORT
paddr <- asks (peerAddress . svcPeer) >>= return . \case
(DatagramAddress saddr) -> case IP.fromSockAddr saddr of
Just (IP.IPv6 ipv6, _)
@@ -205,100 +248,98 @@ instance Service DiscoveryService where
toIceServer (Just server) Nothing = Just ( server, 0 )
toIceServer (Just server) (Just port) = Just ( server, port )
- cfg <- liftIO $ iceCreateConfig
- (toIceServer stunServer stunPort)
- (toIceServer turnServer turnPort)
- svcSet cfg
-#endif
- return ()
+ svcModify $ \s -> s
+ { dpsStunServer = toIceServer stunServer stunPort
+ , dpsTurnServer = toIceServer turnServer turnPort
+ }
- DiscoverySearch ref -> do
- dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal
- replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer
+ DiscoverySearch edgst -> do
+ dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal
+ replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer
- DiscoveryResult ref [] -> do
- svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found"
+ DiscoveryResult edgst [] -> do
+ svcPrint $ "Discovery: " ++ show (either refDigest id edgst) ++ " not found"
- DiscoveryResult ref addrs -> do
+ DiscoveryResult edgst addrs -> do
+ let dgst = either refDigest id edgst
-- TODO: check if we really requested that
server <- asks svcServer
+ st <- getStorage
self <- svcSelf
- mbIceConfig <- svcGet
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
- liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if
+ forM_ addrs $ \addr -> if
| addr == T.pack "ICE"
-#ifdef ENABLE_ICE_SUPPORT
- , Just config <- mbIceConfig
- -> do
- ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
- rinfo <- iceRemoteInfo ice
- res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
- case res of
- Right _ -> return ()
- Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
-
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Nothing
- , dpAddress = []
- , dpIceSession = Just ice
- }
-#else
-> do
- return ()
+#ifdef ENABLE_ICE_SUPPORT
+ getIceConfig >>= \case
+ Just config -> void $ liftIO $ forkIO $ do
+ ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
+ rinfo <- iceRemoteInfo ice
+
+ -- Try to promote weak ref to normal one for older peers:
+ edgst' <- case edgst of
+ Left r -> return (Left r)
+ Right d -> refFromDigest st d >>= \case
+ Just r -> return (Left r)
+ Nothing -> return (Right d)
+
+ res <- runExceptT $ sendToPeer discoveryPeer $
+ DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo }
+ case res of
+ Right _ -> return ()
+ Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
+
+ runAsService $ do
+ let upd dp = dp { dpIceSession = Just ice }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+
+ Nothing -> do
+ return ()
#endif
+ return ()
| [ ipaddr, port ] <- words (T.unpack addr) -> do
- saddr <- head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- serverPeer server (addrAddress saddr)
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Just peer
- , dpAddress = []
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+ void $ liftIO $ forkIO $ do
+ saddr <- head <$>
+ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
+ peer <- serverPeer server (addrAddress saddr)
+ runAsService $ do
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
| otherwise -> do
- runAsService $ do
- svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
+ svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
DiscoveryConnectionRequest conn -> do
self <- svcSelf
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ if either refDigest id (dconnTarget conn) `elem` identityDigests self
+ then if
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
+ -- request for us, create ICE sesssion
+ | Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
- svcGet >>= \case
+ getIceConfig >>= \case
Just config -> do
liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do
rinfo <- iceRemoteInfo ice
res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo }
case res of
- Right _ -> do
- case dconnIceInfo conn of
- Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice
- Nothing -> putStrLn $ "Discovery: connection request without ICE remote info"
+ Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice
Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err
Nothing -> do
- svcPrint $ "Discovery: ICE request from peer without ICE configuration"
-#else
- return ()
+ return ()
#endif
- else do
+ | otherwise -> do
+ svcPrint $ "Discovery: unsupported connection request"
+
+ else do
-- request to some of our peers, relay
- mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal
+ mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
@@ -308,32 +349,31 @@ instance Service DiscoveryService where
DiscoveryConnectionResponse conn -> do
self <- svcSelf
- dpeers <- svcGetGlobal
- if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
+ dpeers <- dgsPeers <$> svcGetGlobal
+ if either refDigest id (dconnSource conn) `elem` identityDigests self
then do
-- response to our request, try to connect to the peer
-#ifdef ENABLE_ICE_SUPPORT
server <- asks svcServer
if | Just addr <- dconnAddress conn
, [ipaddr, port] <- words (T.unpack addr) -> do
saddr <- liftIO $ head <$>
getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
peer <- liftIO $ serverPeer server (addrAddress saddr)
- svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $
- DiscoveryPeer 0 (Just peer) [] Nothing
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s
+ { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
- | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers
+#ifdef ENABLE_ICE_SUPPORT
+ | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
, Just rinfo <- dconnIceInfo conn -> do
liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice
+#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
-#else
- return ()
-#endif
else do
-- response to relayed request
- case M.lookup (refDigest $ dconnSource conn) dpeers of
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
Just dp | Just dpeer <- dpPeer dp -> do
sendToPeer dpeer $ DiscoveryConnectionResponse conn
_ -> svcPrint $ "Discovery: failed to relay connection response"
@@ -352,5 +392,58 @@ instance Service DiscoveryService where
#endif
]
+ pid <- asks svcPeerIdentity
+ gs <- svcGetGlobal
+ let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid)
+ svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor }
+
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
+ forM_ searchingFor $ \dgst -> do
+ sendToPeer peer $ DiscoverySearch (Right dgst)
+
+#ifdef ENABLE_ICE_SUPPORT
+ serviceStopServer _ _ _ pstates = do
+ forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do
+ mapM_ iceStopThread dpsIceConfig
+#endif
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
+
+
+getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig)
+getIceConfig = do
+ dpsIceConfig <$> svcGet >>= \case
+ Just cfg -> return $ Just cfg
+ Nothing -> do
+#ifdef ENABLE_ICE_SUPPORT
+ stun <- dpsStunServer <$> svcGet
+ turn <- dpsTurnServer <$> svcGet
+ liftIO (iceCreateConfig stun turn) >>= \case
+ Just cfg -> do
+ svcModify $ \s -> s { dpsIceConfig = Just cfg }
+ return $ Just cfg
+ Nothing -> do
+ svcPrint $ "Discovery: failed to create ICE config"
+ return Nothing
+#else
+ return Nothing
+#endif
+
+
+discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m ()
+discoverySearch server dgst = do
+ peers <- liftIO $ getCurrentPeerList server
+ match <- forM peers $ \peer -> do
+ peerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ dgst `elem` identityDigests pid
+ _ -> return False
+ when (not $ or match) $ do
+ modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
+ { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s
+ }
+ forM_ peers $ \peer -> do
+ sendToPeer peer $ DiscoverySearch $ Right dgst
diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs
index ba2607a..1e1a521 100644
--- a/src/Erebos/Flow.hs
+++ b/src/Erebos/Flow.hs
@@ -11,54 +11,53 @@ module Erebos.Flow (
import Control.Concurrent.STM
-data Flow r w = Flow (TMVar [r]) (TMVar [w])
- | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
+data Flow r w
+ = Flow (TBQueue r) (TBQueue w)
+ | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
type SymFlow a = Flow a a
newFlow :: STM (Flow a b, Flow b a)
newFlow = do
- x <- newEmptyTMVar
- y <- newEmptyTMVar
+ x <- newTBQueue 16
+ y <- newTBQueue 16
return (Flow x y, Flow y x)
newFlowIO :: IO (Flow a b, Flow b a)
newFlowIO = atomically newFlow
readFlow :: Flow r w -> STM r
-readFlow (Flow rvar _) = takeTMVar rvar >>= \case
- (x:[]) -> return x
- (x:xs) -> putTMVar rvar xs >> return x
- [] -> error "Flow: empty list"
+readFlow (Flow rvar _) = readTBQueue rvar
readFlow (MappedFlow f _ up) = f <$> readFlow up
tryReadFlow :: Flow r w -> STM (Maybe r)
-tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case
- Just (x:[]) -> return (Just x)
- Just (x:xs) -> putTMVar rvar xs >> return (Just x)
- Just [] -> error "Flow: empty list"
- Nothing -> return Nothing
+tryReadFlow (Flow rvar _) = tryReadTBQueue rvar
tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up
canReadFlow :: Flow r w -> STM Bool
-canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar
+canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar
canReadFlow (MappedFlow _ _ up) = canReadFlow up
writeFlow :: Flow r w -> w -> STM ()
-writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
+writeFlow (Flow _ wvar) = writeTBQueue wvar
writeFlow (MappedFlow _ f up) = writeFlow up . f
writeFlowBulk :: Flow r w -> [w] -> STM ()
writeFlowBulk _ [] = return ()
-writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
+writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs
writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
tryWriteFlow :: Flow r w -> w -> STM Bool
-tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[])
-tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f
+tryWriteFlow (Flow _ wvar) x = do
+ isFullTBQueue wvar >>= \case
+ True -> return False
+ False -> do
+ writeTBQueue wvar x
+ return True
+tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x
canWriteFlow :: Flow r w -> STM Bool
-canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar
+canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar
canWriteFlow (MappedFlow _ _ up) = canWriteFlow up
readFlowIO :: Flow r w -> IO r
diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs
index 2c6f500..dceeb2c 100644
--- a/src/Erebos/ICE.chs
+++ b/src/Erebos/ICE.chs
@@ -8,6 +8,7 @@ module Erebos.ICE (
IceRemoteInfo,
iceCreateConfig,
+ iceStopThread,
iceCreateSession,
iceDestroy,
iceRemoteInfo,
@@ -139,6 +140,12 @@ iceCreateConfig stun turn =
then return Nothing
else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg
+foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread"
+ ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO ()
+
+iceStopThread :: IceConfig -> IO ()
+iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread
+
{#pointer *pj_ice_strans as ^ #}
iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession
diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c
index e79fb9d..e9446fe 100644
--- a/src/Erebos/ICE/pjproject.c
+++ b/src/Erebos/ICE/pjproject.c
@@ -1,6 +1,7 @@
#include "pjproject.h"
#include "Erebos/ICE_stub.h"
+#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -15,6 +16,13 @@ static struct
pj_sockaddr def_addr;
} ice;
+struct erebos_ice_cfg
+{
+ pj_ice_strans_cfg cfg;
+ pj_thread_t * thread;
+ atomic_bool exit;
+};
+
struct user_data
{
pj_ice_sess_role role;
@@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status)
fprintf(stderr, "ICE: %s: %s\n", msg, err);
}
-static int ice_worker_thread(void * vcfg)
+static int ice_worker_thread( void * vcfg )
{
- pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg;
+ struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg );
- while (true) {
+ while( ! ecfg->exit ){
pj_time_val max_timeout = { 0, 0 };
pj_time_val timeout = { 0, 0 };
max_timeout.msec = 500;
- pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout);
+ pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout );
pj_assert(timeout.sec >= 0 && timeout.msec >= 0);
if (timeout.msec >= 1000)
@@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg)
if (PJ_TIME_VAL_GT(timeout, max_timeout))
timeout = max_timeout;
- int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout);
+ int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout );
if (c < 0)
pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
}
@@ -131,80 +139,91 @@ exit:
pthread_mutex_unlock(&mutex);
}
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port )
{
ice_init();
- pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) );
- pj_ice_strans_cfg_default( cfg );
+ struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) );
+ pj_ice_strans_cfg_default( &ecfg->cfg );
+ ecfg->exit = false;
+ ecfg->thread = NULL;
- cfg->stun_cfg.pf = &ice.cp.factory;
+ ecfg->cfg.stun_cfg.pf = &ice.cp.factory;
if( pj_timer_heap_create( ice.pool, 100,
- &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){
+ &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){
fprintf( stderr, "pj_timer_heap_create failed\n" );
goto fail;
}
- if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){
+ if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){
fprintf( stderr, "pj_ioqueue_create failed\n" );
goto fail;
}
- pj_thread_t * thread;
if( pj_thread_create( ice.pool, NULL, &ice_worker_thread,
- cfg, 0, 0, &thread ) != PJ_SUCCESS ){
+ ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){
fprintf( stderr, "pj_thread_create failed\n" );
goto fail;
}
- cfg->af = pj_AF_INET();
- cfg->opt.aggressive = PJ_TRUE;
+ ecfg->cfg.af = pj_AF_INET();
+ ecfg->cfg.opt.aggressive = PJ_TRUE;
if( stun_server ){
- cfg->stun.server.ptr = malloc( strlen( stun_server ));
- pj_strcpy2( &cfg->stun.server, stun_server );
+ ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server ));
+ pj_strcpy2( &ecfg->cfg.stun.server, stun_server );
if( stun_port )
- cfg->stun.port = stun_port;
+ ecfg->cfg.stun.port = stun_port;
}
if( turn_server ){
- cfg->turn.server.ptr = malloc( strlen( turn_server ));
- pj_strcpy2( &cfg->turn.server, turn_server );
+ ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server ));
+ pj_strcpy2( &ecfg->cfg.turn.server, turn_server );
if( turn_port )
- cfg->turn.port = turn_port;
- cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
- cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
- cfg->turn.conn_type = PJ_TURN_TP_UDP;
+ ecfg->cfg.turn.port = turn_port;
+ ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
+ ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+ ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP;
}
- return cfg;
+ return ecfg;
fail:
- ice_cfg_free( cfg );
+ ice_cfg_free( ecfg );
return NULL;
}
-void ice_cfg_free( pj_ice_strans_cfg * cfg )
+void ice_cfg_free( struct erebos_ice_cfg * ecfg )
{
- if( ! cfg )
+ if( ! ecfg )
return;
- if( cfg->turn.server.ptr )
- free( cfg->turn.server.ptr );
+ ecfg->exit = true;
+ pj_thread_join( ecfg->thread );
- if( cfg->stun.server.ptr )
- free( cfg->stun.server.ptr );
+ if( ecfg->cfg.turn.server.ptr )
+ free( ecfg->cfg.turn.server.ptr );
- if( cfg->stun_cfg.ioqueue )
- pj_ioqueue_destroy( cfg->stun_cfg.ioqueue );
+ if( ecfg->cfg.stun.server.ptr )
+ free( ecfg->cfg.stun.server.ptr );
- if( cfg->stun_cfg.timer_heap )
- pj_timer_heap_destroy( cfg->stun_cfg.timer_heap );
+ if( ecfg->cfg.stun_cfg.ioqueue )
+ pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue );
- free( cfg );
+ if( ecfg->cfg.stun_cfg.timer_heap )
+ pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap );
+
+ free( ecfg );
+}
+
+void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg )
+{
+ if( ! ecfg )
+ return;
+ ecfg->exit = true;
}
-pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role,
+pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb )
{
ice_init();
@@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role
.on_ice_complete = cb_on_ice_complete,
};
- pj_status_t status = pj_ice_strans_create( NULL, cfg, 1,
+ pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1,
udata, &icecb, &res );
if (status != PJ_SUCCESS)
diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h
index e4fcbdb..c31e227 100644
--- a/src/Erebos/ICE/pjproject.h
+++ b/src/Erebos/ICE/pjproject.h
@@ -3,11 +3,12 @@
#include <pjnath.h>
#include <HsFFI.h>
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port );
-void ice_cfg_free( pj_ice_strans_cfg * cfg );
+void ice_cfg_free( struct erebos_ice_cfg * cfg );
+void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg );
-pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role,
+pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb );
void ice_destroy(pj_ice_strans * strans);
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 54658de..8da4c8d 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -14,7 +14,12 @@ module Erebos.Network (
PeerIdentity(..), peerIdentity,
WaitingRef, wrDigest,
Service(..),
+
+ PeerAddressType(..),
+ receivedFromCustomAddress,
+
serverPeer,
+ serverPeerCustom,
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce,
#endif
@@ -24,6 +29,7 @@ module Erebos.Network (
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -36,13 +42,14 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
+import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
import Data.Map (Map)
-import qualified Data.Map as M
+import Data.Map qualified as M
import Data.Maybe
import Data.Typeable
import Data.Word
@@ -56,7 +63,7 @@ import Foreign.Storable as F
import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
-import qualified Network.Socket.ByteString as S
+import Network.Socket.ByteString qualified as S
import Erebos.Error
#ifdef ENABLE_ICE_SUPPORT
@@ -157,12 +164,19 @@ setPeerChannel Peer {..} ch = do
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
-data PeerAddress = DatagramAddress SockAddr
+class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where
+ sendBytesToAddress :: addr -> ByteString -> IO ()
+
+data PeerAddress
+ = forall addr. PeerAddressType addr => CustomPeerAddress addr
+ | DatagramAddress SockAddr
#ifdef ENABLE_ICE_SUPPORT
- | PeerIceSession IceSession
+ | PeerIceSession IceSession
#endif
instance Show PeerAddress where
+ show (CustomPeerAddress addr) = show addr
+
show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of
Just (IP.IPv6 ipv6, port)
| (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
@@ -170,22 +184,32 @@ instance Show PeerAddress where
Just (addr, port)
-> [show addr, show port]
_ -> [show saddr]
+
#ifdef ENABLE_ICE_SUPPORT
show (PeerIceSession ice) = show ice
#endif
instance Eq PeerAddress where
+ CustomPeerAddress addr == CustomPeerAddress addr'
+ | Just addr'' <- cast addr' = addr == addr''
DatagramAddress addr == DatagramAddress addr' = addr == addr'
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice == PeerIceSession ice' = ice == ice'
- _ == _ = False
#endif
+ _ == _ = False
instance Ord PeerAddress where
+ compare (CustomPeerAddress addr) (CustomPeerAddress addr')
+ | Just addr'' <- cast addr' = compare addr addr''
+ | otherwise = compare (typeOf addr) (typeOf addr')
+ compare (CustomPeerAddress _ ) _ = LT
+ compare _ (CustomPeerAddress _ ) = GT
+
compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr'
#ifdef ENABLE_ICE_SUPPORT
compare (DatagramAddress _ ) _ = LT
compare _ (DatagramAddress _ ) = GT
+
compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
#endif
@@ -198,9 +222,10 @@ peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
- | PeerConnected (Connection PeerAddress)
- | PeerDropped
+data PeerState
+ = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
@@ -302,13 +327,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -316,8 +346,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
case paddr of
+ CustomPeerAddress addr -> sendBytesToAddress addr msg
DatagramAddress addr -> void $ S.sendTo sock msg addr
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice -> iceSend ice msg
@@ -385,16 +416,29 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
stopServer :: Server -> IO ()
-stopServer Server {..} = do
- mapM_ killThread =<< takeMVar serverThreads
+stopServer server@Server {..} = do
+ withMVar serverPeers $ \peers -> do
+ ( global, peerStates ) <- atomically $ (,)
+ <$> takeTMVar serverServiceStates
+ <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState)
+
+ forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do
+ ps <- forM peerStates $ \( peer, states ) ->
+ return $ ( peer, ) $ case M.lookup (serviceID proxy) states of
+ Just (SomeServiceState (_ :: Proxy ps) pstate)
+ | Just (Refl :: s :~: ps) <- eqT
+ -> pstate
+ _ -> emptyServiceState proxy
+ serviceStopServer proxy server gs ps
+ mapM_ killThread =<< takeMVar serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -502,9 +546,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -524,8 +566,8 @@ appendDistinct x (y:ys) | x == y = y : ys
appendDistinct x [] = [x]
handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
+ -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
@@ -659,10 +701,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
@@ -787,9 +830,13 @@ notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer
+receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
+receivedFromCustomAddress Server {..} addr msg = do
+ writeFlowIO serverRawPath ( CustomPeerAddress addr, msg )
+
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer peerServer_ peerAddress = do
peerState <- newTVarIO (PeerInit [])
@@ -808,6 +855,9 @@ serverPeer server paddr = do
_ -> paddr
serverPeer' server (DatagramAddress paddr')
+serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer
+serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr)
+
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce :: Server -> IceSession -> IO Peer
serverPeerIce server@Server {..} ice = do
@@ -856,19 +906,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -901,17 +981,17 @@ sendToPeerWith peer fobj = do
Left err -> throwError $ fromErebosError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService = runPeerServiceOn Nothing []
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -936,6 +1016,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do
@@ -956,6 +1037,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index c340503..025f52c 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -3,6 +3,7 @@ module Erebos.Network.Protocol (
transportToObject,
TransportHeader(..),
TransportHeaderItem(..),
+ ServiceID(..),
SecurityRequirement(..),
WaitingRef(..),
@@ -22,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -36,6 +38,7 @@ import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
+import Control.Exception
import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans
@@ -68,9 +71,9 @@ import Erebos.Flow
import Erebos.Identity
import Erebos.Network.Channel
import Erebos.Object
-import Erebos.Service
import Erebos.Storable
import Erebos.Storage
+import Erebos.UUID (UUID)
protocolVersion :: Text
@@ -107,6 +110,9 @@ data TransportHeaderItem
| StreamOpen Word8
deriving (Eq, Show)
+newtype ServiceID = ServiceID UUID
+ deriving (Eq, Ord, Show, StorableUUID)
+
newtype Cookie = Cookie ByteString
deriving (Eq, Show)
@@ -283,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go cGlobalState streamNumber stream
+ )
where
go gs@GlobalState {..} streamNumber stream = do
@@ -356,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
+
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -398,7 +415,7 @@ streamClosed Connection {..} snum = atomically $ do
modifyTVar' cOutStreams $ filter ((snum /=) . fst)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
@@ -420,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
@@ -512,8 +529,10 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
race_ (waitTill next) waitForUpdate
- race_ signalTimeouts $ forever $ join $ atomically $
- passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+ race_ signalTimeouts $ forever $ do
+ io <- atomically $ do
+ passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+ catch io $ \(e :: SomeException) -> atomically $ gLog $ "exception during network protocol handling: " <> show e
getConnection :: GlobalState addr -> addr -> STM (Connection addr)
diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs
index 26ca09f..f00b63d 100644
--- a/src/Erebos/Object.hs
+++ b/src/Erebos/Object.hs
@@ -13,8 +13,9 @@ module Erebos.Object (
RecItem, RecItem'(..),
Ref, PartialRef, RefDigest,
- refDigest,
- readRef, showRef, showRefDigest,
+ refDigest, refFromDigest,
+ readRef, showRef,
+ readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
) where
diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs
index 6111d2a..fdb587a 100644
--- a/src/Erebos/Object/Internal.hs
+++ b/src/Erebos/Object/Internal.hs
@@ -2,8 +2,9 @@ module Erebos.Object.Internal (
Storage, PartialStorage, StorageCompleteness,
Ref, PartialRef, RefDigest,
- refDigest,
- readRef, showRef, showRefDigest,
+ refDigest, refFromDigest,
+ readRef, showRef,
+ readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
@@ -74,13 +75,14 @@ import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Format
import Data.Time.LocalTime
-import Data.UUID (UUID)
-import qualified Data.UUID as U
import System.IO.Unsafe
import Erebos.Error
import Erebos.Storage.Internal
+import Erebos.UUID (UUID)
+import Erebos.UUID qualified as U
+import Erebos.Util
zeroRef :: Storage' c -> Ref' c
@@ -701,8 +703,6 @@ loadRawWeaks name = mapMaybe p <$> loadRecItems
-type Stored a = Stored' Complete a
-
instance Storable a => Storable (Stored a) where
store st = copyRef st . storedRef
store' (Stored _ x) = store' x
@@ -712,10 +712,10 @@ instance ZeroStorable a => ZeroStorable (Stored a) where
fromZero st = Stored (zeroRef st) $ fromZero st
fromStored :: Stored a -> a
-fromStored (Stored _ x) = x
+fromStored = storedObject'
storedRef :: Stored a -> Ref
-storedRef (Stored ref _) = ref
+storedRef = storedRef'
wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a)
wrappedStore st x = do ref <- liftIO $ store st x
@@ -724,9 +724,8 @@ wrappedStore st x = do ref <- liftIO $ store st x
wrappedLoad :: Storable a => Ref -> Stored a
wrappedLoad ref = Stored ref (load ref)
-copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) =>
- Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a))
-copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref'
+copyStored :: forall m a. MonadIO m => Storage -> Stored a -> m (Stored a)
+copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (\r -> Stored r x) <$> copyRef' st ref'
-- |Passed function needs to preserve the object representation to be safe
unsafeMapStored :: (a -> b) -> Stored a -> Stored b
diff --git a/src/Erebos/Pairing.hs b/src/Erebos/Pairing.hs
index 703afcd..e3ebf2b 100644
--- a/src/Erebos/Pairing.hs
+++ b/src/Erebos/Pairing.hs
@@ -17,9 +17,10 @@ import Control.Monad.Reader
import Crypto.Random
import Data.Bits
-import Data.ByteArray (Bytes, convert)
-import qualified Data.ByteArray as BA
-import qualified Data.ByteString.Char8 as BC
+import Data.ByteArray qualified as BA
+import Data.ByteString (ByteString)
+import Data.ByteString qualified as BS
+import Data.ByteString.Char8 qualified as BC
import Data.Kind
import Data.Maybe
import Data.Typeable
@@ -34,16 +35,16 @@ import Erebos.State
import Erebos.Storable
data PairingService a = PairingRequest (Stored (Signed IdentityData)) (Stored (Signed IdentityData)) RefDigest
- | PairingResponse Bytes
- | PairingRequestNonce Bytes
+ | PairingResponse ByteString
+ | PairingRequestNonce ByteString
| PairingAccept a
| PairingReject
data PairingState a = NoPairing
- | OurRequest UnifiedIdentity UnifiedIdentity Bytes
+ | OurRequest UnifiedIdentity UnifiedIdentity ByteString
| OurRequestConfirm (Maybe (PairingVerifiedResult a))
| OurRequestReady
- | PeerRequest UnifiedIdentity UnifiedIdentity Bytes RefDigest
+ | PeerRequest UnifiedIdentity UnifiedIdentity ByteString RefDigest
| PeerRequestConfirm
| PairingDone
@@ -88,7 +89,7 @@ instance Storable a => Storable (PairingService a) where
load' = do
res <- loadRec $ do
- (req :: Maybe Bytes) <- loadMbBinary "request"
+ (req :: Maybe ByteString) <- loadMbBinary "request"
idReq <- loadMbRef "id-req"
idRsp <- loadMbRef "id-rsp"
rsp <- loadMbBinary "response"
@@ -171,7 +172,7 @@ instance PairingResult a => Service (PairingService a) where
x@(OurRequestReady, _) -> reject $ uncurry PairingUnexpectedMessage x
(PeerRequest peer self nonce dgst, PairingRequestNonce pnonce) -> do
- if dgst == nonceDigest peer self pnonce BA.empty
+ if dgst == nonceDigest peer self pnonce BS.empty
then do hook <- asks $ pairingHookRequestNonce . svcAttributes
hook $ confirmationNumber $ nonceDigest peer self pnonce nonce
svcSet PeerRequestConfirm
@@ -188,12 +189,12 @@ reject reason = do
replyPacket PairingReject
-nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> Bytes -> Bytes -> RefDigest
+nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> ByteString -> ByteString -> RefDigest
nonceDigest idReq idRsp nonceReq nonceRsp = hashToRefDigest $ serializeObject $ Rec
[ (BC.pack "id-req", RecRef $ storedRef $ idData idReq)
, (BC.pack "id-rsp", RecRef $ storedRef $ idData idRsp)
- , (BC.pack "nonce-req", RecBinary $ convert nonceReq)
- , (BC.pack "nonce-rsp", RecBinary $ convert nonceRsp)
+ , (BC.pack "nonce-req", RecBinary nonceReq)
+ , (BC.pack "nonce-rsp", RecBinary nonceRsp)
]
confirmationNumber :: RefDigest -> String
@@ -212,7 +213,7 @@ pairingRequest _ peer = do
PeerIdentityFull pid -> return pid
_ -> throwOtherError "incomplete peer identity"
sendToPeerWith @(PairingService a) peer $ \case
- NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BA.empty), OurRequest self pid nonce)
+ NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BS.empty), OurRequest self pid nonce)
_ -> throwOtherError "already in progress"
pairingAccept :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m ()
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index e95e700..4499ef9 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -29,14 +29,14 @@ import Control.Monad.Writer
import Data.Kind
import Data.Typeable
-import Data.UUID (UUID)
-import qualified Data.UUID as U
import Erebos.Identity
import {-# SOURCE #-} Erebos.Network
+import Erebos.Network.Protocol
import Erebos.State
import Erebos.Storable
import Erebos.Storage.Head
+import Erebos.UUID qualified as U
class (
Typeable s, Storable s,
@@ -72,6 +72,9 @@ class (
serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s]
serviceStorageWatchers _ = []
+ serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO ()
+ serviceStopServer _ _ _ _ = return ()
+
data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s)
@@ -101,11 +104,10 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p)
-data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
-
+data SomeStorageWatcher s
+ = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+ | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT ErebosError IO ())
-newtype ServiceID = ServiceID UUID
- deriving (Eq, Ord, Show, StorableUUID)
mkServiceID :: String -> ServiceID
mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString
@@ -116,10 +118,13 @@ data ServiceInput s = ServiceInput
, svcPeerIdentity :: UnifiedIdentity
, svcServer :: Server
, svcPrintOp :: String -> IO ()
+ , svcNewStreams :: [ RawStreamReader ]
}
-data ServiceReply s = ServiceReply (Either s (Stored s)) Bool
- | ServiceFinally (IO ())
+data ServiceReply s
+ = ServiceReply (Either s (Stored s)) Bool
+ | ServiceOpenStream (RawStreamWriter -> IO ())
+ | ServiceFinally (IO ())
data ServiceHandlerState s = ServiceHandlerState
{ svcValue :: ServiceState s
diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs
new file mode 100644
index 0000000..67df4d7
--- /dev/null
+++ b/src/Erebos/Service/Stream.hs
@@ -0,0 +1,74 @@
+module Erebos.Service.Stream (
+ StreamPacket(..),
+ StreamReader, getStreamReaderNumber,
+ StreamWriter, getStreamWriterNumber,
+ openStream, receivedStreams,
+ readStreamPacket, writeStreamPacket,
+ writeStream,
+ closeStream,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.Reader
+import Control.Monad.Writer
+
+import Data.ByteString (ByteString)
+import Data.Word
+
+import Erebos.Flow
+import Erebos.Network
+import Erebos.Network.Protocol
+import Erebos.Service
+
+
+data StreamReader = StreamReader RawStreamReader
+
+getStreamReaderNumber :: StreamReader -> IO Int
+getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream
+
+data StreamWriter = StreamWriter (MVar StreamWriterData)
+
+data StreamWriterData = StreamWriterData
+ { swdStream :: RawStreamWriter
+ , swdSequence :: Maybe Word64
+ }
+
+getStreamWriterNumber :: StreamWriter -> IO Int
+getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream
+
+
+openStream :: Service s => ServiceHandler s StreamWriter
+openStream = do
+ mvar <- liftIO newEmptyMVar
+ tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ]
+ return $ StreamWriter mvar
+
+receivedStreams :: Service s => ServiceHandler s [ StreamReader ]
+receivedStreams = do
+ map StreamReader <$> asks svcNewStreams
+
+readStreamPacket :: StreamReader -> IO StreamPacket
+readStreamPacket (StreamReader stream) = do
+ readFlowIO (rsrFlow stream)
+
+writeStreamPacket :: StreamWriter -> StreamPacket -> IO ()
+writeStreamPacket (StreamWriter mvar) packet = do
+ withMVar mvar $ \swd -> do
+ writeFlowIO (rswFlow $ swdStream swd) packet
+
+writeStream :: StreamWriter -> ByteString -> IO ()
+writeStream (StreamWriter mvar) bytes = do
+ modifyMVar_ mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> do
+ writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes
+ return swd { swdSequence = Just (seqNum + 1) }
+ Nothing -> do
+ fail "writeStream: stream closed"
+
+closeStream :: StreamWriter -> IO ()
+closeStream (StreamWriter mvar) = do
+ withMVar mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum
+ Nothing -> fail "closeStream: stream already closed"
diff --git a/src/Erebos/State.hs b/src/Erebos/State.hs
index 5ce9952..076a8c0 100644
--- a/src/Erebos/State.hs
+++ b/src/Erebos/State.hs
@@ -23,8 +23,6 @@ import Control.Monad.Reader
import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.Typeable
-import Data.UUID (UUID)
-import Data.UUID qualified as U
import Erebos.Identity
import Erebos.Object
@@ -32,6 +30,8 @@ import Erebos.PubKey
import Erebos.Storable
import Erebos.Storage.Head
import Erebos.Storage.Merge
+import Erebos.UUID (UUID)
+import Erebos.UUID qualified as U
data LocalState = LocalState
{ lsPrev :: Maybe RefDigest
diff --git a/src/Erebos/Storage/Disk.hs b/src/Erebos/Storage/Disk.hs
index 370c584..8e35940 100644
--- a/src/Erebos/Storage/Disk.hs
+++ b/src/Erebos/Storage/Disk.hs
@@ -18,7 +18,6 @@ import Data.ByteString.Lazy.Char8 qualified as BLC
import Data.Function
import Data.List
import Data.Maybe
-import Data.UUID qualified as U
import System.Directory
import System.FSNotify
@@ -31,6 +30,7 @@ import Erebos.Storage.Backend
import Erebos.Storage.Head
import Erebos.Storage.Internal
import Erebos.Storage.Platform
+import Erebos.UUID qualified as U
data DiskStorage = StorageDir
diff --git a/src/Erebos/Storage/Head.hs b/src/Erebos/Storage/Head.hs
index 8f8e009..285902d 100644
--- a/src/Erebos/Storage/Head.hs
+++ b/src/Erebos/Storage/Head.hs
@@ -28,13 +28,12 @@ import Control.Monad.Reader
import Data.Bifunctor
import Data.Typeable
-import Data.UUID qualified as U
-import Data.UUID.V4 qualified as U
import Erebos.Object
import Erebos.Storable
import Erebos.Storage.Backend
import Erebos.Storage.Internal
+import Erebos.UUID qualified as U
-- | Represents loaded Erebos storage head, along with the object it pointed to
@@ -114,7 +113,7 @@ loadHeadRaw st@Storage {..} tid hid = do
-- | Reload the given head from storage, returning `Head' with updated object,
-- or `Nothing' if there is no longer head with the particular ID in storage.
reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a))
-reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid
+reloadHead (Head hid val) = loadHead (storedStorage val) hid
-- | Store a new `Head' of type 'a' in the storage.
storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a)
@@ -233,8 +232,8 @@ watchHeadWith
-> (Head a -> b) -- ^ Selector function
-> (b -> IO ()) -- ^ Callback
-> IO WatchedHead -- ^ Watched head handle
-watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do
- watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb
+watchHeadWith (Head hid val) sel cb = do
+ watchHeadRaw (storedStorage val) (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb
-- | Watch the given head using raw IDs and a selector from `Ref'.
watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead
diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs
index 6df1410..db211bb 100644
--- a/src/Erebos/Storage/Internal.hs
+++ b/src/Erebos/Storage/Internal.hs
@@ -1,32 +1,55 @@
-module Erebos.Storage.Internal where
+module Erebos.Storage.Internal (
+ Storage'(..), Storage, PartialStorage,
+ Ref'(..), Ref, PartialRef,
+ RefDigest(..),
+ WatchID, startWatchID, nextWatchID,
+ WatchList(..), WatchListItem(..), watchListAdd, watchListDel,
+
+ refStorage,
+ refDigest, refDigestFromByteString,
+ showRef, showRefDigest, showRefDigestParts,
+ readRefDigest,
+ hashToRefDigest,
+
+ StorageCompleteness(..),
+ StorageBackend(..),
+ Complete, Partial,
+
+ unsafeStoreRawBytes,
+ ioLoadBytesFromStorage,
+
+ Generation(..),
+ HeadID(..), HeadTypeID(..),
+ Stored(..), storedStorage,
+) where
import Control.Arrow
import Control.Concurrent
import Control.DeepSeq
import Control.Exception
-import Control.Monad
import Control.Monad.Identity
import Crypto.Hash
import Data.Bits
-import Data.ByteArray (ByteArray, ByteArrayAccess, ScrubbedBytes)
+import Data.ByteArray (ByteArrayAccess, ScrubbedBytes)
import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
-import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
-import Data.Char
+import Data.Function
import Data.HashTable.IO qualified as HT
import Data.Hashable
import Data.Kind
import Data.Typeable
-import Data.UUID (UUID)
import Foreign.Storable (peek)
import System.IO.Unsafe (unsafePerformIO)
+import Erebos.UUID (UUID)
+import Erebos.Util
+
data Storage' c = forall bck. (StorageBackend bck, BackendCompleteness bck ~ c) => Storage
{ stBackend :: bck
@@ -196,35 +219,15 @@ showRefDigest = showRefDigestParts >>> \(alg, hex) -> alg <> BC.pack "#" <> hex
readRefDigest :: ByteString -> Maybe RefDigest
readRefDigest x = case BC.split '#' x of
[alg, dgst] | BA.convert alg == BC.pack "blake2" ->
- refDigestFromByteString =<< readHex @ByteString dgst
+ refDigestFromByteString =<< readHex dgst
_ -> Nothing
-refDigestFromByteString :: ByteArrayAccess ba => ba -> Maybe RefDigest
+refDigestFromByteString :: ByteString -> Maybe RefDigest
refDigestFromByteString = fmap RefDigest . digestFromByteString
hashToRefDigest :: BL.ByteString -> RefDigest
hashToRefDigest = RefDigest . hashFinalize . hashUpdates hashInit . BL.toChunks
-showHex :: ByteArrayAccess ba => ba -> ByteString
-showHex = B.concat . map showHexByte . BA.unpack
- where showHexChar x | x < 10 = x + o '0'
- | otherwise = x + o 'a' - 10
- showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ]
- o = fromIntegral . ord
-
-readHex :: ByteArray ba => ByteString -> Maybe ba
-readHex = return . BA.concat <=< readHex'
- where readHex' bs | B.null bs = Just []
- readHex' bs = do (bx, bs') <- B.uncons bs
- (by, bs'') <- B.uncons bs'
- x <- hexDigit bx
- y <- hexDigit by
- (B.singleton (x * 16 + y) :) <$> readHex' bs''
- hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0'
- | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10
- | otherwise = Nothing
- o = fromIntegral . ord
-
newtype Generation = Generation Int
deriving (Eq, Show)
@@ -237,17 +240,20 @@ newtype HeadID = HeadID UUID
newtype HeadTypeID = HeadTypeID UUID
deriving (Eq, Ord)
-data Stored' c a = Stored (Ref' c) a
+data Stored a = Stored
+ { storedRef' :: Ref
+ , storedObject' :: a
+ }
deriving (Show)
-instance Eq (Stored' c a) where
- Stored r1 _ == Stored r2 _ = refDigest r1 == refDigest r2
+instance Eq (Stored a) where
+ (==) = (==) `on` (refDigest . storedRef')
-instance Ord (Stored' c a) where
- compare (Stored r1 _) (Stored r2 _) = compare (refDigest r1) (refDigest r2)
+instance Ord (Stored a) where
+ compare = compare `on` (refDigest . storedRef')
-storedStorage :: Stored' c a -> Storage' c
-storedStorage (Stored (Ref st _) _) = st
+storedStorage :: Stored a -> Storage
+storedStorage = refStorage . storedRef'
type Complete = Identity
diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs
index 41725af..a41a65f 100644
--- a/src/Erebos/Storage/Merge.hs
+++ b/src/Erebos/Storage/Merge.hs
@@ -52,7 +52,7 @@ merge xs = mergeSorted $ filterAncestors xs
storeMerge :: (Mergeable a, Storable a) => [Stored (Component a)] -> IO (Stored a)
storeMerge [] = error "merge: empty list"
-storeMerge xs@(Stored ref _ : _) = wrappedStore (refStorage ref) $ mergeSorted $ filterAncestors xs
+storeMerge xs@(x : _) = wrappedStore (storedStorage x) $ mergeSorted $ filterAncestors xs
previous :: Storable a => Stored a -> [Stored a]
previous (Stored ref _) = case load ref of
diff --git a/src/Erebos/UUID.hs b/src/Erebos/UUID.hs
new file mode 100644
index 0000000..128d450
--- /dev/null
+++ b/src/Erebos/UUID.hs
@@ -0,0 +1,24 @@
+module Erebos.UUID (
+ UUID,
+ toString, fromString,
+ toText, fromText,
+ toASCIIBytes, fromASCIIBytes,
+ nextRandom,
+) where
+
+import Crypto.Random.Entropy
+
+import Data.Bits
+import Data.ByteString qualified as BS
+import Data.ByteString.Lazy qualified as BSL
+import Data.Maybe
+import Data.UUID.Types
+
+nextRandom :: IO UUID
+nextRandom = do
+ [ b0, b1, b2, b3, b4, b5, b6, b7, b8, b9, ba, bb, bc, bd, be, bf ]
+ <- BS.unpack <$> getEntropy 16
+ let version = 4
+ b6' = b6 .&. 0x0f .|. (version `shiftL` 4)
+ b8' = b8 .&. 0x3f .|. 0x80
+ return $ fromJust $ fromByteString $ BSL.pack [ b0, b1, b2, b3, b4, b5, b6', b7, b8', b9, ba, bb, bc, bd, be, bf ]
diff --git a/src/Erebos/Util.hs b/src/Erebos/Util.hs
index ffca9c7..0381c3e 100644
--- a/src/Erebos/Util.hs
+++ b/src/Erebos/Util.hs
@@ -1,5 +1,14 @@
module Erebos.Util where
+import Control.Monad
+
+import Data.ByteArray (ByteArray, ByteArrayAccess)
+import Data.ByteArray qualified as BA
+import Data.ByteString (ByteString)
+import Data.ByteString qualified as B
+import Data.Char
+
+
uniq :: Eq a => [a] -> [a]
uniq (x:y:xs) | x == y = uniq (x:xs)
| otherwise = x : uniq (y:xs)
@@ -35,3 +44,24 @@ intersectsSorted (x:xs) (y:ys) | x < y = intersectsSorted xs (y:ys)
| x > y = intersectsSorted (x:xs) ys
| otherwise = True
intersectsSorted _ _ = False
+
+
+showHex :: ByteArrayAccess ba => ba -> ByteString
+showHex = B.concat . map showHexByte . BA.unpack
+ where showHexChar x | x < 10 = x + o '0'
+ | otherwise = x + o 'a' - 10
+ showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ]
+ o = fromIntegral . ord
+
+readHex :: ByteArray ba => ByteString -> Maybe ba
+readHex = return . BA.concat <=< readHex'
+ where readHex' bs | B.null bs = Just []
+ readHex' bs = do (bx, bs') <- B.uncons bs
+ (by, bs'') <- B.uncons bs'
+ x <- hexDigit bx
+ y <- hexDigit by
+ (B.singleton (x * 16 + y) :) <$> readHex' bs''
+ hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0'
+ | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10
+ | otherwise = Nothing
+ o = fromIntegral . ord
diff --git a/test/common.test b/test/common.test
new file mode 100644
index 0000000..89941f0
--- /dev/null
+++ b/test/common.test
@@ -0,0 +1,3 @@
+module common
+
+export def refpat = /blake2#[0-9a-f]*/
diff --git a/test/discovery.test b/test/discovery.test
index f2dddb7..3be6275 100644
--- a/test/discovery.test
+++ b/test/discovery.test
@@ -1,7 +1,7 @@
module discovery
test ManualDiscovery:
- let services = "discovery,test"
+ let services = "discovery"
let refpat = /blake2#[0-9a-f]*/
subnet sd
@@ -24,27 +24,6 @@ test ManualDiscovery:
expect /create-identity-done ref $refpat.*/ from p2
expect /create-identity-done ref $refpat.*/ from pd
- # TODO: avoid the need to send identity objects with weak refs
- for p in [ p1, p2 ]:
- with p:
- send "start-server services $services"
- send "peer-add ${p2.node.ip}" to p1
- expect from p1:
- /peer 1 addr ${p2.node.ip} 29665/
- /peer 1 id Device2 Owner2/
- expect from p2:
- /peer 1 addr ${p1.node.ip} 29665/
- /peer 1 id Device1 Owner1/
- for r in [ p1base, p1obase ]:
- with p1:
- send "test-message-send 1 $r"
- expect /test-message-send done/
- with p2:
- expect /test-message-received rec [0-9]+ $r/
- for p in [ p1, p2 ]:
- send "stop-server" to p
- expect /stop-server-done/ from p
-
# Test discovery using owner and device identities:
for id in [ p1obase, p1base ]:
for p in [ pd, p1, p2 ]:
@@ -73,3 +52,40 @@ test ManualDiscovery:
send "stop-server" to p
for p in [ pd, p1, p2 ]:
expect /stop-server-done/ from p
+
+ # Test delayed discovery with new peer
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services" to p
+
+ with p1:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p1.node.ip} 29665/
+ /peer [12] id Device1 Owner1/
+
+ send "discovery-connect $id" to p2
+
+ with p2:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p2.node.ip} 29665/
+ /peer [12] id Device2 Owner2/
+
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ expect from p2:
+ /peer [0-9]+ addr ${p1.node.ip} 29665/
+ /peer [0-9]+ id Device1 Owner1/
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p
diff --git a/test/message.test b/test/message.test
index c0e251b..2990d0f 100644
--- a/test/message.test
+++ b/test/message.test
@@ -1,3 +1,7 @@
+module message
+
+import common
+
test DirectMessage:
let services = "contact,dm"
@@ -149,3 +153,113 @@ test DirectMessage:
send "start-server services $services" to p2
expect /dm-received from Owner1 text while_peer_offline/ from p2
+
+
+test DirectMessageDiscovery:
+ let services = "dm,discovery"
+
+ subnet sd
+ subnet s1
+ subnet s2
+ subnet s3
+ subnet s4
+
+ spawn on sd as pd
+ spawn on s1 as p1
+ spawn on s2 as p2
+ spawn on s3 as p3
+ spawn on s4 as p4
+
+ send "create-identity Discovery" to pd
+
+ send "create-identity Device1 Owner1" to p1
+ expect /create-identity-done ref ($refpat)/ from p1 capture p1_id
+ send "identity-info $p1_id" to p1
+ expect /identity-info ref $p1_id base ($refpat) owner ($refpat).*/ from p1 capture p1_base, p1_owner
+
+ send "create-identity Device2 Owner2" to p2
+ expect /create-identity-done ref ($refpat)/ from p2 capture p2_id
+ send "identity-info $p2_id" to p2
+ expect /identity-info ref $p2_id base ($refpat) owner ($refpat).*/ from p2 capture p2_base, p2_owner
+ send "identity-info $p2_owner" to p2
+ expect /identity-info ref $p2_owner base ($refpat).*/ from p2 capture p2_obase
+
+ send "create-identity Device3 Owner3" to p3
+ expect /create-identity-done ref ($refpat)/ from p3 capture p3_id
+ send "identity-info $p3_id" to p3
+ expect /identity-info ref $p3_id base ($refpat) owner ($refpat).*/ from p3 capture p3_base, p3_owner
+
+ send "create-identity Device4 Owner4" to p4
+ expect /create-identity-done ref ($refpat)/ from p4 capture p4_id
+ send "identity-info $p4_id" to p4
+ expect /identity-info ref $p4_id base ($refpat) owner ($refpat).*/ from p4 capture p4_base, p4_owner
+
+
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ send "start-server services $services"
+
+ for p in [ p2, p3, p4 ]:
+ with p1:
+ send "peer-add ${p.node.ip}"
+ expect:
+ /peer [0-9]+ addr ${p.node.ip} 29665/
+ /peer [0-9]+ id Device. Owner./
+ expect from p:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1 Owner1/
+
+ # Make sure p1 has other identities in storage:
+ for i in [ 1 .. 3 ]:
+ send "dm-send-peer $i init1" to p1
+ for p in [ p2, p3, p4 ]:
+ expect /dm-received from Owner1 text init1/ from p
+ send "dm-send-identity $p1_owner init2" to p
+ expect /dm-received from Owner. text init2/ from p1
+
+ # Restart servers to remove peers:
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ send "stop-server"
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ expect /stop-server-done/
+
+ # Prepare message before peers connect to discovery
+ send "dm-send-identity $p4_owner hello_to_p4" to p1
+
+ for p in [ p1, p2, p3, p4, pd ]:
+ with p:
+ send "start-server services $services"
+
+ for p in [ p2, p3, p4, p1 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [0-9]+ addr ${p.node.ip} 29665/
+ /peer [0-9]+ id Device. Owner./
+
+ multiply_timeout by 2.0
+
+ # Connect via discovery manually, then send message
+ send "discovery-connect $p2_obase" to p1
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ send "dm-send-identity $p2_owner hello_to_p2" to p1
+ expect /dm-received from Owner1 text hello_to_p2/ from p2
+
+ # Send message, expect automatic discovery
+ send "dm-send-identity $p3_owner hello_to_p3" to p1
+ expect /dm-received from Owner1 text hello_to_p3/ from p3
+
+ # Verify the first message
+ expect /dm-received from Owner1 text hello_to_p4/ from p4
+
+ for p in [ p1, p2, p3, p4, pd ]:
+ send "stop-server" to p
+ for p in [ p1, p2, p3, p4, pd ]:
+ expect /stop-server-done/ from p
diff --git a/test/network.test b/test/network.test
index 52fcbee..0f49a1e 100644
--- a/test/network.test
+++ b/test/network.test
@@ -182,6 +182,58 @@ test ManyStreams:
expect /test-message-received blob 100[2-4] $ref/ from p2
+test ServiceStreams:
+ let services = "test"
+
+ spawn as p1
+ spawn as p2
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server services $services" to p1
+ send "start-server services $services" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ send "test-stream-open 1" to p1
+ expect /test-stream-open-done 1 ([0-9]+)/ from p1 capture stream1
+ expect /test-stream-open-from 1 $stream1/ from p2
+
+ send "test-stream-send 1 $stream1 hello" to p1
+ expect /test-stream-send-done 1 $stream1/ from p1
+ expect /test-stream-received 1 $stream1 0 hello/ from p2
+
+ send "test-stream-close 1 $stream1" to p1
+ expect /test-stream-close-done 1 $stream1/ from p1
+ expect /test-stream-closed-from 1 $stream1 1/ from p2
+
+ send "test-stream-open 1 8" to p2
+ expect /test-stream-open-done 1 ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)/ from p2 capture stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8
+ expect /test-stream-open-from 1 $stream2_1 $stream2_2 $stream2_3 $stream2_4 $stream2_5 $stream2_6 $stream2_7 $stream2_8/ from p1
+
+ let streams2 = [ stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 ]
+ with p2:
+ for i in [ 1..20 ]:
+ for s in streams2:
+ send "test-stream-send 1 $s hello$i"
+ for i in [ 1..20 ]:
+ for s in streams2:
+ expect /test-stream-send-done 1 $s/
+ for s in streams2:
+ send "test-stream-close 1 $s"
+ for s in streams2:
+ expect /test-stream-close-done 1 $s/
+ with p1:
+ for i in [ 1..20 ]:
+ for s in streams2:
+ expect /test-stream-received 1 $s ${i-1} hello$i/
+ for s in streams2:
+ expect /test-stream-closed-from 1 $s 20/
+
+
test MultipleServiceRefs:
let services = "test"