summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md34
-rw-r--r--README.md26
-rw-r--r--erebos-tester.yaml2
-rw-r--r--erebos.cabal37
-rw-r--r--main/Main.hs556
-rw-r--r--main/State.hs90
-rw-r--r--main/Terminal.hs125
-rw-r--r--main/Test.hs345
-rw-r--r--main/Test/Service.hs21
-rw-r--r--main/WebSocket.hs48
-rw-r--r--src/Erebos/Attach.hs2
-rw-r--r--src/Erebos/Chatroom.hs12
-rw-r--r--src/Erebos/Contact.hs10
-rw-r--r--src/Erebos/Conversation.hs36
-rw-r--r--src/Erebos/DirectMessage.hs176
-rw-r--r--src/Erebos/Discovery.hs629
-rw-r--r--src/Erebos/Flow.hs37
-rw-r--r--src/Erebos/ICE.chs53
-rw-r--r--src/Erebos/ICE/pjproject.c107
-rw-r--r--src/Erebos/ICE/pjproject.h15
-rw-r--r--src/Erebos/Identity.hs41
-rw-r--r--src/Erebos/Network.hs335
-rw-r--r--src/Erebos/Network.hs-boot1
-rw-r--r--src/Erebos/Network/Address.hs65
-rw-r--r--src/Erebos/Network/Protocol.hs76
-rw-r--r--src/Erebos/Network/ifaddrs.c10
-rw-r--r--src/Erebos/Network/ifaddrs.h6
-rw-r--r--src/Erebos/Object.hs5
-rw-r--r--src/Erebos/Object/Internal.hs21
-rw-r--r--src/Erebos/Pairing.hs29
-rw-r--r--src/Erebos/Service.hs25
-rw-r--r--src/Erebos/Service/Stream.hs74
-rw-r--r--src/Erebos/Set.hs9
-rw-r--r--src/Erebos/State.hs47
-rw-r--r--src/Erebos/Storage/Disk.hs2
-rw-r--r--src/Erebos/Storage/Head.hs9
-rw-r--r--src/Erebos/Storage/Internal.hs76
-rw-r--r--src/Erebos/Storage/Memory.hs20
-rw-r--r--src/Erebos/Storage/Merge.hs45
-rw-r--r--src/Erebos/Sync.hs1
-rw-r--r--src/Erebos/UUID.hs24
-rw-r--r--src/Erebos/Util.hs43
-rw-r--r--test/attach.et (renamed from test/attach.test)0
-rw-r--r--test/chatroom.et (renamed from test/chatroom.test)0
-rw-r--r--test/common.et3
-rw-r--r--test/contact.et (renamed from test/contact.test)0
-rw-r--r--test/discovery.et218
-rw-r--r--test/discovery.test75
-rw-r--r--test/graph.et111
-rw-r--r--test/message.et (renamed from test/message.test)126
-rw-r--r--test/network.et (renamed from test/network.test)63
-rw-r--r--test/storage.et (renamed from test/storage.test)0
-rw-r--r--test/sync.et (renamed from test/sync.test)0
53 files changed, 2792 insertions, 1129 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cddb159..5295389 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,39 @@
# Revision history for erebos
+## 0.2.0 -- 2025-08-06
+
+* Weak references in records
+* Use XDG data directory for default storage path
+* Added `/identity` command to show details of current identity
+* Support tunnel for peers in discovery service
+* New CLI prompt implementation providing cleaner interface
+ * Avoids displaying sent messages twice – both in previous prompt and in message history
+ * Print received messages only for selected conversation
+ * Clear tab completion options after use
+
+* API
+ * Split `Erebos.Storage` into multiple modules
+ * Removed deprecated `Message.formatMessage` alias
+ * Renamed `Erebos.Message` module to `Erebos.DirectMessage`
+ * Added `StorageBackend` type class to allow custom storage implementation
+ * `MonadError` constraints use generic error type
+ * Replaced `Erebos.Network.peerAddress` with `getPeerAddress` and added `getPeerAddresses`
+ * Renamed `Erebos.Network.peerIdentity` to `getPeerIdentity`
+ * Renamed some functions in `Erebos.DirectMessage` module to make clear they are related only to direct messages
+ * `Erebos.Storage.Merge.generations`/`generationsBy` return `NonEmpty`
+ * Replaced `watchReceivedDirectMessages` with `watchDirectMessageThreads`
+ * Return type of `sendMessage` and `sendDirectMessage` is now `()`
+ * Some functions use `MonadStorage` instead of explicit `Storage` parameter:
+ * `Erebos.Set.storeSetAdd`
+ * `Erebos.State.makeSharedStateUpdate`
+ * `Erebos.Identity.createIdentity`
+
+## 0.1.9 -- 2025-07-08
+
+* Option to show details or delete a conversation by giving index parameter without first selecting it
+* Improved handling of ICE connections
+* Automatic discovery of peers for pending direct messages
+
## 0.1.8.1 -- 2025-03-29
* Fix build from sdist (add missing include)
diff --git a/README.md b/README.md
index 07cee2a..f7ad6ae 100644
--- a/README.md
+++ b/README.md
@@ -102,11 +102,13 @@ Test chatroom [19:03] Some Name: Hi
`<message>`
: Send `<message>` to selected conversation.
-`/history`
-: Show message history of the selected conversation.
+`/history [<number>]`
+: Show message history of the selected conversation, or the one identified by
+ `<number>` if given.
-`/details`
-: Show information about the selected conversations, contact or peer.
+`/details [<number>]`
+: Show information about the selected conversations, contact or peer; or the
+ one identified by `<number>` if given.
### Chatrooms
@@ -137,12 +139,13 @@ are signed, so message author can not be forged.
: Leave the chatroom. User will no longer be listed as a member and erebos tool
will no longer collect message of this chatroom.
-`/delete`
-: Delete the chatroom; this action is only synchronized with devices belonging
-to the current user and does not affect the chatroom state for others. Due to
-the storage design, the chatroom data will not be purged from the local state
-history, but the chatroom will no longer be listed as available and no futher
-updates for this chatroom will be collected or shared with other peers.
+`/delete [<number>]`
+: Delete the chatroom (currently selected one, or the one identified by
+ `<number>`); this action is only synchronized with devices belonging to the
+ current user and does not affect the chatroom state for others. Due to the
+ storage design, the chatroom data will not be purged from the local state
+ history, but the chatroom will no longer be listed as available and no futher
+ updates for this chatroom will be collected or shared with other peers.
### Add contacts
@@ -213,6 +216,9 @@ target device with `/<number>`.
: Drop the currently selected peer. Afterwards, the connection can be
re-established by either side.
+`/identity`
+: Show details of current identity
+
`/update-identity`
: Interactively update current identity information
diff --git a/erebos-tester.yaml b/erebos-tester.yaml
index a44f080..c8239df 100644
--- a/erebos-tester.yaml
+++ b/erebos-tester.yaml
@@ -1 +1 @@
-tests: test/**/*.test
+tests: test/**/*.et
diff --git a/erebos.cabal b/erebos.cabal
index 7f96d80..c3080ed 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -1,7 +1,7 @@
Cabal-Version: 3.0
Name: erebos
-Version: 0.1.8.1
+Version: 0.2.0
Synopsis: Decentralized messaging and synchronization
Description:
Library and simple CLI interface implementing the Erebos identity
@@ -39,6 +39,10 @@ Flag ci
default: False
manual: True
+Flag cryptonite
+ description: Use deprecated 'cryptonite' package
+ default: False
+
source-repository head
type: git
location: https://code.erebosprotocol.net/erebos
@@ -104,12 +108,11 @@ library
Erebos.Error
Erebos.Identity
Erebos.Network
- Erebos.Network.Channel
- Erebos.Network.Protocol
Erebos.Object
Erebos.Pairing
Erebos.PubKey
Erebos.Service
+ Erebos.Service.Stream
Erebos.Set
Erebos.State
Erebos.Storable
@@ -122,11 +125,15 @@ library
other-modules:
Erebos.Flow
+ Erebos.Network.Address
+ Erebos.Network.Channel
+ Erebos.Network.Protocol
Erebos.Object.Internal
Erebos.Storage.Disk
Erebos.Storage.Internal
Erebos.Storage.Memory
Erebos.Storage.Platform
+ Erebos.UUID
Erebos.Util
c-sources:
@@ -137,7 +144,7 @@ library
src/Erebos/Network/ifaddrs.h
if flag(ice)
- exposed-modules:
+ other-modules:
Erebos.ICE
c-sources:
src/Erebos/ICE/pjproject.c
@@ -154,7 +161,6 @@ library
bytestring >=0.10 && <0.13,
clock >=0.8 && < 0.9,
containers ^>= { 0.6, 0.7, 0.8 },
- crypton ^>= { 0.34, 1.0 },
deepseq >= 1.4 && <1.6,
directory >= 1.3 && <1.4,
filepath >=1.4 && <1.6,
@@ -168,9 +174,16 @@ library
stm >=2.5 && <2.6,
text >= 1.2 && <2.2,
time ^>= { 1.8, 1.9, 1.10, 1.11, 1.12, 1.13, 1.14 },
- uuid >=1.3 && <1.4,
+ uuid-types ^>= { 1.0.4 },
zlib >=0.6 && <0.8
+ if !flag(cryptonite)
+ build-depends:
+ crypton ^>= { 0.34, 1.0 },
+ else
+ build-depends:
+ cryptonite >=0.25 && <0.31,
+
if os(windows)
hs-source-dirs: src/windows
build-depends:
@@ -195,13 +208,13 @@ executable erebos
Test.Service
Version
Version.Git
+ WebSocket
autogen-modules:
Paths_erebos
build-depends:
ansi-terminal ^>= { 0.11, 1.0, 1.1 },
bytestring,
- crypton,
directory,
erebos,
mtl,
@@ -212,4 +225,12 @@ executable erebos
text,
time,
transformers >= 0.5 && <0.7,
- uuid,
+ uuid-types,
+ websockets ^>= { 0.12.7, 0.13 },
+
+ if !flag(cryptonite)
+ build-depends:
+ crypton,
+ else
+ build-depends:
+ cryptonite,
diff --git a/main/Main.hs b/main/Main.hs
index 3f78db1..5bda7e7 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -1,9 +1,7 @@
-{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
module Main (main) where
-import Control.Arrow (first)
import Control.Concurrent
import Control.Exception
import Control.Monad
@@ -11,11 +9,13 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
import Control.Monad.Trans.Maybe
+import Control.Monad.Writer
import Crypto.Random
-import qualified Data.ByteString.Char8 as BC
-import qualified Data.ByteString.Lazy as BL
+import Data.Bifunctor
+import Data.ByteString.Char8 qualified as BC
+import Data.ByteString.Lazy qualified as BL
import Data.Char
import Data.List
import Data.Maybe
@@ -42,9 +42,6 @@ import Erebos.Chatroom
import Erebos.Conversation
import Erebos.DirectMessage
import Erebos.Discovery
-#ifdef ENABLE_ICE_SUPPORT
-import Erebos.ICE
-#endif
import Erebos.Identity
import Erebos.Network
import Erebos.Object
@@ -61,13 +58,16 @@ import State
import Terminal
import Test
import Version
+import WebSocket
data Options = Options
{ optServer :: ServerOptions
, optServices :: [ServiceOption]
, optStorage :: StorageOption
+ , optCreateIdentity :: Maybe ( Maybe Text, [ Maybe Text ] )
, optChatroomAutoSubscribe :: Maybe Int
, optDmBotEcho :: Maybe Text
+ , optWebSocketServer :: Maybe Int
, optShowHelp :: Bool
, optShowVersion :: Bool
}
@@ -88,8 +88,10 @@ defaultOptions = Options
{ optServer = defaultServerOptions
, optServices = availableServices
, optStorage = DefaultStorage
+ , optCreateIdentity = Nothing
, optChatroomAutoSubscribe = Nothing
, optDmBotEcho = Nothing
+ , optWebSocketServer = Nothing
, optShowHelp = False
, optShowVersion = False
}
@@ -110,7 +112,7 @@ availableServices =
True "peer discovery"
]
-options :: [OptDescr (Options -> Options)]
+options :: [ OptDescr (Options -> Writer [ String ] Options) ]
options =
[ Option ['p'] ["port"]
(ReqArg (\p -> so $ \opts -> opts { serverPort = read p }) "<port>")
@@ -119,57 +121,92 @@ options =
(NoArg (so $ \opts -> opts { serverLocalDiscovery = False }))
"do not send announce packets for local discovery"
, Option [] [ "storage" ]
- (ReqArg (\path -> \opts -> opts { optStorage = FilesystemStorage path }) "<path>")
+ (ReqArg (\path -> \opts -> return opts { optStorage = FilesystemStorage path }) "<path>")
"use storage in <path>"
, Option [] [ "memory-storage" ]
- (NoArg (\opts -> opts { optStorage = MemoryStorage }))
+ (NoArg (\opts -> return opts { optStorage = MemoryStorage }))
"use memory storage"
+ , Option [] [ "create-identity" ]
+ (OptArg (\value -> \opts -> return opts
+ { optCreateIdentity =
+ let devName = T.pack <$> value
+ in maybe (Just ( devName, [] )) (Just . first (const devName)) (optCreateIdentity opts)
+ }) "<name>")
+ "create a new (device) identity in a new local state"
+ , Option [] [ "create-owner" ]
+ (OptArg (\value -> \opts -> return opts
+ { optCreateIdentity =
+ let ownerName = T.pack <$> value
+ in maybe (Just ( Nothing, [ ownerName ] )) (Just . second (ownerName :)) (optCreateIdentity opts)
+ }) "<name>")
+ "create owner for a new device identity"
, Option [] ["chatroom-auto-subscribe"]
- (ReqArg (\count -> \opts -> opts { optChatroomAutoSubscribe = Just (read count) }) "<count>")
+ (ReqArg (\count -> \opts -> return opts { optChatroomAutoSubscribe = Just (read count) }) "<count>")
"automatically subscribe for up to <count> chatrooms"
-#ifdef ENABLE_ICE_SUPPORT
, Option [] [ "discovery-stun-port" ]
- (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryStunPort = Just (read value) }) "<port>")
+ (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryStunPort = Just (read value) }) "<port>")
"offer specified <port> to discovery peers for STUN protocol"
, Option [] [ "discovery-stun-server" ]
- (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryStunServer = Just (read value) }) "<server>")
+ (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryStunServer = Just (read value) }) "<server>")
"offer <server> (domain name or IP address) to discovery peers for STUN protocol"
, Option [] [ "discovery-turn-port" ]
- (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryTurnPort = Just (read value) }) "<port>")
+ (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryTurnPort = Just (read value) }) "<port>")
"offer specified <port> to discovery peers for TURN protocol"
, Option [] [ "discovery-turn-server" ]
- (ReqArg (\value -> serviceAttr $ \attrs -> attrs { discoveryTurnServer = Just (read value) }) "<server>")
+ (ReqArg (\value -> serviceAttr $ \attrs -> return attrs { discoveryTurnServer = Just (read value) }) "<server>")
"offer <server> (domain name or IP address) to discovery peers for TURN protocol"
-#endif
+ , Option [] [ "discovery-tunnel" ]
+ (OptArg (\value -> \opts -> do
+ fun <- provideTunnelFun value
+ serviceAttr (\attrs -> return attrs { discoveryProvideTunnel = fun }) opts) "<peer-type>")
+ "offer to provide tunnel for peers of given <peer-type>, possible values: all, none, websocket"
, Option [] ["dm-bot-echo"]
- (ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>")
+ (ReqArg (\prefix -> \opts -> return opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>")
"automatically reply to direct messages with the same text prefixed with <prefix>"
+ , Option [] [ "websocket-server" ]
+ (ReqArg (\value -> \opts -> return opts { optWebSocketServer = Just (read value) }) "<port>")
+ "start WebSocket server on given port"
, Option ['h'] ["help"]
- (NoArg $ \opts -> opts { optShowHelp = True })
+ (NoArg $ \opts -> return opts { optShowHelp = True })
"show this help and exit"
, Option ['V'] ["version"]
- (NoArg $ \opts -> opts { optShowVersion = True })
+ (NoArg $ \opts -> return opts { optShowVersion = True })
"show version and exit"
]
where
- so f opts = opts { optServer = f $ optServer opts }
+ so f opts = return opts { optServer = f $ optServer opts }
- updateService :: Service s => (ServiceAttributes s -> ServiceAttributes s) -> SomeService -> SomeService
+ updateService :: (Service s, Monad m, Typeable m) => (ServiceAttributes s -> m (ServiceAttributes s)) -> SomeService -> m SomeService
updateService f some@(SomeService proxy attrs)
- | Just f' <- cast f = SomeService proxy (f' attrs)
- | otherwise = some
-
- serviceAttr :: Service s => (ServiceAttributes s -> ServiceAttributes s) -> Options -> Options
- serviceAttr f opts = opts { optServices = map (\sopt -> sopt { soptService = updateService f (soptService sopt) }) (optServices opts) }
-
-servicesOptions :: [OptDescr (Options -> Options)]
+ | Just f' <- cast f = SomeService proxy <$> f' attrs
+ | otherwise = return some
+
+ serviceAttr :: (Service s, Monad m, Typeable m) => (ServiceAttributes s -> m (ServiceAttributes s)) -> Options -> m Options
+ serviceAttr f opts = do
+ services' <- forM (optServices opts) $ \sopt -> do
+ service <- updateService f (soptService sopt)
+ return sopt { soptService = service }
+ return opts { optServices = services' }
+
+ provideTunnelFun :: Maybe String -> Writer [ String ] (Peer -> PeerAddress -> Bool)
+ provideTunnelFun Nothing = return $ \_ _ -> True
+ provideTunnelFun (Just "all") = return $ \_ _ -> True
+ provideTunnelFun (Just "none") = return $ \_ _ -> False
+ provideTunnelFun (Just "websocket") = return $ \_ -> \case
+ CustomPeerAddress addr | Just WebSocketAddress {} <- cast addr -> True
+ _ -> False
+ provideTunnelFun (Just name) = do
+ tell [ "Invalid value of --discovery-tunnel: ‘" <> name <> "’\n" ]
+ return $ \_ _ -> False
+
+servicesOptions :: [ OptDescr (Options -> Writer [ String ] Options) ]
servicesOptions = concatMap helper $ "all" : map soptName availableServices
where
helper name =
- [ Option [] ["enable-" <> name] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = True }) ""
- , Option [] ["disable-" <> name] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = False }) ""
+ [ Option [] [ "enable-" <> name ] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = True }) ""
+ , Option [] [ "disable-" <> name ] (NoArg $ so $ change name $ \sopt -> sopt { soptEnabled = False }) ""
]
- so f opts = opts { optServices = f $ optServices opts }
+ so f opts = return opts { optServices = f $ optServices opts }
change :: String -> (ServiceOption -> ServiceOption) -> [ServiceOption] -> [ServiceOption]
change name f (s : ss)
| soptName s == name || name == "all"
@@ -187,13 +224,16 @@ getDefaultStorageDir = do
main :: IO ()
main = do
- (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case
- (o, args, []) -> do
- return (foldl (flip id) defaultOptions o, args)
- (_, _, errs) -> do
+ let printErrors errs = do
progName <- getProgName
hPutStrLn stderr $ concat errs <> "Try `" <> progName <> " --help' for more information."
exitFailure
+ (opts, args) <- (getOpt RequireOrder (options ++ servicesOptions) <$> getArgs) >>= \case
+ (wo, args, []) ->
+ case runWriter (foldM (flip ($)) defaultOptions wo) of
+ ( o, [] ) -> return ( o, args )
+ ( _, errs ) -> printErrors errs
+ (_, _, errs) -> printErrors errs
st <- liftIO $ case optStorage opts of
DefaultStorage -> openStorage =<< getDefaultStorageDir
@@ -236,10 +276,18 @@ main = do
Nothing -> error "ref does not exist"
Just ref -> print $ storedGeneration (wrappedLoad ref :: Stored Object)
+ [ "identity" ] -> do
+ loadHeads st >>= \case
+ (h : _) -> do
+ T.putStr $ showIdentityDetails $ headLocalIdentity h
+ [] -> do
+ T.putStrLn "no local state head"
+ exitFailure
+
["update-identity"] -> do
withTerminal noCompletion $ \term -> do
either (fail . showErebosError) return <=< runExceptT $ do
- runReaderT (updateSharedIdentity term) =<< loadLocalStateHead term st
+ runReaderT (updateSharedIdentity term) =<< runReaderT (loadLocalStateHead term) st
("update-identity" : srefs) -> do
withTerminal noCompletion $ \term -> do
@@ -281,7 +329,10 @@ main = do
interactiveLoop :: Storage -> Options -> IO ()
interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
- erebosHead <- liftIO $ loadLocalStateHead term st
+ erebosHead <- either (fail . showErebosError) return <=< runExceptT . flip runReaderT st $ do
+ case optCreateIdentity opts of
+ Nothing -> loadLocalStateHead term
+ Just ( devName, names ) -> createLocalStateHead (names ++ [ devName ])
void $ printLine term $ T.unpack $ displayIdentity $ headLocalIdentity erebosHead
let tui = hasTerminalUI term
@@ -293,7 +344,7 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
Left cstate -> do
pname <- case csContext cstate of
NoContext -> return ""
- SelectedPeer peer -> peerIdentity peer >>= return . \case
+ SelectedPeer peer -> getPeerIdentity peer >>= return . \case
PeerIdentityFull pid -> maybe "<unnamed>" T.unpack $ idName $ finalOwner pid
PeerIdentityRef wref _ -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">"
PeerIdentityUnknown _ -> "<unknown>"
@@ -309,65 +360,85 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
_ | all isSpace input -> getInputLinesTui eprompt
'\\':rest -> (reverse ('\n':rest) ++) <$> getInputLinesTui (Right ">> ")
_ -> return input
- Nothing -> KeepPrompt mzero
+ Nothing
+ | tui -> KeepPrompt mzero
+ | otherwise -> KeepPrompt $ liftIO $ forever $ threadDelay 100000000
getInputCommandTui cstate = do
- input <- getInputLinesTui cstate
- let (CommandM cmd, line) = case input of
- '/':rest -> let (scmd, args) = dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') rest
- in if not (null scmd) && all isDigit scmd
- then (cmdSelectContext, scmd)
- else (fromMaybe (cmdUnknown scmd) $ lookup scmd commands, args)
- _ -> (cmdSend, input)
- return (cmd, line)
-
- getInputLinesPipe = do
- join $ lift $ getInputLine term $ KeepPrompt . \case
- Just input -> return input
- Nothing -> liftIO $ forever $ threadDelay 100000000
-
- getInputCommandPipe _ = do
- input <- getInputLinesPipe
- let (scmd, args) = dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') input
- let (CommandM cmd, line) = (fromMaybe (cmdUnknown scmd) $ lookup scmd commands, args)
- return (cmd, line)
-
- let getInputCommand = if tui then getInputCommandTui . Left
- else getInputCommandPipe
+ let parseCommand cmdline =
+ case dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') cmdline of
+ ( scmd, args )
+ | not (null scmd) && all isDigit scmd
+ -> ( cmdSelectContext, scmd )
+
+ | otherwise
+ -> ( fromMaybe (cmdUnknown scmd) $ lookup scmd commands, args )
+
+ ( CommandM cmd, line ) <- getInputLinesTui cstate >>= return . \case
+ '/' : input -> parseCommand input
+ input | not tui -> parseCommand input
+ input -> ( cmdSend, input )
+ return ( cmd, line )
+
+ let getInputCommand = getInputCommandTui . Left
+
+ contextVar <- liftIO $ newMVar NoContext
_ <- liftIO $ do
tzone <- getCurrentTimeZone
- watchReceivedMessages erebosHead $ \smsg -> do
- let msg = fromStored smsg
- extPrintLn $ formatDirectMessage tzone msg
- case optDmBotEcho opts of
- Nothing -> return ()
- Just prefix -> do
- res <- runExceptT $ flip runReaderT erebosHead $ sendDirectMessage (msgFrom msg) (prefix <> msgText msg)
- case res of
- Right reply -> extPrintLn $ formatDirectMessage tzone $ fromStored reply
- Left err -> extPrintLn $ "Failed to send dm echo: " <> err
+ let self = finalOwner $ headLocalIdentity erebosHead
+ watchDirectMessageThreads erebosHead $ \prev cur -> do
+ forM_ (reverse $ dmThreadToListSince prev cur) $ \msg -> do
+ withMVar contextVar $ \ctx -> do
+ mbpid <- case ctx of
+ SelectedPeer peer -> getPeerIdentity peer >>= return . \case
+ PeerIdentityFull pid -> Just $ finalOwner pid
+ _ -> Nothing
+ SelectedContact contact
+ | Just cid <- contactIdentity contact -> return (Just cid)
+ SelectedConversation conv -> return $ conversationPeer conv
+ _ -> return Nothing
+ when (not tui || maybe False (msgPeer cur `sameIdentity`) mbpid) $ do
+ extPrintLn $ formatDirectMessage tzone msg
+
+ case optDmBotEcho opts of
+ Just prefix
+ | not (msgFrom msg `sameIdentity` self)
+ -> do
+ void $ forkIO $ do
+ res <- runExceptT $ flip runReaderT erebosHead $ sendDirectMessage (msgFrom msg) (prefix <> msgText msg)
+ case res of
+ Right _ -> return ()
+ Left err -> extPrintLn $ "Failed to send dm echo: " <> err
+ _ -> return ()
peers <- liftIO $ newMVar []
- contextOptions <- liftIO $ newMVar []
+ contextOptions <- liftIO $ newMVar ( Nothing, [] )
chatroomSetVar <- liftIO $ newEmptyMVar
let autoSubscribe = optChatroomAutoSubscribe opts
chatroomList = fromSetBy (comparing roomStateData) . lookupSharedValue . lsShared . headObject $ erebosHead
watched <- if isJust autoSubscribe || any roomStateSubscribe chatroomList
- then fmap Just $ liftIO $ watchChatroomsForCli extPrintLn erebosHead chatroomSetVar contextOptions autoSubscribe
- else return Nothing
+ then do
+ fmap Just $ liftIO $ watchChatroomsForCli tui extPrintLn erebosHead
+ chatroomSetVar contextVar contextOptions autoSubscribe
+ else do
+ return Nothing
server <- liftIO $ do
startServer (optServer opts) erebosHead extPrintLn $
map soptService $ filter soptEnabled $ optServices opts
+ case optWebSocketServer opts of
+ Just port -> startWebsocketServer server "::" port extPrintLn
+ Nothing -> return ()
+
void $ liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange server
- peerIdentity peer >>= \case
+ getPeerIdentity peer >>= \case
pid@(PeerIdentityFull _) -> do
dropped <- isPeerDropped peer
- let shown = showPeer pid $ peerAddress peer
+ shown <- showPeer pid <$> getPeerAddress peer
let update [] = ([(peer, shown)], (Nothing, "NEW"))
update ((p,s):ps)
| p == peer && dropped = (ps, (Nothing, "DEL"))
@@ -379,8 +450,15 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
| otherwise = first (ctx:) $ ctxUpdate (n + 1) ctxs
(op, updateType) <- modifyMVar peers (return . update)
let updateType' = if dropped then "DEL" else updateType
- idx <- modifyMVar contextOptions (return . ctxUpdate (1 :: Int))
- when (Just shown /= op) $ extPrintLn $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown
+ modifyMVar_ contextOptions $ \case
+ ( watch, clist )
+ | watch == Just WatchPeers || not tui
+ -> do
+ let ( clist', idx ) = ctxUpdate (1 :: Int) clist
+ when (Just shown /= op) $ do
+ extPrintLn $ "[" <> show idx <> "] PEER " <> updateType' <> " " <> shown
+ return ( Just WatchPeers, clist' )
+ cur -> return cur
_ -> return ()
let process :: CommandState -> MaybeT IO CommandState
@@ -390,20 +468,23 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
Just h -> return h
Nothing -> do lift $ extPrintLn "current head deleted"
mzero
- res <- liftIO $ runExceptT $ flip execStateT cstate { csHead = h } $ runReaderT cmd CommandInput
- { ciServer = server
- , ciTerminal = term
- , ciLine = line
- , ciPrint = extPrintLn
- , ciOptions = opts
- , ciPeers = liftIO $ modifyMVar peers $ \ps -> do
- ps' <- filterM (fmap not . isPeerDropped . fst) ps
- return (ps', ps')
- , ciContextOptions = liftIO $ readMVar contextOptions
- , ciSetContextOptions = \ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ctxs
- , ciContextOptionsVar = contextOptions
- , ciChatroomSetVar = chatroomSetVar
- }
+ res <- liftIO $ modifyMVar contextVar $ \ctx -> do
+ res <- runExceptT $ flip execStateT cstate { csHead = h, csContext = ctx } $ runReaderT cmd CommandInput
+ { ciServer = server
+ , ciTerminal = term
+ , ciLine = line
+ , ciPrint = extPrintLn
+ , ciOptions = opts
+ , ciPeers = liftIO $ modifyMVar peers $ \ps -> do
+ ps' <- filterM (fmap not . isPeerDropped . fst) ps
+ return (ps', ps')
+ , ciContextOptions = liftIO $ snd <$> readMVar contextOptions
+ , ciSetContextOptions = \watch ctxs -> liftIO $ modifyMVar_ contextOptions $ const $ return ( Just watch, ctxs )
+ , ciContextVar = contextVar
+ , ciContextOptionsVar = contextOptions
+ , ciChatroomSetVar = chatroomSetVar
+ }
+ return ( either (const ctx) csContext res, res )
case res of
Right cstate'
| csQuit cstate' -> mzero
@@ -417,10 +498,6 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
loop $ Just $ CommandState
{ csHead = erebosHead
, csContext = NoContext
-#ifdef ENABLE_ICE_SUPPORT
- , csIceSessions = []
-#endif
- , csIcePeer = Nothing
, csWatchChatrooms = watched
, csQuit = False
}
@@ -433,28 +510,33 @@ data CommandInput = CommandInput
, ciPrint :: String -> IO ()
, ciOptions :: Options
, ciPeers :: CommandM [(Peer, String)]
- , ciContextOptions :: CommandM [CommandContext]
- , ciSetContextOptions :: [CommandContext] -> Command
- , ciContextOptionsVar :: MVar [ CommandContext ]
+ , ciContextOptions :: CommandM [ CommandContext ]
+ , ciSetContextOptions :: ContextWatchOptions -> [ CommandContext ] -> Command
+ , ciContextVar :: MVar CommandContext
+ , ciContextOptionsVar :: MVar ( Maybe ContextWatchOptions, [ CommandContext ] )
, ciChatroomSetVar :: MVar (Set ChatroomState)
}
data CommandState = CommandState
{ csHead :: Head LocalState
, csContext :: CommandContext
-#ifdef ENABLE_ICE_SUPPORT
- , csIceSessions :: [IceSession]
-#endif
- , csIcePeer :: Maybe Peer
, csWatchChatrooms :: Maybe WatchedHead
, csQuit :: Bool
}
-data CommandContext = NoContext
- | SelectedPeer Peer
- | SelectedContact Contact
- | SelectedChatroom ChatroomState
- | SelectedConversation Conversation
+data CommandContext
+ = NoContext
+ | SelectedPeer Peer
+ | SelectedContact Contact
+ | SelectedChatroom ChatroomState
+ | SelectedConversation Conversation
+
+data ContextWatchOptions
+ = WatchPeers
+ | WatchContacts
+ | WatchChatrooms
+ | WatchConversations
+ deriving (Eq)
newtype CommandM a = CommandM (ReaderT CommandInput (StateT CommandState (ExceptT ErebosError IO)) a)
deriving (Functor, Applicative, Monad, MonadReader CommandInput, MonadState CommandState, MonadError ErebosError)
@@ -493,8 +575,11 @@ getSelectedChatroom = gets csContext >>= \case
_ -> throwOtherError "no chatroom selected"
getSelectedConversation :: CommandM Conversation
-getSelectedConversation = gets csContext >>= \case
- SelectedPeer peer -> peerIdentity peer >>= \case
+getSelectedConversation = gets csContext >>= getConversationFromContext
+
+getConversationFromContext :: CommandContext -> CommandM Conversation
+getConversationFromContext = \case
+ SelectedPeer peer -> getPeerIdentity peer >>= \case
PeerIdentityFull pid -> directMessageConversation $ finalOwner pid
_ -> throwOtherError "incomplete peer identity"
SelectedContact contact -> case contactIdentity contact of
@@ -507,9 +592,17 @@ getSelectedConversation = gets csContext >>= \case
SelectedConversation conv -> reloadConversation conv
_ -> throwOtherError "no contact, peer or conversation selected"
+getSelectedOrManualContext :: CommandM CommandContext
+getSelectedOrManualContext = do
+ asks ciLine >>= \case
+ "" -> gets csContext
+ str | all isDigit str -> getContextByIndex id (read str)
+ _ -> throwOtherError "invalid index"
+
commands :: [(String, Command)]
commands =
[ ("history", cmdHistory)
+ , ("identity", cmdIdentity)
, ("peers", cmdPeers)
, ("peer-add", cmdPeerAdd)
, ("peer-add-public", cmdPeerAddPublic)
@@ -528,15 +621,7 @@ commands =
, ("contact-reject", cmdContactReject)
, ("conversations", cmdConversations)
, ("details", cmdDetails)
- , ("discovery-init", cmdDiscoveryInit)
, ("discovery", cmdDiscovery)
-#ifdef ENABLE_ICE_SUPPORT
- , ("ice-create", cmdIceCreate)
- , ("ice-destroy", cmdIceDestroy)
- , ("ice-show", cmdIceShow)
- , ("ice-connect", cmdIceConnect)
- , ("ice-send", cmdIceSend)
-#endif
, ("join", cmdJoin)
, ("join-as", cmdJoinAs)
, ("leave", cmdLeave)
@@ -565,7 +650,7 @@ cmdPeers :: Command
cmdPeers = do
peers <- join $ asks ciPeers
set <- asks ciSetContextOptions
- set $ map (SelectedPeer . fst) peers
+ set WatchPeers $ map (SelectedPeer . fst) peers
forM_ (zip [1..] peers) $ \(i :: Int, (_, name)) -> do
cmdPutStrLn $ "[" ++ show i ++ "] " ++ name
@@ -577,11 +662,15 @@ cmdPeerAdd = void $ do
[hostname] -> return (hostname, show discoveryPort)
[] -> throwOtherError "missing peer address"
addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just hostname) (Just port)
+ contextOptsVar <- asks ciContextOptionsVar
+ liftIO $ modifyMVar_ contextOptsVar $ return . first (const $ Just WatchPeers)
liftIO $ serverPeer server (addrAddress addr)
cmdPeerAddPublic :: Command
cmdPeerAddPublic = do
server <- asks ciServer
+ contextOptsVar <- asks ciContextOptionsVar
+ liftIO $ modifyMVar_ contextOptsVar $ return . first (const $ Just WatchPeers)
liftIO $ mapM_ (serverPeer server . addrAddress) =<< gather 'a'
where
gather c
@@ -615,8 +704,7 @@ cmdJoin = joinChatroom =<< getSelectedChatroom
cmdJoinAs :: Command
cmdJoinAs = do
name <- asks ciLine
- st <- getStorage
- identity <- liftIO $ createIdentity st (Just $ T.pack name) Nothing
+ identity <- createIdentity (Just $ T.pack name) Nothing
joinChatroomAs identity =<< getSelectedChatroom
cmdLeave :: Command
@@ -628,38 +716,41 @@ cmdMembers = do
forM_ (chatroomMembers room) $ \x -> do
cmdPutStrLn $ maybe "<unnamed>" T.unpack $ idName x
+getContextByIndex :: (Maybe ContextWatchOptions -> Maybe ContextWatchOptions) -> Int -> CommandM CommandContext
+getContextByIndex f n = do
+ contextOptsVar <- asks ciContextOptionsVar
+ join $ liftIO $ modifyMVar contextOptsVar $ \cur@( watch, ctxs ) -> if
+ | n > 0, (ctx : _) <- drop (n - 1) ctxs
+ -> return ( ( f watch, ctxs ), return ctx )
+
+ | otherwise
+ -> return ( cur, throwOtherError "invalid index" )
cmdSelectContext :: Command
cmdSelectContext = do
n <- read <$> asks ciLine
- join (asks ciContextOptions) >>= \ctxs -> if
- | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do
- modify $ \s -> s { csContext = ctx }
- case ctx of
- SelectedChatroom rstate -> do
- when (not (roomStateSubscribe rstate)) $ do
- chatroomSetSubscribe (head $ roomStateData rstate) True
- _ -> return ()
- | otherwise -> throwOtherError "invalid index"
+ ctx <- getContextByIndex (const Nothing) n
+ modify $ \s -> s { csContext = ctx }
+ case ctx of
+ SelectedChatroom rstate -> do
+ when (not (roomStateSubscribe rstate)) $ do
+ chatroomSetSubscribe (head $ roomStateData rstate) True
+ _ -> return ()
cmdSend :: Command
cmdSend = void $ do
text <- asks ciLine
conv <- getSelectedConversation
- sendMessage conv (T.pack text) >>= \case
- Just msg -> do
- tzone <- liftIO $ getCurrentTimeZone
- cmdPutStrLn $ formatMessage tzone msg
- Nothing -> return ()
+ sendMessage conv (T.pack text)
cmdDelete :: Command
cmdDelete = void $ do
- deleteConversation =<< getSelectedConversation
+ deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext
modify $ \s -> s { csContext = NoContext }
cmdHistory :: Command
cmdHistory = void $ do
- conv <- getSelectedConversation
+ conv <- getConversationFromContext =<< getSelectedOrManualContext
case conversationHistory conv of
thread@(_:_) -> do
tzone <- liftIO $ getCurrentTimeZone
@@ -667,6 +758,24 @@ cmdHistory = void $ do
[] -> do
cmdPutStrLn $ "<empty history>"
+showIdentityDetails :: Foldable f => Identity f -> Text
+showIdentityDetails identity = T.unlines $ go $ reverse $ unfoldOwners identity
+ where
+ go (i : is) = concat
+ [ maybeToList $ ("Name: " <>) <$> idName i
+ , map (("Ref: " <>) . T.pack . show . refDigest . storedRef) $ idDataF i
+ , map (("ExtRef: " <>) . T.pack . show . refDigest . storedRef) $ filter isExtension $ idExtDataF i
+ , do guard $ not (null is)
+ "" : "Device:" : map (" " <>) (go is)
+ ]
+ go [] = []
+ isExtension x = case fromSigned x of BaseIdentityData {} -> False
+ _ -> True
+
+cmdIdentity :: Command
+cmdIdentity = do
+ cmdPutStrLn . T.unpack . showIdentityDetails . localIdentity . fromStored =<< getLocalHead
+
cmdUpdateIdentity :: Command
cmdUpdateIdentity = void $ do
term <- asks ciTerminal
@@ -681,8 +790,11 @@ cmdAttachAccept = attachAccept =<< getSelectedPeer
cmdAttachReject :: Command
cmdAttachReject = attachReject =<< getSelectedPeer
-watchChatroomsForCli :: (String -> IO ()) -> Head LocalState -> MVar (Set ChatroomState) -> MVar [ CommandContext ] -> Maybe Int -> IO WatchedHead
-watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe = do
+watchChatroomsForCli
+ :: Bool -> (String -> IO ()) -> Head LocalState -> MVar (Set ChatroomState)
+ -> MVar CommandContext -> MVar ( Maybe ContextWatchOptions, [ CommandContext ] )
+ -> Maybe Int -> IO WatchedHead
+watchChatroomsForCli tui eprint h chatroomSetVar contextVar contextOptsVar autoSubscribe = do
subscribedNumVar <- newEmptyMVar
let ctxUpdate updateType (idx :: Int) rstate = \case
@@ -721,28 +833,44 @@ watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe = do
Just diff -> do
modifyMVar_ chatroomSetVar $ return . const set
+ modifyMVar_ contextOptsVar $ \case
+ ( watch, clist )
+ | watch == Just WatchChatrooms || not tui
+ -> do
+ let upd c = \case
+ AddedChatroom rstate -> ctxUpdate "NEW" 1 rstate c
+ RemovedChatroom rstate -> ctxUpdate "DEL" 1 rstate c
+ UpdatedChatroom _ rstate
+ | any ((\rsd -> not (null (rsdRoom rsd))) . fromStored) (roomStateData rstate)
+ -> do
+ ctxUpdate "UPD" 1 rstate c
+ | otherwise -> return c
+ ( watch, ) <$> foldM upd clist diff
+ cur -> return cur
+
forM_ diff $ \case
AddedChatroom rstate -> do
- modifyMVar_ contextVar $ ctxUpdate "NEW" 1 rstate
modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then (+ 1) else id
RemovedChatroom rstate -> do
- modifyMVar_ contextVar $ ctxUpdate "DEL" 1 rstate
modifyMVar_ subscribedNumVar $ return . if roomStateSubscribe rstate then subtract 1 else id
UpdatedChatroom oldroom rstate -> do
- when (any ((\rsd -> not (null (rsdRoom rsd))) . fromStored) (roomStateData rstate)) $ do
- modifyMVar_ contextVar $ ctxUpdate "UPD" 1 rstate
when (any (not . null . rsdMessages . fromStored) (roomStateData rstate)) $ do
- tzone <- getCurrentTimeZone
- forM_ (reverse $ getMessagesSinceState rstate oldroom) $ \msg -> do
- eprint $ concat $
- [ maybe "<unnamed>" T.unpack $ roomName =<< cmsgRoom msg
- , formatTime defaultTimeLocale " [%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ cmsgTime msg
- , maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg
- , if cmsgLeave msg then " left" else ""
- , maybe (if cmsgLeave msg then "" else " joined") ((": " ++) . T.unpack) $ cmsgText msg
- ]
+ withMVar contextVar $ \ctx -> do
+ isSelected <- case ctx of
+ SelectedChatroom rstate' -> return $ isSameChatroom rstate' rstate
+ SelectedConversation conv -> return $ isChatroomStateConversation rstate conv
+ _ -> return False
+ when (not tui || isSelected) $ do
+ tzone <- getCurrentTimeZone
+ forM_ (reverse $ getMessagesSinceState rstate oldroom) $ \msg -> do
+ eprint $ concat $
+ [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ cmsgTime msg
+ , maybe "<unnamed>" T.unpack $ idName $ cmsgFrom msg
+ , if cmsgLeave msg then " left" else ""
+ , maybe (if cmsgLeave msg then "" else " joined") ((": " ++) . T.unpack) $ cmsgText msg
+ ]
modifyMVar_ subscribedNumVar $ return
. (if roomStateSubscribe rstate then (+ 1) else id)
. (if roomStateSubscribe oldroom then subtract 1 else id)
@@ -754,9 +882,11 @@ ensureWatchedChatrooms = do
eprint <- asks ciPrint
h <- gets csHead
chatroomSetVar <- asks ciChatroomSetVar
- contextVar <- asks ciContextOptionsVar
+ contextVar <- asks ciContextVar
+ contextOptsVar <- asks ciContextOptionsVar
autoSubscribe <- asks $ optChatroomAutoSubscribe . ciOptions
- watched <- liftIO $ watchChatroomsForCli eprint h chatroomSetVar contextVar autoSubscribe
+ tui <- asks $ hasTerminalUI . ciTerminal
+ watched <- liftIO $ watchChatroomsForCli tui eprint h chatroomSetVar contextVar contextOptsVar autoSubscribe
modify $ \s -> s { csWatchChatrooms = Just watched }
Just _ -> return ()
@@ -766,7 +896,7 @@ cmdChatrooms = do
chatroomSetVar <- asks ciChatroomSetVar
chatroomList <- filter (not . roomStateDeleted) . fromSetBy (comparing roomStateData) <$> liftIO (readMVar chatroomSetVar)
set <- asks ciSetContextOptions
- set $ map SelectedChatroom chatroomList
+ set WatchChatrooms $ map SelectedChatroom chatroomList
forM_ (zip [1..] chatroomList) $ \(i :: Int, rstate) -> do
cmdPutStrLn $ "[" ++ show i ++ "] " ++ maybe "<unnamed>" T.unpack (roomName =<< roomStateRoom rstate)
@@ -780,6 +910,8 @@ cmdChatroomCreatePublic = do
getInputLine term $ KeepPrompt . maybe T.empty T.pack
ensureWatchedChatrooms
+ contextOptsVar <- asks ciContextOptionsVar
+ liftIO $ modifyMVar_ contextOptsVar $ return . first (const $ Just WatchChatrooms)
void $ createChatroom
(if T.null name then Nothing else Just name)
Nothing
@@ -792,7 +924,7 @@ cmdContacts = do
let contacts = fromSetBy (comparing contactName) $ lookupSharedValue $ lsShared $ headObject ehead
verbose = "-v" `elem` args
set <- asks ciSetContextOptions
- set $ map SelectedContact contacts
+ set WatchContacts $ map SelectedContact contacts
forM_ (zip [1..] contacts) $ \(i :: Int, c) -> do
cmdPutStrLn $ T.unpack $ T.concat
[ "[", T.pack (show i), "] ", contactName c
@@ -818,19 +950,20 @@ cmdConversations :: Command
cmdConversations = do
conversations <- lookupConversations
set <- asks ciSetContextOptions
- set $ map SelectedConversation conversations
+ set WatchConversations $ map SelectedConversation conversations
forM_ (zip [1..] conversations) $ \(i :: Int, conv) -> do
cmdPutStrLn $ "[" ++ show i ++ "] " ++ T.unpack (conversationName conv)
cmdDetails :: Command
cmdDetails = do
- gets csContext >>= \case
+ getSelectedOrManualContext >>= \case
SelectedPeer peer -> do
+ paddr <- getPeerAddress peer
cmdPutStrLn $ unlines
[ "Network peer:"
- , " " <> show (peerAddress peer)
+ , " " <> show paddr
]
- peerIdentity peer >>= \case
+ getPeerIdentity peer >>= \case
PeerIdentityUnknown _ -> do
cmdPutStrLn $ "unknown identity"
PeerIdentityRef wref _ -> do
@@ -885,106 +1018,13 @@ cmdDetails = do
, map (BC.unpack . showRefDigest . refDigest . storedRef) $ idExtDataF cpid
]
-cmdDiscoveryInit :: Command
-cmdDiscoveryInit = void $ do
- server <- asks ciServer
-
- (hostname, port) <- (words <$> asks ciLine) >>= return . \case
- hostname:p:_ -> (hostname, p)
- [hostname] -> (hostname, show discoveryPort)
- [] -> ("discovery.erebosprotocol.net", show discoveryPort)
- addr:_ <- liftIO $ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just hostname) (Just port)
- peer <- liftIO $ serverPeer server (addrAddress addr)
- sendToPeer peer $ DiscoverySelf [ T.pack "ICE" ] Nothing
- modify $ \s -> s { csIcePeer = Just peer }
-
cmdDiscovery :: Command
cmdDiscovery = void $ do
- Just peer <- gets csIcePeer
- st <- getStorage
- sref <- asks ciLine
- eprint <- asks ciPrint
- liftIO $ readRef st (BC.pack sref) >>= \case
- Nothing -> error "ref does not exist"
- Just ref -> do
- res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref
- case res of
- Right _ -> return ()
- Left err -> eprint err
-
-#ifdef ENABLE_ICE_SUPPORT
-
-cmdIceCreate :: Command
-cmdIceCreate = do
- let getRole = \case
- 'm':_ -> PjIceSessRoleControlling
- 's':_ -> PjIceSessRoleControlled
- _ -> PjIceSessRoleUnknown
-
- ( role, stun, turn ) <- asks (words . ciLine) >>= \case
- [] -> return ( PjIceSessRoleControlling, Nothing, Nothing )
- [ role ] -> return
- ( getRole role, Nothing, Nothing )
- [ role, server ] -> return
- ( getRole role
- , Just ( T.pack server, 0 )
- , Just ( T.pack server, 0 )
- )
- [ role, server, port ] -> return
- ( getRole role
- , Just ( T.pack server, read port )
- , Just ( T.pack server, read port )
- )
- [ role, stunServer, stunPort, turnServer, turnPort ] -> return
- ( getRole role
- , Just ( T.pack stunServer, read stunPort )
- , Just ( T.pack turnServer, read turnPort )
- )
- _ -> throwOtherError "invalid parameters"
-
- eprint <- asks ciPrint
- Just cfg <- liftIO $ iceCreateConfig stun turn
- sess <- liftIO $ iceCreateSession cfg role $ eprint <=< iceShow
- modify $ \s -> s { csIceSessions = sess : csIceSessions s }
-
-cmdIceDestroy :: Command
-cmdIceDestroy = do
- s:ss <- gets csIceSessions
- modify $ \st -> st { csIceSessions = ss }
- liftIO $ iceDestroy s
-
-cmdIceShow :: Command
-cmdIceShow = do
- sess <- gets csIceSessions
- eprint <- asks ciPrint
- liftIO $ forM_ (zip [1::Int ..] sess) $ \(i, s) -> do
- eprint $ "[" ++ show i ++ "]"
- eprint =<< iceShow s
-
-cmdIceConnect :: Command
-cmdIceConnect = do
- s:_ <- gets csIceSessions
server <- asks ciServer
- term <- asks ciTerminal
- let loadInfo =
- getInputLine term (KeepPrompt . maybe BC.empty BC.pack) >>= \case
- line | BC.null line -> return []
- | otherwise -> (line :) <$> loadInfo
- Right remote <- liftIO $ do
- st <- memoryStorage
- pst <- derivePartialStorage st
- setPrompt term ""
- rbytes <- (BL.fromStrict . BC.unlines) <$> loadInfo
- copyRef st =<< storeRawBytes pst (BL.fromChunks [ BC.pack "rec ", BC.pack (show (BL.length rbytes)), BC.singleton '\n' ] `BL.append` rbytes)
- liftIO $ iceConnect s (load remote) $ void $ serverPeerIce server s
-
-cmdIceSend :: Command
-cmdIceSend = void $ do
- s:_ <- gets csIceSessions
- server <- asks ciServer
- liftIO $ serverPeerIce server s
-
-#endif
+ sref <- asks ciLine
+ case readRefDigest (BC.pack sref) of
+ Nothing -> throwOtherError "failed to parse ref"
+ Just dgst -> discoverySearch server dgst
cmdQuit :: Command
cmdQuit = modify $ \s -> s { csQuit = True }
diff --git a/main/State.hs b/main/State.hs
index d357844..5d66ba9 100644
--- a/main/State.hs
+++ b/main/State.hs
@@ -1,15 +1,17 @@
module State (
loadLocalStateHead,
+ createLocalStateHead,
updateSharedIdentity,
interactiveIdentityUpdate,
) where
+import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import Data.Foldable
-import Data.Maybe
import Data.Proxy
+import Data.Text (Text)
import Data.Text qualified as T
import Erebos.Error
@@ -22,34 +24,67 @@ import Erebos.Storage
import Terminal
-loadLocalStateHead :: MonadIO m => Terminal -> Storage -> m (Head LocalState)
-loadLocalStateHead term st = loadHeads st >>= \case
- (h:_) -> return h
- [] -> liftIO $ do
- setPrompt term "Name: "
- name <- getInputLine term $ KeepPrompt . maybe T.empty T.pack
+loadLocalStateHead
+ :: (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m)
+ => Terminal -> m (Head LocalState)
+loadLocalStateHead term = getStorage >>= loadHeads >>= \case
+ (h : _) -> return h
+ [] -> do
+ name <- liftIO $ do
+ setPrompt term "Name: "
+ getInputLine term $ KeepPrompt . maybe T.empty T.pack
- setPrompt term "Device: "
- devName <- getInputLine term $ KeepPrompt . maybe T.empty T.pack
+ devName <- liftIO $ do
+ setPrompt term "Device: "
+ getInputLine term $ KeepPrompt . maybe T.empty T.pack
- owner <- if
- | T.null name -> return Nothing
- | otherwise -> Just <$> createIdentity st (Just name) Nothing
+ ( owner, shared ) <- if
+ | T.null name -> do
+ return ( Nothing, [] )
+ | otherwise -> do
+ owner <- createIdentity (Just name) Nothing
+ shared <- mstore SharedState
+ { ssPrev = []
+ , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy
+ , ssValue = [ storedRef $ idExtData owner ]
+ }
+ return ( Just owner, [ shared ] )
- identity <- createIdentity st (if T.null devName then Nothing else Just devName) owner
+ identity <- createIdentity (if T.null devName then Nothing else Just devName) owner
- shared <- wrappedStore st $ SharedState
- { ssPrev = []
- , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy
- , ssValue = [ storedRef $ idExtData $ fromMaybe identity owner ]
- }
+ st <- getStorage
storeHead st $ LocalState
{ lsPrev = Nothing
, lsIdentity = idExtData identity
- , lsShared = [ shared ]
+ , lsShared = shared
, lsOther = []
}
+createLocalStateHead
+ :: (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m)
+ => [ Maybe Text ] -> m (Head LocalState)
+createLocalStateHead [] = throwOtherError "createLocalStateHead: empty name list"
+createLocalStateHead ( ownerName : names ) = do
+ owner <- createIdentity ownerName Nothing
+ identity <- foldM createSingleIdentity owner names
+ shared <- case names of
+ [] -> return []
+ _ : _ -> do
+ fmap (: []) $ mstore SharedState
+ { ssPrev = []
+ , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy
+ , ssValue = [ storedRef $ idExtData owner ]
+ }
+ st <- getStorage
+ storeHead st $ LocalState
+ { lsPrev = Nothing
+ , lsIdentity = idExtData identity
+ , lsShared = shared
+ , lsOther = []
+ }
+ where
+ createSingleIdentity owner name = createIdentity name (Just owner)
+
updateSharedIdentity :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Terminal -> m ()
updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case
@@ -58,9 +93,8 @@ updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case
Nothing -> throwOtherError "no existing shared identity"
interactiveIdentityUpdate :: (Foldable f, MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => Terminal -> Identity f -> m UnifiedIdentity
-interactiveIdentityUpdate term identity = do
- let public = idKeyIdentity identity
-
+interactiveIdentityUpdate term fidentity = do
+ identity <- mergeIdentity fidentity
name <- liftIO $ do
setPrompt term $ T.unpack $ T.concat $ concat
[ [ T.pack "Name" ]
@@ -71,11 +105,11 @@ interactiveIdentityUpdate term identity = do
]
getInputLine term $ KeepPrompt . maybe T.empty T.pack
- if | T.null name -> mergeIdentity identity
+ if | T.null name -> return identity
| otherwise -> do
- secret <- loadKey public
- maybe (throwOtherError "created invalid identity") return . validateIdentity =<<
- mstore =<< sign secret =<< mstore (emptyIdentityData public)
- { iddPrev = toList $ idDataF identity
- , iddName = Just name
+ secret <- loadKey $ idKeyIdentity identity
+ maybe (throwOtherError "created invalid identity") return . validateExtendedIdentity =<<
+ mstore =<< sign secret =<< mstore . ExtendedIdentityData =<< return (emptyIdentityExtension $ idData identity)
+ { idePrev = toList $ idExtDataF identity
+ , ideName = Just name
}
diff --git a/main/Terminal.hs b/main/Terminal.hs
index 5dc3612..b8b953f 100644
--- a/main/Terminal.hs
+++ b/main/Terminal.hs
@@ -31,8 +31,9 @@ import Data.List
import Data.Text (Text)
import Data.Text qualified as T
-import System.IO
import System.Console.ANSI
+import System.IO
+import System.IO.Error
data Terminal = Terminal
@@ -43,14 +44,20 @@ data Terminal = Terminal
, termShowPrompt :: TVar Bool
, termInput :: TVar ( String, String )
, termBottomLines :: TVar [ String ]
+ , termHistory :: TVar [ String ]
+ , termHistoryPos :: TVar Int
+ , termHistoryStash :: TVar ( String, String )
}
data TerminalLine = TerminalLine
{ tlTerminal :: Terminal
+ , tlLineCount :: Int
}
data Input
= InputChar Char
+ | InputMoveUp
+ | InputMoveDown
| InputMoveRight
| InputMoveLeft
| InputMoveEnd
@@ -83,6 +90,9 @@ initTerminal termCompletionFunc = do
termShowPrompt <- newTVarIO False
termInput <- newTVarIO ( "", "" )
termBottomLines <- newTVarIO []
+ termHistory <- newTVarIO []
+ termHistoryPos <- newTVarIO 0
+ termHistoryStash <- newTVarIO ( "", "" )
return Terminal {..}
bracketSet :: IO a -> (a -> IO b) -> a -> IO c -> IO c
@@ -107,10 +117,12 @@ termPutStr Terminal {..} str = do
getInput :: IO Input
getInput = do
- getChar >>= \case
+ handleJust (guard . isEOFError) (\() -> return InputEnd) $ getChar >>= \case
'\ESC' -> do
esc <- readEsc
case parseEsc esc of
+ Just ( 'A' , [] ) -> return InputMoveUp
+ Just ( 'B' , [] ) -> return InputMoveDown
Just ( 'C' , [] ) -> return InputMoveRight
Just ( 'D' , [] ) -> return InputMoveLeft
_ -> return (InputEscape esc)
@@ -118,6 +130,8 @@ getInput = do
'\DEL' -> return InputBackspace
'\NAK' -> return InputClear
'\ETB' -> return InputBackWord
+ '\DLE' -> return InputMoveUp
+ '\SO' -> return InputMoveDown
'\SOH' -> return InputMoveStart
'\ENQ' -> return InputMoveEnd
'\EOT' -> return InputEnd
@@ -135,19 +149,33 @@ getInput = do
getInputLine :: Terminal -> (Maybe String -> InputHandling a) -> IO a
getInputLine term@Terminal {..} handleResult = do
- withMVar termLock $ \_ -> do
- prompt <- atomically $ do
- writeTVar termShowPrompt True
- readTVar termPrompt
- putStr $ prompt <> "\ESC[K"
- drawBottomLines term
- hFlush stdout
- (handleResult <$> go) >>= \case
+ when termAnsi $ do
+ withMVar termLock $ \_ -> do
+ prompt <- atomically $ do
+ writeTVar termShowPrompt True
+ readTVar termPrompt
+ putStr $ prompt <> "\ESC[K"
+ drawBottomLines term
+ hFlush stdout
+
+ mbLine <- go
+ forM_ mbLine $ \line -> do
+ let addLine xs
+ | null line = xs
+ | (x : _) <- xs, x == line = xs
+ | otherwise = line : xs
+ atomically $ do
+ writeTVar termHistory . addLine =<< readTVar termHistory
+ writeTVar termHistoryPos 0
+
+ case handleResult mbLine of
KeepPrompt x -> do
- termPutStr term "\n\ESC[J"
+ when termAnsi $ do
+ termPutStr term "\n\ESC[J"
return x
ErasePrompt x -> do
- termPutStr term "\r\ESC[J"
+ when termAnsi $ do
+ termPutStr term "\r\ESC[J"
return x
where
go = getInput >>= \case
@@ -155,11 +183,12 @@ getInputLine term@Terminal {..} handleResult = do
atomically $ do
( pre, post ) <- readTVar termInput
writeTVar termInput ( "", "" )
- writeTVar termShowPrompt False
- writeTVar termBottomLines []
+ when termAnsi $ do
+ writeTVar termShowPrompt False
+ writeTVar termBottomLines []
return $ Just $ pre ++ post
- InputChar '\t' -> do
+ InputChar '\t' | termAnsi -> do
options <- withMVar termLock $ const $ do
( pre, post ) <- atomically $ readTVar termInput
let updatePrompt pre' = do
@@ -178,9 +207,11 @@ getInputLine term@Terminal {..} handleResult = do
( unused, completions@(c : cs) ) -> do
let commonPrefixes' x y = fmap (\( common, _, _ ) -> common) $ T.commonPrefixes x y
case foldl' (\mbcommon cur -> commonPrefixes' cur =<< mbcommon) (Just $ replacement c) (fmap replacement cs) of
- Just common -> updatePrompt $ T.unpack unused ++ T.unpack common
- Nothing -> return ()
- return $ map replacement completions
+ Just common | T.unpack common /= pre -> do
+ updatePrompt $ T.unpack unused ++ T.unpack common
+ return []
+ _ -> do
+ return $ map replacement completions
( _, [] ) -> do
return []
@@ -195,6 +226,37 @@ getInputLine term@Terminal {..} handleResult = do
InputChar _ -> go
+ InputMoveUp -> withInput $ \prepost -> do
+ hist <- readTVar termHistory
+ pos <- readTVar termHistoryPos
+ case drop pos hist of
+ ( h : _ ) -> do
+ when (pos == 0) $ do
+ writeTVar termHistoryStash prepost
+ writeTVar termHistoryPos (pos + 1)
+ writeTVar termInput ( h, "" )
+ ("\r\ESC[K" <>) <$> getCurrentPromptLine term
+ [] -> do
+ return ""
+
+ InputMoveDown -> withInput $ \_ -> do
+ readTVar termHistoryPos >>= \case
+ 0 -> do
+ return ""
+ 1 -> do
+ writeTVar termHistoryPos 0
+ writeTVar termInput =<< readTVar termHistoryStash
+ ("\r\ESC[K" <>) <$> getCurrentPromptLine term
+ pos -> do
+ writeTVar termHistoryPos (pos - 1)
+ hist <- readTVar termHistory
+ case drop (pos - 2) hist of
+ ( h : _ ) -> do
+ writeTVar termInput ( h, "" )
+ ("\r\ESC[K" <>) <$> getCurrentPromptLine term
+ [] -> do
+ return ""
+
InputMoveRight -> withInput $ \case
( pre, c : post ) -> do
writeTVar termInput ( pre ++ [ c ], post )
@@ -240,7 +302,7 @@ getInputLine term@Terminal {..} handleResult = do
withInput f = do
withMVar termLock $ const $ do
str <- atomically $ f =<< readTVar termInput
- when (not $ null str) $ do
+ when (termAnsi && not (null str)) $ do
putStr str
hFlush stdout
go
@@ -253,6 +315,8 @@ getCurrentPromptLine Terminal {..} = do
return $ prompt <> pre <> "\ESC[s" <> post <> "\ESC[u"
setPrompt :: Terminal -> String -> IO ()
+setPrompt Terminal { termAnsi = False } _ = do
+ return ()
setPrompt term@Terminal {..} prompt = do
withMVar termLock $ \_ -> do
join $ atomically $ do
@@ -268,17 +332,26 @@ setPrompt term@Terminal {..} prompt = do
printLine :: Terminal -> String -> IO TerminalLine
printLine tlTerminal@Terminal {..} str = do
withMVar termLock $ \_ -> do
- promptLine <- atomically $ do
- readTVar termShowPrompt >>= \case
- True -> getCurrentPromptLine tlTerminal
- False -> return ""
- putStr $ "\r\ESC[K" <> str <> "\n\ESC[K" <> promptLine
- drawBottomLines tlTerminal
+ let strLines = lines str
+ tlLineCount = length strLines
+ if termAnsi
+ then do
+ promptLine <- atomically $ do
+ readTVar termShowPrompt >>= \case
+ True -> getCurrentPromptLine tlTerminal
+ False -> return ""
+ putStr $ "\r\ESC[K" <> unlines strLines <> "\ESC[K" <> promptLine
+ drawBottomLines tlTerminal
+ else do
+ putStr $ unlines strLines
+
hFlush stdout
return TerminalLine {..}
printBottomLines :: Terminal -> String -> IO ()
+printBottomLines Terminal { termAnsi = False } _ = do
+ return ()
printBottomLines term@Terminal {..} str = do
case lines str of
[] -> clearBottomLines term
@@ -289,6 +362,8 @@ printBottomLines term@Terminal {..} str = do
hFlush stdout
clearBottomLines :: Terminal -> IO ()
+clearBottomLines Terminal { termAnsi = False } = do
+ return ()
clearBottomLines Terminal {..} = do
withMVar termLock $ \_ -> do
atomically (readTVar termBottomLines) >>= \case
diff --git a/main/Test.hs b/main/Test.hs
index e54285a..c3dca14 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -26,7 +26,7 @@ import Data.Text qualified as T
import Data.Text.Encoding
import Data.Text.IO qualified as T
import Data.Typeable
-import Data.UUID qualified as U
+import Data.UUID.Types qualified as U
import Network.Socket
@@ -44,6 +44,7 @@ import Erebos.Object
import Erebos.Pairing
import Erebos.PubKey
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Set
import Erebos.State
import Erebos.Storable
@@ -66,10 +67,17 @@ data TestState = TestState
data RunningServer = RunningServer
{ rsServer :: Server
- , rsPeers :: MVar (Int, [(Int, Peer)])
+ , rsPeers :: MVar ( Int, [ TestPeer ] )
, rsPeerThread :: ThreadId
}
+data TestPeer = TestPeer
+ { tpIndex :: Int
+ , tpPeer :: Peer
+ , tpStreamReaders :: MVar [ (Int, StreamReader ) ]
+ , tpStreamWriters :: MVar [ (Int, StreamWriter ) ]
+ }
+
initTestState :: TestState
initTestState = TestState
{ tsHead = Nothing
@@ -137,17 +145,20 @@ cmdOut line = do
getPeer :: Text -> CommandM Peer
-getPeer spidx = do
+getPeer spidx = tpPeer <$> getTestPeer spidx
+
+getTestPeer :: Text -> CommandM TestPeer
+getTestPeer spidx = do
Just RunningServer {..} <- gets tsServer
- Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers)
+ Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers)
return peer
-getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int
+getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int
getPeerIndex pmvar = do
peer <- asks svcPeer
- maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar)
+ maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar)
-pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a
+pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a
pairingAttributes _ out peers prefix = PairingAttributes
{ pairingHookRequest = return ()
@@ -216,14 +227,21 @@ directMessageAttributes out = DirectMessageAttributes
{ dmOwnerMismatch = afterCommit $ outLine out "dm-owner-mismatch"
}
-dmReceivedWatcher :: Output -> Stored DirectMessage -> IO ()
-dmReceivedWatcher out smsg = do
- let msg = fromStored smsg
- outLine out $ unwords
- [ "dm-received"
- , "from", maybe "<unnamed>" T.unpack $ idName $ msgFrom msg
- , "text", T.unpack $ msgText msg
- ]
+discoveryAttributes :: DiscoveryAttributes
+discoveryAttributes = (defaultServiceAttributes Proxy)
+ { discoveryProvideTunnel = \_ _ -> False
+ }
+
+dmThreadWatcher :: ComposedIdentity -> Output -> DirectMessageThread -> DirectMessageThread -> IO ()
+dmThreadWatcher self out prev cur = do
+ forM_ (reverse $ dmThreadToListSince prev cur) $ \msg -> do
+ outLine out $ unwords
+ [ if sameIdentity self (msgFrom msg)
+ then "dm-sent"
+ else "dm-received"
+ , "from", maybe "<unnamed>" T.unpack $ idName $ msgFrom msg
+ , "text", T.unpack $ msgText msg
+ ]
newtype CommandM a = CommandM (ReaderT TestInput (StateT TestState (ExceptT ErebosError IO)) a)
@@ -247,60 +265,66 @@ instance MonadHead LocalState CommandM where
type Command = CommandM ()
-commands :: [(Text, Command)]
-commands = map (T.pack *** id)
- [ ("store", cmdStore)
- , ("load", cmdLoad)
- , ("stored-generation", cmdStoredGeneration)
- , ("stored-roots", cmdStoredRoots)
- , ("stored-set-add", cmdStoredSetAdd)
- , ("stored-set-list", cmdStoredSetList)
- , ("head-create", cmdHeadCreate)
- , ("head-replace", cmdHeadReplace)
- , ("head-watch", cmdHeadWatch)
- , ("head-unwatch", cmdHeadUnwatch)
- , ("create-identity", cmdCreateIdentity)
- , ("identity-info", cmdIdentityInfo)
- , ("start-server", cmdStartServer)
- , ("stop-server", cmdStopServer)
- , ("peer-add", cmdPeerAdd)
- , ("peer-drop", cmdPeerDrop)
- , ("peer-list", cmdPeerList)
- , ("test-message-send", cmdTestMessageSend)
- , ("local-state-get", cmdLocalStateGet)
- , ("local-state-replace", cmdLocalStateReplace)
- , ("local-state-wait", cmdLocalStateWait)
- , ("shared-state-get", cmdSharedStateGet)
- , ("shared-state-wait", cmdSharedStateWait)
- , ("watch-local-identity", cmdWatchLocalIdentity)
- , ("watch-shared-identity", cmdWatchSharedIdentity)
- , ("update-local-identity", cmdUpdateLocalIdentity)
- , ("update-shared-identity", cmdUpdateSharedIdentity)
- , ("attach-to", cmdAttachTo)
- , ("attach-accept", cmdAttachAccept)
- , ("attach-reject", cmdAttachReject)
- , ("contact-request", cmdContactRequest)
- , ("contact-accept", cmdContactAccept)
- , ("contact-reject", cmdContactReject)
- , ("contact-list", cmdContactList)
- , ("contact-set-name", cmdContactSetName)
- , ("dm-send-peer", cmdDmSendPeer)
- , ("dm-send-contact", cmdDmSendContact)
- , ("dm-list-peer", cmdDmListPeer)
- , ("dm-list-contact", cmdDmListContact)
- , ("chatroom-create", cmdChatroomCreate)
- , ("chatroom-delete", cmdChatroomDelete)
- , ("chatroom-list-local", cmdChatroomListLocal)
- , ("chatroom-watch-local", cmdChatroomWatchLocal)
- , ("chatroom-set-name", cmdChatroomSetName)
- , ("chatroom-subscribe", cmdChatroomSubscribe)
- , ("chatroom-unsubscribe", cmdChatroomUnsubscribe)
- , ("chatroom-members", cmdChatroomMembers)
- , ("chatroom-join", cmdChatroomJoin)
- , ("chatroom-join-as", cmdChatroomJoinAs)
- , ("chatroom-leave", cmdChatroomLeave)
- , ("chatroom-message-send", cmdChatroomMessageSend)
- , ("discovery-connect", cmdDiscoveryConnect)
+commands :: [ ( Text, Command ) ]
+commands =
+ [ ( "store", cmdStore )
+ , ( "load", cmdLoad )
+ , ( "stored-generation", cmdStoredGeneration )
+ , ( "stored-roots", cmdStoredRoots )
+ , ( "stored-set-add", cmdStoredSetAdd )
+ , ( "stored-set-list", cmdStoredSetList )
+ , ( "stored-difference", cmdStoredDifference )
+ , ( "head-create", cmdHeadCreate )
+ , ( "head-replace", cmdHeadReplace )
+ , ( "head-watch", cmdHeadWatch )
+ , ( "head-unwatch", cmdHeadUnwatch )
+ , ( "create-identity", cmdCreateIdentity )
+ , ( "identity-info", cmdIdentityInfo )
+ , ( "start-server", cmdStartServer )
+ , ( "stop-server", cmdStopServer )
+ , ( "peer-add", cmdPeerAdd )
+ , ( "peer-drop", cmdPeerDrop )
+ , ( "peer-list", cmdPeerList )
+ , ( "test-message-send", cmdTestMessageSend )
+ , ( "test-stream-open", cmdTestStreamOpen )
+ , ( "test-stream-close", cmdTestStreamClose )
+ , ( "test-stream-send", cmdTestStreamSend )
+ , ( "local-state-get", cmdLocalStateGet )
+ , ( "local-state-replace", cmdLocalStateReplace )
+ , ( "local-state-wait", cmdLocalStateWait )
+ , ( "shared-state-get", cmdSharedStateGet )
+ , ( "shared-state-wait", cmdSharedStateWait )
+ , ( "watch-local-identity", cmdWatchLocalIdentity )
+ , ( "watch-shared-identity", cmdWatchSharedIdentity )
+ , ( "update-local-identity", cmdUpdateLocalIdentity )
+ , ( "update-shared-identity", cmdUpdateSharedIdentity )
+ , ( "attach-to", cmdAttachTo )
+ , ( "attach-accept", cmdAttachAccept )
+ , ( "attach-reject", cmdAttachReject )
+ , ( "contact-request", cmdContactRequest )
+ , ( "contact-accept", cmdContactAccept )
+ , ( "contact-reject", cmdContactReject )
+ , ( "contact-list", cmdContactList )
+ , ( "contact-set-name", cmdContactSetName )
+ , ( "dm-send-peer", cmdDmSendPeer )
+ , ( "dm-send-contact", cmdDmSendContact )
+ , ( "dm-send-identity", cmdDmSendIdentity )
+ , ( "dm-list-peer", cmdDmListPeer )
+ , ( "dm-list-contact", cmdDmListContact )
+ , ( "chatroom-create", cmdChatroomCreate )
+ , ( "chatroom-delete", cmdChatroomDelete )
+ , ( "chatroom-list-local", cmdChatroomListLocal )
+ , ( "chatroom-watch-local", cmdChatroomWatchLocal )
+ , ( "chatroom-set-name", cmdChatroomSetName )
+ , ( "chatroom-subscribe", cmdChatroomSubscribe )
+ , ( "chatroom-unsubscribe", cmdChatroomUnsubscribe )
+ , ( "chatroom-members", cmdChatroomMembers )
+ , ( "chatroom-join", cmdChatroomJoin )
+ , ( "chatroom-join-as", cmdChatroomJoinAs )
+ , ( "chatroom-leave", cmdChatroomLeave )
+ , ( "chatroom-message-send", cmdChatroomMessageSend )
+ , ( "discovery-connect", cmdDiscoveryConnect )
+ , ( "discovery-tunnel", cmdDiscoveryTunnel )
]
cmdStore :: Command
@@ -354,7 +378,7 @@ cmdStoredSetAdd = do
[Just iref, Just sref] -> return (wrappedLoad iref, loadSet @[Stored Object] sref)
[Just iref] -> return (wrappedLoad iref, emptySet)
_ -> fail "unexpected parameters"
- set' <- storeSetAdd st [item] set
+ set' <- storeSetAdd [ item ] set
cmdOut $ "stored-set-add" ++ concatMap ((' ':) . show . refDigest . storedRef) (toComponents set')
cmdStoredSetList :: Command
@@ -367,6 +391,19 @@ cmdStoredSetList = do
cmdOut $ "stored-set-item" ++ concatMap ((' ':) . show . refDigest . storedRef) item
cmdOut $ "stored-set-done"
+cmdStoredDifference :: Command
+cmdStoredDifference = do
+ st <- asks tiStorage
+ ( trefs1, "|" : trefs2 ) <- span (/= "|") <$> asks tiParams
+
+ let loadObjs = mapM (maybe (fail "invalid ref") (return . wrappedLoad @Object) <=< liftIO . readRef st . encodeUtf8)
+ objs1 <- loadObjs trefs1
+ objs2 <- loadObjs trefs2
+
+ forM_ (storedDifference objs1 objs2) $ \item -> do
+ cmdOut $ "stored-difference-item " ++ (show $ refDigest $ storedRef item)
+ cmdOut $ "stored-difference-done"
+
cmdHeadCreate :: Command
cmdHeadCreate = do
[ ttid, tref ] <- asks tiParams
@@ -421,7 +458,8 @@ cmdHeadUnwatch = do
initTestHead :: Head LocalState -> Command
initTestHead h = do
- _ <- liftIO . watchReceivedMessages h . dmReceivedWatcher =<< asks tiOutput
+ let self = finalOwner $ headLocalIdentity h
+ _ <- liftIO . watchDirectMessageThreads h . dmThreadWatcher self =<< asks tiOutput
modify $ \s -> s { tsHead = Just h }
loadTestHead :: CommandM (Head LocalState)
@@ -444,13 +482,13 @@ cmdCreateIdentity = do
st <- asks tiStorage
names <- asks tiParams
- h <- liftIO $ do
+ h <- do
Just identity <- if null names
- then Just <$> createIdentity st Nothing Nothing
- else foldrM (\n o -> Just <$> createIdentity st (Just n) o) Nothing names
+ then Just <$> createIdentity Nothing Nothing
+ else foldrM (\n o -> Just <$> createIdentity (Just n) o) Nothing names
shared <- case names of
- _:_:_ -> (:[]) <$> makeSharedStateUpdate st (Just $ finalOwner identity) []
+ _:_:_ -> (: []) <$> makeSharedStateUpdate (Just $ finalOwner identity) []
_ -> return []
storeHead st $ LocalState
@@ -483,42 +521,78 @@ cmdStartServer = do
let parseParams = \case
(name : value : rest)
- | name == "services" -> T.splitOn "," value
+ | name == "services" -> second ( map splitServiceParams (T.splitOn "," value) ++ ) (parseParams rest)
+ (name : rest)
+ | name == "test-log" -> first (\o -> o { serverTestLog = True }) (parseParams rest)
| otherwise -> parseParams rest
- _ -> []
- serviceNames <- parseParams <$> asks tiParams
+ _ -> ( defaultServerOptions { serverErrorPrefix = "server-error-message " }, [] )
+
+ splitServiceParams svc =
+ case T.splitOn ":" svc of
+ name : params -> ( name, params )
+ _ -> ( svc, [] )
+
+ ( serverOptions, serviceNames ) <- parseParams <$> asks tiParams
h <- getOrLoadHead
rsPeers <- liftIO $ newMVar (1, [])
services <- forM serviceNames $ \case
- "attach" -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach"
- "chatroom" -> return $ someService @ChatroomService Proxy
- "contact" -> return $ someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact"
- "discovery" -> return $ someService @DiscoveryService Proxy
- "dm" -> return $ someServiceAttr $ directMessageAttributes out
- "sync" -> return $ someService @SyncService Proxy
- "test" -> return $ someServiceAttr $ (defaultServiceAttributes Proxy)
+ ( "attach", _ ) -> return $ someServiceAttr $ pairingAttributes (Proxy @AttachService) out rsPeers "attach"
+ ( "chatroom", _ ) -> return $ someService @ChatroomService Proxy
+ ( "contact", _ ) -> return $ someServiceAttr $ pairingAttributes (Proxy @ContactService) out rsPeers "contact"
+ ( "discovery", params ) -> return $ someServiceAttr $ discoveryAttributes
+ { discoveryProvideTunnel = \_ _ -> "tunnel" `elem` params
+ }
+ ( "dm", _ ) -> return $ someServiceAttr $ directMessageAttributes out
+ ( "sync", _ ) -> return $ someService @SyncService Proxy
+ ( "test", _ ) -> return $ someServiceAttr $ (defaultServiceAttributes Proxy)
{ testMessageReceived = \obj otype len sref -> do
liftIO $ do
void $ store (headStorage h) obj
- outLine out $ unwords ["test-message-received", otype, len, sref]
+ outLine out $ unwords [ "test-message-received", otype, len, sref ]
+ , testStreamsReceived = \streams -> do
+ pidx <- getPeerIndex rsPeers
+ liftIO $ do
+ nums <- mapM getStreamReaderNumber streams
+ outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums
+ forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do
+ let go = readStreamPacket stream >>= \case
+ StreamData seqNum bytes -> do
+ outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ]
+ go
+ StreamClosed seqNum -> do
+ outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ]
+ go
}
- sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'"
+ ( sname, _ ) -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'"
- rsServer <- liftIO $ startServer defaultServerOptions h (B.hPutStr stderr . (`BC.snoc` '\n') . BC.pack) services
+ let logPrint str = do BC.hPutStrLn stdout (BC.pack str)
+ hFlush stdout
+ rsServer <- liftIO $ startServer serverOptions h logPrint services
rsPeerThread <- liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange rsServer
- let printPeer (idx, p) = do
- params <- peerIdentity p >>= return . \case
- PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
- _ -> [ "addr", show (peerAddress p) ]
- outLine out $ unwords $ [ "peer", show idx ] ++ params
-
- update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)])
- update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur
- | otherwise = fmap (p:) <$> update (nid, ps)
+ let printPeer TestPeer {..} = do
+ params <- getPeerIdentity tpPeer >>= \case
+ PeerIdentityFull pid -> do
+ return $ ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
+ _ -> do
+ paddr <- getPeerAddress tpPeer
+ return $ [ "addr", show paddr ]
+ outLine out $ unwords $ [ "peer", show tpIndex ] ++ params
+
+ update ( tpIndex, [] ) = do
+ tpPeer <- return peer
+ tpStreamReaders <- newMVar []
+ tpStreamWriters <- newMVar []
+ let tp = TestPeer {..}
+ printPeer tp
+ return ( tpIndex + 1, [ tp ] )
+
+ update cur@( nid, p : ps )
+ | tpPeer p == peer = printPeer p >> return cur
+ | otherwise = fmap (p :) <$> update ( nid, ps )
modifyMVar_ rsPeers update
@@ -555,11 +629,12 @@ cmdPeerList = do
peers <- liftIO $ getCurrentPeerList rsServer
tpeers <- liftIO $ readMVar rsPeers
forM_ peers $ \peer -> do
- Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers
- mbpid <- peerIdentity peer
+ Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers
+ mbpid <- getPeerIdentity peer
+ paddr <- getPeerAddress peer
cmdOut $ unwords $ concat
- [ [ "peer-list-item", show n ]
- , [ "addr", show (peerAddress peer) ]
+ [ [ "peer-list-item", show (tpIndex tp) ]
+ , [ "addr", show paddr ]
, case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
_ -> []
]
@@ -575,6 +650,40 @@ cmdTestMessageSend = do
sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs
cmdOut "test-message-send done"
+cmdTestStreamOpen :: Command
+cmdTestStreamOpen = do
+ spidx : rest <- asks tiParams
+ tp <- getTestPeer spidx
+ count <- case rest of
+ [] -> return 1
+ tcount : _ -> return $ read $ T.unpack tcount
+
+ out <- asks tiOutput
+ runPeerService (tpPeer tp) $ do
+ streams <- openTestStreams count
+ afterCommit $ do
+ nums <- mapM getStreamWriterNumber streams
+ modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams)
+ outLine out $ unwords $ "test-stream-open-done"
+ : T.unpack spidx
+ : map show nums
+
+cmdTestStreamClose :: Command
+cmdTestStreamClose = do
+ [ spidx, sid ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ closeStream stream
+ cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ]
+
+cmdTestStreamSend :: Command
+cmdTestStreamSend = do
+ [ spidx, sid, content ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ writeStream stream $ encodeUtf8 content
+ cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ]
+
cmdLocalStateGet :: Command
cmdLocalStateGet = do
h <- getHead
@@ -738,7 +847,7 @@ cmdContactSetName = do
cmdDmSendPeer :: Command
cmdDmSendPeer = do
[spidx, msg] <- asks tiParams
- PeerIdentityFull to <- peerIdentity =<< getPeer spidx
+ PeerIdentityFull to <- getPeerIdentity =<< getPeer spidx
void $ sendDirectMessage to msg
cmdDmSendContact :: Command
@@ -747,12 +856,20 @@ cmdDmSendContact = do
Just to <- contactIdentity <$> getContact cid
void $ sendDirectMessage to msg
+cmdDmSendIdentity :: Command
+cmdDmSendIdentity = do
+ st <- asks tiStorage
+ [ tid, msg ] <- asks tiParams
+ Just ref <- liftIO $ readRef st $ encodeUtf8 tid
+ Just to <- return $ validateExtendedIdentity $ wrappedLoad ref
+ void $ sendDirectMessage to msg
+
dmList :: Foldable f => Identity f -> Command
dmList peer = do
- threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead
+ threads <- dmThreadList . lookupSharedValue . lsShared . headObject <$> getHead
case find (sameIdentity peer . msgPeer) threads of
Just thread -> do
- forM_ (reverse $ threadToList thread) $ \DirectMessage {..} -> cmdOut $ "dm-list-item"
+ forM_ (reverse $ dmThreadToList thread) $ \DirectMessage {..} -> cmdOut $ "dm-list-item"
<> " from " <> (maybe "<unnamed>" T.unpack $ idName msgFrom)
<> " text " <> (T.unpack msgText)
Nothing -> return ()
@@ -761,7 +878,7 @@ dmList peer = do
cmdDmListPeer :: Command
cmdDmListPeer = do
[spidx] <- asks tiParams
- PeerIdentityFull to <- peerIdentity =<< getPeer spidx
+ PeerIdentityFull to <- getPeerIdentity =<< getPeer spidx
dmList to
cmdDmListContact :: Command
@@ -869,8 +986,7 @@ cmdChatroomJoin = do
cmdChatroomJoinAs :: Command
cmdChatroomJoinAs = do
[ cid, name ] <- asks tiParams
- st <- asks tiStorage
- identity <- liftIO $ createIdentity st (Just name) Nothing
+ identity <- createIdentity (Just name) Nothing
joinChatroomAsByStateData identity =<< getChatroomStateData cid
cmdOut $ unwords [ "chatroom-join-as-done", T.unpack cid ]
@@ -888,11 +1004,14 @@ cmdChatroomMessageSend = do
cmdDiscoveryConnect :: Command
cmdDiscoveryConnect = do
- st <- asks tiStorage
[ tref ] <- asks tiParams
- Just ref <- liftIO $ readRef st $ encodeUtf8 tref
-
+ Just dgst <- return $ readRefDigest $ encodeUtf8 tref
Just RunningServer {..} <- gets tsServer
- peers <- liftIO $ getCurrentPeerList rsServer
- forM_ peers $ \peer -> do
- sendToPeer peer $ DiscoverySearch ref
+ discoverySearch rsServer dgst
+
+cmdDiscoveryTunnel :: Command
+cmdDiscoveryTunnel = do
+ [ tvia, ttarget ] <- asks tiParams
+ via <- getPeer tvia
+ Just target <- return $ readRefDigest $ encodeUtf8 ttarget
+ liftIO $ discoverySetupTunnel via target
diff --git a/main/Test/Service.hs b/main/Test/Service.hs
index 8c58dee..c0be07d 100644
--- a/main/Test/Service.hs
+++ b/main/Test/Service.hs
@@ -1,8 +1,11 @@
module Test.Service (
TestMessage(..),
TestMessageAttributes(..),
+
+ openTestStreams,
) where
+import Control.Monad
import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as BL
@@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL
import Erebos.Network
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
data TestMessage = TestMessage (Stored Object)
data TestMessageAttributes = TestMessageAttributes
{ testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage ()
+ , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage ()
}
instance Storable TestMessage where
@@ -26,7 +31,10 @@ instance Service TestMessage where
serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8"
type ServiceAttributes TestMessage = TestMessageAttributes
- defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ())
+ defaultServiceAttributes _ = TestMessageAttributes
+ { testMessageReceived = \_ _ _ _ -> return ()
+ , testStreamsReceived = \_ -> return ()
+ }
serviceHandler smsg = do
let TestMessage sobj = fromStored smsg
@@ -36,3 +44,14 @@ instance Service TestMessage where
cb <- asks $ testMessageReceived . svcAttributes
cb obj otype len (show $ refDigest $ storedRef sobj)
_ -> return ()
+
+ streams <- receivedStreams
+ when (not $ null streams) $ do
+ cb <- asks $ testStreamsReceived . svcAttributes
+ cb streams
+
+
+openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ]
+openTestStreams count = do
+ replyPacket . TestMessage =<< mstore (Rec [])
+ replicateM count openStream
diff --git a/main/WebSocket.hs b/main/WebSocket.hs
new file mode 100644
index 0000000..7a957e2
--- /dev/null
+++ b/main/WebSocket.hs
@@ -0,0 +1,48 @@
+module WebSocket (
+ WebSocketAddress(..),
+ startWebsocketServer,
+) where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+
+import Data.ByteString.Lazy qualified as BL
+import Data.Unique
+
+import Erebos.Network
+
+import Network.WebSockets qualified as WS
+
+
+data WebSocketAddress = WebSocketAddress Unique WS.Connection
+
+instance Eq WebSocketAddress where
+ WebSocketAddress u _ == WebSocketAddress u' _ = u == u'
+
+instance Ord WebSocketAddress where
+ compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u'
+
+instance Show WebSocketAddress where
+ show (WebSocketAddress _ _) = "websocket"
+
+instance PeerAddressType WebSocketAddress where
+ sendBytesToAddress (WebSocketAddress _ conn) msg = do
+ WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg
+ connectionToAddressClosed (WebSocketAddress _ conn) = do
+ WS.sendClose conn BL.empty
+
+startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO ()
+startWebsocketServer server addr port logd = do
+ void $ forkIO $ do
+ WS.runServer addr port $ \pending -> do
+ conn <- WS.acceptRequest pending
+ u <- newUnique
+ let paddr = WebSocketAddress u conn
+ void $ serverPeerCustom server paddr
+ handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do
+ WS.withPingThread conn 30 (return ()) $ do
+ forever $ do
+ WS.receiveDataMessage conn >>= \case
+ WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg
+ WS.Text {} -> logd $ "unexpected websocket text message"
diff --git a/src/Erebos/Attach.hs b/src/Erebos/Attach.hs
index fad6197..b7c642f 100644
--- a/src/Erebos/Attach.hs
+++ b/src/Erebos/Attach.hs
@@ -59,7 +59,7 @@ instance PairingResult AttachIdentity where
liftIO $ mapM_ storeKey $ catMaybes [ keyFromData sec pub | sec <- keys, pub <- pkeys ]
identity' <- mergeIdentity $ updateIdentity [ lsIdentity $ fromStored slocal ] identity
- shared <- makeSharedStateUpdate st (Just owner) (lsShared $ fromStored slocal)
+ shared <- makeSharedStateUpdate (Just owner) (lsShared $ fromStored slocal)
mstore (fromStored slocal)
{ lsIdentity = idExtData identity'
, lsShared = [ shared ]
diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs
index 74456ff..f9bf545 100644
--- a/src/Erebos/Chatroom.hs
+++ b/src/Erebos/Chatroom.hs
@@ -17,6 +17,7 @@ module Erebos.Chatroom (
joinChatroomAs, joinChatroomAsByStateData,
leaveChatroom, leaveChatroomByStateData,
getMessagesSinceState,
+ isSameChatroom,
ChatroomSetChange(..),
watchChatrooms,
@@ -294,8 +295,7 @@ createChatroom rdName rdDescription = do
}
updateLocalState $ updateSharedState $ \rooms -> do
- st <- getStorage
- (, cstate) <$> storeSetAdd st cstate rooms
+ (, cstate) <$> storeSetAdd cstate rooms
findAndUpdateChatroomState
:: (MonadStorage m, MonadHead LocalState m)
@@ -309,8 +309,7 @@ findAndUpdateChatroomState f = do
upd <- act
if roomStateData orig /= roomStateData upd
then do
- st <- getStorage
- roomSet' <- storeSetAdd st upd roomSet
+ roomSet' <- storeSetAdd upd roomSet
return (roomSet', Just upd)
else do
return (roomSet, Just upd)
@@ -422,6 +421,11 @@ leaveChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupDa
getMessagesSinceState :: ChatroomState -> ChatroomState -> [ChatMessage]
getMessagesSinceState cur old = threadToListSince (roomStateMessageData old) (roomStateMessageData cur)
+isSameChatroom :: ChatroomState -> ChatroomState -> Bool
+isSameChatroom rstate rstate' =
+ let roots = filterAncestors . concatMap storedRoots . roomStateData
+ in intersectsSorted (roots rstate) (roots rstate')
+
data ChatroomSetChange = AddedChatroom ChatroomState
| RemovedChatroom ChatroomState
diff --git a/src/Erebos/Contact.hs b/src/Erebos/Contact.hs
index 88e6c44..b081ddb 100644
--- a/src/Erebos/Contact.hs
+++ b/src/Erebos/Contact.hs
@@ -83,13 +83,12 @@ contactName c = fromJust $ msum
contactSetName :: MonadHead LocalState m => Contact -> Text -> Set Contact -> m (Set Contact)
contactSetName contact name set = do
- st <- getStorage
- cdata <- wrappedStore st ContactData
+ cdata <- mstore ContactData
{ cdPrev = toComponents contact
, cdIdentity = []
, cdName = Just name
}
- storeSetAdd st (mergeSorted @Contact [cdata]) set
+ storeSetAdd (mergeSorted @Contact [cdata]) set
type ContactService = PairingService ContactAccepted
@@ -166,10 +165,9 @@ contactReject = pairingReject @ContactAccepted Proxy
finalizeContact :: MonadHead LocalState m => UnifiedIdentity -> m ()
finalizeContact identity = updateLocalState_ $ updateSharedState_ $ \contacts -> do
- st <- getStorage
- cdata <- wrappedStore st ContactData
+ cdata <- mstore ContactData
{ cdPrev = []
, cdIdentity = idExtDataF $ finalOwner identity
, cdName = Nothing
}
- storeSetAdd st (mergeSorted @Contact [cdata]) contacts
+ storeSetAdd (mergeSorted @Contact [cdata]) contacts
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs
index dee6faa..2d007c9 100644
--- a/src/Erebos/Conversation.hs
+++ b/src/Erebos/Conversation.hs
@@ -7,9 +7,11 @@ module Erebos.Conversation (
formatMessage,
Conversation,
+ isSameConversation,
directMessageConversation,
chatroomConversation,
chatroomConversationByStateData,
+ isChatroomStateConversation,
reloadConversation,
lookupConversations,
@@ -64,14 +66,22 @@ formatMessage tzone msg = concat
]
-data Conversation = DirectMessageConversation DirectMessageThread
- | ChatroomConversation ChatroomState
+data Conversation
+ = DirectMessageConversation DirectMessageThread
+ | ChatroomConversation ChatroomState
+
+isSameConversation :: Conversation -> Conversation -> Bool
+isSameConversation (DirectMessageConversation t) (DirectMessageConversation t')
+ = sameIdentity (msgPeer t) (msgPeer t')
+isSameConversation (ChatroomConversation rstate) (ChatroomConversation rstate') = isSameChatroom rstate rstate'
+isSameConversation _ _ = False
directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Conversation
directMessageConversation peer = do
- (find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case
+ createOrUpdateDirectMessagePeer peer
+ (find (sameIdentity peer . msgPeer) . dmThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case
Just thread -> return $ DirectMessageConversation thread
- Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] []
+ Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] []
chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation)
chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate)
@@ -79,13 +89,17 @@ chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateD
chatroomConversationByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> m (Maybe Conversation)
chatroomConversationByStateData sdata = fmap ChatroomConversation <$> findChatroomByStateData sdata
+isChatroomStateConversation :: ChatroomState -> Conversation -> Bool
+isChatroomStateConversation rstate (ChatroomConversation rstate') = isSameChatroom rstate rstate'
+isChatroomStateConversation _ _ = False
+
reloadConversation :: MonadHead LocalState m => Conversation -> m Conversation
reloadConversation (DirectMessageConversation thread) = directMessageConversation (msgPeer thread)
reloadConversation cur@(ChatroomConversation rstate) =
fromMaybe cur <$> chatroomConversation rstate
-lookupConversations :: MonadHead LocalState m => m [Conversation]
-lookupConversations = map DirectMessageConversation . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead
+lookupConversations :: MonadHead LocalState m => m [ Conversation ]
+lookupConversations = map DirectMessageConversation . dmThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead
conversationName :: Conversation -> Text
@@ -96,14 +110,14 @@ conversationPeer :: Conversation -> Maybe ComposedIdentity
conversationPeer (DirectMessageConversation thread) = Just $ msgPeer thread
conversationPeer (ChatroomConversation _) = Nothing
-conversationHistory :: Conversation -> [Message]
-conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ threadToList thread
+conversationHistory :: Conversation -> [ Message ]
+conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ dmThreadToList thread
conversationHistory (ChatroomConversation rstate) = map (\msg -> ChatroomMessage msg False) $ roomStateMessages rstate
-sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m (Maybe Message)
-sendMessage (DirectMessageConversation thread) text = fmap Just $ DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False
-sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text >> return Nothing
+sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m ()
+sendMessage (DirectMessageConversation thread) text = sendDirectMessage (msgPeer thread) text
+sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text
deleteConversation :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> m ()
deleteConversation (DirectMessageConversation _) = throwOtherError "deleting direct message conversation is not supported"
diff --git a/src/Erebos/DirectMessage.hs b/src/Erebos/DirectMessage.hs
index 05da865..f518b57 100644
--- a/src/Erebos/DirectMessage.hs
+++ b/src/Erebos/DirectMessage.hs
@@ -1,34 +1,41 @@
module Erebos.DirectMessage (
DirectMessage(..),
sendDirectMessage,
+ updateDirectMessagePeer,
+ createOrUpdateDirectMessagePeer,
DirectMessageAttributes(..),
defaultDirectMessageAttributes,
DirectMessageThreads,
- toThreadList,
+ dmThreadList,
DirectMessageThread(..),
- threadToList,
- messageThreadView,
+ dmThreadToList, dmThreadToListSince,
+ dmThreadView,
- watchReceivedMessages,
+ watchDirectMessageThreads,
formatDirectMessage,
) where
+import Control.Concurrent.MVar
import Control.Monad
+import Control.Monad.Except
import Control.Monad.Reader
import Data.List
import Data.Ord
-import qualified Data.Set as S
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
-import qualified Data.Text as T
+import Data.Text qualified as T
import Data.Time.Format
import Data.Time.LocalTime
+import Erebos.Discovery
import Erebos.Identity
import Erebos.Network
+import Erebos.Object
import Erebos.Service
import Erebos.State
import Erebos.Storable
@@ -37,7 +44,7 @@ import Erebos.Storage.Merge
data DirectMessage = DirectMessage
{ msgFrom :: ComposedIdentity
- , msgPrev :: [Stored DirectMessage]
+ , msgPrev :: [ Stored DirectMessage ]
, msgTime :: ZonedTime
, msgText :: Text
}
@@ -74,7 +81,6 @@ instance Service DirectMessage where
let msg = fromStored smsg
powner <- asks $ finalOwner . svcPeerIdentity
erb <- svcGetLocal
- st <- getStorage
let DirectMessageThreads prev _ = lookupSharedValue $ lsShared $ fromStored erb
sent = findMsgProperty powner msSent prev
received = findMsgProperty powner msReceived prev
@@ -83,7 +89,7 @@ instance Service DirectMessage where
filterAncestors sent == filterAncestors (smsg : sent)
then do
when (received' /= received) $ do
- next <- wrappedStore st $ MessageState
+ next <- mstore MessageState
{ msPrev = prev
, msPeer = powner
, msReady = []
@@ -91,37 +97,43 @@ instance Service DirectMessage where
, msReceived = received'
, msSeen = []
}
- let threads = DirectMessageThreads [next] (messageThreadView [next])
- shared <- makeSharedStateUpdate st threads (lsShared $ fromStored erb)
- svcSetLocal =<< wrappedStore st (fromStored erb) { lsShared = [shared] }
+ let threads = DirectMessageThreads [ next ] (dmThreadView [ next ])
+ shared <- makeSharedStateUpdate threads (lsShared $ fromStored erb)
+ svcSetLocal =<< mstore (fromStored erb) { lsShared = [ shared ] }
when (powner `sameIdentity` msgFrom msg) $ do
replyStoredRef smsg
else join $ asks $ dmOwnerMismatch . svcAttributes
- serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal
+ serviceNewPeer = do
+ syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal
- serviceStorageWatchers _ = (:[]) $
- SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ serviceUpdatedPeer = do
+ updateDirectMessagePeer . finalOwner =<< asks svcPeerIdentity
+
+ serviceStorageWatchers _ =
+ [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers
+ ]
data MessageState = MessageState
- { msPrev :: [Stored MessageState]
+ { msPrev :: [ Stored MessageState ]
, msPeer :: ComposedIdentity
- , msReady :: [Stored DirectMessage]
- , msSent :: [Stored DirectMessage]
- , msReceived :: [Stored DirectMessage]
- , msSeen :: [Stored DirectMessage]
+ , msReady :: [ Stored DirectMessage ]
+ , msSent :: [ Stored DirectMessage ]
+ , msReceived :: [ Stored DirectMessage ]
+ , msSeen :: [ Stored DirectMessage ]
}
-data DirectMessageThreads = DirectMessageThreads [Stored MessageState] [DirectMessageThread]
+data DirectMessageThreads = DirectMessageThreads [ Stored MessageState ] [ DirectMessageThread ]
instance Eq DirectMessageThreads where
DirectMessageThreads mss _ == DirectMessageThreads mss' _ = mss == mss'
-toThreadList :: DirectMessageThreads -> [DirectMessageThread]
-toThreadList (DirectMessageThreads _ threads) = threads
+dmThreadList :: DirectMessageThreads -> [ DirectMessageThread ]
+dmThreadList (DirectMessageThreads _ threads) = threads
instance Storable MessageState where
store' MessageState {..} = storeRec $ do
@@ -143,13 +155,13 @@ instance Storable MessageState where
instance Mergeable DirectMessageThreads where
type Component DirectMessageThreads = MessageState
- mergeSorted mss = DirectMessageThreads mss (messageThreadView mss)
+ mergeSorted mss = DirectMessageThreads mss (dmThreadView mss)
toComponents (DirectMessageThreads mss _) = mss
instance SharedType DirectMessageThreads where
sharedTypeID _ = mkSharedTypeID "ee793681-5976-466a-b0f0-4e1907d3fade"
-findMsgProperty :: Foldable m => Identity m -> (MessageState -> [a]) -> [Stored MessageState] -> [a]
+findMsgProperty :: Foldable m => Identity m -> (MessageState -> [ a ]) -> [ Stored MessageState ] -> [ a ]
findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do
guard $ msPeer x `sameIdentity` pid
guard $ not $ null $ sel x
@@ -157,11 +169,11 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do
sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m)
- => Identity f -> Text -> m (Stored DirectMessage)
-sendDirectMessage pid text = updateLocalState $ \ls -> do
+ => Identity f -> Text -> m ()
+sendDirectMessage pid text = updateLocalState_ $ \ls -> do
let self = localIdentity $ fromStored ls
powner = finalOwner pid
- flip updateSharedState ls $ \(DirectMessageThreads prev _) -> do
+ flip updateSharedState_ ls $ \(DirectMessageThreads prev _) -> do
let ready = findMsgProperty powner msReady prev
received = findMsgProperty powner msReceived prev
@@ -175,12 +187,52 @@ sendDirectMessage pid text = updateLocalState $ \ls -> do
next <- mstore MessageState
{ msPrev = prev
, msPeer = powner
- , msReady = [smsg]
+ , msReady = [ smsg ]
, msSent = []
, msReceived = []
, msSeen = []
}
- return (DirectMessageThreads [next] (messageThreadView [next]), smsg)
+ return $ DirectMessageThreads [ next ] (dmThreadView [ next ])
+
+updateDirectMessagePeer
+ :: (Foldable f, Applicative f, MonadHead LocalState m)
+ => Identity f -> m ()
+updateDirectMessagePeer = createOrUpdateDirectMessagePeer' False
+
+createOrUpdateDirectMessagePeer
+ :: (Foldable f, Applicative f, MonadHead LocalState m)
+ => Identity f -> m ()
+createOrUpdateDirectMessagePeer = createOrUpdateDirectMessagePeer' True
+
+createOrUpdateDirectMessagePeer'
+ :: (Foldable f, Applicative f, MonadHead LocalState m)
+ => Bool -> Identity f -> m ()
+createOrUpdateDirectMessagePeer' create pid = do
+ let powner = finalOwner pid
+ updateLocalState_ $ updateSharedState_ $ \old@(DirectMessageThreads prev threads) -> do
+ let updatePeerThread = do
+ next <- mstore MessageState
+ { msPrev = prev
+ , msPeer = powner
+ , msReady = []
+ , msSent = []
+ , msReceived = []
+ , msSeen = []
+ }
+ return $ DirectMessageThreads [ next ] (dmThreadView [ next ])
+ case find (sameIdentity powner . msgPeer) threads of
+ Nothing
+ | create
+ -> updatePeerThread
+
+ Just thread
+ | oldPeer <- msgPeer thread
+ , newPeer <- updateIdentity (idExtDataF powner) oldPeer
+ , oldPeer /= newPeer
+ -> updatePeerThread
+
+ _ -> return old
+
syncDirectMessageToPeer :: DirectMessageThreads -> ServiceHandler DirectMessage ()
syncDirectMessageToPeer (DirectMessageThreads mss _) = do
@@ -205,28 +257,41 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do
, msReceived = []
, msSeen = []
}
- return $ DirectMessageThreads [next] (messageThreadView [next])
+ return $ DirectMessageThreads [ next ] (dmThreadView [ next ])
else do
return unchanged
+findMissingPeers :: Server -> DirectMessageThreads -> ExceptT ErebosError IO ()
+findMissingPeers server threads = do
+ forM_ (dmThreadList threads) $ \thread -> do
+ when (msgHead thread /= msgReceived thread) $ do
+ mapM_ (discoverySearch server) $ map (refDigest . storedRef) $ idDataF $ msgPeer thread
+
data DirectMessageThread = DirectMessageThread
{ msgPeer :: ComposedIdentity
- , msgHead :: [Stored DirectMessage]
- , msgSent :: [Stored DirectMessage]
- , msgSeen :: [Stored DirectMessage]
+ , msgHead :: [ Stored DirectMessage ]
+ , msgSent :: [ Stored DirectMessage ]
+ , msgSeen :: [ Stored DirectMessage ]
+ , msgReceived :: [ Stored DirectMessage ]
}
-threadToList :: DirectMessageThread -> [DirectMessage]
-threadToList thread = helper S.empty $ msgHead thread
- where helper seen msgs
- | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs =
- fromStored msg : helper (S.insert msg seen) (msgs' ++ msgPrev (fromStored msg))
- | otherwise = []
- cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg)
+dmThreadToList :: DirectMessageThread -> [ DirectMessage ]
+dmThreadToList thread = threadToListHelper S.empty $ msgHead thread
+
+dmThreadToListSince :: DirectMessageThread -> DirectMessageThread -> [ DirectMessage ]
+dmThreadToListSince since thread = threadToListHelper (S.fromAscList $ msgHead since) (msgHead thread)
+
+threadToListHelper :: Set (Stored DirectMessage) -> [ Stored DirectMessage ] -> [ DirectMessage ]
+threadToListHelper seen msgs
+ | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs =
+ fromStored msg : threadToListHelper (S.insert msg seen) (msgs' ++ msgPrev (fromStored msg))
+ | otherwise = []
+ where
+ cmpView msg = (zonedTimeToUTC $ msgTime $ fromStored msg, msg)
-messageThreadView :: [Stored MessageState] -> [DirectMessageThread]
-messageThreadView = helper []
+dmThreadView :: [ Stored MessageState ] -> [ DirectMessageThread ]
+dmThreadView = helper []
where helper used ms' = case filterAncestors ms' of
mss@(sms : rest)
| any (sameIdentity $ msPeer $ fromStored sms) used ->
@@ -236,7 +301,7 @@ messageThreadView = helper []
in messageThreadFor peer mss : helper (peer : used) (msPrev (fromStored sms) ++ rest)
_ -> []
-messageThreadFor :: ComposedIdentity -> [Stored MessageState] -> DirectMessageThread
+messageThreadFor :: ComposedIdentity -> [ Stored MessageState ] -> DirectMessageThread
messageThreadFor peer mss =
let ready = findMsgProperty peer msReady mss
sent = findMsgProperty peer msSent mss
@@ -248,15 +313,28 @@ messageThreadFor peer mss =
, msgHead = filterAncestors $ ready ++ received
, msgSent = filterAncestors $ sent ++ received
, msgSeen = filterAncestors $ ready ++ seen
+ , msgReceived = filterAncestors $ received
}
-watchReceivedMessages :: Head LocalState -> (Stored DirectMessage -> IO ()) -> IO WatchedHead
-watchReceivedMessages h f = do
- let self = finalOwner $ localIdentity $ headObject h
+watchDirectMessageThreads :: Head LocalState -> (DirectMessageThread -> DirectMessageThread -> IO ()) -> IO WatchedHead
+watchDirectMessageThreads h f = do
+ prevVar <- newMVar Nothing
watchHeadWith h (lookupSharedValue . lsShared . headObject) $ \(DirectMessageThreads sms _) -> do
- forM_ (map fromStored sms) $ \ms -> do
- mapM_ f $ filter (not . sameIdentity self . msgFrom . fromStored) $ msReceived ms
+ modifyMVar_ prevVar $ \case
+ Just prev -> do
+ let addPeer (p : ps) p'
+ | p `sameIdentity` p' = p : ps
+ | otherwise = p : addPeer ps p'
+ addPeer [] p' = [ p' ]
+
+ let peers = foldl' addPeer [] $ map (msPeer . fromStored) $ storedDifference prev sms
+ forM_ peers $ \peer -> do
+ f (messageThreadFor peer prev) (messageThreadFor peer sms)
+ return (Just sms)
+
+ Nothing -> do
+ return (Just sms)
formatDirectMessage :: TimeZone -> DirectMessage -> String
formatDirectMessage tzone msg = concat
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 48500d7..5590e4c 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -1,9 +1,13 @@
{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
module Erebos.Discovery (
DiscoveryService(..),
DiscoveryAttributes(..),
- DiscoveryConnection(..)
+ DiscoveryConnection(..),
+
+ discoverySearch,
+ discoverySetupTunnel,
) where
import Control.Concurrent
@@ -11,39 +15,58 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
-import Data.IP qualified as IP
+import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
import Data.Maybe
+import Data.Proxy
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
-import Network.Socket
+import Text.Read
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
#endif
import Erebos.Identity
import Erebos.Network
+import Erebos.Network.Address
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
+#ifndef ENABLE_ICE_SUPPORT
+type IceConfig = ()
+type IceSession = ()
+type IceRemoteInfo = Stored Object
+#endif
+
+
data DiscoveryService
- = DiscoverySelf [ Text ] (Maybe Int)
- | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
- | DiscoverySearch Ref
- | DiscoveryResult Ref [ Text ]
+ = DiscoverySelf [ DiscoveryAddress ] (Maybe Int)
+ | DiscoveryAcknowledged [ DiscoveryAddress ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
+ | DiscoverySearch (Either Ref RefDigest)
+ | DiscoveryResult (Either Ref RefDigest) [ DiscoveryAddress ]
| DiscoveryConnectionRequest DiscoveryConnection
| DiscoveryConnectionResponse DiscoveryConnection
+data DiscoveryAddress
+ = DiscoveryIP InetAddress PortNumber
+ | DiscoveryICE
+ | DiscoveryTunnel
+ | DiscoveryOther Text
+
data DiscoveryAttributes = DiscoveryAttributes
{ discoveryStunPort :: Maybe Word16
, discoveryStunServer :: Maybe Text
, discoveryTurnPort :: Maybe Word16
, discoveryTurnServer :: Maybe Text
+ , discoveryProvideTunnel :: Peer -> PeerAddress -> Bool
}
defaultDiscoveryAttributes :: DiscoveryAttributes
@@ -52,23 +75,22 @@ defaultDiscoveryAttributes = DiscoveryAttributes
, discoveryStunServer = Nothing
, discoveryTurnPort = Nothing
, discoveryTurnServer = Nothing
+ , discoveryProvideTunnel = \_ _ -> False
}
data DiscoveryConnection = DiscoveryConnection
- { dconnSource :: Ref
- , dconnTarget :: Ref
+ { dconnSource :: Either Ref RefDigest
+ , dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
-#ifdef ENABLE_ICE_SUPPORT
+ , dconnTunnel :: Bool
, dconnIceInfo :: Maybe IceRemoteInfo
-#else
- , dconnIceInfo :: Maybe (Stored Object)
-#endif
}
-emptyConnection :: Ref -> Ref -> DiscoveryConnection
+emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
+ dconnTunnel = False
dconnIceInfo = Nothing
instance Storable DiscoveryService where
@@ -84,19 +106,21 @@ instance Storable DiscoveryService where
storeMbInt "stun-port" stunPort
storeMbText "turn-server" turnServer
storeMbInt "turn-port" turnPort
- DiscoverySearch ref -> storeRawRef "search" ref
- DiscoveryResult ref addr -> do
- storeRawRef "result" ref
+ DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst
+ DiscoveryResult edgst addr -> do
+ either (storeRawRef "result") (storeRawWeak "result") edgst
mapM_ (storeText "address") addr
DiscoveryConnectionRequest conn -> storeConnection "request" conn
DiscoveryConnectionResponse conn -> storeConnection "response" conn
- where storeConnection ctype conn = do
- storeText "connection" $ ctype
- storeRawRef "source" $ dconnSource conn
- storeRawRef "target" $ dconnTarget conn
- storeMbText "address" $ dconnAddress conn
- storeMbRef "ice-info" $ dconnIceInfo conn
+ where
+ storeConnection (ctype :: Text) DiscoveryConnection {..} = do
+ storeText "connection" $ ctype
+ either (storeRawRef "source") (storeRawWeak "source") dconnSource
+ either (storeRawRef "target") (storeRawWeak "target") dconnTarget
+ storeMbText "address" dconnAddress
+ when dconnTunnel $ storeEmpty "tunnel"
+ storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
[ do
@@ -114,29 +138,87 @@ instance Storable DiscoveryService where
<*> loadMbInt "stun-port"
<*> loadMbText "turn-server"
<*> loadMbInt "turn-port"
- , DiscoverySearch <$> loadRawRef "search"
+ , DiscoverySearch <$> msum
+ [ Left <$> loadRawRef "search"
+ , Right <$> loadRawWeak "search"
+ ]
, DiscoveryResult
- <$> loadRawRef "result"
+ <$> msum
+ [ Left <$> loadRawRef "result"
+ , Right <$> loadRawWeak "result"
+ ]
<*> loadTexts "address"
, loadConnection "request" DiscoveryConnectionRequest
, loadConnection "response" DiscoveryConnectionResponse
]
- where loadConnection ctype ctor = do
- ctype' <- loadText "connection"
- guard $ ctype == ctype'
- return . ctor =<< DiscoveryConnection
- <$> loadRawRef "source"
- <*> loadRawRef "target"
- <*> loadMbText "address"
- <*> loadMbRef "ice-info"
+ where
+ loadConnection (ctype :: Text) ctor = do
+ ctype' <- loadText "connection"
+ guard $ ctype == ctype'
+ dconnSource <- msum
+ [ Left <$> loadRawRef "source"
+ , Right <$> loadRawWeak "source"
+ ]
+ dconnTarget <- msum
+ [ Left <$> loadRawRef "target"
+ , Right <$> loadRawWeak "target"
+ ]
+ dconnAddress <- loadMbText "address"
+ dconnTunnel <- isJust <$> loadMbEmpty "tunnel"
+ dconnIceInfo <- loadMbRef "ice-info"
+ return $ ctor DiscoveryConnection {..}
+
+instance StorableText DiscoveryAddress where
+ toText = \case
+ DiscoveryIP addr port -> T.unwords [ T.pack $ show addr, T.pack $ show port ]
+ DiscoveryICE -> "ICE"
+ DiscoveryTunnel -> "tunnel"
+ DiscoveryOther str -> str
+
+ fromText str = return $ if
+ | [ addrStr, portStr ] <- T.words str
+ , Just addr <- readMaybe $ T.unpack addrStr
+ , Just port <- readMaybe $ T.unpack portStr
+ -> DiscoveryIP addr port
+
+ | "ice" <- T.toLower str
+ -> DiscoveryICE
+
+ | "tunnel" <- str
+ -> DiscoveryTunnel
+
+ | otherwise
+ -> DiscoveryOther str
+
data DiscoveryPeer = DiscoveryPeer
{ dpPriority :: Int
, dpPeer :: Maybe Peer
- , dpAddress :: [ Text ]
-#ifdef ENABLE_ICE_SUPPORT
+ , dpAddress :: [ DiscoveryAddress ]
, dpIceSession :: Maybe IceSession
-#endif
+ }
+
+emptyPeer :: DiscoveryPeer
+emptyPeer = DiscoveryPeer
+ { dpPriority = 0
+ , dpPeer = Nothing
+ , dpAddress = []
+ , dpIceSession = Nothing
+ }
+
+data DiscoveryPeerState = DiscoveryPeerState
+ { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ]
+ -- ( original target, our write stream )
+ , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ]
+ -- ( original source, ( from source, to target ))
+ , dpsStunServer :: Maybe ( Text, Word16 )
+ , dpsTurnServer :: Maybe ( Text, Word16 )
+ , dpsIceConfig :: Maybe IceConfig
+ }
+
+data DiscoveryGlobalState = DiscoveryGlobalState
+ { dgsPeers :: Map RefDigest DiscoveryPeer
+ , dgsSearchingFor :: Set RefDigest
}
instance Service DiscoveryService where
@@ -145,42 +227,44 @@ instance Service DiscoveryService where
type ServiceAttributes DiscoveryService = DiscoveryAttributes
defaultServiceAttributes _ = defaultDiscoveryAttributes
-#ifdef ENABLE_ICE_SUPPORT
- type ServiceState DiscoveryService = Maybe IceConfig
- emptyServiceState _ = Nothing
-#endif
-
- type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer
- emptyServiceGlobalState _ = M.empty
+ type ServiceState DiscoveryService = DiscoveryPeerState
+ emptyServiceState _ = DiscoveryPeerState
+ { dpsOurTunnelRequests = []
+ , dpsRelayedTunnelRequests = []
+ , dpsStunServer = Nothing
+ , dpsTurnServer = Nothing
+ , dpsIceConfig = Nothing
+ }
+
+ type ServiceGlobalState DiscoveryService = DiscoveryGlobalState
+ emptyServiceGlobalState _ = DiscoveryGlobalState
+ { dgsPeers = M.empty
+ , dgsSearchingFor = S.empty
+ }
serviceHandler msg = case fromStored msg of
DiscoverySelf addrs priority -> do
pid <- asks svcPeerIdentity
peer <- asks svcPeer
+ paddrs <- getPeerAddresses peer
+
let insertHelper new old | dpPriority new > dpPriority old = new
| otherwise = old
- matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if
- | addr == T.pack "ICE" -> do
- return $ Just addr
-
- | [ ipaddr, port ] <- words (T.unpack addr)
- , DatagramAddress paddr <- peerAddress peer -> do
- saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- return $ if paddr == addrAddress saddr
- then Just addr
- else Nothing
-
- | otherwise -> return Nothing
-
- forM_ (idDataF =<< unfoldOwners pid) $ \s ->
- svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer
- { dpPriority = fromMaybe 0 priority
- , dpPeer = Just peer
- , dpAddress = addrs
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+
+ let matchedAddrs = flip filter addrs $ \case
+ DiscoveryICE -> True
+ DiscoveryIP ipaddr port ->
+ DatagramAddress (inetToSockAddr ( ipaddr, port )) `elem` paddrs
+ _ -> False
+
+ forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do
+ let dp = DiscoveryPeer
+ { dpPriority = fromMaybe 0 priority
+ , dpPeer = Just peer
+ , dpAddress = matchedAddrs
+ , dpIceSession = Nothing
+ }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s }
attrs <- asks svcAttributes
replyPacket $ DiscoveryAcknowledged matchedAddrs
(discoveryStunServer attrs)
@@ -189,15 +273,8 @@ instance Service DiscoveryService where
(discoveryTurnPort attrs)
DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do
-#ifdef ENABLE_ICE_SUPPORT
- paddr <- asks (peerAddress . svcPeer) >>= return . \case
- (DatagramAddress saddr) -> case IP.fromSockAddr saddr of
- Just (IP.IPv6 ipv6, _)
- | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
- -> Just $ T.pack $ show (IP.toIPv4w ipv4)
- Just (addr, _)
- -> Just $ T.pack $ show addr
- _ -> Nothing
+ paddr <- asks svcPeerAddress >>= return . \case
+ (DatagramAddress saddr) -> T.pack . show . fst <$> inetFromSockAddr saddr
_ -> Nothing
let toIceServer Nothing Nothing = Nothing
@@ -205,152 +282,368 @@ instance Service DiscoveryService where
toIceServer (Just server) Nothing = Just ( server, 0 )
toIceServer (Just server) (Just port) = Just ( server, port )
- cfg <- liftIO $ iceCreateConfig
- (toIceServer stunServer stunPort)
- (toIceServer turnServer turnPort)
- svcSet cfg
-#endif
- return ()
+ svcModify $ \s -> s
+ { dpsStunServer = toIceServer stunServer stunPort
+ , dpsTurnServer = toIceServer turnServer turnPort
+ }
- DiscoverySearch ref -> do
- dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal
- replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer
+ DiscoverySearch edgst -> do
+ dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal
+ peer <- asks svcPeer
+ paddr <- asks svcPeerAddress
+ attrs <- asks svcAttributes
+ let offerTunnel
+ | discoveryProvideTunnel attrs peer paddr = (++ [ DiscoveryTunnel ])
+ | otherwise = id
+ replyPacket $ DiscoveryResult edgst $ maybe [] (offerTunnel . dpAddress) dpeer
- DiscoveryResult ref [] -> do
- svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found"
+ DiscoveryResult _ [] -> do
+ -- not found
+ return ()
- DiscoveryResult ref addrs -> do
+ DiscoveryResult edgst addrs -> do
+ let dgst = either refDigest id edgst
-- TODO: check if we really requested that
server <- asks svcServer
+ st <- getStorage
self <- svcSelf
- mbIceConfig <- svcGet
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
- liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if
- | addr == T.pack "ICE"
-#ifdef ENABLE_ICE_SUPPORT
- , Just config <- mbIceConfig
- -> do
- ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
- rinfo <- iceRemoteInfo ice
- res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
- case res of
- Right _ -> return ()
- Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
-
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Nothing
- , dpAddress = []
- , dpIceSession = Just ice
- }
-#else
- -> do
- return ()
-#endif
+ let tryAddresses = \case
+ DiscoveryIP ipaddr port : _ -> do
+ void $ liftIO $ forkIO $ do
+ let saddr = inetToSockAddr ( ipaddr, port )
+ peer <- serverPeer server saddr
+ runAsService $ do
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
- | [ ipaddr, port ] <- words (T.unpack addr) -> do
- saddr <- head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- serverPeer server (addrAddress saddr)
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Just peer
- , dpAddress = []
+ DiscoveryICE : rest -> do
#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
+ getIceConfig >>= \case
+ Just config -> do
+ void $ liftIO $ forkIO $ do
+ ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
+ rinfo <- iceRemoteInfo ice
+
+ -- Try to promote weak ref to normal one for older peers:
+ edgst' <- case edgst of
+ Left r -> return (Left r)
+ Right d -> refFromDigest st d >>= \case
+ Just r -> return (Left r)
+ Nothing -> return (Right d)
+
+ res <- runExceptT $ sendToPeer discoveryPeer $
+ DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo }
+ case res of
+ Right _ -> return ()
+ Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
+
+ runAsService $ do
+ let upd dp = dp { dpIceSession = Just ice }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+
+ Nothing -> do
#endif
- }
+ tryAddresses rest
- | otherwise -> do
- runAsService $ do
- svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
+ DiscoveryTunnel : _ -> do
+ discoverySetupTunnelResponse dgst
+
+ addr : rest -> do
+ svcPrint $ "Discovery: unsupported address in result: " ++ T.unpack (toText addr)
+ tryAddresses rest
+
+ [] -> svcPrint $ "Discovery: no (supported) address received for " <> show dgst
+
+ tryAddresses addrs
DiscoveryConnectionRequest conn -> do
self <- svcSelf
+ attrs <- asks svcAttributes
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ if either refDigest id (dconnTarget conn) `elem` identityDigests self
+ then if
+ -- request for us, create ICE sesssion or tunnel
+ | dconnTunnel conn -> do
+ receivedStreams >>= \case
+ (tunnelReader : _) -> do
+ tunnelWriter <- openStream
+ replyPacket $ DiscoveryConnectionResponse rconn
+ { dconnTunnel = True
+ }
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ server <- asks svcServer
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+
+ [] -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (endpoint)"
+
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
+ | Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
- svcGet >>= \case
+ getIceConfig >>= \case
Just config -> do
liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do
rinfo <- iceRemoteInfo ice
res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo }
case res of
- Right _ -> do
- case dconnIceInfo conn of
- Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice
- Nothing -> putStrLn $ "Discovery: connection request without ICE remote info"
+ Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice
Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err
Nothing -> do
- svcPrint $ "Discovery: ICE request from peer without ICE configuration"
-#else
- return ()
+ return ()
#endif
- else do
- -- request to some of our peers, relay
- mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal
- case mbdp of
+ | otherwise -> do
+ svcPrint $ "Discovery: unsupported connection request"
+
+ else do
+ -- request to some of our peers, relay
+ peer <- asks svcPeer
+ paddr <- asks svcPeerAddress
+ mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
+ streams <- receivedStreams
+ case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
- | Just dpeer <- dpPeer dp -> do
- sendToPeer dpeer $ DiscoveryConnectionRequest conn
+ | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn -> if
+ | not (discoveryProvideTunnel attrs peer paddr) -> do
+ replyPacket $ DiscoveryConnectionResponse rconn
+ | fromSource : _ <- streams -> do
+ void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ toTarget <- openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest conn
+ | otherwise -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (relay)"
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionRequest conn
| otherwise -> svcPrint $ "Discovery: failed to relay connection request"
DiscoveryConnectionResponse conn -> do
self <- svcSelf
- dpeers <- svcGetGlobal
- if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ dps <- svcGet
+ dpeers <- dgsPeers <$> svcGetGlobal
+
+ if either refDigest id (dconnSource conn) `elem` identityDigests self
+ then do
-- response to our request, try to connect to the peer
-#ifdef ENABLE_ICE_SUPPORT
server <- asks svcServer
- if | Just addr <- dconnAddress conn
- , [ipaddr, port] <- words (T.unpack addr) -> do
- saddr <- liftIO $ head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- liftIO $ serverPeer server (addrAddress saddr)
- svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $
- DiscoveryPeer 0 (Just peer) [] Nothing
-
- | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers
+ if
+ | Just addr <- dconnAddress conn
+ , [ addrStr, portStr ] <- words (T.unpack addr)
+ , Just ipaddr <- readMaybe addrStr
+ , Just port <- readMaybe portStr
+ -> do
+ let saddr = inetToSockAddr ( ipaddr, port )
+ peer <- liftIO $ serverPeer server saddr
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s
+ { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
+
+ | dconnTunnel conn
+ , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ receivedStreams >>= \case
+ tunnelReader : _ -> do
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+ [] -> do
+ svcPrint $ "Discovery: missing stream in tunnel response"
+ liftIO $ closeStream tunnelWriter
+
+ | Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ svcPrint $ "Discovery: tunnel request failed"
+ liftIO $ closeStream tunnelWriter
+
+#ifdef ENABLE_ICE_SUPPORT
+ | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
, Just rinfo <- dconnIceInfo conn -> do
liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice
+#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
-#else
- return ()
-#endif
- else do
- -- response to relayed request
- case M.lookup (refDigest $ dconnSource conn) dpeers of
- Just dp | Just dpeer <- dpPeer dp -> do
+ else do
+ -- response to relayed request
+ streams <- receivedStreams
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) }
+
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
+ Just dp | Just dpeer <- dpPeer dp -> if
+ -- successful tunnel request
+ | dconnTunnel conn
+ , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ , fromTarget : _ <- streams
+ -> liftIO $ do
+ toSourceVar <- newEmptyMVar
+ void $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ liftIO . putMVar toSourceVar =<< openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionResponse conn
+ void $ forkIO $ do
+ relayStream fromSource toTarget
+ void $ forkIO $ do
+ toSource <- readMVar toSourceVar
+ relayStream fromTarget toSource
+
+ -- failed tunnel request
+ | Just ( _, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ -> do
+ liftIO $ closeStream toTarget
+ sendToPeer dpeer $ DiscoveryConnectionResponse conn
+
+ | otherwise -> do
sendToPeer dpeer $ DiscoveryConnectionResponse conn
- _ -> svcPrint $ "Discovery: failed to relay connection response"
+ _ -> svcPrint $ "Discovery: failed to relay connection response"
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
- let addrToText saddr = do
- ( addr, port ) <- IP.fromSockAddr saddr
- Just $ T.pack $ show addr <> " " <> show port
addrs <- concat <$> sequence
- [ catMaybes . map addrToText <$> liftIO (getServerAddresses server)
+ [ catMaybes . map (fmap (uncurry DiscoveryIP) . inetFromSockAddr) <$> liftIO (getServerAddresses server)
#ifdef ENABLE_ICE_SUPPORT
- , return [ T.pack "ICE" ]
+ , return [ DiscoveryICE ]
#endif
]
+ pid <- asks svcPeerIdentity
+ gs <- svcGetGlobal
+ let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid)
+ svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor }
+
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
+ forM_ searchingFor $ \dgst -> do
+ sendToPeer peer $ DiscoverySearch (Right dgst)
+
+#ifdef ENABLE_ICE_SUPPORT
+ serviceStopServer _ _ _ pstates = do
+ forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do
+ mapM_ iceStopThread dpsIceConfig
+#endif
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
+
+
+getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig)
+getIceConfig = do
+ dpsIceConfig <$> svcGet >>= \case
+ Just cfg -> return $ Just cfg
+ Nothing -> do
+#ifdef ENABLE_ICE_SUPPORT
+ stun <- dpsStunServer <$> svcGet
+ turn <- dpsTurnServer <$> svcGet
+ liftIO (iceCreateConfig stun turn) >>= \case
+ Just cfg -> do
+ svcModify $ \s -> s { dpsIceConfig = Just cfg }
+ return $ Just cfg
+ Nothing -> do
+ svcPrint $ "Discovery: failed to create ICE config"
+ return Nothing
+#else
+ return Nothing
+#endif
+
+
+discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m ()
+discoverySearch server dgst = do
+ peers <- liftIO $ getCurrentPeerList server
+ match <- forM peers $ \peer -> do
+ getPeerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ dgst `elem` identityDigests pid
+ _ -> return False
+ when (not $ or match) $ do
+ modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
+ { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s
+ }
+ forM_ peers $ \peer -> do
+ sendToPeer peer $ DiscoverySearch $ Right dgst
+
+
+data TunnelAddress = TunnelAddress
+ { tunnelVia :: Peer
+ , tunnelIdentity :: UnifiedIdentity
+ , tunnelStreamNumber :: Int
+ , tunnelReader :: StreamReader
+ , tunnelWriter :: StreamWriter
+ }
+
+instance Eq TunnelAddress where
+ x == y = (==)
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Ord TunnelAddress where
+ compare x y = compare
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Show TunnelAddress where
+ show tunnel = concat
+ [ "tunnel@"
+ , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel
+ , "/" <> show (tunnelStreamNumber tunnel)
+ ]
+
+instance PeerAddressType TunnelAddress where
+ sendBytesToAddress TunnelAddress {..} bytes = do
+ writeStream tunnelWriter bytes
+
+ connectionToAddressClosed TunnelAddress {..} = do
+ closeStream tunnelWriter
+
+relayStream :: StreamReader -> StreamWriter -> IO ()
+relayStream r w = do
+ p <- readStreamPacket r
+ writeStreamPacket w p
+ case p of
+ StreamClosed {} -> return ()
+ _ -> relayStream r w
+
+receiveFromTunnel :: Server -> TunnelAddress -> IO ()
+receiveFromTunnel server taddr = do
+ p <- readStreamPacket (tunnelReader taddr)
+ case p of
+ StreamData {..} -> do
+ receivedFromCustomAddress server taddr stpData
+ receiveFromTunnel server taddr
+ StreamClosed {} -> do
+ return ()
+
+
+discoverySetupTunnel :: Peer -> RefDigest -> IO ()
+discoverySetupTunnel via target = do
+ runPeerService via $ do
+ discoverySetupTunnelResponse target
+
+discoverySetupTunnelResponse :: RefDigest -> ServiceHandler DiscoveryService ()
+discoverySetupTunnelResponse target = do
+ self <- refDigest . storedRef . idData <$> svcSelf
+ stream <- openStream
+ svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest
+ (emptyConnection (Right self) (Right target))
+ { dconnTunnel = True
+ }
diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs
index ba2607a..1e1a521 100644
--- a/src/Erebos/Flow.hs
+++ b/src/Erebos/Flow.hs
@@ -11,54 +11,53 @@ module Erebos.Flow (
import Control.Concurrent.STM
-data Flow r w = Flow (TMVar [r]) (TMVar [w])
- | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
+data Flow r w
+ = Flow (TBQueue r) (TBQueue w)
+ | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
type SymFlow a = Flow a a
newFlow :: STM (Flow a b, Flow b a)
newFlow = do
- x <- newEmptyTMVar
- y <- newEmptyTMVar
+ x <- newTBQueue 16
+ y <- newTBQueue 16
return (Flow x y, Flow y x)
newFlowIO :: IO (Flow a b, Flow b a)
newFlowIO = atomically newFlow
readFlow :: Flow r w -> STM r
-readFlow (Flow rvar _) = takeTMVar rvar >>= \case
- (x:[]) -> return x
- (x:xs) -> putTMVar rvar xs >> return x
- [] -> error "Flow: empty list"
+readFlow (Flow rvar _) = readTBQueue rvar
readFlow (MappedFlow f _ up) = f <$> readFlow up
tryReadFlow :: Flow r w -> STM (Maybe r)
-tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case
- Just (x:[]) -> return (Just x)
- Just (x:xs) -> putTMVar rvar xs >> return (Just x)
- Just [] -> error "Flow: empty list"
- Nothing -> return Nothing
+tryReadFlow (Flow rvar _) = tryReadTBQueue rvar
tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up
canReadFlow :: Flow r w -> STM Bool
-canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar
+canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar
canReadFlow (MappedFlow _ _ up) = canReadFlow up
writeFlow :: Flow r w -> w -> STM ()
-writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
+writeFlow (Flow _ wvar) = writeTBQueue wvar
writeFlow (MappedFlow _ f up) = writeFlow up . f
writeFlowBulk :: Flow r w -> [w] -> STM ()
writeFlowBulk _ [] = return ()
-writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
+writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs
writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
tryWriteFlow :: Flow r w -> w -> STM Bool
-tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[])
-tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f
+tryWriteFlow (Flow _ wvar) x = do
+ isFullTBQueue wvar >>= \case
+ True -> return False
+ False -> do
+ writeTBQueue wvar x
+ return True
+tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x
canWriteFlow :: Flow r w -> STM Bool
-canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar
+canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar
canWriteFlow (MappedFlow _ _ up) = canWriteFlow up
readFlowIO :: Flow r w -> IO r
diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs
index 2c6f500..a3dd9bc 100644
--- a/src/Erebos/ICE.chs
+++ b/src/Erebos/ICE.chs
@@ -8,6 +8,7 @@ module Erebos.ICE (
IceRemoteInfo,
iceCreateConfig,
+ iceStopThread,
iceCreateSession,
iceDestroy,
iceRemoteInfo,
@@ -15,7 +16,7 @@ module Erebos.ICE (
iceConnect,
iceSend,
- iceSetChan,
+ serverPeerIce,
) where
import Control.Arrow
@@ -31,7 +32,6 @@ import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding qualified as T
import Data.Text.Read qualified as T
-import Data.Void
import Data.Word
import Foreign.C.String
@@ -42,7 +42,7 @@ import Foreign.Marshal.Array
import Foreign.Ptr
import Foreign.StablePtr
-import Erebos.Flow
+import Erebos.Network
import Erebos.Object
import Erebos.Storable
import Erebos.Storage
@@ -52,7 +52,7 @@ import Erebos.Storage
data IceSession = IceSession
{ isStrans :: PjIceStrans
, _isConfig :: IceConfig
- , isChan :: MVar (Either [ByteString] (Flow Void ByteString))
+ , isChan :: MVar (Either [ ByteString ] (ByteString -> IO ()))
}
instance Eq IceSession where
@@ -64,6 +64,10 @@ instance Ord IceSession where
instance Show IceSession where
show _ = "<ICE>"
+instance PeerAddressType IceSession where
+ sendBytesToAddress = iceSend
+ connectionToAddressClosed = iceDestroy
+
data IceRemoteInfo = IceRemoteInfo
{ iriUsernameFrament :: Text
@@ -125,9 +129,9 @@ instance StorableText IceCandidate where
data PjIceStransCfg
newtype IceConfig = IceConfig (ForeignPtr PjIceStransCfg)
-foreign import ccall unsafe "pjproject.h &ice_cfg_free"
+foreign import ccall unsafe "pjproject.h &erebos_ice_cfg_free"
ice_cfg_free :: FunPtr (Ptr PjIceStransCfg -> IO ())
-foreign import ccall unsafe "pjproject.h ice_cfg_create"
+foreign import ccall unsafe "pjproject.h erebos_ice_cfg_create"
ice_cfg_create :: CString -> Word16 -> CString -> Word16 -> IO (Ptr PjIceStransCfg)
iceCreateConfig :: Maybe ( Text, Word16 ) -> Maybe ( Text, Word16 ) -> IO (Maybe IceConfig)
@@ -139,6 +143,12 @@ iceCreateConfig stun turn =
then return Nothing
else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg
+foreign import ccall unsafe "pjproject.h erebos_ice_cfg_stop_thread"
+ ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO ()
+
+iceStopThread :: IceConfig -> IO ()
+iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread
+
{#pointer *pj_ice_strans as ^ #}
iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession
@@ -151,13 +161,13 @@ iceCreateSession icfg@(IceConfig fcfg) role cb = do
forkIO $ cb sess
sess <- IceSession
<$> (withForeignPtr fcfg $ \cfg ->
- {#call ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr)
+ {#call erebos_ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr)
)
<*> pure icfg
<*> (newMVar $ Left [])
return $ sess
-{#fun ice_destroy as ^ { isStrans `IceSession' } -> `()' #}
+{#fun erebos_ice_destroy as iceDestroy { isStrans `IceSession' } -> `()' #}
iceRemoteInfo :: IceSession -> IO IceRemoteInfo
iceRemoteInfo sess = do
@@ -172,7 +182,7 @@ iceRemoteInfo sess = do
let cptrs = take maxcand $ iterate (`plusPtr` maxlen) bytes
pokeArray carr $ take maxcand cptrs
- ncand <- {#call ice_encode_session #} (isStrans sess) ufrag pass def carr (fromIntegral maxlen) (fromIntegral maxcand)
+ ncand <- {#call erebos_ice_encode_session #} (isStrans sess) ufrag pass def carr (fromIntegral maxlen) (fromIntegral maxcand)
if ncand < 0 then fail "failed to generate ICE remote info"
else IceRemoteInfo
<$> (T.pack <$> peekCString ufrag)
@@ -189,13 +199,13 @@ iceShow sess = do
iceConnect :: IceSession -> IceRemoteInfo -> (IO ()) -> IO ()
iceConnect sess remote cb = do
cbptr <- newStablePtr $ cb
- ice_connect sess cbptr
+ erebos_ice_connect sess cbptr
(iriUsernameFrament remote)
(iriPassword remote)
(iriDefaultCandidate remote)
(iriCandidates remote)
-{#fun ice_connect { isStrans `IceSession', castStablePtrToPtr `StablePtr (IO ())',
+{#fun erebos_ice_connect { isStrans `IceSession', castStablePtrToPtr `StablePtr (IO ())',
withText* `Text', withText* `Text', withText* `Text', withTextArray* `[Text]'& } -> `()' #}
withText :: Text -> (Ptr CChar -> IO a) -> IO a
@@ -211,19 +221,19 @@ withTextArray tsAll f = helper tsAll []
withByteStringLen :: Num n => ByteString -> ((Ptr CChar, n) -> IO a) -> IO a
withByteStringLen t f = unsafeUseAsCStringLen t (f . (id *** fromIntegral))
-{#fun ice_send as ^ { isStrans `IceSession', withByteStringLen* `ByteString'& } -> `()' #}
+{#fun erebos_ice_send as iceSend { isStrans `IceSession', withByteStringLen* `ByteString'& } -> `()' #}
foreign export ccall ice_call_cb :: StablePtr (IO ()) -> IO ()
ice_call_cb :: StablePtr (IO ()) -> IO ()
ice_call_cb = join . deRefStablePtr
-iceSetChan :: IceSession -> Flow Void ByteString -> IO ()
-iceSetChan sess chan = do
+iceSetServer :: IceSession -> Server -> IO ()
+iceSetServer sess server = do
modifyMVar_ (isChan sess) $ \orig -> do
case orig of
- Left buf -> mapM_ (writeFlowIO chan) $ reverse buf
+ Left buf -> mapM_ (receivedFromCustomAddress server sess) $ reverse buf
Right _ -> return ()
- return $ Right chan
+ return $ Right $ receivedFromCustomAddress server sess
foreign export ccall ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO ()
ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO ()
@@ -231,5 +241,12 @@ ice_rx_data sptr buf len = do
sess <- deRefStablePtr sptr
bs <- packCStringLen (buf, len)
modifyMVar_ (isChan sess) $ \case
- mc@(Right chan) -> writeFlowIO chan bs >> return mc
- Left bss -> return $ Left (bs:bss)
+ mc@(Right sendToServer) -> sendToServer bs >> return mc
+ Left bss -> return $ Left (bs : bss)
+
+
+serverPeerIce :: Server -> IceSession -> IO Peer
+serverPeerIce server ice = do
+ peer <- serverPeerCustom server ice
+ iceSetServer ice server
+ return peer
diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c
index e79fb9d..8d91eac 100644
--- a/src/Erebos/ICE/pjproject.c
+++ b/src/Erebos/ICE/pjproject.c
@@ -1,6 +1,7 @@
#include "pjproject.h"
#include "Erebos/ICE_stub.h"
+#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -15,6 +16,13 @@ static struct
pj_sockaddr def_addr;
} ice;
+struct erebos_ice_cfg
+{
+ pj_ice_strans_cfg cfg;
+ pj_thread_t * thread;
+ atomic_bool exit;
+};
+
struct user_data
{
pj_ice_sess_role role;
@@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status)
fprintf(stderr, "ICE: %s: %s\n", msg, err);
}
-static int ice_worker_thread(void * vcfg)
+static int ice_worker_thread( void * vcfg )
{
- pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg;
+ struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg );
- while (true) {
+ while( ! ecfg->exit ){
pj_time_val max_timeout = { 0, 0 };
pj_time_val timeout = { 0, 0 };
max_timeout.msec = 500;
- pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout);
+ pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout );
pj_assert(timeout.sec >= 0 && timeout.msec >= 0);
if (timeout.msec >= 1000)
@@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg)
if (PJ_TIME_VAL_GT(timeout, max_timeout))
timeout = max_timeout;
- int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout);
+ int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout );
if (c < 0)
pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
}
@@ -70,7 +78,7 @@ static void cb_on_ice_complete(pj_ice_strans * strans,
{
if (status != PJ_SUCCESS) {
ice_perror("cb_on_ice_complete", status);
- ice_destroy(strans);
+ erebos_ice_destroy(strans);
return;
}
@@ -131,80 +139,91 @@ exit:
pthread_mutex_unlock(&mutex);
}
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * erebos_ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port )
{
ice_init();
- pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) );
- pj_ice_strans_cfg_default( cfg );
+ struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) );
+ pj_ice_strans_cfg_default( &ecfg->cfg );
+ ecfg->exit = false;
+ ecfg->thread = NULL;
- cfg->stun_cfg.pf = &ice.cp.factory;
+ ecfg->cfg.stun_cfg.pf = &ice.cp.factory;
if( pj_timer_heap_create( ice.pool, 100,
- &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){
+ &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){
fprintf( stderr, "pj_timer_heap_create failed\n" );
goto fail;
}
- if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){
+ if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){
fprintf( stderr, "pj_ioqueue_create failed\n" );
goto fail;
}
- pj_thread_t * thread;
if( pj_thread_create( ice.pool, NULL, &ice_worker_thread,
- cfg, 0, 0, &thread ) != PJ_SUCCESS ){
+ ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){
fprintf( stderr, "pj_thread_create failed\n" );
goto fail;
}
- cfg->af = pj_AF_INET();
- cfg->opt.aggressive = PJ_TRUE;
+ ecfg->cfg.af = pj_AF_INET();
+ ecfg->cfg.opt.aggressive = PJ_TRUE;
if( stun_server ){
- cfg->stun.server.ptr = malloc( strlen( stun_server ));
- pj_strcpy2( &cfg->stun.server, stun_server );
+ ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server ));
+ pj_strcpy2( &ecfg->cfg.stun.server, stun_server );
if( stun_port )
- cfg->stun.port = stun_port;
+ ecfg->cfg.stun.port = stun_port;
}
if( turn_server ){
- cfg->turn.server.ptr = malloc( strlen( turn_server ));
- pj_strcpy2( &cfg->turn.server, turn_server );
+ ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server ));
+ pj_strcpy2( &ecfg->cfg.turn.server, turn_server );
if( turn_port )
- cfg->turn.port = turn_port;
- cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
- cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
- cfg->turn.conn_type = PJ_TURN_TP_UDP;
+ ecfg->cfg.turn.port = turn_port;
+ ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
+ ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+ ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP;
}
- return cfg;
+ return ecfg;
fail:
- ice_cfg_free( cfg );
+ erebos_ice_cfg_free( ecfg );
return NULL;
}
-void ice_cfg_free( pj_ice_strans_cfg * cfg )
+void erebos_ice_cfg_free( struct erebos_ice_cfg * ecfg )
{
- if( ! cfg )
+ if( ! ecfg )
return;
- if( cfg->turn.server.ptr )
- free( cfg->turn.server.ptr );
+ ecfg->exit = true;
+ pj_thread_join( ecfg->thread );
- if( cfg->stun.server.ptr )
- free( cfg->stun.server.ptr );
+ if( ecfg->cfg.turn.server.ptr )
+ free( ecfg->cfg.turn.server.ptr );
- if( cfg->stun_cfg.ioqueue )
- pj_ioqueue_destroy( cfg->stun_cfg.ioqueue );
+ if( ecfg->cfg.stun.server.ptr )
+ free( ecfg->cfg.stun.server.ptr );
- if( cfg->stun_cfg.timer_heap )
- pj_timer_heap_destroy( cfg->stun_cfg.timer_heap );
+ if( ecfg->cfg.stun_cfg.ioqueue )
+ pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue );
- free( cfg );
+ if( ecfg->cfg.stun_cfg.timer_heap )
+ pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap );
+
+ free( ecfg );
+}
+
+void erebos_ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg )
+{
+ if( ! ecfg )
+ return;
+ ecfg->exit = true;
}
-pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role,
+pj_ice_strans * erebos_ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb )
{
ice_init();
@@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role
.on_ice_complete = cb_on_ice_complete,
};
- pj_status_t status = pj_ice_strans_create( NULL, cfg, 1,
+ pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1,
udata, &icecb, &res );
if (status != PJ_SUCCESS)
@@ -230,7 +249,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role
return res;
}
-void ice_destroy(pj_ice_strans * strans)
+void erebos_ice_destroy(pj_ice_strans * strans)
{
struct user_data * udata = pj_ice_strans_get_user_data(strans);
if (udata->sptr)
@@ -245,7 +264,7 @@ void ice_destroy(pj_ice_strans * strans)
pj_ice_strans_destroy(strans);
}
-ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass,
+ssize_t erebos_ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass,
char * def, char * candidates[], size_t maxlen, size_t maxcand)
{
int n;
@@ -299,7 +318,7 @@ ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass,
return cand_cnt;
}
-void ice_connect(pj_ice_strans * strans, HsStablePtr cb,
+void erebos_ice_connect(pj_ice_strans * strans, HsStablePtr cb,
const char * ufrag, const char * pass,
const char * defcand, const char * tcandidates[], size_t ncand)
{
@@ -390,7 +409,7 @@ void ice_connect(pj_ice_strans * strans, HsStablePtr cb,
}
}
-void ice_send(pj_ice_strans * strans, const char * data, size_t len)
+void erebos_ice_send(pj_ice_strans * strans, const char * data, size_t len)
{
if (!pj_ice_strans_sess_is_complete(strans)) {
fprintf(stderr, "ICE: negotiation has not been started or is in progress\n");
diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h
index e4fcbdb..7a1b96d 100644
--- a/src/Erebos/ICE/pjproject.h
+++ b/src/Erebos/ICE/pjproject.h
@@ -3,17 +3,18 @@
#include <pjnath.h>
#include <HsFFI.h>
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * erebos_ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port );
-void ice_cfg_free( pj_ice_strans_cfg * cfg );
+void erebos_ice_cfg_free( struct erebos_ice_cfg * cfg );
+void erebos_ice_cfg_stop_thread( struct erebos_ice_cfg * cfg );
-pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role,
+pj_ice_strans * erebos_ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb );
-void ice_destroy(pj_ice_strans * strans);
+void erebos_ice_destroy(pj_ice_strans * strans);
-ssize_t ice_encode_session(pj_ice_strans *, char * ufrag, char * pass,
+ssize_t erebos_ice_encode_session(pj_ice_strans *, char * ufrag, char * pass,
char * def, char * candidates[], size_t maxlen, size_t maxcand);
-void ice_connect(pj_ice_strans * strans, HsStablePtr cb,
+void erebos_ice_connect(pj_ice_strans * strans, HsStablePtr cb,
const char * ufrag, const char * pass,
const char * defcand, const char * candidates[], size_t ncand);
-void ice_send(pj_ice_strans *, const char * data, size_t len);
+void erebos_ice_send(pj_ice_strans *, const char * data, size_t len);
diff --git a/src/Erebos/Identity.hs b/src/Erebos/Identity.hs
index a3f17b5..bd5acb3 100644
--- a/src/Erebos/Identity.hs
+++ b/src/Erebos/Identity.hs
@@ -214,29 +214,33 @@ isExtension x = case fromSigned x of BaseIdentityData {} -> False
_ -> True
-createIdentity :: Storage -> Maybe Text -> Maybe UnifiedIdentity -> IO UnifiedIdentity
-createIdentity st name owner = do
- (secret, public) <- generateKeys st
- (_secretMsg, publicMsg) <- generateKeys st
-
- let signOwner :: Signed a -> ReaderT Storage IO (Signed a)
+createIdentity
+ :: forall m e. (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m)
+ => Maybe Text -> Maybe UnifiedIdentity -> m UnifiedIdentity
+createIdentity name owner = do
+ st <- getStorage
+ ( secret, public ) <- liftIO $ generateKeys st
+ ( _secretMsg, publicMsg ) <- liftIO $ generateKeys st
+
+ let signOwner :: Signed a -> m (Signed a)
signOwner idd
| Just o <- owner = do
- Just ownerSecret <- loadKeyMb (iddKeyIdentity $ fromSigned $ idData o)
+ ownerSecret <- maybe (throwOtherError "failed to load private key") return =<<
+ loadKeyMb (iddKeyIdentity $ fromSigned $ idData o)
signAdd ownerSecret idd
| otherwise = return idd
- Just identity <- flip runReaderT st $ do
- baseData <- mstore =<< signOwner =<< sign secret =<<
- mstore (emptyIdentityData public)
- { iddOwner = idData <$> owner
- , iddKeyMessage = Just publicMsg
- }
- let extOwner = do
- odata <- idExtData <$> owner
- guard $ isExtension odata
- return odata
-
+ baseData <- mstore =<< signOwner =<< sign secret =<<
+ mstore (emptyIdentityData public)
+ { iddOwner = idData <$> owner
+ , iddKeyMessage = Just publicMsg
+ }
+ let extOwner = do
+ odata <- idExtData <$> owner
+ guard $ isExtension odata
+ return odata
+
+ maybe (throwOtherError "created invalid identity") return =<< do
validateExtendedIdentityF . I.Identity <$>
if isJust name || isJust extOwner
then mstore =<< signOwner =<< sign secret =<<
@@ -245,7 +249,6 @@ createIdentity st name owner = do
, ideOwner = extOwner
}
else return $ baseToExtended baseData
- return identity
validateIdentity :: Stored (Signed IdentityData) -> Maybe UnifiedIdentity
validateIdentity = validateIdentityF . I.Identity
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 54658de..6265bbf 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -1,5 +1,3 @@
-{-# LANGUAGE CPP #-}
-
module Erebos.Network (
Server,
startServer,
@@ -10,20 +8,24 @@ module Erebos.Network (
ServerOptions(..), serverIdentity, defaultServerOptions,
Peer, peerServer, peerStorage,
- PeerAddress(..), peerAddress,
- PeerIdentity(..), peerIdentity,
+ PeerAddress(..), getPeerAddress, getPeerAddresses,
+ PeerIdentity(..), getPeerIdentity,
WaitingRef, wrDigest,
Service(..),
+
+ PeerAddressType(..),
+ receivedFromCustomAddress,
+
serverPeer,
-#ifdef ENABLE_ICE_SUPPORT
- serverPeerIce,
-#endif
+ serverPeerCustom,
+ findPeer,
dropPeer,
isPeerDropped,
sendToPeer, sendManyToPeer,
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -36,13 +38,14 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
+import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
import Data.Map (Map)
-import qualified Data.Map as M
+import Data.Map qualified as M
import Data.Maybe
import Data.Typeable
import Data.Word
@@ -56,13 +59,11 @@ import Foreign.Storable as F
import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
-import qualified Network.Socket.ByteString as S
+import Network.Socket.ByteString qualified as S
import Erebos.Error
-#ifdef ENABLE_ICE_SUPPORT
-import Erebos.ICE
-#endif
import Erebos.Identity
+import Erebos.Network.Address
import Erebos.Network.Channel
import Erebos.Network.Protocol
import Erebos.Object.Internal
@@ -114,12 +115,16 @@ getNextPeerChange = atomically . readTChan . serverChanPeer
data ServerOptions = ServerOptions
{ serverPort :: PortNumber
, serverLocalDiscovery :: Bool
+ , serverErrorPrefix :: String
+ , serverTestLog :: Bool
}
defaultServerOptions :: ServerOptions
defaultServerOptions = ServerOptions
{ serverPort = discoveryPort
, serverLocalDiscovery = True
+ , serverErrorPrefix = ""
+ , serverTestLog = False
}
@@ -134,6 +139,14 @@ data Peer = Peer
, peerWaitingRefs :: TMVar [WaitingRef]
}
+-- | Get current main address of the peer (used to send new packets).
+getPeerAddress :: MonadIO m => Peer -> m PeerAddress
+getPeerAddress = liftIO . return . peerAddress
+
+-- | Get all known addresses of given peer.
+getPeerAddresses :: MonadIO m => Peer -> m [ PeerAddress ]
+getPeerAddresses = fmap (: []) . getPeerAddress
+
peerServer :: Peer -> Server
peerServer = peerServer_
@@ -157,50 +170,52 @@ setPeerChannel Peer {..} ch = do
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
-data PeerAddress = DatagramAddress SockAddr
-#ifdef ENABLE_ICE_SUPPORT
- | PeerIceSession IceSession
-#endif
+class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where
+ sendBytesToAddress :: addr -> ByteString -> IO ()
+ connectionToAddressClosed :: addr -> IO ()
+
+data PeerAddress
+ = forall addr. PeerAddressType addr => CustomPeerAddress addr
+ | DatagramAddress SockAddr
instance Show PeerAddress where
- show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of
- Just (IP.IPv6 ipv6, port)
- | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
- -> [show (IP.toIPv4w ipv4), show port]
- Just (addr, port)
- -> [show addr, show port]
- _ -> [show saddr]
-#ifdef ENABLE_ICE_SUPPORT
- show (PeerIceSession ice) = show ice
-#endif
+ show (CustomPeerAddress addr) = show addr
+
+ show (DatagramAddress saddr) =
+ case inetFromSockAddr saddr of
+ Just ( addr, port ) -> unwords [ show addr, show port ]
+ _ -> show saddr
instance Eq PeerAddress where
+ CustomPeerAddress addr == CustomPeerAddress addr'
+ | Just addr'' <- cast addr' = addr == addr''
DatagramAddress addr == DatagramAddress addr' = addr == addr'
-#ifdef ENABLE_ICE_SUPPORT
- PeerIceSession ice == PeerIceSession ice' = ice == ice'
_ == _ = False
-#endif
instance Ord PeerAddress where
+ compare (CustomPeerAddress addr) (CustomPeerAddress addr')
+ | Just addr'' <- cast addr' = compare addr addr''
+ | otherwise = compare (typeOf addr) (typeOf addr')
+ compare (CustomPeerAddress _ ) _ = LT
+ compare _ (CustomPeerAddress _ ) = GT
+
compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr'
-#ifdef ENABLE_ICE_SUPPORT
- compare (DatagramAddress _ ) _ = LT
- compare _ (DatagramAddress _ ) = GT
- compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
-#endif
-data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()])
- | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()])
- | PeerIdentityFull UnifiedIdentity
+data PeerIdentity
+ = PeerIdentityUnknown (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ])
+ | PeerIdentityRef WaitingRef (TVar [ UnifiedIdentity -> ExceptT ErebosError IO () ])
+ | PeerIdentityFull UnifiedIdentity
-peerIdentity :: MonadIO m => Peer -> m PeerIdentity
-peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
+-- | Get currently known identity of the given peer
+getPeerIdentity :: MonadIO m => Peer -> m PeerIdentity
+getPeerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
- | PeerConnected (Connection PeerAddress)
- | PeerDropped
+data PeerState
+ = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
@@ -252,7 +267,16 @@ startServer serverOptions serverOrigHead logd' serverServices = do
let logd = writeTQueue serverErrorLog
forkServerThread server $ forever $ do
- logd' =<< atomically (readTQueue serverErrorLog)
+ logd' . (serverErrorPrefix serverOptions <>) =<< atomically (readTQueue serverErrorLog)
+
+ logt <- if
+ | serverTestLog serverOptions -> do
+ serverTestLog <- newTQueueIO
+ forkServerThread server $ forever $ do
+ logd' =<< atomically (readTQueue serverTestLog)
+ return $ writeTQueue serverTestLog
+ | otherwise -> do
+ return $ \_ -> return ()
forkServerThread server $ dataResponseWorker server
forkServerThread server $ forever $ do
@@ -302,13 +326,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -316,12 +345,10 @@ startServer serverOptions serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
case paddr of
+ CustomPeerAddress addr -> sendBytesToAddress addr msg
DatagramAddress addr -> void $ S.sendTo sock msg addr
-#ifdef ENABLE_ICE_SUPPORT
- PeerIceSession ice -> iceSend ice msg
-#endif
forkServerThread server $ forever $ do
readFlowIO serverControlFlow >>= \case
@@ -363,9 +390,13 @@ startServer serverOptions serverOrigHead logd' serverServices = do
prefs <- forM objs $ storeObject $ peerInStorage peer
identity <- readMVar serverIdentity_
let svcs = map someServiceID serverServices
- handlePacket identity secure peer chanSvc svcs header prefs
+ handlePacket paddr identity secure peer chanSvc svcs header prefs
peerLoop
Nothing -> do
+ case paddr of
+ DatagramAddress _ -> return ()
+ CustomPeerAddress caddr -> connectionToAddressClosed caddr
+
dropPeer peer
atomically $ writeTChan serverChanPeer peer
peerLoop
@@ -373,7 +404,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
ReceivedAnnounce addr _ -> do
void $ serverPeer' server addr
- erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow
+ erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd logt protocolRawPath protocolControlFlow
forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
@@ -385,16 +416,29 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, paddr, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams paddr peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
stopServer :: Server -> IO ()
-stopServer Server {..} = do
- mapM_ killThread =<< takeMVar serverThreads
+stopServer server@Server {..} = do
+ withMVar serverPeers $ \peers -> do
+ ( global, peerStates ) <- atomically $ (,)
+ <$> takeTMVar serverServiceStates
+ <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState)
+
+ forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do
+ ps <- forM peerStates $ \( peer, states ) ->
+ return $ ( peer, ) $ case M.lookup (serviceID proxy) states of
+ Just (SomeServiceState (_ :: Proxy ps) pstate)
+ | Just (Refl :: s :~: ps) <- eqT
+ -> pstate
+ _ -> emptyServiceState proxy
+ serviceStopServer proxy server gs ps
+ mapM_ killThread =<< takeMVar serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -502,9 +546,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -523,10 +565,10 @@ appendDistinct x (y:ys) | x == y = y : ys
| otherwise = y : appendDistinct x ys
appendDistinct x [] = [x]
-handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
-handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
+handlePacket :: PeerAddress -> UnifiedIdentity -> Bool
+ -> Peer -> TQueue ( Peer, PeerAddress, ServiceID, Ref, [ RawStreamReader ] ) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
+handlePacket paddr identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
let sidentity = idData identity
@@ -659,10 +701,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, paddr, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
@@ -741,7 +784,7 @@ finalizedChannel peer@Peer {..} ch self = do
-- Notify services about new peer
readTVar peerIdentityVar >>= \case
- PeerIdentityFull _ -> notifyServicesOfPeer peer
+ PeerIdentityFull _ -> notifyServicesOfPeer True peer
_ -> return ()
@@ -767,7 +810,7 @@ handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
PeerIdentityFull pid
| idData pid `precedes` wrappedLoad ref
-> validateAndUpdate (idUpdates pid) $ \_ -> do
- notifyServicesOfPeer peer
+ notifyServicesOfPeer False peer
_ -> return ()
@@ -779,16 +822,23 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do
-> do
writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid'
writeTChan (serverChanPeer $ peerServer peer) peer
- when (idData pid /= idData pid') $ notifyServicesOfPeer peer
+ when (pid /= pid') $ do
+ notifyServicesOfPeer False peer
| otherwise -> return ()
-notifyServicesOfPeer :: Peer -> STM ()
-notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
+notifyServicesOfPeer :: Bool -> Peer -> STM ()
+notifyServicesOfPeer new peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
+ paddr <- getPeerAddress peer
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] paddr peer $
+ if new then serviceNewPeer else serviceUpdatedPeer
+
+receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
+receivedFromCustomAddress Server {..} addr msg = do
+ writeFlowIO serverRawPath ( CustomPeerAddress addr, msg )
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer peerServer_ peerAddress = do
@@ -808,14 +858,8 @@ serverPeer server paddr = do
_ -> paddr
serverPeer' server (DatagramAddress paddr')
-#ifdef ENABLE_ICE_SUPPORT
-serverPeerIce :: Server -> IceSession -> IO Peer
-serverPeerIce server@Server {..} ice = do
- let paddr = PeerIceSession ice
- peer <- serverPeer' server paddr
- iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath
- return peer
-#endif
+serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer
+serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr)
serverPeer' :: Server -> PeerAddress -> IO Peer
serverPeer' server paddr = do
@@ -829,6 +873,13 @@ serverPeer' server paddr = do
writeFlow (serverControlFlow server) (RequestConnection paddr)
return peer
+findPeer :: Server -> (Peer -> IO Bool) -> IO (Maybe Peer)
+findPeer server test = withMVar (serverPeers server) (helper . M.elems)
+ where
+ helper (p : ps) = test p >>= \case True -> return (Just p)
+ False -> helper ps
+ helper [] = return Nothing
+
dropPeer :: MonadIO m => Peer -> m ()
dropPeer peer = liftIO $ do
modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do
@@ -856,19 +907,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -901,17 +982,19 @@ sendToPeerWith peer fobj = do
Left err -> throwError $ fromErebosError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService peer handler = do
+ paddr <- getPeerAddress peer
+ runPeerServiceOn Nothing [] paddr peer handler
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> PeerAddress -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams paddr peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -933,9 +1016,11 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
let inp = ServiceInput
{ svcAttributes = attr
, svcPeer = peer
+ , svcPeerAddress = paddr
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do
@@ -951,35 +1036,37 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs
putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global
_ -> do
- atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer)
+ atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show paddr
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
-foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
-foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
-foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
-foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO ()
-
-data InetAddress = InetAddress { fromInetAddress :: IP.IP }
-
-instance F.Storable InetAddress where
- sizeOf _ = sizeOf (undefined :: CInt) + 16
- alignment _ = 8
-
- peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case
- AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt))
- AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8)
- _ -> fail "InetAddress: unknown family"
- poke ptr (InetAddress addr) = case addr of
- IP.IPv4 ip -> do
- pokeByteOff ptr 0 (packFamily AF_INET)
- pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip)
- IP.IPv6 ip -> do
- pokeByteOff ptr 0 (packFamily AF_INET6)
- pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip)
+foreign import ccall unsafe "Network/ifaddrs.h erebos_join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
+foreign import ccall unsafe "Network/ifaddrs.h erebos_local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
+foreign import ccall unsafe "Network/ifaddrs.h erebos_broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
+foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO ()
joinMulticast :: Socket -> IO [ Word32 ]
joinMulticast sock =
@@ -1007,7 +1094,7 @@ getServerAddresses Server {..} = do
count <- fromIntegral <$> peek pcount
res <- peekArray count ptr
cFree ptr
- return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res
+ return $ map (inetToSockAddr . (, serverPort serverOptions )) res
getBroadcastAddresses :: PortNumber -> IO [SockAddr]
getBroadcastAddresses port = do
diff --git a/src/Erebos/Network.hs-boot b/src/Erebos/Network.hs-boot
index af77581..17a5275 100644
--- a/src/Erebos/Network.hs-boot
+++ b/src/Erebos/Network.hs-boot
@@ -4,5 +4,6 @@ import Erebos.Object.Internal
data Server
data Peer
+data PeerAddress
peerStorage :: Peer -> Storage
diff --git a/src/Erebos/Network/Address.hs b/src/Erebos/Network/Address.hs
new file mode 100644
index 0000000..63f6af1
--- /dev/null
+++ b/src/Erebos/Network/Address.hs
@@ -0,0 +1,65 @@
+module Erebos.Network.Address (
+ InetAddress(..),
+ inetFromSockAddr,
+ inetToSockAddr,
+
+ SockAddr, PortNumber,
+) where
+
+import Data.Bifunctor
+import Data.IP qualified as IP
+import Data.Word
+
+import Foreign.C.Types
+import Foreign.Marshal.Array
+import Foreign.Ptr
+import Foreign.Storable as F
+
+import Network.Socket
+
+import Text.Read
+
+
+newtype InetAddress = InetAddress { fromInetAddress :: IP.IP }
+ deriving (Eq, Ord)
+
+instance Show InetAddress where
+ show (InetAddress ipaddr)
+ | IP.IPv6 ipv6 <- ipaddr
+ , ( 0, 0, 0xffff, ipv4 ) <- IP.fromIPv6w ipv6
+ = show (IP.toIPv4w ipv4)
+
+ | otherwise
+ = show ipaddr
+
+instance Read InetAddress where
+ readPrec = do
+ readPrec >>= return . InetAddress . \case
+ IP.IPv4 ipv4 -> IP.IPv6 $ IP.toIPv6w ( 0, 0, 0xffff, IP.fromIPv4w ipv4 )
+ ipaddr -> ipaddr
+
+ readListPrec = readListPrecDefault
+
+instance F.Storable InetAddress where
+ sizeOf _ = sizeOf (undefined :: CInt) + 16
+ alignment _ = 8
+
+ peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case
+ AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt))
+ AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8)
+ _ -> fail "InetAddress: unknown family"
+
+ poke ptr (InetAddress addr) = case addr of
+ IP.IPv4 ip -> do
+ pokeByteOff ptr 0 (packFamily AF_INET)
+ pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip)
+ IP.IPv6 ip -> do
+ pokeByteOff ptr 0 (packFamily AF_INET6)
+ pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip)
+
+
+inetFromSockAddr :: SockAddr -> Maybe ( InetAddress, PortNumber )
+inetFromSockAddr saddr = first InetAddress <$> IP.fromSockAddr saddr
+
+inetToSockAddr :: ( InetAddress, PortNumber ) -> SockAddr
+inetToSockAddr = IP.toSockAddr . first fromInetAddress
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index c340503..f67e296 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -3,6 +3,7 @@ module Erebos.Network.Protocol (
transportToObject,
TransportHeader(..),
TransportHeaderItem(..),
+ ServiceID(..),
SecurityRequirement(..),
WaitingRef(..),
@@ -22,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -36,6 +38,7 @@ import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
+import Control.Exception
import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans
@@ -68,9 +71,9 @@ import Erebos.Flow
import Erebos.Identity
import Erebos.Network.Channel
import Erebos.Object
-import Erebos.Service
import Erebos.Storable
import Erebos.Storage
+import Erebos.UUID (UUID)
protocolVersion :: Text
@@ -107,6 +110,9 @@ data TransportHeaderItem
| StreamOpen Word8
deriving (Eq, Show)
+newtype ServiceID = ServiceID UUID
+ deriving (Eq, Ord, Show, StorableUUID)
+
newtype Cookie = Cookie ByteString
deriving (Eq, Show)
@@ -207,6 +213,7 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
, gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr)
, gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))
, gLog :: String -> STM ()
+ , gTestLog :: String -> STM ()
, gStorage :: PartialStorage
, gStartTime :: TimeSpec
, gNowVar :: TVar TimeSpec
@@ -243,6 +250,12 @@ instance Eq (Connection addr) where
connAddress :: Connection addr -> addr
connAddress = cAddress
+showConnAddress :: forall addr. Connection addr -> String
+showConnAddress Connection {..} = helper cGlobalState cAddress
+ where
+ helper :: GlobalState addr -> addr -> String
+ helper GlobalState {} = show
+
connData :: Connection addr -> Flow
(Maybe (Bool, TransportPacket PartialObject))
(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])
@@ -267,6 +280,7 @@ connClose conn@Connection {..} = do
connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ()))
connAddWriteStream conn@Connection {..} = do
+ let GlobalState {..} = cGlobalState
outStreams <- readTVar cOutStreams
let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)])
doInsert n (s@(n', _) : rest) | n == n' =
@@ -283,10 +297,16 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ lift $ gTestLog $ "net-ostream-open " <> showConnAddress conn <> " " <> show streamNumber <> " " <> show (length outStreams')
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go streamNumber stream
+ )
where
- go gs@GlobalState {..} streamNumber stream = do
+ go streamNumber stream = do
+ let GlobalState {..} = cGlobalState
(reserved, msg) <- atomically $ do
readTVar (sState stream) >>= \case
StreamRunning -> return ()
@@ -299,6 +319,8 @@ connAddWriteStream conn@Connection {..} = do
return (stpData, True, return ())
StreamClosed {} -> do
atomically $ do
+ gTestLog $ "net-ostream-close-send " <> showConnAddress conn <> " " <> show streamNumber
+ atomically $ do
-- wait for ack on all sent stream data
waits <- readTVar (sWaitingForAck stream)
when (waits > 0) retry
@@ -342,7 +364,7 @@ connAddWriteStream conn@Connection {..} = do
sendBytes conn mbReserved' bs
Nothing -> return ()
- when cont $ go gs streamNumber stream
+ when cont $ go streamNumber stream
connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader
connAddReadStream Connection {..} streamNumber = do
@@ -356,14 +378,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
+
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -394,11 +423,13 @@ streamAccepted Connection {..} snum = atomically $ do
Nothing -> return ()
streamClosed :: Connection addr -> Word8 -> IO ()
-streamClosed Connection {..} snum = atomically $ do
- modifyTVar' cOutStreams $ filter ((snum /=) . fst)
+streamClosed conn@Connection {..} snum = atomically $ do
+ streams <- filter ((snum /=) . fst) <$> readTVar cOutStreams
+ writeTVar cOutStreams streams
+ gTestLog cGlobalState $ "net-ostream-close-ack " <> showConnAddress conn <> " " <> show snum <> " " <> show (length streams)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
@@ -420,10 +451,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
@@ -477,10 +508,11 @@ data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest)
erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
=> UnifiedIdentity
-> (String -> STM ())
+ -> (String -> STM ())
-> SymFlow (addr, ByteString)
-> Flow (ControlRequest addr) (ControlMessage addr)
-> IO ()
-erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
+erebosNetworkProtocol initialIdentity gLog gTestLog gDataFlow gControlFlow = do
gIdentity <- newTVarIO (initialIdentity, [])
gConnections <- newTVarIO []
gNextUp <- newEmptyTMVarIO
@@ -512,8 +544,10 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
race_ (waitTill next) waitForUpdate
- race_ signalTimeouts $ forever $ join $ atomically $
- passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+ race_ signalTimeouts $ forever $ do
+ io <- atomically $ do
+ passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+ catch io $ \(e :: SomeException) -> atomically $ gLog $ "exception during network protocol handling: " <> show e
getConnection :: GlobalState addr -> addr -> STM (Connection addr)
@@ -542,6 +576,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
cOutStreams <- newTVar []
let conn = Connection {..}
+ gTestLog $ "net-conn-new " <> show cAddress
writeTVar gConnections (conn : conns)
return conn
@@ -898,7 +933,10 @@ processOutgoing gs@GlobalState {..} = do
, rsOnAck = rsOnAck rs >> onAck
}) <$> mbReserved
sendBytes conn mbReserved' bs
- Nothing -> return ()
+ Nothing -> do
+ when (isJust mbReserved) $ do
+ atomically $ do
+ modifyTVar' cReservedPackets (subtract 1)
let waitUntil :: TimeSpec -> TimeSpec -> STM ()
waitUntil now till = do
diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c
index ff4382a..8139b5e 100644
--- a/src/Erebos/Network/ifaddrs.c
+++ b/src/Erebos/Network/ifaddrs.c
@@ -22,7 +22,7 @@
#define DISCOVERY_MULTICAST_GROUP "ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1"
-uint32_t * join_multicast(int fd, size_t * count)
+uint32_t * erebos_join_multicast(int fd, size_t * count)
{
size_t capacity = 16;
*count = 0;
@@ -117,7 +117,7 @@ static bool copy_local_address( struct InetAddress * dst, const struct sockaddr
#ifndef _WIN32
-struct InetAddress * local_addresses( size_t * count )
+struct InetAddress * erebos_local_addresses( size_t * count )
{
struct ifaddrs * addrs;
if( getifaddrs( &addrs ) < 0 )
@@ -153,7 +153,7 @@ struct InetAddress * local_addresses( size_t * count )
return ret;
}
-uint32_t * broadcast_addresses(void)
+uint32_t * erebos_broadcast_addresses(void)
{
struct ifaddrs * addrs;
if (getifaddrs(&addrs) < 0)
@@ -196,7 +196,7 @@ uint32_t * broadcast_addresses(void)
#pragma comment(lib, "ws2_32.lib")
-struct InetAddress * local_addresses( size_t * count )
+struct InetAddress * erebos_local_addresses( size_t * count )
{
* count = 0;
struct InetAddress * ret = NULL;
@@ -237,7 +237,7 @@ cleanup:
return ret;
}
-uint32_t * broadcast_addresses(void)
+uint32_t * erebos_broadcast_addresses(void)
{
uint32_t * ret = NULL;
SOCKET wsock = INVALID_SOCKET;
diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h
index 2ee45a7..2b3c014 100644
--- a/src/Erebos/Network/ifaddrs.h
+++ b/src/Erebos/Network/ifaddrs.h
@@ -13,6 +13,6 @@ struct InetAddress
uint8_t addr[16];
} __attribute__((packed));
-uint32_t * join_multicast(int fd, size_t * count);
-struct InetAddress * local_addresses( size_t * count );
-uint32_t * broadcast_addresses(void);
+uint32_t * erebos_join_multicast(int fd, size_t * count);
+struct InetAddress * erebos_local_addresses( size_t * count );
+uint32_t * erebos_broadcast_addresses(void);
diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs
index 26ca09f..f00b63d 100644
--- a/src/Erebos/Object.hs
+++ b/src/Erebos/Object.hs
@@ -13,8 +13,9 @@ module Erebos.Object (
RecItem, RecItem'(..),
Ref, PartialRef, RefDigest,
- refDigest,
- readRef, showRef, showRefDigest,
+ refDigest, refFromDigest,
+ readRef, showRef,
+ readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
) where
diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs
index 6111d2a..fdb587a 100644
--- a/src/Erebos/Object/Internal.hs
+++ b/src/Erebos/Object/Internal.hs
@@ -2,8 +2,9 @@ module Erebos.Object.Internal (
Storage, PartialStorage, StorageCompleteness,
Ref, PartialRef, RefDigest,
- refDigest,
- readRef, showRef, showRefDigest,
+ refDigest, refFromDigest,
+ readRef, showRef,
+ readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
@@ -74,13 +75,14 @@ import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Format
import Data.Time.LocalTime
-import Data.UUID (UUID)
-import qualified Data.UUID as U
import System.IO.Unsafe
import Erebos.Error
import Erebos.Storage.Internal
+import Erebos.UUID (UUID)
+import Erebos.UUID qualified as U
+import Erebos.Util
zeroRef :: Storage' c -> Ref' c
@@ -701,8 +703,6 @@ loadRawWeaks name = mapMaybe p <$> loadRecItems
-type Stored a = Stored' Complete a
-
instance Storable a => Storable (Stored a) where
store st = copyRef st . storedRef
store' (Stored _ x) = store' x
@@ -712,10 +712,10 @@ instance ZeroStorable a => ZeroStorable (Stored a) where
fromZero st = Stored (zeroRef st) $ fromZero st
fromStored :: Stored a -> a
-fromStored (Stored _ x) = x
+fromStored = storedObject'
storedRef :: Stored a -> Ref
-storedRef (Stored ref _) = ref
+storedRef = storedRef'
wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a)
wrappedStore st x = do ref <- liftIO $ store st x
@@ -724,9 +724,8 @@ wrappedStore st x = do ref <- liftIO $ store st x
wrappedLoad :: Storable a => Ref -> Stored a
wrappedLoad ref = Stored ref (load ref)
-copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) =>
- Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a))
-copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref'
+copyStored :: forall m a. MonadIO m => Storage -> Stored a -> m (Stored a)
+copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (\r -> Stored r x) <$> copyRef' st ref'
-- |Passed function needs to preserve the object representation to be safe
unsafeMapStored :: (a -> b) -> Stored a -> Stored b
diff --git a/src/Erebos/Pairing.hs b/src/Erebos/Pairing.hs
index 703afcd..d1fdc79 100644
--- a/src/Erebos/Pairing.hs
+++ b/src/Erebos/Pairing.hs
@@ -17,9 +17,10 @@ import Control.Monad.Reader
import Crypto.Random
import Data.Bits
-import Data.ByteArray (Bytes, convert)
-import qualified Data.ByteArray as BA
-import qualified Data.ByteString.Char8 as BC
+import Data.ByteArray qualified as BA
+import Data.ByteString (ByteString)
+import Data.ByteString qualified as BS
+import Data.ByteString.Char8 qualified as BC
import Data.Kind
import Data.Maybe
import Data.Typeable
@@ -34,16 +35,16 @@ import Erebos.State
import Erebos.Storable
data PairingService a = PairingRequest (Stored (Signed IdentityData)) (Stored (Signed IdentityData)) RefDigest
- | PairingResponse Bytes
- | PairingRequestNonce Bytes
+ | PairingResponse ByteString
+ | PairingRequestNonce ByteString
| PairingAccept a
| PairingReject
data PairingState a = NoPairing
- | OurRequest UnifiedIdentity UnifiedIdentity Bytes
+ | OurRequest UnifiedIdentity UnifiedIdentity ByteString
| OurRequestConfirm (Maybe (PairingVerifiedResult a))
| OurRequestReady
- | PeerRequest UnifiedIdentity UnifiedIdentity Bytes RefDigest
+ | PeerRequest UnifiedIdentity UnifiedIdentity ByteString RefDigest
| PeerRequestConfirm
| PairingDone
@@ -88,7 +89,7 @@ instance Storable a => Storable (PairingService a) where
load' = do
res <- loadRec $ do
- (req :: Maybe Bytes) <- loadMbBinary "request"
+ (req :: Maybe ByteString) <- loadMbBinary "request"
idReq <- loadMbRef "id-req"
idRsp <- loadMbRef "id-rsp"
rsp <- loadMbBinary "response"
@@ -171,7 +172,7 @@ instance PairingResult a => Service (PairingService a) where
x@(OurRequestReady, _) -> reject $ uncurry PairingUnexpectedMessage x
(PeerRequest peer self nonce dgst, PairingRequestNonce pnonce) -> do
- if dgst == nonceDigest peer self pnonce BA.empty
+ if dgst == nonceDigest peer self pnonce BS.empty
then do hook <- asks $ pairingHookRequestNonce . svcAttributes
hook $ confirmationNumber $ nonceDigest peer self pnonce nonce
svcSet PeerRequestConfirm
@@ -188,12 +189,12 @@ reject reason = do
replyPacket PairingReject
-nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> Bytes -> Bytes -> RefDigest
+nonceDigest :: UnifiedIdentity -> UnifiedIdentity -> ByteString -> ByteString -> RefDigest
nonceDigest idReq idRsp nonceReq nonceRsp = hashToRefDigest $ serializeObject $ Rec
[ (BC.pack "id-req", RecRef $ storedRef $ idData idReq)
, (BC.pack "id-rsp", RecRef $ storedRef $ idData idRsp)
- , (BC.pack "nonce-req", RecBinary $ convert nonceReq)
- , (BC.pack "nonce-rsp", RecBinary $ convert nonceRsp)
+ , (BC.pack "nonce-req", RecBinary nonceReq)
+ , (BC.pack "nonce-rsp", RecBinary nonceRsp)
]
confirmationNumber :: RefDigest -> String
@@ -208,11 +209,11 @@ pairingRequest :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e
pairingRequest _ peer = do
self <- liftIO $ serverIdentity $ peerServer peer
nonce <- liftIO $ getRandomBytes 32
- pid <- peerIdentity peer >>= \case
+ pid <- getPeerIdentity peer >>= \case
PeerIdentityFull pid -> return pid
_ -> throwOtherError "incomplete peer identity"
sendToPeerWith @(PairingService a) peer $ \case
- NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BA.empty), OurRequest self pid nonce)
+ NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BS.empty), OurRequest self pid nonce)
_ -> throwOtherError "already in progress"
pairingAccept :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m ()
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index e95e700..303f9db 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -29,14 +29,14 @@ import Control.Monad.Writer
import Data.Kind
import Data.Typeable
-import Data.UUID (UUID)
-import qualified Data.UUID as U
import Erebos.Identity
import {-# SOURCE #-} Erebos.Network
+import Erebos.Network.Protocol
import Erebos.State
import Erebos.Storable
import Erebos.Storage.Head
+import Erebos.UUID qualified as U
class (
Typeable s, Storable s,
@@ -51,6 +51,9 @@ class (
serviceNewPeer :: ServiceHandler s ()
serviceNewPeer = return ()
+ serviceUpdatedPeer :: ServiceHandler s ()
+ serviceUpdatedPeer = return ()
+
type ServiceAttributes s = attr | attr -> s
type ServiceAttributes s = Proxy s
defaultServiceAttributes :: proxy s -> ServiceAttributes s
@@ -72,6 +75,9 @@ class (
serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s]
serviceStorageWatchers _ = []
+ serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO ()
+ serviceStopServer _ _ _ _ = return ()
+
data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s)
@@ -101,11 +107,10 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p)
-data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
-
+data SomeStorageWatcher s
+ = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+ | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT ErebosError IO ())
-newtype ServiceID = ServiceID UUID
- deriving (Eq, Ord, Show, StorableUUID)
mkServiceID :: String -> ServiceID
mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString
@@ -113,13 +118,17 @@ mkServiceID = maybe (error "Invalid service ID") ServiceID . U.fromString
data ServiceInput s = ServiceInput
{ svcAttributes :: ServiceAttributes s
, svcPeer :: Peer
+ , svcPeerAddress :: PeerAddress
, svcPeerIdentity :: UnifiedIdentity
, svcServer :: Server
, svcPrintOp :: String -> IO ()
+ , svcNewStreams :: [ RawStreamReader ]
}
-data ServiceReply s = ServiceReply (Either s (Stored s)) Bool
- | ServiceFinally (IO ())
+data ServiceReply s
+ = ServiceReply (Either s (Stored s)) Bool
+ | ServiceOpenStream (RawStreamWriter -> IO ())
+ | ServiceFinally (IO ())
data ServiceHandlerState s = ServiceHandlerState
{ svcValue :: ServiceState s
diff --git a/src/Erebos/Service/Stream.hs b/src/Erebos/Service/Stream.hs
new file mode 100644
index 0000000..67df4d7
--- /dev/null
+++ b/src/Erebos/Service/Stream.hs
@@ -0,0 +1,74 @@
+module Erebos.Service.Stream (
+ StreamPacket(..),
+ StreamReader, getStreamReaderNumber,
+ StreamWriter, getStreamWriterNumber,
+ openStream, receivedStreams,
+ readStreamPacket, writeStreamPacket,
+ writeStream,
+ closeStream,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.Reader
+import Control.Monad.Writer
+
+import Data.ByteString (ByteString)
+import Data.Word
+
+import Erebos.Flow
+import Erebos.Network
+import Erebos.Network.Protocol
+import Erebos.Service
+
+
+data StreamReader = StreamReader RawStreamReader
+
+getStreamReaderNumber :: StreamReader -> IO Int
+getStreamReaderNumber (StreamReader stream) = return $ rsrNum stream
+
+data StreamWriter = StreamWriter (MVar StreamWriterData)
+
+data StreamWriterData = StreamWriterData
+ { swdStream :: RawStreamWriter
+ , swdSequence :: Maybe Word64
+ }
+
+getStreamWriterNumber :: StreamWriter -> IO Int
+getStreamWriterNumber (StreamWriter stream) = rswNum . swdStream <$> readMVar stream
+
+
+openStream :: Service s => ServiceHandler s StreamWriter
+openStream = do
+ mvar <- liftIO newEmptyMVar
+ tell [ ServiceOpenStream $ \stream -> putMVar mvar $ StreamWriterData stream (Just 0) ]
+ return $ StreamWriter mvar
+
+receivedStreams :: Service s => ServiceHandler s [ StreamReader ]
+receivedStreams = do
+ map StreamReader <$> asks svcNewStreams
+
+readStreamPacket :: StreamReader -> IO StreamPacket
+readStreamPacket (StreamReader stream) = do
+ readFlowIO (rsrFlow stream)
+
+writeStreamPacket :: StreamWriter -> StreamPacket -> IO ()
+writeStreamPacket (StreamWriter mvar) packet = do
+ withMVar mvar $ \swd -> do
+ writeFlowIO (rswFlow $ swdStream swd) packet
+
+writeStream :: StreamWriter -> ByteString -> IO ()
+writeStream (StreamWriter mvar) bytes = do
+ modifyMVar_ mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> do
+ writeFlowIO (rswFlow $ swdStream swd) $ StreamData seqNum bytes
+ return swd { swdSequence = Just (seqNum + 1) }
+ Nothing -> do
+ fail "writeStream: stream closed"
+
+closeStream :: StreamWriter -> IO ()
+closeStream (StreamWriter mvar) = do
+ withMVar mvar $ \swd -> do
+ case swdSequence swd of
+ Just seqNum -> writeFlowIO (rswFlow $ swdStream swd) $ StreamClosed seqNum
+ Nothing -> fail "closeStream: stream already closed"
diff --git a/src/Erebos/Set.hs b/src/Erebos/Set.hs
index 270c0ba..7453be4 100644
--- a/src/Erebos/Set.hs
+++ b/src/Erebos/Set.hs
@@ -10,7 +10,6 @@ module Erebos.Set (
) where
import Control.Arrow
-import Control.Monad.IO.Class
import Data.Function
import Data.List
@@ -53,14 +52,14 @@ emptySet = Set []
loadSet :: Mergeable a => Ref -> Set a
loadSet = mergeSorted . (:[]) . wrappedLoad
-storeSetAdd :: (Mergeable a, MonadIO m) => Storage -> a -> Set a -> m (Set a)
-storeSetAdd st x (Set prev) = Set . (:[]) <$> wrappedStore st SetItem
+storeSetAdd :: (Mergeable a, MonadStorage m) => a -> Set a -> m (Set a)
+storeSetAdd x (Set prev) = Set . (: []) <$> mstore SetItem
{ siPrev = prev
, siItem = toComponents x
}
-storeSetAddComponent :: (Mergeable a, MonadStorage m, MonadIO m) => Stored (Component a) -> Set a -> m (Set a)
-storeSetAddComponent component (Set prev) = Set . (:[]) <$> mstore SetItem
+storeSetAddComponent :: (Mergeable a, MonadStorage m) => Stored (Component a) -> Set a -> m (Set a)
+storeSetAddComponent component (Set prev) = Set . (: []) <$> mstore SetItem
{ siPrev = prev
, siItem = [ component ]
}
diff --git a/src/Erebos/State.hs b/src/Erebos/State.hs
index 5ce9952..06e5c54 100644
--- a/src/Erebos/State.hs
+++ b/src/Erebos/State.hs
@@ -6,6 +6,7 @@ module Erebos.State (
MonadStorage(..),
MonadHead(..),
updateLocalHead_,
+ LocalHeadT(..),
updateLocalState, updateLocalState_,
updateSharedState, updateSharedState_,
@@ -17,14 +18,14 @@ module Erebos.State (
mergeSharedIdentity,
) where
+import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
+import Data.Bifunctor
import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.Typeable
-import Data.UUID (UUID)
-import Data.UUID qualified as U
import Erebos.Identity
import Erebos.Object
@@ -32,6 +33,8 @@ import Erebos.PubKey
import Erebos.Storable
import Erebos.Storage.Head
import Erebos.Storage.Merge
+import Erebos.UUID (UUID)
+import Erebos.UUID qualified as U
data LocalState = LocalState
{ lsPrev :: Maybe RefDigest
@@ -66,7 +69,7 @@ instance Storable LocalState where
lsPrev <- loadMbRawWeak "PREV"
lsIdentity <- loadRef "id"
lsShared <- loadRefs "shared"
- lsOther <- filter ((`notElem` [ BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems
+ lsOther <- filter ((`notElem` [ BC.pack "PREV", BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems
return LocalState {..}
instance HeadType LocalState where
@@ -101,6 +104,35 @@ instance (HeadType a, MonadIO m) => MonadHead a (ReaderT (Head a) m) where
snd <$> updateHead h f
+newtype LocalHeadT h m a = LocalHeadT { runLocalHeadT :: Storage -> Stored h -> m ( a, Stored h ) }
+
+instance Functor m => Functor (LocalHeadT h m) where
+ fmap f (LocalHeadT act) = LocalHeadT $ \st h -> first f <$> act st h
+
+instance Monad m => Applicative (LocalHeadT h m) where
+ pure x = LocalHeadT $ \_ h -> pure ( x, h )
+ (<*>) = ap
+
+instance Monad m => Monad (LocalHeadT h m) where
+ return = pure
+ LocalHeadT act >>= f = LocalHeadT $ \st h -> do
+ ( x, h' ) <- act st h
+ let (LocalHeadT act') = f x
+ act' st h'
+
+instance MonadIO m => MonadIO (LocalHeadT h m) where
+ liftIO act = LocalHeadT $ \_ h -> ( , h ) <$> liftIO act
+
+instance MonadIO m => MonadStorage (LocalHeadT h m) where
+ getStorage = LocalHeadT $ \st h -> return ( st, h )
+
+instance (HeadType h, MonadIO m) => MonadHead h (LocalHeadT h m) where
+ updateLocalHead f = LocalHeadT $ \st h -> do
+ let LocalHeadT act = f h
+ ( ( h', x ), _ ) <- act st h
+ return ( x, h' )
+
+
localIdentity :: LocalState -> UnifiedIdentity
localIdentity ls = maybe (error "failed to verify local identity")
(updateOwners $ maybe [] idExtDataF $ lookupSharedValue $ lsShared ls)
@@ -128,12 +160,11 @@ updateSharedState :: forall a b m. (SharedType a, MonadHead LocalState m) => (a
updateSharedState f = \ls -> do
let shared = lsShared $ fromStored ls
val = lookupSharedValue shared
- st <- getStorage
(val', x) <- f val
(,x) <$> if toComponents val' == toComponents val
then return ls
- else do shared' <- makeSharedStateUpdate st val' shared
- wrappedStore st (fromStored ls) { lsShared = [shared'] }
+ else do shared' <- makeSharedStateUpdate val' shared
+ mstore (fromStored ls) { lsShared = [shared'] }
lookupSharedValue :: forall a. SharedType a => [Stored SharedState] -> a
lookupSharedValue = mergeSorted . filterAncestors . map wrappedLoad . concatMap (ssValue . fromStored) . filterAncestors . helper
@@ -141,8 +172,8 @@ lookupSharedValue = mergeSorted . filterAncestors . map wrappedLoad . concatMap
| otherwise = helper $ ssPrev (fromStored x) ++ xs
helper [] = []
-makeSharedStateUpdate :: forall a m. MonadIO m => SharedType a => Storage -> a -> [Stored SharedState] -> m (Stored SharedState)
-makeSharedStateUpdate st val prev = liftIO $ wrappedStore st SharedState
+makeSharedStateUpdate :: forall a m. (SharedType a, MonadStorage m) => a -> [ Stored SharedState ] -> m (Stored SharedState)
+makeSharedStateUpdate val prev = mstore SharedState
{ ssPrev = prev
, ssType = Just $ sharedTypeID @a Proxy
, ssValue = storedRef <$> toComponents val
diff --git a/src/Erebos/Storage/Disk.hs b/src/Erebos/Storage/Disk.hs
index 370c584..8e35940 100644
--- a/src/Erebos/Storage/Disk.hs
+++ b/src/Erebos/Storage/Disk.hs
@@ -18,7 +18,6 @@ import Data.ByteString.Lazy.Char8 qualified as BLC
import Data.Function
import Data.List
import Data.Maybe
-import Data.UUID qualified as U
import System.Directory
import System.FSNotify
@@ -31,6 +30,7 @@ import Erebos.Storage.Backend
import Erebos.Storage.Head
import Erebos.Storage.Internal
import Erebos.Storage.Platform
+import Erebos.UUID qualified as U
data DiskStorage = StorageDir
diff --git a/src/Erebos/Storage/Head.hs b/src/Erebos/Storage/Head.hs
index 8f8e009..285902d 100644
--- a/src/Erebos/Storage/Head.hs
+++ b/src/Erebos/Storage/Head.hs
@@ -28,13 +28,12 @@ import Control.Monad.Reader
import Data.Bifunctor
import Data.Typeable
-import Data.UUID qualified as U
-import Data.UUID.V4 qualified as U
import Erebos.Object
import Erebos.Storable
import Erebos.Storage.Backend
import Erebos.Storage.Internal
+import Erebos.UUID qualified as U
-- | Represents loaded Erebos storage head, along with the object it pointed to
@@ -114,7 +113,7 @@ loadHeadRaw st@Storage {..} tid hid = do
-- | Reload the given head from storage, returning `Head' with updated object,
-- or `Nothing' if there is no longer head with the particular ID in storage.
reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a))
-reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid
+reloadHead (Head hid val) = loadHead (storedStorage val) hid
-- | Store a new `Head' of type 'a' in the storage.
storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a)
@@ -233,8 +232,8 @@ watchHeadWith
-> (Head a -> b) -- ^ Selector function
-> (b -> IO ()) -- ^ Callback
-> IO WatchedHead -- ^ Watched head handle
-watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do
- watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb
+watchHeadWith (Head hid val) sel cb = do
+ watchHeadRaw (storedStorage val) (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb
-- | Watch the given head using raw IDs and a selector from `Ref'.
watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead
diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs
index 6df1410..db211bb 100644
--- a/src/Erebos/Storage/Internal.hs
+++ b/src/Erebos/Storage/Internal.hs
@@ -1,32 +1,55 @@
-module Erebos.Storage.Internal where
+module Erebos.Storage.Internal (
+ Storage'(..), Storage, PartialStorage,
+ Ref'(..), Ref, PartialRef,
+ RefDigest(..),
+ WatchID, startWatchID, nextWatchID,
+ WatchList(..), WatchListItem(..), watchListAdd, watchListDel,
+
+ refStorage,
+ refDigest, refDigestFromByteString,
+ showRef, showRefDigest, showRefDigestParts,
+ readRefDigest,
+ hashToRefDigest,
+
+ StorageCompleteness(..),
+ StorageBackend(..),
+ Complete, Partial,
+
+ unsafeStoreRawBytes,
+ ioLoadBytesFromStorage,
+
+ Generation(..),
+ HeadID(..), HeadTypeID(..),
+ Stored(..), storedStorage,
+) where
import Control.Arrow
import Control.Concurrent
import Control.DeepSeq
import Control.Exception
-import Control.Monad
import Control.Monad.Identity
import Crypto.Hash
import Data.Bits
-import Data.ByteArray (ByteArray, ByteArrayAccess, ScrubbedBytes)
+import Data.ByteArray (ByteArrayAccess, ScrubbedBytes)
import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
-import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
-import Data.Char
+import Data.Function
import Data.HashTable.IO qualified as HT
import Data.Hashable
import Data.Kind
import Data.Typeable
-import Data.UUID (UUID)
import Foreign.Storable (peek)
import System.IO.Unsafe (unsafePerformIO)
+import Erebos.UUID (UUID)
+import Erebos.Util
+
data Storage' c = forall bck. (StorageBackend bck, BackendCompleteness bck ~ c) => Storage
{ stBackend :: bck
@@ -196,35 +219,15 @@ showRefDigest = showRefDigestParts >>> \(alg, hex) -> alg <> BC.pack "#" <> hex
readRefDigest :: ByteString -> Maybe RefDigest
readRefDigest x = case BC.split '#' x of
[alg, dgst] | BA.convert alg == BC.pack "blake2" ->
- refDigestFromByteString =<< readHex @ByteString dgst
+ refDigestFromByteString =<< readHex dgst
_ -> Nothing
-refDigestFromByteString :: ByteArrayAccess ba => ba -> Maybe RefDigest
+refDigestFromByteString :: ByteString -> Maybe RefDigest
refDigestFromByteString = fmap RefDigest . digestFromByteString
hashToRefDigest :: BL.ByteString -> RefDigest
hashToRefDigest = RefDigest . hashFinalize . hashUpdates hashInit . BL.toChunks
-showHex :: ByteArrayAccess ba => ba -> ByteString
-showHex = B.concat . map showHexByte . BA.unpack
- where showHexChar x | x < 10 = x + o '0'
- | otherwise = x + o 'a' - 10
- showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ]
- o = fromIntegral . ord
-
-readHex :: ByteArray ba => ByteString -> Maybe ba
-readHex = return . BA.concat <=< readHex'
- where readHex' bs | B.null bs = Just []
- readHex' bs = do (bx, bs') <- B.uncons bs
- (by, bs'') <- B.uncons bs'
- x <- hexDigit bx
- y <- hexDigit by
- (B.singleton (x * 16 + y) :) <$> readHex' bs''
- hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0'
- | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10
- | otherwise = Nothing
- o = fromIntegral . ord
-
newtype Generation = Generation Int
deriving (Eq, Show)
@@ -237,17 +240,20 @@ newtype HeadID = HeadID UUID
newtype HeadTypeID = HeadTypeID UUID
deriving (Eq, Ord)
-data Stored' c a = Stored (Ref' c) a
+data Stored a = Stored
+ { storedRef' :: Ref
+ , storedObject' :: a
+ }
deriving (Show)
-instance Eq (Stored' c a) where
- Stored r1 _ == Stored r2 _ = refDigest r1 == refDigest r2
+instance Eq (Stored a) where
+ (==) = (==) `on` (refDigest . storedRef')
-instance Ord (Stored' c a) where
- compare (Stored r1 _) (Stored r2 _) = compare (refDigest r1) (refDigest r2)
+instance Ord (Stored a) where
+ compare = compare `on` (refDigest . storedRef')
-storedStorage :: Stored' c a -> Storage' c
-storedStorage (Stored (Ref st _) _) = st
+storedStorage :: Stored a -> Storage
+storedStorage = refStorage . storedRef'
type Complete = Identity
diff --git a/src/Erebos/Storage/Memory.hs b/src/Erebos/Storage/Memory.hs
index 677e8c5..26bb181 100644
--- a/src/Erebos/Storage/Memory.hs
+++ b/src/Erebos/Storage/Memory.hs
@@ -4,7 +4,8 @@ module Erebos.Storage.Memory (
derivePartialStorage,
) where
-import Control.Concurrent.MVar
+import Control.Concurrent
+import Control.Monad
import Data.ByteArray (ScrubbedBytes)
import Data.ByteString.Lazy qualified as BL
@@ -62,14 +63,19 @@ instance (StorageCompleteness c, Typeable p) => StorageBackend (MemoryStorage p
backendReplaceHead StorageMemory {..} tid hid expected new = do
res <- modifyMVar memHeads $ \hs -> do
- ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers
- return $ case partition ((==(tid, hid)) . fst) hs of
- ( [] , _ ) -> ( hs, Left Nothing )
+ case partition ((==(tid, hid)) . fst) hs of
+ ( [] , _ ) -> return ( hs, Left Nothing )
(( _, dgst ) : _, hs' )
- | dgst == expected -> ((( tid, hid ), new ) : hs', Right ( new, ws ))
- | otherwise -> ( hs, Left $ Just dgst )
+ | dgst == expected -> do
+ ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers
+ return ((( tid, hid ), new ) : hs', Right ( new, ws ))
+ | otherwise -> do
+ return ( hs, Left $ Just dgst )
case res of
- Right ( dgst, ws ) -> mapM_ ($ dgst) ws >> return (Right dgst)
+ Right ( dgst, ws ) -> do
+ void $ forkIO $ do
+ mapM_ ($ dgst) ws
+ return (Right dgst)
Left x -> return $ Left x
backendWatchHead StorageMemory {..} tid hid cb = modifyMVar memWatchers $ return . watchListAdd tid hid cb
diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs
index 41725af..8221e91 100644
--- a/src/Erebos/Storage/Merge.hs
+++ b/src/Erebos/Storage/Merge.hs
@@ -7,7 +7,7 @@ module Erebos.Storage.Merge (
compareGeneration, generationMax,
storedGeneration,
- generations,
+ generations, generationsBy,
ancestors,
precedes,
precedesOrEquals,
@@ -17,6 +17,8 @@ module Erebos.Storage.Merge (
findProperty,
findPropertyFirst,
+
+ storedDifference,
) where
import Control.Concurrent.MVar
@@ -25,6 +27,8 @@ import Data.ByteString.Char8 qualified as BC
import Data.HashTable.IO qualified as HT
import Data.Kind
import Data.List
+import Data.List.NonEmpty (NonEmpty)
+import Data.List.NonEmpty qualified as NE
import Data.Maybe
import Data.Set (Set)
import Data.Set qualified as S
@@ -52,7 +56,7 @@ merge xs = mergeSorted $ filterAncestors xs
storeMerge :: (Mergeable a, Storable a) => [Stored (Component a)] -> IO (Stored a)
storeMerge [] = error "merge: empty list"
-storeMerge xs@(Stored ref _ : _) = wrappedStore (refStorage ref) $ mergeSorted $ filterAncestors xs
+storeMerge xs@(x : _) = wrappedStore (storedStorage x) $ mergeSorted $ filterAncestors xs
previous :: Storable a => Stored a -> [Stored a]
previous (Stored ref _) = case load ref of
@@ -100,16 +104,24 @@ storedGeneration x =
-- |Returns list of sets starting with the set of given objects and
-- intcrementally adding parents.
-generations :: Storable a => [Stored a] -> [Set (Stored a)]
-generations = unfoldr gen . (,S.empty)
- where gen (hs, cur) = case filter (`S.notMember` cur) hs of
- [] -> Nothing
- added -> let next = foldr S.insert cur added
- in Just (next, (previous =<< added, next))
+generations :: Storable a => [ Stored a ] -> NonEmpty (Set (Stored a))
+generations = generationsBy previous
+
+-- |Returns list of sets starting with the set of given objects and
+-- intcrementally adding parents, with the first parameter being
+-- a function to get all the parents of given object.
+generationsBy :: Ord a => (a -> [ a ]) -> [ a ] -> NonEmpty (Set a)
+generationsBy parents xs = NE.unfoldr gen ( xs, S.fromList xs )
+ where
+ gen ( hs, cur ) = ( cur, ) $
+ case filter (`S.notMember` cur) (parents =<< hs) of
+ [] -> Nothing
+ added -> let next = foldr S.insert cur added
+ in Just ( added, next )
-- |Returns set containing all given objects and their ancestors
ancestors :: Storable a => [Stored a] -> Set (Stored a)
-ancestors = last . (S.empty:) . generations
+ancestors = NE.last . generations
precedes :: Storable a => Stored a -> Stored a -> Bool
precedes x y = not $ x `elem` filterAncestors [x, y]
@@ -162,3 +174,18 @@ findPropertyFirst sel = fmap (fromJust . sel . fromStored) . listToMaybe . filte
findPropHeads :: forall a b. Storable a => (a -> Maybe b) -> Stored a -> [Stored a]
findPropHeads sel sobj | Just _ <- sel $ fromStored sobj = [sobj]
| otherwise = findPropHeads sel =<< previous sobj
+
+
+-- | Compute symmetrict difference between two stored histories. In other
+-- words, return all 'Stored a' objects reachable (via 'previous') from first
+-- given set, but not from the second; and vice versa.
+storedDifference :: Storable a => [ Stored a ] -> [ Stored a ] -> [ Stored a ]
+storedDifference xs' ys' =
+ let xs = filterAncestors xs'
+ ys = filterAncestors ys'
+
+ filteredPrevious blocked zs = filterAncestors (previous zs ++ blocked) `diffSorted` blocked
+ xg = S.toAscList $ NE.last $ generationsBy (filteredPrevious ys) $ filterAncestors (xs ++ ys) `diffSorted` ys
+ yg = S.toAscList $ NE.last $ generationsBy (filteredPrevious xs) $ filterAncestors (ys ++ xs) `diffSorted` xs
+
+ in xg `mergeUniq` yg
diff --git a/src/Erebos/Sync.hs b/src/Erebos/Sync.hs
index d837a14..5f5fdec 100644
--- a/src/Erebos/Sync.hs
+++ b/src/Erebos/Sync.hs
@@ -31,6 +31,7 @@ instance Service SyncService where
else return ls
serviceNewPeer = notifyPeer . lsShared . fromStored =<< svcGetLocal
+ serviceUpdatedPeer = serviceNewPeer
serviceStorageWatchers _ = (:[]) $ SomeStorageWatcher (lsShared . fromStored) notifyPeer
instance Storable SyncService where
diff --git a/src/Erebos/UUID.hs b/src/Erebos/UUID.hs
new file mode 100644
index 0000000..128d450
--- /dev/null
+++ b/src/Erebos/UUID.hs
@@ -0,0 +1,24 @@
+module Erebos.UUID (
+ UUID,
+ toString, fromString,
+ toText, fromText,
+ toASCIIBytes, fromASCIIBytes,
+ nextRandom,
+) where
+
+import Crypto.Random.Entropy
+
+import Data.Bits
+import Data.ByteString qualified as BS
+import Data.ByteString.Lazy qualified as BSL
+import Data.Maybe
+import Data.UUID.Types
+
+nextRandom :: IO UUID
+nextRandom = do
+ [ b0, b1, b2, b3, b4, b5, b6, b7, b8, b9, ba, bb, bc, bd, be, bf ]
+ <- BS.unpack <$> getEntropy 16
+ let version = 4
+ b6' = b6 .&. 0x0f .|. (version `shiftL` 4)
+ b8' = b8 .&. 0x3f .|. 0x80
+ return $ fromJust $ fromByteString $ BSL.pack [ b0, b1, b2, b3, b4, b5, b6', b7, b8', b9, ba, bb, bc, bd, be, bf ]
diff --git a/src/Erebos/Util.hs b/src/Erebos/Util.hs
index ffca9c7..0d53e98 100644
--- a/src/Erebos/Util.hs
+++ b/src/Erebos/Util.hs
@@ -1,5 +1,14 @@
module Erebos.Util where
+import Control.Monad
+
+import Data.ByteArray (ByteArray, ByteArrayAccess)
+import Data.ByteArray qualified as BA
+import Data.ByteString (ByteString)
+import Data.ByteString qualified as B
+import Data.Char
+
+
uniq :: Eq a => [a] -> [a]
uniq (x:y:xs) | x == y = uniq (x:xs)
| otherwise = x : uniq (y:xs)
@@ -13,15 +22,16 @@ mergeBy cmp (x : xs) (y : ys) = case cmp x y of
mergeBy _ xs [] = xs
mergeBy _ [] ys = ys
-mergeUniqBy :: (a -> a -> Ordering) -> [a] -> [a] -> [a]
-mergeUniqBy cmp (x : xs) (y : ys) = case cmp x y of
- LT -> x : mergeBy cmp xs (y : ys)
- EQ -> x : mergeBy cmp xs ys
- GT -> y : mergeBy cmp (x : xs) ys
+mergeUniqBy :: (a -> a -> Ordering) -> [ a ] -> [ a ] -> [ a ]
+mergeUniqBy cmp (x : xs) (y : ys) =
+ case cmp x y of
+ LT -> x : mergeUniqBy cmp xs (y : ys)
+ EQ -> x : mergeUniqBy cmp xs ys
+ GT -> y : mergeUniqBy cmp (x : xs) ys
mergeUniqBy _ xs [] = xs
mergeUniqBy _ [] ys = ys
-mergeUniq :: Ord a => [a] -> [a] -> [a]
+mergeUniq :: Ord a => [ a ] -> [ a ] -> [ a ]
mergeUniq = mergeUniqBy compare
diffSorted :: Ord a => [a] -> [a] -> [a]
@@ -35,3 +45,24 @@ intersectsSorted (x:xs) (y:ys) | x < y = intersectsSorted xs (y:ys)
| x > y = intersectsSorted (x:xs) ys
| otherwise = True
intersectsSorted _ _ = False
+
+
+showHex :: ByteArrayAccess ba => ba -> ByteString
+showHex = B.concat . map showHexByte . BA.unpack
+ where showHexChar x | x < 10 = x + o '0'
+ | otherwise = x + o 'a' - 10
+ showHexByte x = B.pack [ showHexChar (x `div` 16), showHexChar (x `mod` 16) ]
+ o = fromIntegral . ord
+
+readHex :: ByteArray ba => ByteString -> Maybe ba
+readHex = return . BA.concat <=< readHex'
+ where readHex' bs | B.null bs = Just []
+ readHex' bs = do (bx, bs') <- B.uncons bs
+ (by, bs'') <- B.uncons bs'
+ x <- hexDigit bx
+ y <- hexDigit by
+ (B.singleton (x * 16 + y) :) <$> readHex' bs''
+ hexDigit x | x >= o '0' && x <= o '9' = Just $ x - o '0'
+ | x >= o 'a' && x <= o 'z' = Just $ x - o 'a' + 10
+ | otherwise = Nothing
+ o = fromIntegral . ord
diff --git a/test/attach.test b/test/attach.et
index afbdd0e..afbdd0e 100644
--- a/test/attach.test
+++ b/test/attach.et
diff --git a/test/chatroom.test b/test/chatroom.et
index 54f9b2a..54f9b2a 100644
--- a/test/chatroom.test
+++ b/test/chatroom.et
diff --git a/test/common.et b/test/common.et
new file mode 100644
index 0000000..89941f0
--- /dev/null
+++ b/test/common.et
@@ -0,0 +1,3 @@
+module common
+
+export def refpat = /blake2#[0-9a-f]*/
diff --git a/test/contact.test b/test/contact.et
index 978f8a6..978f8a6 100644
--- a/test/contact.test
+++ b/test/contact.et
diff --git a/test/discovery.et b/test/discovery.et
new file mode 100644
index 0000000..e80a755
--- /dev/null
+++ b/test/discovery.et
@@ -0,0 +1,218 @@
+module discovery
+
+def refpat = /blake2#[0-9a-f]*/
+
+test ManualDiscovery:
+ let services = "discovery"
+
+ subnet sd
+ subnet s1
+ subnet s2
+
+ spawn as pd on sd
+ spawn as p1 on s1
+ spawn as p2 on s2
+ send "create-identity Discovery" to pd
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+
+ expect /create-identity-done ref ($refpat).*/ from p1 capture p1id
+ send "identity-info $p1id" to p1
+ expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner
+ send "identity-info $p1owner" to p1
+ expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase
+
+ expect /create-identity-done ref $refpat.*/ from p2
+ expect /create-identity-done ref $refpat.*/ from pd
+
+ # Test discovery using owner and device identities:
+ for id in [ p1obase, p1base ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services" to p
+
+ for p in [ p1, p2 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p.node.ip} 29665/
+ /peer [12] id .*/
+
+ send "discovery-connect $id" to p2
+
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ expect from p2:
+ /peer [0-9]+ addr ${p1.node.ip} 29665/
+ /peer [0-9]+ id Device1 Owner1/
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p
+
+ # Test delayed discovery with new peer
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services" to p
+
+ with p1:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p1.node.ip} 29665/
+ /peer [12] id Device1 Owner1/
+
+ send "discovery-connect $id" to p2
+
+ with p2:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p2.node.ip} 29665/
+ /peer [12] id Device2 Owner2/
+
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ expect from p2:
+ /peer [0-9]+ addr ${p1.node.ip} 29665/
+ /peer [0-9]+ id Device1 Owner1/
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p
+
+
+test DiscoveryTunnel:
+ let services = "discovery:tunnel"
+
+ subnet sd
+ subnet s1
+ subnet s2
+
+ spawn as pd on sd
+ spawn as p1 on s1
+ spawn as p2 on s2
+
+ for n in [ p1.node, p2.node ]:
+ shell on n:
+ nft add table inet filter
+ nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }'
+ nft add rule inet filter input 'ct state { established, related } accept'
+
+ send "create-identity Discovery" to pd
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+
+ expect /create-identity-done ref ($refpat).*/ from p1 capture p1id
+ send "identity-info $p1id" to p1
+ expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner
+ send "identity-info $p1owner" to p1
+ expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase
+
+ expect /create-identity-done ref $refpat.*/ from p2
+ expect /create-identity-done ref $refpat.*/ from pd
+
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services test-log" to p
+
+ for p in [ p1, p2 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p.node.ip} 29665/
+ /peer [12] id .*/
+
+ send "discovery-tunnel 1 $id" to p2
+
+ expect /net-ostream-open ${pd.node.ip} 29665 1 1/ from p2
+ expect /net-ostream-open ${p1.node.ip} 29665 1 1/ from pd
+ expect /net-ostream-open ${pd.node.ip} 29665 1 1/ from p1
+ expect /net-ostream-open ${p2.node.ip} 29665 1 1/ from pd
+
+ expect from p1:
+ /peer 2 addr tunnel@.*/
+ /peer 2 id Device2 Owner2/
+ expect from p2:
+ /peer 2 addr tunnel@.*/
+ /peer 2 id Device1 Owner1/
+
+ send "peer-drop 2" to p1
+ send "peer-drop 2" to p2
+
+ expect /net-ostream-close-ack ${pd.node.ip} 29665 1 0/ from p2
+ expect /net-ostream-close-ack ${p1.node.ip} 29665 1 0/ from pd
+ expect /net-ostream-close-ack ${pd.node.ip} 29665 1 0/ from p1
+ expect /net-ostream-close-ack ${p2.node.ip} 29665 1 0/ from pd
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p
+
+
+test DiscoveryTunnelRefused:
+ let services = "discovery"
+
+ subnet sd
+ subnet s1
+ subnet s2
+
+ spawn as pd on sd
+ spawn as p1 on s1
+ spawn as p2 on s2
+
+ for n in [ p1.node, p2.node ]:
+ shell on n:
+ nft add table inet filter
+ nft add chain inet filter input '{ type filter hook input priority filter ; policy drop; }'
+ nft add rule inet filter input 'ct state { established, related } accept'
+
+ send "create-identity Discovery" to pd
+ send "create-identity Device1 Owner1" to p1
+ send "create-identity Device2 Owner2" to p2
+
+ expect /create-identity-done ref ($refpat).*/ from p1 capture p1id
+ send "identity-info $p1id" to p1
+ expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner
+ send "identity-info $p1owner" to p1
+ expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase
+
+ expect /create-identity-done ref $refpat.*/ from p2
+ expect /create-identity-done ref $refpat.*/ from pd
+
+ for id in [ p1obase ]:
+ for p in [ pd, p1, p2 ]:
+ send "start-server services $services test-log" to p
+
+ for p in [ p1, p2 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [12] addr ${p.node.ip} 29665/
+ /peer [12] id .*/
+
+ send "discovery-tunnel 1 $id" to p2
+ expect /net-ostream-open ${pd.node.ip} 29665 1 1/ from p2
+ expect /net-ostream-close-ack ${pd.node.ip} 29665 1 0/ from p2
+
+ for p in [ pd, p1, p2 ]:
+ send "stop-server" to p
+ for p in [ pd, p1, p2 ]:
+ expect /stop-server-done/ from p
diff --git a/test/discovery.test b/test/discovery.test
deleted file mode 100644
index f2dddb7..0000000
--- a/test/discovery.test
+++ /dev/null
@@ -1,75 +0,0 @@
-module discovery
-
-test ManualDiscovery:
- let services = "discovery,test"
- let refpat = /blake2#[0-9a-f]*/
-
- subnet sd
- subnet s1
- subnet s2
-
- spawn as pd on sd
- spawn as p1 on s1
- spawn as p2 on s2
- send "create-identity Discovery" to pd
- send "create-identity Device1 Owner1" to p1
- send "create-identity Device2 Owner2" to p2
-
- expect /create-identity-done ref ($refpat).*/ from p1 capture p1id
- send "identity-info $p1id" to p1
- expect /identity-info ref $p1id base ($refpat) owner ($refpat).*/ from p1 capture p1base, p1owner
- send "identity-info $p1owner" to p1
- expect /identity-info ref $p1owner base ($refpat).*/ from p1 capture p1obase
-
- expect /create-identity-done ref $refpat.*/ from p2
- expect /create-identity-done ref $refpat.*/ from pd
-
- # TODO: avoid the need to send identity objects with weak refs
- for p in [ p1, p2 ]:
- with p:
- send "start-server services $services"
- send "peer-add ${p2.node.ip}" to p1
- expect from p1:
- /peer 1 addr ${p2.node.ip} 29665/
- /peer 1 id Device2 Owner2/
- expect from p2:
- /peer 1 addr ${p1.node.ip} 29665/
- /peer 1 id Device1 Owner1/
- for r in [ p1base, p1obase ]:
- with p1:
- send "test-message-send 1 $r"
- expect /test-message-send done/
- with p2:
- expect /test-message-received rec [0-9]+ $r/
- for p in [ p1, p2 ]:
- send "stop-server" to p
- expect /stop-server-done/ from p
-
- # Test discovery using owner and device identities:
- for id in [ p1obase, p1base ]:
- for p in [ pd, p1, p2 ]:
- send "start-server services $services" to p
-
- for p in [ p1, p2 ]:
- with p:
- send "peer-add ${pd.node.ip}"
- expect:
- /peer 1 addr ${pd.node.ip} 29665/
- /peer 1 id Discovery/
- expect from pd:
- /peer [12] addr ${p.node.ip} 29665/
- /peer [12] id .*/
-
- send "discovery-connect $id" to p2
-
- expect from p1:
- /peer [0-9]+ addr ${p2.node.ip} 29665/
- /peer [0-9]+ id Device2 Owner2/
- expect from p2:
- /peer [0-9]+ addr ${p1.node.ip} 29665/
- /peer [0-9]+ id Device1 Owner1/
-
- for p in [ pd, p1, p2 ]:
- send "stop-server" to p
- for p in [ pd, p1, p2 ]:
- expect /stop-server-done/ from p
diff --git a/test/graph.et b/test/graph.et
new file mode 100644
index 0000000..38ec3c4
--- /dev/null
+++ b/test/graph.et
@@ -0,0 +1,111 @@
+module graph
+
+test StoredDifference:
+ spawn as p1
+ with p1:
+ # ref names: r<level>_<num>
+
+ send:
+ "store rec"
+ "num:i 1"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r1_1
+
+ send:
+ "store rec"
+ "PREV:r $r1_1"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r2_1
+
+ send "stored-difference $r2_1 |"
+ expect /stored-difference-item $r1_1/
+ expect /stored-difference-item $r2_1/
+ local:
+ expect /stored-difference-(.*)/ capture done
+ guard (done == "done")
+
+ send:
+ "store rec"
+ "PREV:r $r2_1"
+ "num:i 1"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r3_1
+
+ send "stored-difference $r1_1 | $r3_1"
+ expect /stored-difference-item $r2_1/
+ expect /stored-difference-item $r3_1/
+ local:
+ expect /stored-difference-(.*)/ capture done
+ guard (done == "done")
+
+ send:
+ "store rec"
+ "PREV:r $r2_1"
+ "num:i 2"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r3_2
+
+ send:
+ "store rec"
+ "PREV:r $r3_1"
+ "num:i 1"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r4_1
+
+ send:
+ "store rec"
+ "PREV:r $r3_2"
+ "num:i 2"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r4_2
+
+ send "stored-difference $r4_1 | $r4_2"
+ expect /stored-difference-item $r3_1/
+ expect /stored-difference-item $r3_2/
+ expect /stored-difference-item $r4_1/
+ expect /stored-difference-item $r4_2/
+ local:
+ expect /stored-difference-(.*)/ capture done
+ guard (done == "done")
+
+
+ send:
+ "store rec"
+ "PREV:r $r2_1"
+ "num:i 3"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r3_3
+
+ send:
+ "store rec"
+ "PREV:r $r3_2"
+ "PREV:r $r3_3"
+ "num:i 3"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r4_3
+
+ send:
+ "store rec"
+ "PREV:r $r3_3"
+ "num:i 4"
+ ""
+ expect /store-done (blake2#[0-9a-f]*)/ capture r4_4
+
+ send "stored-difference $r4_1 $r4_2 | $r4_3 $r4_4"
+ expect /stored-difference-item $r3_1/
+ expect /stored-difference-item $r3_3/
+ expect /stored-difference-item $r4_1/
+ expect /stored-difference-item $r4_2/
+ expect /stored-difference-item $r4_3/
+ expect /stored-difference-item $r4_4/
+ local:
+ expect /stored-difference-(.*)/ capture done
+ guard (done == "done")
+
+ send "stored-difference $r1_1 $r2_1 $r3_2 $r3_3 | $r4_1 $r4_3"
+ expect /stored-difference-item $r3_1/
+ expect /stored-difference-item $r4_1/
+ expect /stored-difference-item $r4_3/
+ local:
+ expect /stored-difference-(.*)/ capture done
+ guard (done == "done")
diff --git a/test/message.test b/test/message.et
index c0e251b..acdfc27 100644
--- a/test/message.test
+++ b/test/message.et
@@ -1,3 +1,7 @@
+module message
+
+import common
+
test DirectMessage:
let services = "contact,dm"
@@ -24,16 +28,20 @@ test DirectMessage:
for i in [1..2]:
send "dm-send-peer $peer1_2 hello$i" to p1
+ expect /dm-sent from Owner1 text hello$i/ from p1
expect /dm-received from Owner1 text hello$i/ from p2
for i in [1..2]:
send "dm-send-peer $peer2_1 hi$i" to p2
+ expect /dm-sent from Owner2 text hi$i/ from p2
expect /dm-received from Owner2 text hi$i/ from p1
for i in [3..4]:
send "dm-send-peer $peer1_2 hello$i" to p1
+ expect /dm-sent from Owner1 text hello$i/ from p1
expect /dm-received from Owner1 text hello$i/ from p2
send "dm-send-peer $peer2_1 hi$i" to p2
+ expect /dm-sent from Owner2 text hi$i/ from p2
expect /dm-received from Owner2 text hi$i/ from p1
# Create contacts
@@ -63,16 +71,20 @@ test DirectMessage:
for i in [1..2]:
send "dm-send-contact $c1_2 hello_c_$i" to p1
+ expect /dm-sent from Owner1 text hello_c_$i/ from p1
expect /dm-received from Owner1 text hello_c_$i/ from p2
for i in [1..2]:
send "dm-send-contact $c2_1 hi_c_$i" to p2
+ expect /dm-sent from Owner2 text hi_c_$i/ from p2
expect /dm-received from Owner2 text hi_c_$i/ from p1
for i in [3..4]:
send "dm-send-contact $c1_2 hello_c_$i" to p1
+ expect /dm-sent from Owner1 text hello_c_$i/ from p1
expect /dm-received from Owner1 text hello_c_$i/ from p2
send "dm-send-contact $c2_1 hi_c_$i" to p2
+ expect /dm-sent from Owner2 text hi_c_$i/ from p2
expect /dm-received from Owner2 text hi_c_$i/ from p1
send "dm-list-contact $c1_2" to p1
@@ -131,6 +143,7 @@ test DirectMessage:
send "start-server services $services" to p2
send "dm-send-contact $c1_2 while_offline" to p1
+ expect /dm-sent from Owner1 text while_offline/ from p1
send "start-server services $services" to p1
expect /dm-received from Owner1 text while_offline/ from p2
@@ -144,8 +157,117 @@ test DirectMessage:
send "start-server services $services" to p1
send "dm-send-contact $c1_2 while_peer_offline" to p1
- # TODO: sync from p1 on peer p2 discovery not ensured without addition wait
- #wait
+ expect /dm-sent from Owner1 text while_peer_offline/ from p1
send "start-server services $services" to p2
expect /dm-received from Owner1 text while_peer_offline/ from p2
+
+
+test DirectMessageDiscovery:
+ let services = "dm,discovery"
+
+ subnet sd
+ subnet s1
+ subnet s2
+ subnet s3
+ subnet s4
+
+ spawn on sd as pd
+ spawn on s1 as p1
+ spawn on s2 as p2
+ spawn on s3 as p3
+ spawn on s4 as p4
+
+ send "create-identity Discovery" to pd
+
+ send "create-identity Device1 Owner1" to p1
+ expect /create-identity-done ref ($refpat)/ from p1 capture p1_id
+ send "identity-info $p1_id" to p1
+ expect /identity-info ref $p1_id base ($refpat) owner ($refpat).*/ from p1 capture p1_base, p1_owner
+
+ send "create-identity Device2 Owner2" to p2
+ expect /create-identity-done ref ($refpat)/ from p2 capture p2_id
+ send "identity-info $p2_id" to p2
+ expect /identity-info ref $p2_id base ($refpat) owner ($refpat).*/ from p2 capture p2_base, p2_owner
+ send "identity-info $p2_owner" to p2
+ expect /identity-info ref $p2_owner base ($refpat).*/ from p2 capture p2_obase
+
+ send "create-identity Device3 Owner3" to p3
+ expect /create-identity-done ref ($refpat)/ from p3 capture p3_id
+ send "identity-info $p3_id" to p3
+ expect /identity-info ref $p3_id base ($refpat) owner ($refpat).*/ from p3 capture p3_base, p3_owner
+
+ send "create-identity Device4 Owner4" to p4
+ expect /create-identity-done ref ($refpat)/ from p4 capture p4_id
+ send "identity-info $p4_id" to p4
+ expect /identity-info ref $p4_id base ($refpat) owner ($refpat).*/ from p4 capture p4_base, p4_owner
+
+
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ send "start-server services $services"
+
+ for p in [ p2, p3, p4 ]:
+ with p1:
+ send "peer-add ${p.node.ip}"
+ expect:
+ /peer [0-9]+ addr ${p.node.ip} 29665/
+ /peer [0-9]+ id Device. Owner./
+ expect from p:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1 Owner1/
+
+ # Make sure p1 has other identities in storage:
+ for i in [ 1 .. 3 ]:
+ send "dm-send-peer $i init1" to p1
+ for p in [ p2, p3, p4 ]:
+ expect /dm-received from Owner1 text init1/ from p
+ send "dm-send-identity $p1_owner init2" to p
+ expect /dm-received from Owner. text init2/ from p1
+
+ # Restart servers to remove peers:
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ send "stop-server"
+ for p in [ p1, p2, p3, p4 ]:
+ with p:
+ expect /stop-server-done/
+
+ # Prepare message before peers connect to discovery
+ send "dm-send-identity $p4_owner hello_to_p4" to p1
+
+ for p in [ p1, p2, p3, p4, pd ]:
+ with p:
+ send "start-server services $services"
+
+ for p in [ p2, p3, p4, p1 ]:
+ with p:
+ send "peer-add ${pd.node.ip}"
+ expect:
+ /peer 1 addr ${pd.node.ip} 29665/
+ /peer 1 id Discovery/
+ expect from pd:
+ /peer [0-9]+ addr ${p.node.ip} 29665/
+ /peer [0-9]+ id Device. Owner./
+
+ multiply_timeout by 2.0
+
+ # Connect via discovery manually, then send message
+ send "discovery-connect $p2_obase" to p1
+ expect from p1:
+ /peer [0-9]+ addr ${p2.node.ip} 29665/
+ /peer [0-9]+ id Device2 Owner2/
+ send "dm-send-identity $p2_owner hello_to_p2" to p1
+ expect /dm-received from Owner1 text hello_to_p2/ from p2
+
+ # Send message, expect automatic discovery
+ send "dm-send-identity $p3_owner hello_to_p3" to p1
+ expect /dm-received from Owner1 text hello_to_p3/ from p3
+
+ # Verify the first message
+ expect /dm-received from Owner1 text hello_to_p4/ from p4
+
+ for p in [ p1, p2, p3, p4, pd ]:
+ send "stop-server" to p
+ for p in [ p1, p2, p3, p4, pd ]:
+ expect /stop-server-done/ from p
diff --git a/test/network.test b/test/network.et
index 52fcbee..a670f35 100644
--- a/test/network.test
+++ b/test/network.et
@@ -182,6 +182,69 @@ test ManyStreams:
expect /test-message-received blob 100[2-4] $ref/ from p2
+test ServiceStreams:
+ let services = "test"
+
+ spawn as p1
+ spawn as p2
+ send "create-identity Device1" to p1
+ send "create-identity Device2" to p2
+ send "start-server services $services test-log" to p1
+ send "start-server services $services test-log" to p2
+ expect from p1:
+ /peer 1 addr ${p2.node.ip} 29665/
+ /peer 1 id Device2/
+ expect from p2:
+ /peer 1 addr ${p1.node.ip} 29665/
+ /peer 1 id Device1/
+
+ send "test-stream-open 1" to p1
+ expect /test-stream-open-done 1 ([0-9]+)/ from p1 capture stream1
+ expect /test-stream-open-from 1 $stream1/ from p2
+
+ expect /net-ostream-open ${p2.node.ip} 29665 1 1/ from p1
+
+ send "test-stream-send 1 $stream1 hello" to p1
+ expect /test-stream-send-done 1 $stream1/ from p1
+ expect /test-stream-received 1 $stream1 0 hello/ from p2
+
+ send "test-stream-close 1 $stream1" to p1
+ expect /test-stream-close-done 1 $stream1/ from p1
+ expect /test-stream-closed-from 1 $stream1 1/ from p2
+
+ expect /net-ostream-close-send ${p2.node.ip} 29665 1/ from p1
+ expect /net-ostream-close-ack ${p2.node.ip} 29665 1 0/ from p1
+
+ send "test-stream-open 1 8" to p2
+ expect /test-stream-open-done 1 ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+) ([0-9]+)/ from p2 capture stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8
+ expect /test-stream-open-from 1 $stream2_1 $stream2_2 $stream2_3 $stream2_4 $stream2_5 $stream2_6 $stream2_7 $stream2_8/ from p1
+
+ let streams2 = [ stream2_1, stream2_2, stream2_3, stream2_4, stream2_5, stream2_6, stream2_7, stream2_8 ]
+ with p2:
+ expect /net-ostream-open ${p1.node.ip} 29665 . 8/
+ flush matching /net-ostream-open ${p1.node.ip} 29665.*/
+
+ for i in [ 1..20 ]:
+ for s in streams2:
+ send "test-stream-send 1 $s hello$i"
+ for i in [ 1..20 ]:
+ for s in streams2:
+ expect /test-stream-send-done 1 $s/
+ for s in streams2:
+ send "test-stream-close 1 $s"
+ for s in streams2:
+ expect /test-stream-close-done 1 $s/
+
+ expect /net-ostream-close-ack ${p1.node.ip} 29665 . 0/
+ flush matching /net-ostream-close-[a-z]* ${p1.node.ip} 29665.*/
+ with p1:
+ for i in [ 1..20 ]:
+ for s in streams2:
+ expect /test-stream-received 1 $s ${i-1} hello$i/
+ for s in streams2:
+ expect /test-stream-closed-from 1 $s 20/
+
+
test MultipleServiceRefs:
let services = "test"
diff --git a/test/storage.test b/test/storage.et
index 2230eac..2230eac 100644
--- a/test/storage.test
+++ b/test/storage.et
diff --git a/test/sync.test b/test/sync.et
index d465b11..d465b11 100644
--- a/test/sync.test
+++ b/test/sync.et