diff options
author | Roman Smrž <roman.smrz@seznam.cz> | 2025-04-26 19:54:48 +0200 |
---|---|---|
committer | Roman Smrž <roman.smrz@seznam.cz> | 2025-04-28 22:08:39 +0200 |
commit | acb147022e867f7c1df03d8ac175b6de98a0d598 (patch) | |
tree | 101282d8de0742f9ffa7f1d36509167cd2a01109 | |
parent | 31436dbed550c76f2165f29f48e255f19cc3561a (diff) |
WebSocket server
Changelog: Experimental WebSocket server
-rw-r--r-- | erebos.cabal | 2 | ||||
-rw-r--r-- | main/Main.hs | 10 | ||||
-rw-r--r-- | main/WebSocket.hs | 45 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 50 |
4 files changed, 98 insertions, 9 deletions
diff --git a/erebos.cabal b/erebos.cabal index a0a0d4b..54e4b47 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -206,6 +206,7 @@ executable erebos Test.Service Version Version.Git + WebSocket autogen-modules: Paths_erebos @@ -223,6 +224,7 @@ executable erebos time, transformers >= 0.5 && <0.7, uuid-types, + websockets ^>= { 0.12.7, 0.13 }, if !flag(cryptonite) build-depends: diff --git a/main/Main.hs b/main/Main.hs index 3f78db1..a1a8b50 100644 --- a/main/Main.hs +++ b/main/Main.hs @@ -61,6 +61,7 @@ import State import Terminal import Test import Version +import WebSocket data Options = Options { optServer :: ServerOptions @@ -68,6 +69,7 @@ data Options = Options , optStorage :: StorageOption , optChatroomAutoSubscribe :: Maybe Int , optDmBotEcho :: Maybe Text + , optWebSocketServer :: Maybe Int , optShowHelp :: Bool , optShowVersion :: Bool } @@ -90,6 +92,7 @@ defaultOptions = Options , optStorage = DefaultStorage , optChatroomAutoSubscribe = Nothing , optDmBotEcho = Nothing + , optWebSocketServer = Nothing , optShowHelp = False , optShowVersion = False } @@ -144,6 +147,9 @@ options = , Option [] ["dm-bot-echo"] (ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>") "automatically reply to direct messages with the same text prefixed with <prefix>" + , Option [] [ "websocket-server" ] + (ReqArg (\value -> \opts -> opts { optWebSocketServer = Just (read value) }) "<port>") + "start WebSocket server on given port" , Option ['h'] ["help"] (NoArg $ \opts -> opts { optShowHelp = True }) "show this help and exit" @@ -362,6 +368,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do startServer (optServer opts) erebosHead extPrintLn $ map soptService $ filter soptEnabled $ optServices opts + case optWebSocketServer opts of + Just port -> startWebsocketServer server "::" port extPrintLn + Nothing -> return () + void $ liftIO $ forkIO $ void $ forever $ do peer <- getNextPeerChange server peerIdentity peer >>= \case diff --git a/main/WebSocket.hs b/main/WebSocket.hs new file mode 100644 index 0000000..fbdd65f --- /dev/null +++ b/main/WebSocket.hs @@ -0,0 +1,45 @@ +module WebSocket ( + startWebsocketServer, +) where + +import Control.Concurrent +import Control.Exception +import Control.Monad + +import Data.ByteString.Lazy qualified as BL +import Data.Unique + +import Erebos.Network + +import Network.WebSockets qualified as WS + + +data WebSocketAddress = WebSocketAddress Unique WS.Connection + +instance Eq WebSocketAddress where + WebSocketAddress u _ == WebSocketAddress u' _ = u == u' + +instance Ord WebSocketAddress where + compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u' + +instance Show WebSocketAddress where + show (WebSocketAddress _ _) = "websocket" + +instance PeerAddressType WebSocketAddress where + sendBytesToAddress (WebSocketAddress _ conn) msg = do + WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg + +startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO () +startWebsocketServer server addr port logd = do + void $ forkIO $ do + WS.runServer addr port $ \pending -> do + conn <- WS.acceptRequest pending + u <- newUnique + let paddr = WebSocketAddress u conn + void $ serverPeerCustom server paddr + handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do + WS.withPingThread conn 30 (return ()) $ do + forever $ do + WS.receiveDataMessage conn >>= \case + WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg + WS.Text {} -> logd $ "unexpected websocket text message" diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 54658de..08f4e5c 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -14,7 +14,12 @@ module Erebos.Network ( PeerIdentity(..), peerIdentity, WaitingRef, wrDigest, Service(..), + + PeerAddressType(..), + receivedFromCustomAddress, + serverPeer, + serverPeerCustom, #ifdef ENABLE_ICE_SUPPORT serverPeerIce, #endif @@ -36,13 +41,14 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State +import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.Function import Data.IP qualified as IP import Data.List import Data.Map (Map) -import qualified Data.Map as M +import Data.Map qualified as M import Data.Maybe import Data.Typeable import Data.Word @@ -56,7 +62,7 @@ import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) -import qualified Network.Socket.ByteString as S +import Network.Socket.ByteString qualified as S import Erebos.Error #ifdef ENABLE_ICE_SUPPORT @@ -157,12 +163,19 @@ setPeerChannel Peer {..} ch = do instance Eq Peer where (==) = (==) `on` peerIdentityVar -data PeerAddress = DatagramAddress SockAddr +class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where + sendBytesToAddress :: addr -> ByteString -> IO () + +data PeerAddress + = forall addr. PeerAddressType addr => CustomPeerAddress addr + | DatagramAddress SockAddr #ifdef ENABLE_ICE_SUPPORT - | PeerIceSession IceSession + | PeerIceSession IceSession #endif instance Show PeerAddress where + show (CustomPeerAddress addr) = show addr + show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, port) | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 @@ -170,22 +183,32 @@ instance Show PeerAddress where Just (addr, port) -> [show addr, show port] _ -> [show saddr] + #ifdef ENABLE_ICE_SUPPORT show (PeerIceSession ice) = show ice #endif instance Eq PeerAddress where + CustomPeerAddress addr == CustomPeerAddress addr' + | Just addr'' <- cast addr' = addr == addr'' DatagramAddress addr == DatagramAddress addr' = addr == addr' #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice == PeerIceSession ice' = ice == ice' - _ == _ = False #endif + _ == _ = False instance Ord PeerAddress where + compare (CustomPeerAddress addr) (CustomPeerAddress addr') + | Just addr'' <- cast addr' = compare addr addr'' + | otherwise = compare (typeOf addr) (typeOf addr') + compare (CustomPeerAddress _ ) _ = LT + compare _ (CustomPeerAddress _ ) = GT + compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' #ifdef ENABLE_ICE_SUPPORT compare (DatagramAddress _ ) _ = LT compare _ (DatagramAddress _ ) = GT + compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' #endif @@ -198,9 +221,10 @@ peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] - | PeerConnected (Connection PeerAddress) - | PeerDropped +data PeerState + = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ] + | PeerConnected (Connection PeerAddress) + | PeerDropped lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID @@ -316,8 +340,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do case paddr of + CustomPeerAddress addr -> sendBytesToAddress addr msg DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice -> iceSend ice msg @@ -790,6 +815,10 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer +receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () +receivedFromCustomAddress Server {..} addr msg = do + writeFlowIO serverRawPath ( CustomPeerAddress addr, msg ) + mkPeer :: Server -> PeerAddress -> IO Peer mkPeer peerServer_ peerAddress = do peerState <- newTVarIO (PeerInit []) @@ -808,6 +837,9 @@ serverPeer server paddr = do _ -> paddr serverPeer' server (DatagramAddress paddr') +serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer +serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr) + #ifdef ENABLE_ICE_SUPPORT serverPeerIce :: Server -> IceSession -> IO Peer serverPeerIce server@Server {..} ice = do |