summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md23
-rw-r--r--main/Main.hs43
-rw-r--r--main/Test.hs14
-rw-r--r--src/Erebos/Conversation.hs2
-rw-r--r--src/Erebos/Discovery.hs278
-rw-r--r--src/Erebos/Flow.hs37
-rw-r--r--src/Erebos/ICE.chs7
-rw-r--r--src/Erebos/ICE/pjproject.c97
-rw-r--r--src/Erebos/ICE/pjproject.h7
-rw-r--r--src/Erebos/Message.hs21
-rw-r--r--src/Erebos/Network.hs60
-rw-r--r--src/Erebos/Service.hs7
-rw-r--r--src/Erebos/Storage.hs2
-rw-r--r--test/common.test3
-rw-r--r--test/discovery.test37
-rw-r--r--test/message.test114
16 files changed, 543 insertions, 209 deletions
diff --git a/README.md b/README.md
index 9f9d8ff..b730215 100644
--- a/README.md
+++ b/README.md
@@ -102,11 +102,13 @@ Test chatroom [19:03] Some Name: Hi
`<message>`
: Send `<message>` to selected conversation.
-`/history`
-: Show message history of the selected conversation.
+`/history [<number>]`
+: Show message history of the selected conversation, or the one identified by
+ `<number>` if given.
-`/details`
-: Show information about the selected conversations, contact or peer.
+`/details [<number>]`
+: Show information about the selected conversations, contact or peer; or the
+ one identified by `<number>` if given.
### Chatrooms
@@ -137,12 +139,13 @@ are signed, so message author can not be forged.
: Leave the chatroom. User will no longer be listed as a member and erebos tool
will no longer collect message of this chatroom.
-`/delete`
-: Delete the chatroom; this action is only synchronized with devices belonging
-to the current user and does not affect the chatroom state for others. Due to
-the storage design, the chatroom data will not be purged from the local state
-history, but the chatroom will no longer be listed as available and no futher
-updates for this chatroom will be collected or shared with other peers.
+`/delete [<number>]`
+: Delete the chatroom (currently selected one, or the one identified by
+ `<number>`); this action is only synchronized with devices belonging to the
+ current user and does not affect the chatroom state for others. Due to the
+ storage design, the chatroom data will not be purged from the local state
+ history, but the chatroom will no longer be listed as available and no futher
+ updates for this chatroom will be collected or shared with other peers.
### Add contacts
diff --git a/main/Main.hs b/main/Main.hs
index 8a4729f..59ea7c3 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -479,7 +479,10 @@ getSelectedChatroom = gets csContext >>= \case
_ -> throwError "no chatroom selected"
getSelectedConversation :: CommandM Conversation
-getSelectedConversation = gets csContext >>= \case
+getSelectedConversation = gets csContext >>= getConversationFromContext
+
+getConversationFromContext :: CommandContext -> CommandM Conversation
+getConversationFromContext = \case
SelectedPeer peer -> peerIdentity peer >>= \case
PeerIdentityFull pid -> directMessageConversation $ finalOwner pid
_ -> throwError "incomplete peer identity"
@@ -493,6 +496,13 @@ getSelectedConversation = gets csContext >>= \case
SelectedConversation conv -> reloadConversation conv
_ -> throwError "no contact, peer or conversation selected"
+getSelectedOrManualContext :: CommandM CommandContext
+getSelectedOrManualContext = do
+ asks ciLine >>= \case
+ "" -> gets csContext
+ str | all isDigit str -> getContextByIndex (read str)
+ _ -> throwError "invalid index"
+
commands :: [(String, Command)]
commands =
[ ("history", cmdHistory)
@@ -609,19 +619,22 @@ cmdMembers = do
forM_ (chatroomMembers room) $ \x -> do
liftIO $ putStrLn $ maybe "<unnamed>" T.unpack $ idName x
+getContextByIndex :: Int -> CommandM CommandContext
+getContextByIndex n = do
+ join (asks ciContextOptions) >>= \ctxs -> if
+ | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx
+ | otherwise -> throwError "invalid index"
cmdSelectContext :: Command
cmdSelectContext = do
n <- read <$> asks ciLine
- join (asks ciContextOptions) >>= \ctxs -> if
- | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do
- modify $ \s -> s { csContext = ctx }
- case ctx of
- SelectedChatroom rstate -> do
- when (not (roomStateSubscribe rstate)) $ do
- chatroomSetSubscribe (head $ roomStateData rstate) True
- _ -> return ()
- | otherwise -> throwError "invalid index"
+ ctx <- getContextByIndex n
+ modify $ \s -> s { csContext = ctx }
+ case ctx of
+ SelectedChatroom rstate -> do
+ when (not (roomStateSubscribe rstate)) $ do
+ chatroomSetSubscribe (head $ roomStateData rstate) True
+ _ -> return ()
cmdSend :: Command
cmdSend = void $ do
@@ -635,12 +648,12 @@ cmdSend = void $ do
cmdDelete :: Command
cmdDelete = void $ do
- deleteConversation =<< getSelectedConversation
+ deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext
modify $ \s -> s { csContext = NoContext }
cmdHistory :: Command
cmdHistory = void $ do
- conv <- getSelectedConversation
+ conv <- getConversationFromContext =<< getSelectedOrManualContext
case conversationHistory conv of
thread@(_:_) -> do
tzone <- liftIO $ getCurrentTimeZone
@@ -804,7 +817,7 @@ cmdConversations = do
cmdDetails :: Command
cmdDetails = do
- gets csContext >>= \case
+ getSelectedOrManualContext >>= \case
SelectedPeer peer -> do
liftIO $ putStr $ unlines
[ "Network peer:"
@@ -880,14 +893,14 @@ cmdDiscoveryInit = void $ do
cmdDiscovery :: Command
cmdDiscovery = void $ do
- Just peer <- gets csIcePeer
+ server <- asks ciServer
st <- getStorage
sref <- asks ciLine
eprint <- asks ciPrint
liftIO $ readRef st (BC.pack sref) >>= \case
Nothing -> error "ref does not exist"
Just ref -> do
- res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref
+ res <- runExceptT $ discoverySearch server ref
case res of
Right _ -> return ()
Left err -> eprint err
diff --git a/main/Test.hs b/main/Test.hs
index 0181575..adb3c39 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -284,6 +284,7 @@ commands = map (T.pack *** id)
, ("contact-set-name", cmdContactSetName)
, ("dm-send-peer", cmdDmSendPeer)
, ("dm-send-contact", cmdDmSendContact)
+ , ("dm-send-identity", cmdDmSendIdentity)
, ("dm-list-peer", cmdDmListPeer)
, ("dm-list-contact", cmdDmListContact)
, ("chatroom-create", cmdChatroomCreate)
@@ -736,6 +737,14 @@ cmdDmSendContact = do
Just to <- contactIdentity <$> getContact cid
void $ sendDirectMessage to msg
+cmdDmSendIdentity :: Command
+cmdDmSendIdentity = do
+ st <- asks tiStorage
+ [ tid, msg ] <- asks tiParams
+ Just ref <- liftIO $ readRef st $ encodeUtf8 tid
+ Just to <- return $ validateExtendedIdentity $ wrappedLoad ref
+ void $ sendDirectMessage to msg
+
dmList :: Foldable f => Identity f -> Command
dmList peer = do
threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead
@@ -880,8 +889,5 @@ cmdDiscoveryConnect = do
st <- asks tiStorage
[ tref ] <- asks tiParams
Just ref <- liftIO $ readRef st $ encodeUtf8 tref
-
Just RunningServer {..} <- gets tsServer
- peers <- liftIO $ getCurrentPeerList rsServer
- forM_ peers $ \peer -> do
- sendToPeer peer $ DiscoverySearch ref
+ discoverySearch rsServer ref
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs
index f0ffa70..c165343 100644
--- a/src/Erebos/Conversation.hs
+++ b/src/Erebos/Conversation.hs
@@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con
directMessageConversation peer = do
(find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case
Just thread -> return $ DirectMessageConversation thread
- Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] []
+ Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] []
chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation)
chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate)
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index f156c85..d098f98 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -3,7 +3,9 @@
module Erebos.Discovery (
DiscoveryService(..),
DiscoveryAttributes(..),
- DiscoveryConnection(..)
+ DiscoveryConnection(..),
+
+ discoverySearch,
) where
import Control.Concurrent
@@ -12,9 +14,13 @@ import Control.Monad.Except
import Control.Monad.Reader
import Data.IP qualified as IP
+import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
import Data.Maybe
+import Data.Proxy
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
@@ -30,6 +36,13 @@ import Erebos.Service
import Erebos.Storage
+#ifndef ENABLE_ICE_SUPPORT
+type IceConfig = ()
+type IceSession = ()
+type IceRemoteInfo = Stored Object
+#endif
+
+
data DiscoveryService
= DiscoverySelf [ Text ] (Maybe Int)
| DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
@@ -57,11 +70,7 @@ data DiscoveryConnection = DiscoveryConnection
{ dconnSource :: Ref
, dconnTarget :: Ref
, dconnAddress :: Maybe Text
-#ifdef ENABLE_ICE_SUPPORT
, dconnIceInfo :: Maybe IceRemoteInfo
-#else
- , dconnIceInfo :: Maybe (Stored Object)
-#endif
}
emptyConnection :: Ref -> Ref -> DiscoveryConnection
@@ -90,12 +99,13 @@ instance Storable DiscoveryService where
DiscoveryConnectionRequest conn -> storeConnection "request" conn
DiscoveryConnectionResponse conn -> storeConnection "response" conn
- where storeConnection ctype conn = do
- storeText "connection" $ ctype
- storeRawRef "source" $ dconnSource conn
- storeRawRef "target" $ dconnTarget conn
- storeMbText "address" $ dconnAddress conn
- storeMbRef "ice-info" $ dconnIceInfo conn
+ where
+ storeConnection ctype DiscoveryConnection {..} = do
+ storeText "connection" $ ctype
+ storeRawRef "source" dconnSource
+ storeRawRef "target" dconnTarget
+ storeMbText "address" dconnAddress
+ storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
[ do
@@ -120,22 +130,40 @@ instance Storable DiscoveryService where
, loadConnection "request" DiscoveryConnectionRequest
, loadConnection "response" DiscoveryConnectionResponse
]
- where loadConnection ctype ctor = do
- ctype' <- loadText "connection"
- guard $ ctype == ctype'
- return . ctor =<< DiscoveryConnection
- <$> loadRawRef "source"
- <*> loadRawRef "target"
- <*> loadMbText "address"
- <*> loadMbRef "ice-info"
+ where
+ loadConnection ctype ctor = do
+ ctype' <- loadText "connection"
+ guard $ ctype == ctype'
+ dconnSource <- loadRawRef "source"
+ dconnTarget <- loadRawRef "target"
+ dconnAddress <- loadMbText "address"
+ dconnIceInfo <- loadMbRef "ice-info"
+ return $ ctor DiscoveryConnection {..}
data DiscoveryPeer = DiscoveryPeer
{ dpPriority :: Int
, dpPeer :: Maybe Peer
, dpAddress :: [ Text ]
-#ifdef ENABLE_ICE_SUPPORT
, dpIceSession :: Maybe IceSession
-#endif
+ }
+
+emptyPeer :: DiscoveryPeer
+emptyPeer = DiscoveryPeer
+ { dpPriority = 0
+ , dpPeer = Nothing
+ , dpAddress = []
+ , dpIceSession = Nothing
+ }
+
+data DiscoveryPeerState = DiscoveryPeerState
+ { dpsStunServer :: Maybe ( Text, Word16 )
+ , dpsTurnServer :: Maybe ( Text, Word16 )
+ , dpsIceConfig :: Maybe IceConfig
+ }
+
+data DiscoveryGlobalState = DiscoveryGlobalState
+ { dgsPeers :: Map RefDigest DiscoveryPeer
+ , dgsSearchingFor :: Set RefDigest
}
instance Service DiscoveryService where
@@ -144,13 +172,18 @@ instance Service DiscoveryService where
type ServiceAttributes DiscoveryService = DiscoveryAttributes
defaultServiceAttributes _ = defaultDiscoveryAttributes
-#ifdef ENABLE_ICE_SUPPORT
- type ServiceState DiscoveryService = Maybe IceConfig
- emptyServiceState _ = Nothing
-#endif
+ type ServiceState DiscoveryService = DiscoveryPeerState
+ emptyServiceState _ = DiscoveryPeerState
+ { dpsStunServer = Nothing
+ , dpsTurnServer = Nothing
+ , dpsIceConfig = Nothing
+ }
- type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer
- emptyServiceGlobalState _ = M.empty
+ type ServiceGlobalState DiscoveryService = DiscoveryGlobalState
+ emptyServiceGlobalState _ = DiscoveryGlobalState
+ { dgsPeers = M.empty
+ , dgsSearchingFor = S.empty
+ }
serviceHandler msg = case fromStored msg of
DiscoverySelf addrs priority -> do
@@ -171,15 +204,14 @@ instance Service DiscoveryService where
| otherwise -> return Nothing
- forM_ (idDataF =<< unfoldOwners pid) $ \s ->
- svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer
- { dpPriority = fromMaybe 0 priority
- , dpPeer = Just peer
- , dpAddress = addrs
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+ forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do
+ let dp = DiscoveryPeer
+ { dpPriority = fromMaybe 0 priority
+ , dpPeer = Just peer
+ , dpAddress = addrs
+ , dpIceSession = Nothing
+ }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s }
attrs <- asks svcAttributes
replyPacket $ DiscoveryAcknowledged matchedAddrs
(discoveryStunServer attrs)
@@ -188,7 +220,6 @@ instance Service DiscoveryService where
(discoveryTurnPort attrs)
DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do
-#ifdef ENABLE_ICE_SUPPORT
paddr <- asks (peerAddress . svcPeer) >>= return . \case
(DatagramAddress saddr) -> case IP.fromSockAddr saddr of
Just (IP.IPv6 ipv6, _)
@@ -204,100 +235,90 @@ instance Service DiscoveryService where
toIceServer (Just server) Nothing = Just ( server, 0 )
toIceServer (Just server) (Just port) = Just ( server, port )
- cfg <- liftIO $ iceCreateConfig
- (toIceServer stunServer stunPort)
- (toIceServer turnServer turnPort)
- svcSet cfg
-#endif
- return ()
+ svcModify $ \s -> s
+ { dpsStunServer = toIceServer stunServer stunPort
+ , dpsTurnServer = toIceServer turnServer turnPort
+ }
DiscoverySearch ref -> do
- dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal
+ dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal
replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer
DiscoveryResult ref [] -> do
svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found"
DiscoveryResult ref addrs -> do
+ let dgst = refDigest ref
-- TODO: check if we really requested that
server <- asks svcServer
self <- svcSelf
- mbIceConfig <- svcGet
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
- liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if
+ forM_ addrs $ \addr -> if
| addr == T.pack "ICE"
-#ifdef ENABLE_ICE_SUPPORT
- , Just config <- mbIceConfig
-> do
- ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
- rinfo <- iceRemoteInfo ice
- res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
- case res of
- Right _ -> return ()
- Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
-
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Nothing
- , dpAddress = []
- , dpIceSession = Just ice
- }
-#else
- -> do
- return ()
+#ifdef ENABLE_ICE_SUPPORT
+ getIceConfig >>= \case
+ Just config -> void $ liftIO $ forkIO $ do
+ ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
+ rinfo <- iceRemoteInfo ice
+
+ res <- runExceptT $ sendToPeer discoveryPeer $
+ DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
+ case res of
+ Right _ -> return ()
+ Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
+
+ runAsService $ do
+ let upd dp = dp { dpIceSession = Just ice }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+
+ Nothing -> do
+ return ()
#endif
+ return ()
| [ ipaddr, port ] <- words (T.unpack addr) -> do
- saddr <- head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- serverPeer server (addrAddress saddr)
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Just peer
- , dpAddress = []
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+ void $ liftIO $ forkIO $ do
+ saddr <- head <$>
+ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
+ peer <- serverPeer server (addrAddress saddr)
+ runAsService $ do
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
| otherwise -> do
- runAsService $ do
- svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
+ svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
DiscoveryConnectionRequest conn -> do
self <- svcSelf
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ if refDigest (dconnTarget conn) `elem` identityDigests self
+ then if
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
+ -- request for us, create ICE sesssion
+ | Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
- svcGet >>= \case
+ getIceConfig >>= \case
Just config -> do
liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do
rinfo <- iceRemoteInfo ice
res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo }
case res of
- Right _ -> do
- case dconnIceInfo conn of
- Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice
- Nothing -> putStrLn $ "Discovery: connection request without ICE remote info"
+ Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice
Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err
Nothing -> do
- svcPrint $ "Discovery: ICE request from peer without ICE configuration"
-#else
- return ()
+ return ()
#endif
- else do
+ | otherwise -> do
+ svcPrint $ "Discovery: unsupported connection request"
+
+ else do
-- request to some of our peers, relay
- mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal
+ mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
@@ -307,29 +328,28 @@ instance Service DiscoveryService where
DiscoveryConnectionResponse conn -> do
self <- svcSelf
- dpeers <- svcGetGlobal
- if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
+ dpeers <- dgsPeers <$> svcGetGlobal
+ if refDigest (dconnSource conn) `elem` identityDigests self
then do
-- response to our request, try to connect to the peer
-#ifdef ENABLE_ICE_SUPPORT
server <- asks svcServer
if | Just addr <- dconnAddress conn
, [ipaddr, port] <- words (T.unpack addr) -> do
saddr <- liftIO $ head <$>
getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
peer <- liftIO $ serverPeer server (addrAddress saddr)
- svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $
- DiscoveryPeer 0 (Just peer) [] Nothing
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s
+ { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s }
+#ifdef ENABLE_ICE_SUPPORT
| Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
, Just rinfo <- dconnIceInfo conn -> do
liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice
+#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
-#else
- return ()
-#endif
else do
-- response to relayed request
case M.lookup (refDigest $ dconnSource conn) dpeers of
@@ -340,6 +360,7 @@ instance Service DiscoveryService where
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
+ st <- getStorage
let addrToText saddr = do
( addr, port ) <- IP.fromSockAddr saddr
@@ -351,5 +372,60 @@ instance Service DiscoveryService where
#endif
]
+ pid <- asks svcPeerIdentity
+ gs <- svcGetGlobal
+ let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid)
+ svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor }
+
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
+ forM_ searchingFor $ \dgst -> do
+ liftIO (refFromDigest st dgst) >>= \case
+ Just ref -> sendToPeer peer $ DiscoverySearch ref
+ Nothing -> return ()
+
+#ifdef ENABLE_ICE_SUPPORT
+ serviceStopServer _ _ _ pstates = do
+ forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do
+ mapM_ iceStopThread dpsIceConfig
+#endif
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
+
+
+getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig)
+getIceConfig = do
+ dpsIceConfig <$> svcGet >>= \case
+ Just cfg -> return $ Just cfg
+ Nothing -> do
+#ifdef ENABLE_ICE_SUPPORT
+ stun <- dpsStunServer <$> svcGet
+ turn <- dpsTurnServer <$> svcGet
+ liftIO (iceCreateConfig stun turn) >>= \case
+ Just cfg -> do
+ svcModify $ \s -> s { dpsIceConfig = Just cfg }
+ return $ Just cfg
+ Nothing -> do
+ svcPrint $ "Discovery: failed to create ICE config"
+ return Nothing
+#else
+ return Nothing
+#endif
+
+
+discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m ()
+discoverySearch server ref = do
+ peers <- liftIO $ getCurrentPeerList server
+ match <- forM peers $ \peer -> do
+ peerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ refDigest ref `elem` identityDigests pid
+ _ -> return False
+ when (not $ or match) $ do
+ modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
+ { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s
+ }
+ forM_ peers $ \peer -> do
+ sendToPeer peer $ DiscoverySearch ref
diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs
index ba2607a..1e1a521 100644
--- a/src/Erebos/Flow.hs
+++ b/src/Erebos/Flow.hs
@@ -11,54 +11,53 @@ module Erebos.Flow (
import Control.Concurrent.STM
-data Flow r w = Flow (TMVar [r]) (TMVar [w])
- | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
+data Flow r w
+ = Flow (TBQueue r) (TBQueue w)
+ | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
type SymFlow a = Flow a a
newFlow :: STM (Flow a b, Flow b a)
newFlow = do
- x <- newEmptyTMVar
- y <- newEmptyTMVar
+ x <- newTBQueue 16
+ y <- newTBQueue 16
return (Flow x y, Flow y x)
newFlowIO :: IO (Flow a b, Flow b a)
newFlowIO = atomically newFlow
readFlow :: Flow r w -> STM r
-readFlow (Flow rvar _) = takeTMVar rvar >>= \case
- (x:[]) -> return x
- (x:xs) -> putTMVar rvar xs >> return x
- [] -> error "Flow: empty list"
+readFlow (Flow rvar _) = readTBQueue rvar
readFlow (MappedFlow f _ up) = f <$> readFlow up
tryReadFlow :: Flow r w -> STM (Maybe r)
-tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case
- Just (x:[]) -> return (Just x)
- Just (x:xs) -> putTMVar rvar xs >> return (Just x)
- Just [] -> error "Flow: empty list"
- Nothing -> return Nothing
+tryReadFlow (Flow rvar _) = tryReadTBQueue rvar
tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up
canReadFlow :: Flow r w -> STM Bool
-canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar
+canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar
canReadFlow (MappedFlow _ _ up) = canReadFlow up
writeFlow :: Flow r w -> w -> STM ()
-writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
+writeFlow (Flow _ wvar) = writeTBQueue wvar
writeFlow (MappedFlow _ f up) = writeFlow up . f
writeFlowBulk :: Flow r w -> [w] -> STM ()
writeFlowBulk _ [] = return ()
-writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
+writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs
writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
tryWriteFlow :: Flow r w -> w -> STM Bool
-tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[])
-tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f
+tryWriteFlow (Flow _ wvar) x = do
+ isFullTBQueue wvar >>= \case
+ True -> return False
+ False -> do
+ writeTBQueue wvar x
+ return True
+tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x
canWriteFlow :: Flow r w -> STM Bool
-canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar
+canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar
canWriteFlow (MappedFlow _ _ up) = canWriteFlow up
readFlowIO :: Flow r w -> IO r
diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs
index 6f61451..cc2fdcc 100644
--- a/src/Erebos/ICE.chs
+++ b/src/Erebos/ICE.chs
@@ -8,6 +8,7 @@ module Erebos.ICE (
IceRemoteInfo,
iceCreateConfig,
+ iceStopThread,
iceCreateSession,
iceDestroy,
iceRemoteInfo,
@@ -138,6 +139,12 @@ iceCreateConfig stun turn =
then return Nothing
else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg
+foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread"
+ ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO ()
+
+iceStopThread :: IceConfig -> IO ()
+iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread
+
{#pointer *pj_ice_strans as ^ #}
iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession
diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c
index e79fb9d..e9446fe 100644
--- a/src/Erebos/ICE/pjproject.c
+++ b/src/Erebos/ICE/pjproject.c
@@ -1,6 +1,7 @@
#include "pjproject.h"
#include "Erebos/ICE_stub.h"
+#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -15,6 +16,13 @@ static struct
pj_sockaddr def_addr;
} ice;
+struct erebos_ice_cfg
+{
+ pj_ice_strans_cfg cfg;
+ pj_thread_t * thread;
+ atomic_bool exit;
+};
+
struct user_data
{
pj_ice_sess_role role;
@@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status)
fprintf(stderr, "ICE: %s: %s\n", msg, err);
}
-static int ice_worker_thread(void * vcfg)
+static int ice_worker_thread( void * vcfg )
{
- pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg;
+ struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg );
- while (true) {
+ while( ! ecfg->exit ){
pj_time_val max_timeout = { 0, 0 };
pj_time_val timeout = { 0, 0 };
max_timeout.msec = 500;
- pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout);
+ pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout );
pj_assert(timeout.sec >= 0 && timeout.msec >= 0);
if (timeout.msec >= 1000)
@@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg)
if (PJ_TIME_VAL_GT(timeout, max_timeout))
timeout = max_timeout;
- int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout);
+ int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout );
if (c < 0)
pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
}
@@ -131,80 +139,91 @@ exit:
pthread_mutex_unlock(&mutex);
}
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port )
{
ice_init();
- pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) );
- pj_ice_strans_cfg_default( cfg );
+ struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) );
+ pj_ice_strans_cfg_default( &ecfg->cfg );
+ ecfg->exit = false;
+ ecfg->thread = NULL;
- cfg->stun_cfg.pf = &ice.cp.factory;
+ ecfg->cfg.stun_cfg.pf = &ice.cp.factory;
if( pj_timer_heap_create( ice.pool, 100,
- &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){
+ &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){
fprintf( stderr, "pj_timer_heap_create failed\n" );
goto fail;
}
- if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){
+ if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){
fprintf( stderr, "pj_ioqueue_create failed\n" );
goto fail;
}
- pj_thread_t * thread;
if( pj_thread_create( ice.pool, NULL, &ice_worker_thread,
- cfg, 0, 0, &thread ) != PJ_SUCCESS ){
+ ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){
fprintf( stderr, "pj_thread_create failed\n" );
goto fail;
}
- cfg->af = pj_AF_INET();
- cfg->opt.aggressive = PJ_TRUE;
+ ecfg->cfg.af = pj_AF_INET();
+ ecfg->cfg.opt.aggressive = PJ_TRUE;
if( stun_server ){
- cfg->stun.server.ptr = malloc( strlen( stun_server ));
- pj_strcpy2( &cfg->stun.server, stun_server );
+ ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server ));
+ pj_strcpy2( &ecfg->cfg.stun.server, stun_server );
if( stun_port )
- cfg->stun.port = stun_port;
+ ecfg->cfg.stun.port = stun_port;
}
if( turn_server ){
- cfg->turn.server.ptr = malloc( strlen( turn_server ));
- pj_strcpy2( &cfg->turn.server, turn_server );
+ ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server ));
+ pj_strcpy2( &ecfg->cfg.turn.server, turn_server );
if( turn_port )
- cfg->turn.port = turn_port;
- cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
- cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
- cfg->turn.conn_type = PJ_TURN_TP_UDP;
+ ecfg->cfg.turn.port = turn_port;
+ ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
+ ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+ ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP;
}
- return cfg;
+ return ecfg;
fail:
- ice_cfg_free( cfg );
+ ice_cfg_free( ecfg );
return NULL;
}
-void ice_cfg_free( pj_ice_strans_cfg * cfg )
+void ice_cfg_free( struct erebos_ice_cfg * ecfg )
{
- if( ! cfg )
+ if( ! ecfg )
return;
- if( cfg->turn.server.ptr )
- free( cfg->turn.server.ptr );
+ ecfg->exit = true;
+ pj_thread_join( ecfg->thread );
- if( cfg->stun.server.ptr )
- free( cfg->stun.server.ptr );
+ if( ecfg->cfg.turn.server.ptr )
+ free( ecfg->cfg.turn.server.ptr );
- if( cfg->stun_cfg.ioqueue )
- pj_ioqueue_destroy( cfg->stun_cfg.ioqueue );
+ if( ecfg->cfg.stun.server.ptr )
+ free( ecfg->cfg.stun.server.ptr );
- if( cfg->stun_cfg.timer_heap )
- pj_timer_heap_destroy( cfg->stun_cfg.timer_heap );
+ if( ecfg->cfg.stun_cfg.ioqueue )
+ pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue );
- free( cfg );
+ if( ecfg->cfg.stun_cfg.timer_heap )
+ pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap );
+
+ free( ecfg );
+}
+
+void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg )
+{
+ if( ! ecfg )
+ return;
+ ecfg->exit = true;
}
-pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role,
+pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb )
{
ice_init();
@@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role
.on_ice_complete = cb_on_ice_complete,
};
- pj_status_t status = pj_ice_strans_create( NULL, cfg, 1,
+ pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1,
udata, &icecb, &res );
if (status != PJ_SUCCESS)
diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h
index e4fcbdb..c31e227 100644
--- a/src/Erebos/ICE/pjproject.h
+++ b/src/Erebos/ICE/pjproject.h
@@ -3,11 +3,12 @@
#include <pjnath.h>
#include <HsFFI.h>
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port );
-void ice_cfg_free( pj_ice_strans_cfg * cfg );
+void ice_cfg_free( struct erebos_ice_cfg * cfg );
+void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg );
-pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role,
+pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb );
void ice_destroy(pj_ice_strans * strans);
diff --git a/src/Erebos/Message.hs b/src/Erebos/Message.hs
index 5ef27f3..78fb5e7 100644
--- a/src/Erebos/Message.hs
+++ b/src/Erebos/Message.hs
@@ -29,6 +29,7 @@ import qualified Data.Text as T
import Data.Time.Format
import Data.Time.LocalTime
+import Erebos.Discovery
import Erebos.Identity
import Erebos.Network
import Erebos.Service
@@ -103,8 +104,10 @@ instance Service DirectMessage where
serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal
- serviceStorageWatchers _ = (:[]) $
- SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ serviceStorageWatchers _ =
+ [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers
+ ]
data MessageState = MessageState
@@ -210,12 +213,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do
else do
return unchanged
+findMissingPeers :: Server -> DirectMessageThreads -> ExceptT String IO ()
+findMissingPeers server threads = do
+ forM_ (toThreadList threads) $ \thread -> do
+ when (msgHead thread /= msgReceived thread) $ do
+ mapM_ (discoverySearch server) $ map storedRef $ idDataF $ msgPeer thread
+
data DirectMessageThread = DirectMessageThread
{ msgPeer :: ComposedIdentity
- , msgHead :: [Stored DirectMessage]
- , msgSent :: [Stored DirectMessage]
- , msgSeen :: [Stored DirectMessage]
+ , msgHead :: [ Stored DirectMessage ]
+ , msgSent :: [ Stored DirectMessage ]
+ , msgSeen :: [ Stored DirectMessage ]
+ , msgReceived :: [ Stored DirectMessage ]
}
threadToList :: DirectMessageThread -> [DirectMessage]
@@ -249,6 +259,7 @@ messageThreadFor peer mss =
, msgHead = filterAncestors $ ready ++ received
, msgSent = filterAncestors $ sent ++ received
, msgSeen = filterAncestors $ ready ++ seen
+ , msgReceived = filterAncestors $ received
}
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index d8e868a..fb2b5e9 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -24,6 +24,7 @@ module Erebos.Network (
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -300,13 +301,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -391,8 +397,21 @@ startServer serverOptions serverOrigHead logd' serverServices = do
return server
stopServer :: Server -> IO ()
-stopServer Server {..} = do
- mapM_ killThread =<< takeMVar serverThreads
+stopServer server@Server {..} = do
+ withMVar serverPeers $ \peers -> do
+ ( global, peerStates ) <- atomically $ (,)
+ <$> takeTMVar serverServiceStates
+ <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState)
+
+ forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do
+ ps <- forM peerStates $ \( peer, states ) ->
+ return $ ( peer, ) $ case M.lookup (serviceID proxy) states of
+ Just (SomeServiceState (_ :: Proxy ps) pstate)
+ | Just (Refl :: s :~: ps) <- eqT
+ -> pstate
+ _ -> emptyServiceState proxy
+ serviceStopServer proxy server gs ps
+ mapM_ killThread =<< takeMVar serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -899,7 +918,7 @@ sendToPeerWith peer fobj = do
Left err -> throwError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
@@ -954,6 +973,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m proxy. (Service s, MonadIO m, MonadError String m)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index d1943e1..b5e52dd 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -71,6 +71,9 @@ class (
serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s]
serviceStorageWatchers _ = []
+ serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO ()
+ serviceStopServer _ _ _ _ = return ()
+
data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s)
@@ -100,7 +103,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p)
-data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+data SomeStorageWatcher s
+ = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+ | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT String IO ())
newtype ServiceID = ServiceID UUID
diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs
index c1e9664..9ccfdde 100644
--- a/src/Erebos/Storage.hs
+++ b/src/Erebos/Storage.hs
@@ -4,7 +4,7 @@ module Erebos.Storage (
deriveEphemeralStorage, derivePartialStorage,
Ref, PartialRef, RefDigest,
- refDigest,
+ refDigest, refFromDigest,
readRef, showRef, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
diff --git a/test/common.test b/test/common.test
new file mode 100644
index 0000000..89941f0
--- /dev/null
+++ b/test/common.test
@@ -0,0 +1,3 @@
+module common
+
+export def refpat = /blake2#[0-9a-f]*/
diff --git a/test/discovery.test b/test/discovery.test
index f2dddb7..5f6c443 100644
--- a/test/discovery.test
+++ b/test/discovery.test
@@ -73,3 +73,40 @@ test ManualDiscovery:
send "stop-server" to p
for p in [ pd, p1, p2 ]:
expect /stop-server-done/ from p
+
+ # Test delayed discovery with new peer
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services" to p
+
+ with p1:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p1.node.ip} 29665/
+ /peer [12] id Device1 Owner1/
+
+ send "discovery-connect $id" to p2
+
+ with p2:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p2.node.ip} 29665/
+ /peer [12] id Device2 Owner2/
+
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ expect from p2:
+ /peer [0-9]+ addr ${p1.node.ip} 29665/
+ /peer [0-9]+ id Device1 Owner1/
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p
diff --git a/test/message.test b/test/message.test
index c0e251b..2990d0f 100644
--- a/test/message.test
+++ b/test/message.test
@@ -1,3 +1,7 @@
+module message
+
+import common
+
test DirectMessage:
let services = "contact,dm"
@@ -149,3 +153,113 @@ test DirectMessage:
send "start-server services $services" to p2
expect /dm-received from Owner1 text while_peer_offline/ from p2
+
+
+test DirectMessageDiscovery:
+ let services = "dm,discovery"
+
+ subnet sd
+ subnet s1
+ subnet s2
+ subnet s3
+ subnet s4
+
+ spawn on sd as pd
+ spawn on s1 as p1
+ spawn on s2 as p2
+ spawn on s3 as p3
+ spawn on s4 as p4
+
+ send "create-identity Discovery" to pd
+
+ send "create-identity Device1 Owner1" to p1
+ expect /create-identity-done ref ($refpat)/ from p1 capture p1_id
+ send "identity-info $p1_id" to p1
+ expect /identity-info ref $p1_id base ($refpat) owner ($refpat).*/ from p1 capture p1_base, p1_owner
+
+ send "create-identity Device2 Owner2" to p2
+ expect /create-identity-done ref ($refpat)/ from p2 capture p2_id
+ send "identity-info $p2_id" to p2
+ expect /identity-info ref $p2_id base ($refpat) owner ($refpat).*/ from p2 capture p2_base, p2_owner
+ send "identity-info $p2_owner" to p2
+ expect /identity-info ref $p2_owner base ($refpat).*/ from p2 capture p2_obase
+
+ send "create-identity Device3 Owner3" to p3
+ expect /create-identity-done ref ($refpat)/ from p3 capture p3_id
+ send "identity-info $p3_id" to p3
+ expect /identity-info ref $p3_id base ($refpat) owner ($refpat).*/ from p3 capture p3_base, p3_owner
+
+ send "create-identity Device4 Owner4" to p4
+ expect /create-identity-done ref ($refpat)/ from p4 capture p4_id
+ send "identity-info $p4_id" to p4
+ expect /identity-info ref $p4_id base ($refpat) owner ($refpat).*/ from p4 capture p4_base, p4_owner
+
+
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ send "start-server services $services"
+
+ for p in [ p2, p3, p4 ]:
+ with p1:
+ send "peer-add ${p.node.ip}"
+ expect:
+ /peer [0-9]+ addr ${p.node.ip} 29665/
+ /peer [0-9]+ id Device. Owner./
+ expect from p:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1 Owner1/
+
+ # Make sure p1 has other identities in storage:
+ for i in [ 1 .. 3 ]:
+ send "dm-send-peer $i init1" to p1
+ for p in [ p2, p3, p4 ]:
+ expect /dm-received from Owner1 text init1/ from p
+ send "dm-send-identity $p1_owner init2" to p
+ expect /dm-received from Owner. text init2/ from p1
+
+ # Restart servers to remove peers:
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ send "stop-server"
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ expect /stop-server-done/
+
+ # Prepare message before peers connect to discovery
+ send "dm-send-identity $p4_owner hello_to_p4" to p1
+
+ for p in [ p1, p2, p3, p4, pd ]:
+ with p:
+ send "start-server services $services"
+
+ for p in [ p2, p3, p4, p1 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [0-9]+ addr ${p.node.ip} 29665/
+ /peer [0-9]+ id Device. Owner./
+
+ multiply_timeout by 2.0
+
+ # Connect via discovery manually, then send message
+ send "discovery-connect $p2_obase" to p1
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ send "dm-send-identity $p2_owner hello_to_p2" to p1
+ expect /dm-received from Owner1 text hello_to_p2/ from p2
+
+ # Send message, expect automatic discovery
+ send "dm-send-identity $p3_owner hello_to_p3" to p1
+ expect /dm-received from Owner1 text hello_to_p3/ from p3
+
+ # Verify the first message
+ expect /dm-received from Owner1 text hello_to_p4/ from p4
+
+ for p in [ p1, p2, p3, p4, pd ]:
+ send "stop-server" to p
+ for p in [ p1, p2, p3, p4, pd ]:
+ expect /stop-server-done/ from p