summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/Main.hs63
-rw-r--r--main/State.hs17
-rw-r--r--main/Terminal.hs5
-rw-r--r--main/Test.hs120
-rw-r--r--main/Test/Service.hs21
-rw-r--r--main/WebSocket.hs45
6 files changed, 213 insertions, 58 deletions
diff --git a/main/Main.hs b/main/Main.hs
index 3f78db1..26f4b12 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -61,6 +61,7 @@ import State
import Terminal
import Test
import Version
+import WebSocket
data Options = Options
{ optServer :: ServerOptions
@@ -68,6 +69,7 @@ data Options = Options
, optStorage :: StorageOption
, optChatroomAutoSubscribe :: Maybe Int
, optDmBotEcho :: Maybe Text
+ , optWebSocketServer :: Maybe Int
, optShowHelp :: Bool
, optShowVersion :: Bool
}
@@ -90,6 +92,7 @@ defaultOptions = Options
, optStorage = DefaultStorage
, optChatroomAutoSubscribe = Nothing
, optDmBotEcho = Nothing
+ , optWebSocketServer = Nothing
, optShowHelp = False
, optShowVersion = False
}
@@ -144,6 +147,9 @@ options =
, Option [] ["dm-bot-echo"]
(ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>")
"automatically reply to direct messages with the same text prefixed with <prefix>"
+ , Option [] [ "websocket-server" ]
+ (ReqArg (\value -> \opts -> opts { optWebSocketServer = Just (read value) }) "<port>")
+ "start WebSocket server on given port"
, Option ['h'] ["help"]
(NoArg $ \opts -> opts { optShowHelp = True })
"show this help and exit"
@@ -362,6 +368,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
startServer (optServer opts) erebosHead extPrintLn $
map soptService $ filter soptEnabled $ optServices opts
+ case optWebSocketServer opts of
+ Just port -> startWebsocketServer server "::" port extPrintLn
+ Nothing -> return ()
+
void $ liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange server
peerIdentity peer >>= \case
@@ -493,7 +503,10 @@ getSelectedChatroom = gets csContext >>= \case
_ -> throwOtherError "no chatroom selected"
getSelectedConversation :: CommandM Conversation
-getSelectedConversation = gets csContext >>= \case
+getSelectedConversation = gets csContext >>= getConversationFromContext
+
+getConversationFromContext :: CommandContext -> CommandM Conversation
+getConversationFromContext = \case
SelectedPeer peer -> peerIdentity peer >>= \case
PeerIdentityFull pid -> directMessageConversation $ finalOwner pid
_ -> throwOtherError "incomplete peer identity"
@@ -507,6 +520,13 @@ getSelectedConversation = gets csContext >>= \case
SelectedConversation conv -> reloadConversation conv
_ -> throwOtherError "no contact, peer or conversation selected"
+getSelectedOrManualContext :: CommandM CommandContext
+getSelectedOrManualContext = do
+ asks ciLine >>= \case
+ "" -> gets csContext
+ str | all isDigit str -> getContextByIndex (read str)
+ _ -> throwOtherError "invalid index"
+
commands :: [(String, Command)]
commands =
[ ("history", cmdHistory)
@@ -628,19 +648,22 @@ cmdMembers = do
forM_ (chatroomMembers room) $ \x -> do
cmdPutStrLn $ maybe "<unnamed>" T.unpack $ idName x
+getContextByIndex :: Int -> CommandM CommandContext
+getContextByIndex n = do
+ join (asks ciContextOptions) >>= \ctxs -> if
+ | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx
+ | otherwise -> throwOtherError "invalid index"
cmdSelectContext :: Command
cmdSelectContext = do
n <- read <$> asks ciLine
- join (asks ciContextOptions) >>= \ctxs -> if
- | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do
- modify $ \s -> s { csContext = ctx }
- case ctx of
- SelectedChatroom rstate -> do
- when (not (roomStateSubscribe rstate)) $ do
- chatroomSetSubscribe (head $ roomStateData rstate) True
- _ -> return ()
- | otherwise -> throwOtherError "invalid index"
+ ctx <- getContextByIndex n
+ modify $ \s -> s { csContext = ctx }
+ case ctx of
+ SelectedChatroom rstate -> do
+ when (not (roomStateSubscribe rstate)) $ do
+ chatroomSetSubscribe (head $ roomStateData rstate) True
+ _ -> return ()
cmdSend :: Command
cmdSend = void $ do
@@ -654,12 +677,12 @@ cmdSend = void $ do
cmdDelete :: Command
cmdDelete = void $ do
- deleteConversation =<< getSelectedConversation
+ deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext
modify $ \s -> s { csContext = NoContext }
cmdHistory :: Command
cmdHistory = void $ do
- conv <- getSelectedConversation
+ conv <- getConversationFromContext =<< getSelectedOrManualContext
case conversationHistory conv of
thread@(_:_) -> do
tzone <- liftIO $ getCurrentTimeZone
@@ -824,7 +847,7 @@ cmdConversations = do
cmdDetails :: Command
cmdDetails = do
- gets csContext >>= \case
+ getSelectedOrManualContext >>= \case
SelectedPeer peer -> do
cmdPutStrLn $ unlines
[ "Network peer:"
@@ -900,17 +923,11 @@ cmdDiscoveryInit = void $ do
cmdDiscovery :: Command
cmdDiscovery = void $ do
- Just peer <- gets csIcePeer
- st <- getStorage
+ server <- asks ciServer
sref <- asks ciLine
- eprint <- asks ciPrint
- liftIO $ readRef st (BC.pack sref) >>= \case
- Nothing -> error "ref does not exist"
- Just ref -> do
- res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref
- case res of
- Right _ -> return ()
- Left err -> eprint err
+ case readRefDigest (BC.pack sref) of
+ Nothing -> throwOtherError "failed to parse ref"
+ Just dgst -> discoverySearch server dgst
#ifdef ENABLE_ICE_SUPPORT
diff --git a/main/State.hs b/main/State.hs
index d357844..150178e 100644
--- a/main/State.hs
+++ b/main/State.hs
@@ -58,9 +58,8 @@ updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case
Nothing -> throwOtherError "no existing shared identity"
interactiveIdentityUpdate :: (Foldable f, MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => Terminal -> Identity f -> m UnifiedIdentity
-interactiveIdentityUpdate term identity = do
- let public = idKeyIdentity identity
-
+interactiveIdentityUpdate term fidentity = do
+ identity <- mergeIdentity fidentity
name <- liftIO $ do
setPrompt term $ T.unpack $ T.concat $ concat
[ [ T.pack "Name" ]
@@ -71,11 +70,11 @@ interactiveIdentityUpdate term identity = do
]
getInputLine term $ KeepPrompt . maybe T.empty T.pack
- if | T.null name -> mergeIdentity identity
+ if | T.null name -> return identity
| otherwise -> do
- secret <- loadKey public
- maybe (throwOtherError "created invalid identity") return . validateIdentity =<<
- mstore =<< sign secret =<< mstore (emptyIdentityData public)
- { iddPrev = toList $ idDataF identity
- , iddName = Just name
+ secret <- loadKey $ idKeyIdentity identity
+ maybe (throwOtherError "created invalid identity") return . validateExtendedIdentity =<<
+ mstore =<< sign secret =<< mstore . ExtendedIdentityData =<< return (emptyIdentityExtension $ idData identity)
+ { idePrev = toList $ idExtDataF identity
+ , ideName = Just name
}
diff --git a/main/Terminal.hs b/main/Terminal.hs
index 5dc3612..150bd8c 100644
--- a/main/Terminal.hs
+++ b/main/Terminal.hs
@@ -31,8 +31,9 @@ import Data.List
import Data.Text (Text)
import Data.Text qualified as T
-import System.IO
import System.Console.ANSI
+import System.IO
+import System.IO.Error
data Terminal = Terminal
@@ -107,7 +108,7 @@ termPutStr Terminal {..} str = do
getInput :: IO Input
getInput = do
- getChar >>= \case
+ handleJust (guard . isEOFError) (\() -> return InputEnd) $ getChar >>= \case
'\ESC' -> do
esc <- readEsc
case parseEsc esc of
diff --git a/main/Test.hs b/main/Test.hs
index e54285a..fa8501e 100644
--- a/main/Test.hs
+++ b/main/Test.hs
@@ -26,7 +26,7 @@ import Data.Text qualified as T
import Data.Text.Encoding
import Data.Text.IO qualified as T
import Data.Typeable
-import Data.UUID qualified as U
+import Data.UUID.Types qualified as U
import Network.Socket
@@ -44,6 +44,7 @@ import Erebos.Object
import Erebos.Pairing
import Erebos.PubKey
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Set
import Erebos.State
import Erebos.Storable
@@ -66,10 +67,17 @@ data TestState = TestState
data RunningServer = RunningServer
{ rsServer :: Server
- , rsPeers :: MVar (Int, [(Int, Peer)])
+ , rsPeers :: MVar ( Int, [ TestPeer ] )
, rsPeerThread :: ThreadId
}
+data TestPeer = TestPeer
+ { tpIndex :: Int
+ , tpPeer :: Peer
+ , tpStreamReaders :: MVar [ (Int, StreamReader ) ]
+ , tpStreamWriters :: MVar [ (Int, StreamWriter ) ]
+ }
+
initTestState :: TestState
initTestState = TestState
{ tsHead = Nothing
@@ -137,17 +145,20 @@ cmdOut line = do
getPeer :: Text -> CommandM Peer
-getPeer spidx = do
+getPeer spidx = tpPeer <$> getTestPeer spidx
+
+getTestPeer :: Text -> CommandM TestPeer
+getTestPeer spidx = do
Just RunningServer {..} <- gets tsServer
- Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers)
+ Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers)
return peer
-getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int
+getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int
getPeerIndex pmvar = do
peer <- asks svcPeer
- maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar)
+ maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar)
-pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a
+pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a
pairingAttributes _ out peers prefix = PairingAttributes
{ pairingHookRequest = return ()
@@ -267,6 +278,9 @@ commands = map (T.pack *** id)
, ("peer-drop", cmdPeerDrop)
, ("peer-list", cmdPeerList)
, ("test-message-send", cmdTestMessageSend)
+ , ("test-stream-open", cmdTestStreamOpen)
+ , ("test-stream-close", cmdTestStreamClose)
+ , ("test-stream-send", cmdTestStreamSend)
, ("local-state-get", cmdLocalStateGet)
, ("local-state-replace", cmdLocalStateReplace)
, ("local-state-wait", cmdLocalStateWait)
@@ -286,6 +300,7 @@ commands = map (T.pack *** id)
, ("contact-set-name", cmdContactSetName)
, ("dm-send-peer", cmdDmSendPeer)
, ("dm-send-contact", cmdDmSendContact)
+ , ("dm-send-identity", cmdDmSendIdentity)
, ("dm-list-peer", cmdDmListPeer)
, ("dm-list-contact", cmdDmListContact)
, ("chatroom-create", cmdChatroomCreate)
@@ -501,7 +516,20 @@ cmdStartServer = do
{ testMessageReceived = \obj otype len sref -> do
liftIO $ do
void $ store (headStorage h) obj
- outLine out $ unwords ["test-message-received", otype, len, sref]
+ outLine out $ unwords [ "test-message-received", otype, len, sref ]
+ , testStreamsReceived = \streams -> do
+ pidx <- getPeerIndex rsPeers
+ liftIO $ do
+ nums <- mapM getStreamReaderNumber streams
+ outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums
+ forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do
+ let go = readStreamPacket stream >>= \case
+ StreamData seqNum bytes -> do
+ outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ]
+ go
+ StreamClosed seqNum -> do
+ outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ]
+ go
}
sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'"
@@ -510,15 +538,23 @@ cmdStartServer = do
rsPeerThread <- liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange rsServer
- let printPeer (idx, p) = do
- params <- peerIdentity p >>= return . \case
+ let printPeer TestPeer {..} = do
+ params <- peerIdentity tpPeer >>= return . \case
PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
- _ -> [ "addr", show (peerAddress p) ]
- outLine out $ unwords $ [ "peer", show idx ] ++ params
+ _ -> [ "addr", show (peerAddress tpPeer) ]
+ outLine out $ unwords $ [ "peer", show tpIndex ] ++ params
- update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)])
- update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur
- | otherwise = fmap (p:) <$> update (nid, ps)
+ update ( tpIndex, [] ) = do
+ tpPeer <- return peer
+ tpStreamReaders <- newMVar []
+ tpStreamWriters <- newMVar []
+ let tp = TestPeer {..}
+ printPeer tp
+ return ( tpIndex + 1, [ tp ] )
+
+ update cur@( nid, p : ps )
+ | tpPeer p == peer = printPeer p >> return cur
+ | otherwise = fmap (p :) <$> update ( nid, ps )
modifyMVar_ rsPeers update
@@ -555,10 +591,10 @@ cmdPeerList = do
peers <- liftIO $ getCurrentPeerList rsServer
tpeers <- liftIO $ readMVar rsPeers
forM_ peers $ \peer -> do
- Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers
+ Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers
mbpid <- peerIdentity peer
cmdOut $ unwords $ concat
- [ [ "peer-list-item", show n ]
+ [ [ "peer-list-item", show (tpIndex tp) ]
, [ "addr", show (peerAddress peer) ]
, case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid)
_ -> []
@@ -575,6 +611,40 @@ cmdTestMessageSend = do
sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs
cmdOut "test-message-send done"
+cmdTestStreamOpen :: Command
+cmdTestStreamOpen = do
+ spidx : rest <- asks tiParams
+ tp <- getTestPeer spidx
+ count <- case rest of
+ [] -> return 1
+ tcount : _ -> return $ read $ T.unpack tcount
+
+ out <- asks tiOutput
+ runPeerService (tpPeer tp) $ do
+ streams <- openTestStreams count
+ afterCommit $ do
+ nums <- mapM getStreamWriterNumber streams
+ modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams)
+ outLine out $ unwords $ "test-stream-open-done"
+ : T.unpack spidx
+ : map show nums
+
+cmdTestStreamClose :: Command
+cmdTestStreamClose = do
+ [ spidx, sid ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ closeStream stream
+ cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ]
+
+cmdTestStreamSend :: Command
+cmdTestStreamSend = do
+ [ spidx, sid, content ] <- asks tiParams
+ tp <- getTestPeer spidx
+ Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp))
+ liftIO $ writeStream stream $ encodeUtf8 content
+ cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ]
+
cmdLocalStateGet :: Command
cmdLocalStateGet = do
h <- getHead
@@ -747,6 +817,14 @@ cmdDmSendContact = do
Just to <- contactIdentity <$> getContact cid
void $ sendDirectMessage to msg
+cmdDmSendIdentity :: Command
+cmdDmSendIdentity = do
+ st <- asks tiStorage
+ [ tid, msg ] <- asks tiParams
+ Just ref <- liftIO $ readRef st $ encodeUtf8 tid
+ Just to <- return $ validateExtendedIdentity $ wrappedLoad ref
+ void $ sendDirectMessage to msg
+
dmList :: Foldable f => Identity f -> Command
dmList peer = do
threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead
@@ -888,11 +966,7 @@ cmdChatroomMessageSend = do
cmdDiscoveryConnect :: Command
cmdDiscoveryConnect = do
- st <- asks tiStorage
[ tref ] <- asks tiParams
- Just ref <- liftIO $ readRef st $ encodeUtf8 tref
-
+ Just dgst <- return $ readRefDigest $ encodeUtf8 tref
Just RunningServer {..} <- gets tsServer
- peers <- liftIO $ getCurrentPeerList rsServer
- forM_ peers $ \peer -> do
- sendToPeer peer $ DiscoverySearch ref
+ discoverySearch rsServer dgst
diff --git a/main/Test/Service.hs b/main/Test/Service.hs
index 8c58dee..c0be07d 100644
--- a/main/Test/Service.hs
+++ b/main/Test/Service.hs
@@ -1,8 +1,11 @@
module Test.Service (
TestMessage(..),
TestMessageAttributes(..),
+
+ openTestStreams,
) where
+import Control.Monad
import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as BL
@@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL
import Erebos.Network
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
data TestMessage = TestMessage (Stored Object)
data TestMessageAttributes = TestMessageAttributes
{ testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage ()
+ , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage ()
}
instance Storable TestMessage where
@@ -26,7 +31,10 @@ instance Service TestMessage where
serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8"
type ServiceAttributes TestMessage = TestMessageAttributes
- defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ())
+ defaultServiceAttributes _ = TestMessageAttributes
+ { testMessageReceived = \_ _ _ _ -> return ()
+ , testStreamsReceived = \_ -> return ()
+ }
serviceHandler smsg = do
let TestMessage sobj = fromStored smsg
@@ -36,3 +44,14 @@ instance Service TestMessage where
cb <- asks $ testMessageReceived . svcAttributes
cb obj otype len (show $ refDigest $ storedRef sobj)
_ -> return ()
+
+ streams <- receivedStreams
+ when (not $ null streams) $ do
+ cb <- asks $ testStreamsReceived . svcAttributes
+ cb streams
+
+
+openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ]
+openTestStreams count = do
+ replyPacket . TestMessage =<< mstore (Rec [])
+ replicateM count openStream
diff --git a/main/WebSocket.hs b/main/WebSocket.hs
new file mode 100644
index 0000000..fbdd65f
--- /dev/null
+++ b/main/WebSocket.hs
@@ -0,0 +1,45 @@
+module WebSocket (
+ startWebsocketServer,
+) where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+
+import Data.ByteString.Lazy qualified as BL
+import Data.Unique
+
+import Erebos.Network
+
+import Network.WebSockets qualified as WS
+
+
+data WebSocketAddress = WebSocketAddress Unique WS.Connection
+
+instance Eq WebSocketAddress where
+ WebSocketAddress u _ == WebSocketAddress u' _ = u == u'
+
+instance Ord WebSocketAddress where
+ compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u'
+
+instance Show WebSocketAddress where
+ show (WebSocketAddress _ _) = "websocket"
+
+instance PeerAddressType WebSocketAddress where
+ sendBytesToAddress (WebSocketAddress _ conn) msg = do
+ WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg
+
+startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO ()
+startWebsocketServer server addr port logd = do
+ void $ forkIO $ do
+ WS.runServer addr port $ \pending -> do
+ conn <- WS.acceptRequest pending
+ u <- newUnique
+ let paddr = WebSocketAddress u conn
+ void $ serverPeerCustom server paddr
+ handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do
+ WS.withPingThread conn 30 (return ()) $ do
+ forever $ do
+ WS.receiveDataMessage conn >>= \case
+ WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg
+ WS.Text {} -> logd $ "unexpected websocket text message"