summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2025-04-26 19:54:48 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2025-04-28 22:08:39 +0200
commitacb147022e867f7c1df03d8ac175b6de98a0d598 (patch)
tree101282d8de0742f9ffa7f1d36509167cd2a01109
parent31436dbed550c76f2165f29f48e255f19cc3561a (diff)
WebSocket server
Changelog: Experimental WebSocket server
-rw-r--r--erebos.cabal2
-rw-r--r--main/Main.hs10
-rw-r--r--main/WebSocket.hs45
-rw-r--r--src/Erebos/Network.hs50
4 files changed, 98 insertions, 9 deletions
diff --git a/erebos.cabal b/erebos.cabal
index a0a0d4b..54e4b47 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -206,6 +206,7 @@ executable erebos
Test.Service
Version
Version.Git
+ WebSocket
autogen-modules:
Paths_erebos
@@ -223,6 +224,7 @@ executable erebos
time,
transformers >= 0.5 && <0.7,
uuid-types,
+ websockets ^>= { 0.12.7, 0.13 },
if !flag(cryptonite)
build-depends:
diff --git a/main/Main.hs b/main/Main.hs
index 3f78db1..a1a8b50 100644
--- a/main/Main.hs
+++ b/main/Main.hs
@@ -61,6 +61,7 @@ import State
import Terminal
import Test
import Version
+import WebSocket
data Options = Options
{ optServer :: ServerOptions
@@ -68,6 +69,7 @@ data Options = Options
, optStorage :: StorageOption
, optChatroomAutoSubscribe :: Maybe Int
, optDmBotEcho :: Maybe Text
+ , optWebSocketServer :: Maybe Int
, optShowHelp :: Bool
, optShowVersion :: Bool
}
@@ -90,6 +92,7 @@ defaultOptions = Options
, optStorage = DefaultStorage
, optChatroomAutoSubscribe = Nothing
, optDmBotEcho = Nothing
+ , optWebSocketServer = Nothing
, optShowHelp = False
, optShowVersion = False
}
@@ -144,6 +147,9 @@ options =
, Option [] ["dm-bot-echo"]
(ReqArg (\prefix -> \opts -> opts { optDmBotEcho = Just (T.pack prefix) }) "<prefix>")
"automatically reply to direct messages with the same text prefixed with <prefix>"
+ , Option [] [ "websocket-server" ]
+ (ReqArg (\value -> \opts -> opts { optWebSocketServer = Just (read value) }) "<port>")
+ "start WebSocket server on given port"
, Option ['h'] ["help"]
(NoArg $ \opts -> opts { optShowHelp = True })
"show this help and exit"
@@ -362,6 +368,10 @@ interactiveLoop st opts = withTerminal commandCompletion $ \term -> do
startServer (optServer opts) erebosHead extPrintLn $
map soptService $ filter soptEnabled $ optServices opts
+ case optWebSocketServer opts of
+ Just port -> startWebsocketServer server "::" port extPrintLn
+ Nothing -> return ()
+
void $ liftIO $ forkIO $ void $ forever $ do
peer <- getNextPeerChange server
peerIdentity peer >>= \case
diff --git a/main/WebSocket.hs b/main/WebSocket.hs
new file mode 100644
index 0000000..fbdd65f
--- /dev/null
+++ b/main/WebSocket.hs
@@ -0,0 +1,45 @@
+module WebSocket (
+ startWebsocketServer,
+) where
+
+import Control.Concurrent
+import Control.Exception
+import Control.Monad
+
+import Data.ByteString.Lazy qualified as BL
+import Data.Unique
+
+import Erebos.Network
+
+import Network.WebSockets qualified as WS
+
+
+data WebSocketAddress = WebSocketAddress Unique WS.Connection
+
+instance Eq WebSocketAddress where
+ WebSocketAddress u _ == WebSocketAddress u' _ = u == u'
+
+instance Ord WebSocketAddress where
+ compare (WebSocketAddress u _) (WebSocketAddress u' _) = compare u u'
+
+instance Show WebSocketAddress where
+ show (WebSocketAddress _ _) = "websocket"
+
+instance PeerAddressType WebSocketAddress where
+ sendBytesToAddress (WebSocketAddress _ conn) msg = do
+ WS.sendDataMessage conn $ WS.Binary $ BL.fromStrict msg
+
+startWebsocketServer :: Server -> String -> Int -> (String -> IO ()) -> IO ()
+startWebsocketServer server addr port logd = do
+ void $ forkIO $ do
+ WS.runServer addr port $ \pending -> do
+ conn <- WS.acceptRequest pending
+ u <- newUnique
+ let paddr = WebSocketAddress u conn
+ void $ serverPeerCustom server paddr
+ handle (\(e :: SomeException) -> logd $ "WebSocket thread exception: " ++ show e) $ do
+ WS.withPingThread conn 30 (return ()) $ do
+ forever $ do
+ WS.receiveDataMessage conn >>= \case
+ WS.Binary msg -> receivedFromCustomAddress server paddr $ BL.toStrict msg
+ WS.Text {} -> logd $ "unexpected websocket text message"
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 54658de..08f4e5c 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -14,7 +14,12 @@ module Erebos.Network (
PeerIdentity(..), peerIdentity,
WaitingRef, wrDigest,
Service(..),
+
+ PeerAddressType(..),
+ receivedFromCustomAddress,
+
serverPeer,
+ serverPeerCustom,
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce,
#endif
@@ -36,13 +41,14 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
+import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
import Data.Map (Map)
-import qualified Data.Map as M
+import Data.Map qualified as M
import Data.Maybe
import Data.Typeable
import Data.Word
@@ -56,7 +62,7 @@ import Foreign.Storable as F
import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
-import qualified Network.Socket.ByteString as S
+import Network.Socket.ByteString qualified as S
import Erebos.Error
#ifdef ENABLE_ICE_SUPPORT
@@ -157,12 +163,19 @@ setPeerChannel Peer {..} ch = do
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
-data PeerAddress = DatagramAddress SockAddr
+class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where
+ sendBytesToAddress :: addr -> ByteString -> IO ()
+
+data PeerAddress
+ = forall addr. PeerAddressType addr => CustomPeerAddress addr
+ | DatagramAddress SockAddr
#ifdef ENABLE_ICE_SUPPORT
- | PeerIceSession IceSession
+ | PeerIceSession IceSession
#endif
instance Show PeerAddress where
+ show (CustomPeerAddress addr) = show addr
+
show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of
Just (IP.IPv6 ipv6, port)
| (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
@@ -170,22 +183,32 @@ instance Show PeerAddress where
Just (addr, port)
-> [show addr, show port]
_ -> [show saddr]
+
#ifdef ENABLE_ICE_SUPPORT
show (PeerIceSession ice) = show ice
#endif
instance Eq PeerAddress where
+ CustomPeerAddress addr == CustomPeerAddress addr'
+ | Just addr'' <- cast addr' = addr == addr''
DatagramAddress addr == DatagramAddress addr' = addr == addr'
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice == PeerIceSession ice' = ice == ice'
- _ == _ = False
#endif
+ _ == _ = False
instance Ord PeerAddress where
+ compare (CustomPeerAddress addr) (CustomPeerAddress addr')
+ | Just addr'' <- cast addr' = compare addr addr''
+ | otherwise = compare (typeOf addr) (typeOf addr')
+ compare (CustomPeerAddress _ ) _ = LT
+ compare _ (CustomPeerAddress _ ) = GT
+
compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr'
#ifdef ENABLE_ICE_SUPPORT
compare (DatagramAddress _ ) _ = LT
compare _ (DatagramAddress _ ) = GT
+
compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
#endif
@@ -198,9 +221,10 @@ peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
- | PeerConnected (Connection PeerAddress)
- | PeerDropped
+data PeerState
+ = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
@@ -316,8 +340,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
case paddr of
+ CustomPeerAddress addr -> sendBytesToAddress addr msg
DatagramAddress addr -> void $ S.sendTo sock msg addr
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice -> iceSend ice msg
@@ -790,6 +815,10 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
+receivedFromCustomAddress Server {..} addr msg = do
+ writeFlowIO serverRawPath ( CustomPeerAddress addr, msg )
+
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer peerServer_ peerAddress = do
peerState <- newTVarIO (PeerInit [])
@@ -808,6 +837,9 @@ serverPeer server paddr = do
_ -> paddr
serverPeer' server (DatagramAddress paddr')
+serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer
+serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr)
+
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce :: Server -> IceSession -> IO Peer
serverPeerIce server@Server {..} ice = do