diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/Main.hs | 63 | ||||
-rw-r--r-- | main/State.hs | 17 | ||||
-rw-r--r-- | main/Terminal.hs | 5 | ||||
-rw-r--r-- | main/Test.hs | 120 | ||||
-rw-r--r-- | main/Test/Service.hs | 21 | ||||
-rw-r--r-- | main/WebSocket.hs | 45 |
6 files changed, 213 insertions, 58 deletions
diff --git a/main/Main.hs b/main/Main.hs index 3f78db1..26f4b12 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -61,6 +61,7 @@ import State import Terminal import Test import Version +import WebSocket data Options = Options { optServer :: ServerOptions @@ -68,6 +69,7 @@ data Options = Options , optStorage :: StorageOption , optChatroomAutoSubscribe :: Maybe Int , optDmBotEcho :: Maybe Text + , optWebSocketServer :: Maybe Int , optShowHelp :: Bool , optShowVersion :: Bool } @@ -90,6 +92,7 @@ defaultOptions = Options , optStorage = DefaultStorage , optChatroomAutoSubscribe = Nothing , optDmBotEcho = Nothing + , optWebSocketServer = Nothing , optShowHelp = False , optShowVersion = False } @@ -144,6 +147,9 @@ options = , Option [] ["dm-bot-echo"] (ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") "automatically reply to direct messages with the same text prefixed with <prefix>" + , Option [] [ "websocket-server" ] + (ReqArg (\value -> \opts -> opts { optWebSocketServer = Just (read value) }) "<port>") + "start WebSocket server on given port" , Option ['h'] ["help"] (NoArg $ \opts -> opts { optShowHelp = True }) "show this help and exit" @@ -362,6 +368,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do startServer (optServer opts) erebosHead extPrintLn $ map soptService $ filter soptEnabled $ optServices opts + case optWebSocketServer opts of + Just port -> startWebsocketServer server "::" port extPrintLn + Nothing -> return () + void $ liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange server peerIdentity peer >>= \case @@ -493,7 +503,10 @@ getSelectedChatroom = gets csContext >>= \case _ -> throwOtherError "no chatroom selected" getSelectedConversation :: CommandM Conversation -getSelectedConversation = gets csContext >>= \case +getSelectedConversation = gets csContext >>= getConversationFromContext + +getConversationFromContext :: CommandContext -> CommandM Conversation +getConversationFromContext = \case SelectedPeer peer -> peerIdentity peer >>= \case PeerIdentityFull pid -> directMessageConversation $ finalOwner pid _ -> throwOtherError "incomplete peer identity" @@ -507,6 +520,13 @@ getSelectedConversation = gets csContext >>= \case SelectedConversation conv -> reloadConversation conv _ -> throwOtherError "no contact, peer or conversation selected" +getSelectedOrManualContext :: CommandM CommandContext +getSelectedOrManualContext = do + asks ciLine >>= \case + "" -> gets csContext + str | all isDigit str -> getContextByIndex (read str) + _ -> throwOtherError "invalid index" + commands :: [(String, Command)] commands = [ ("history", cmdHistory) @@ -628,19 +648,22 @@ cmdMembers = do forM_ (chatroomMembers room) $ \x -> do cmdPutStrLn $ maybe "<unnamed>" T.unpack $ idName x +getContextByIndex :: Int -> CommandM CommandContext +getContextByIndex n = do + join (asks ciContextOptions) >>= \ctxs -> if + | n > 0, (ctx : _) <- drop (n - 1) ctxs -> return ctx + | otherwise -> throwOtherError "invalid index" cmdSelectContext :: Command cmdSelectContext = do n <- read <$> asks ciLine - join (asks ciContextOptions) >>= \ctxs -> if - | n > 0, (ctx : _) <- drop (n - 1) ctxs -> do - modify $ \s -> s { csContext = ctx } - case ctx of - SelectedChatroom rstate -> do - when (not (roomStateSubscribe rstate)) $ do - chatroomSetSubscribe (head $ roomStateData rstate) True - _ -> return () - | otherwise -> throwOtherError "invalid index" + ctx <- getContextByIndex n + modify $ \s -> s { csContext = ctx } + case ctx of + SelectedChatroom rstate -> do + when (not (roomStateSubscribe rstate)) $ do + chatroomSetSubscribe (head $ roomStateData rstate) True + _ -> return () cmdSend :: Command cmdSend = void $ do @@ -654,12 +677,12 @@ cmdSend = void $ do cmdDelete :: Command cmdDelete = void $ do - deleteConversation =<< getSelectedConversation + deleteConversation =<< getConversationFromContext =<< getSelectedOrManualContext modify $ \s -> s { csContext = NoContext } cmdHistory :: Command cmdHistory = void $ do - conv <- getSelectedConversation + conv <- getConversationFromContext =<< getSelectedOrManualContext case conversationHistory conv of thread@(_:_) -> do tzone <- liftIO $ getCurrentTimeZone @@ -824,7 +847,7 @@ cmdConversations = do cmdDetails :: Command cmdDetails = do - gets csContext >>= \case + getSelectedOrManualContext >>= \case SelectedPeer peer -> do cmdPutStrLn $ unlines [ "Network peer:" @@ -900,17 +923,11 @@ cmdDiscoveryInit = void $ do cmdDiscovery :: Command cmdDiscovery = void $ do - Just peer <- gets csIcePeer - st <- getStorage + server <- asks ciServer sref <- asks ciLine - eprint <- asks ciPrint - liftIO $ readRef st (BC.pack sref) >>= \case - Nothing -> error "ref does not exist" - Just ref -> do - res <- runExceptT $ sendToPeer peer $ DiscoverySearch ref - case res of - Right _ -> return () - Left err -> eprint err + case readRefDigest (BC.pack sref) of + Nothing -> throwOtherError "failed to parse ref" + Just dgst -> discoverySearch server dgst #ifdef ENABLE_ICE_SUPPORT diff --git a/main/State.hs b/main/State.hs index d357844..150178e 100644 --- a/main/State.hs +++ b/main/State.hs @@ -58,9 +58,8 @@ updateSharedIdentity term = updateLocalState_ $ updateSharedState_ $ \case Nothing -> throwOtherError "no existing shared identity" interactiveIdentityUpdate :: (Foldable f, MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => Terminal -> Identity f -> m UnifiedIdentity -interactiveIdentityUpdate term identity = do - let public = idKeyIdentity identity - +interactiveIdentityUpdate term fidentity = do + identity <- mergeIdentity fidentity name <- liftIO $ do setPrompt term $ T.unpack $ T.concat $ concat [ [ T.pack "Name" ] @@ -71,11 +70,11 @@ interactiveIdentityUpdate term identity = do ] getInputLine term $ KeepPrompt . maybe T.empty T.pack - if | T.null name -> mergeIdentity identity + if | T.null name -> return identity | otherwise -> do - secret <- loadKey public - maybe (throwOtherError "created invalid identity") return . validateIdentity =<< - mstore =<< sign secret =<< mstore (emptyIdentityData public) - { iddPrev = toList $ idDataF identity - , iddName = Just name + secret <- loadKey $ idKeyIdentity identity + maybe (throwOtherError "created invalid identity") return . validateExtendedIdentity =<< + mstore =<< sign secret =<< mstore . ExtendedIdentityData =<< return (emptyIdentityExtension $ idData identity) + { idePrev = toList $ idExtDataF identity + , ideName = Just name } diff --git a/main/Terminal.hs b/main/Terminal.hs index 5dc3612..150bd8c 100644 --- a/main/Terminal.hs +++ b/main/Terminal.hs @@ -31,8 +31,9 @@ import Data.List import Data.Text (Text) import Data.Text qualified as T -import System.IO import System.Console.ANSI +import System.IO +import System.IO.Error data Terminal = Terminal @@ -107,7 +108,7 @@ termPutStr Terminal {..} str = do getInput :: IO Input getInput = do - getChar >>= \case + handleJust (guard . isEOFError) (\() -> return InputEnd) $ getChar >>= \case '\ESC' -> do esc <- readEsc case parseEsc esc of diff --git a/main/Test.hs b/main/Test.hs index e54285a..fa8501e 100644 --- a/main/Test.hs +++ b/main/Test.hs @@ -26,7 +26,7 @@ import Data.Text qualified as T import Data.Text.Encoding import Data.Text.IO qualified as T import Data.Typeable -import Data.UUID qualified as U +import Data.UUID.Types qualified as U import Network.Socket @@ -44,6 +44,7 @@ import Erebos.Object import Erebos.Pairing import Erebos.PubKey import Erebos.Service +import Erebos.Service.Stream import Erebos.Set import Erebos.State import Erebos.Storable @@ -66,10 +67,17 @@ data TestState = TestState data RunningServer = RunningServer { rsServer :: Server - , rsPeers :: MVar (Int, [(Int, Peer)]) + , rsPeers :: MVar ( Int, [ TestPeer ] ) , rsPeerThread :: ThreadId } +data TestPeer = TestPeer + { tpIndex :: Int + , tpPeer :: Peer + , tpStreamReaders :: MVar [ (Int, StreamReader ) ] + , tpStreamWriters :: MVar [ (Int, StreamWriter ) ] + } + initTestState :: TestState initTestState = TestState { tsHead = Nothing @@ -137,17 +145,20 @@ cmdOut line = do getPeer :: Text -> CommandM Peer -getPeer spidx = do +getPeer spidx = tpPeer <$> getTestPeer spidx + +getTestPeer :: Text -> CommandM TestPeer +getTestPeer spidx = do Just RunningServer {..} <- gets tsServer - Just peer <- lookup (read $ T.unpack spidx) . snd <$> liftIO (readMVar rsPeers) + Just peer <- find (((read $ T.unpack spidx) ==) . tpIndex) . snd <$> liftIO (readMVar rsPeers) return peer -getPeerIndex :: MVar (Int, [(Int, Peer)]) -> ServiceHandler (PairingService a) Int +getPeerIndex :: MVar ( Int, [ TestPeer ] ) -> ServiceHandler s Int getPeerIndex pmvar = do peer <- asks svcPeer - maybe 0 fst . find ((==peer) . snd) . snd <$> liftIO (readMVar pmvar) + maybe 0 tpIndex . find ((peer ==) . tpPeer) . snd <$> liftIO (readMVar pmvar) -pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar (Int, [(Int, Peer)]) -> String -> PairingAttributes a +pairingAttributes :: PairingResult a => proxy (PairingService a) -> Output -> MVar ( Int, [ TestPeer ] ) -> String -> PairingAttributes a pairingAttributes _ out peers prefix = PairingAttributes { pairingHookRequest = return () @@ -267,6 +278,9 @@ commands = map (T.pack *** id) , ("peer-drop", cmdPeerDrop) , ("peer-list", cmdPeerList) , ("test-message-send", cmdTestMessageSend) + , ("test-stream-open", cmdTestStreamOpen) + , ("test-stream-close", cmdTestStreamClose) + , ("test-stream-send", cmdTestStreamSend) , ("local-state-get", cmdLocalStateGet) , ("local-state-replace", cmdLocalStateReplace) , ("local-state-wait", cmdLocalStateWait) @@ -286,6 +300,7 @@ commands = map (T.pack *** id) , ("contact-set-name", cmdContactSetName) , ("dm-send-peer", cmdDmSendPeer) , ("dm-send-contact", cmdDmSendContact) + , ("dm-send-identity", cmdDmSendIdentity) , ("dm-list-peer", cmdDmListPeer) , ("dm-list-contact", cmdDmListContact) , ("chatroom-create", cmdChatroomCreate) @@ -501,7 +516,20 @@ cmdStartServer = do { testMessageReceived = \obj otype len sref -> do liftIO $ do void $ store (headStorage h) obj - outLine out $ unwords ["test-message-received", otype, len, sref] + outLine out $ unwords [ "test-message-received", otype, len, sref ] + , testStreamsReceived = \streams -> do + pidx <- getPeerIndex rsPeers + liftIO $ do + nums <- mapM getStreamReaderNumber streams + outLine out $ unwords $ "test-stream-open-from" : show pidx : map show nums + forM_ (zip nums streams) $ \( num, stream ) -> void $ forkIO $ do + let go = readStreamPacket stream >>= \case + StreamData seqNum bytes -> do + outLine out $ unwords [ "test-stream-received", show pidx, show num, show seqNum, BC.unpack bytes ] + go + StreamClosed seqNum -> do + outLine out $ unwords [ "test-stream-closed-from", show pidx, show num, show seqNum ] + go } sname -> throwOtherError $ "unknown service `" <> T.unpack sname <> "'" @@ -510,15 +538,23 @@ cmdStartServer = do rsPeerThread <- liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange rsServer - let printPeer (idx, p) = do - params <- peerIdentity p >>= return . \case + let printPeer TestPeer {..} = do + params <- peerIdentity tpPeer >>= return . \case PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) - _ -> [ "addr", show (peerAddress p) ] - outLine out $ unwords $ [ "peer", show idx ] ++ params + _ -> [ "addr", show (peerAddress tpPeer) ] + outLine out $ unwords $ [ "peer", show tpIndex ] ++ params - update (nid, []) = printPeer (nid, peer) >> return (nid + 1, [(nid, peer)]) - update cur@(nid, p:ps) | snd p == peer = printPeer p >> return cur - | otherwise = fmap (p:) <$> update (nid, ps) + update ( tpIndex, [] ) = do + tpPeer <- return peer + tpStreamReaders <- newMVar [] + tpStreamWriters <- newMVar [] + let tp = TestPeer {..} + printPeer tp + return ( tpIndex + 1, [ tp ] ) + + update cur@( nid, p : ps ) + | tpPeer p == peer = printPeer p >> return cur + | otherwise = fmap (p :) <$> update ( nid, ps ) modifyMVar_ rsPeers update @@ -555,10 +591,10 @@ cmdPeerList = do peers <- liftIO $ getCurrentPeerList rsServer tpeers <- liftIO $ readMVar rsPeers forM_ peers $ \peer -> do - Just (n, _) <- return $ find ((peer==).snd) . snd $ tpeers + Just tp <- return $ find ((peer ==) . tpPeer) . snd $ tpeers mbpid <- peerIdentity peer cmdOut $ unwords $ concat - [ [ "peer-list-item", show n ] + [ [ "peer-list-item", show (tpIndex tp) ] , [ "addr", show (peerAddress peer) ] , case mbpid of PeerIdentityFull pid -> ("id":) $ map (maybe "<unnamed>" T.unpack . idName) (unfoldOwners pid) _ -> [] @@ -575,6 +611,40 @@ cmdTestMessageSend = do sendManyToPeer peer $ map (TestMessage . wrappedLoad) refs cmdOut "test-message-send done" +cmdTestStreamOpen :: Command +cmdTestStreamOpen = do + spidx : rest <- asks tiParams + tp <- getTestPeer spidx + count <- case rest of + [] -> return 1 + tcount : _ -> return $ read $ T.unpack tcount + + out <- asks tiOutput + runPeerService (tpPeer tp) $ do + streams <- openTestStreams count + afterCommit $ do + nums <- mapM getStreamWriterNumber streams + modifyMVar_ (tpStreamWriters tp) $ return . (++ zip nums streams) + outLine out $ unwords $ "test-stream-open-done" + : T.unpack spidx + : map show nums + +cmdTestStreamClose :: Command +cmdTestStreamClose = do + [ spidx, sid ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ closeStream stream + cmdOut $ unwords [ "test-stream-close-done", T.unpack spidx, T.unpack sid ] + +cmdTestStreamSend :: Command +cmdTestStreamSend = do + [ spidx, sid, content ] <- asks tiParams + tp <- getTestPeer spidx + Just stream <- lookup (read $ T.unpack sid) <$> liftIO (readMVar (tpStreamWriters tp)) + liftIO $ writeStream stream $ encodeUtf8 content + cmdOut $ unwords [ "test-stream-send-done", T.unpack spidx, T.unpack sid ] + cmdLocalStateGet :: Command cmdLocalStateGet = do h <- getHead @@ -747,6 +817,14 @@ cmdDmSendContact = do Just to <- contactIdentity <$> getContact cid void $ sendDirectMessage to msg +cmdDmSendIdentity :: Command +cmdDmSendIdentity = do + st <- asks tiStorage + [ tid, msg ] <- asks tiParams + Just ref <- liftIO $ readRef st $ encodeUtf8 tid + Just to <- return $ validateExtendedIdentity $ wrappedLoad ref + void $ sendDirectMessage to msg + dmList :: Foldable f => Identity f -> Command dmList peer = do threads <- toThreadList . lookupSharedValue . lsShared . headObject <$> getHead @@ -888,11 +966,7 @@ cmdChatroomMessageSend = do cmdDiscoveryConnect :: Command cmdDiscoveryConnect = do - st <- asks tiStorage [ tref ] <- asks tiParams - Just ref <- liftIO $ readRef st $ encodeUtf8 tref - + Just dgst <- return $ readRefDigest $ encodeUtf8 tref Just RunningServer {..} <- gets tsServer - peers <- liftIO $ getCurrentPeerList rsServer - forM_ peers $ \peer -> do - sendToPeer peer $ DiscoverySearch ref + discoverySearch rsServer dgst diff --git a/main/Test/Service.hs b/main/Test/Service.hs index 8c58dee..c0be07d 100644 --- a/main/Test/Service.hs +++ b/main/Test/Service.hs @@ -1,8 +1,11 @@ module Test.Service ( TestMessage(..), TestMessageAttributes(..), + + openTestStreams, ) where +import Control.Monad import Control.Monad.Reader import Data.ByteString.Lazy.Char8 qualified as BL @@ -10,12 +13,14 @@ import Data.ByteString.Lazy.Char8 qualified as BL import Erebos.Network import Erebos.Object import Erebos.Service +import Erebos.Service.Stream import Erebos.Storable data TestMessage = TestMessage (Stored Object) data TestMessageAttributes = TestMessageAttributes { testMessageReceived :: Object -> String -> String -> String -> ServiceHandler TestMessage () + , testStreamsReceived :: [ StreamReader ] -> ServiceHandler TestMessage () } instance Storable TestMessage where @@ -26,7 +31,10 @@ instance Service TestMessage where serviceID _ = mkServiceID "cb46b92c-9203-4694-8370-8742d8ac9dc8" type ServiceAttributes TestMessage = TestMessageAttributes - defaultServiceAttributes _ = TestMessageAttributes (\_ _ _ _ -> return ()) + defaultServiceAttributes _ = TestMessageAttributes + { testMessageReceived = \_ _ _ _ -> return () + , testStreamsReceived = \_ -> return () + } serviceHandler smsg = do let TestMessage sobj = fromStored smsg @@ -36,3 +44,14 @@ instance Service TestMessage where cb <- asks $ testMessageReceived . svcAttributes cb obj otype len (show $ refDigest $ storedRef sobj) _ -> return () + + streams <- receivedStreams + when (not $ null streams) $ do + cb <- asks $ testStreamsReceived . svcAttributes + cb streams + + +openTestStreams :: Int -> ServiceHandler TestMessage [ StreamWriter ] +openTestStreams count = do + replyPacket . TestMessage =<< mstore (Rec []) + replicateM count openStream diff --git a/main/WebSocket.hs b/main/WebSocket.hs new file mode 100644 index 0000000..fbdd65f --- /dev/null +++ b/main/WebSocket.hs @@ -0,0 +1,45 @@ +module WebSocket ( + startWebsocketServer, +) where + +import Control.Concurrent +import Control.Exception +import Control.Monad + +import Data.ByteString.Lazy qualified as BL +import Data.Unique + +import Erebos.Network + +import Network.WebSockets qualified as WS + + +data WebSocketAddress = WebSocketAddress Unique WS.Connection + +instance Eq WebSocketAddress where + WebSocketAddress u _ == WebSocketAddress u' _ = u == u' + +instance Ord WebSocketAddress where + compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u' + +instance Show WebSocketAddress where + show (WebSocketAddress _ _) = "websocket" + +instance PeerAddressType WebSocketAddress where + sendBytesToAddress (WebSocketAddress _ conn) msg = do + WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg + +startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO () +startWebsocketServer server addr port logd = do + void $ forkIO $ do + WS.runServer addr port $ \pending -> do + conn <- WS.acceptRequest pending + u <- newUnique + let paddr = WebSocketAddress u conn + void $ serverPeerCustom server paddr + handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do + WS.withPingThread conn 30 (return ()) $ do + forever $ do + WS.receiveDataMessage conn >>= \case + WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg + WS.Text {} -> logd $ "unexpected websocket text message" |