summaryrefslogtreecommitdiff
path: root/src/Erebos/Discovery.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r--src/Erebos/Discovery.hs629
1 files changed, 461 insertions, 168 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index 48500d7..5590e4c 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -1,9 +1,13 @@
{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
module Erebos.Discovery (
DiscoveryService(..),
DiscoveryAttributes(..),
- DiscoveryConnection(..)
+ DiscoveryConnection(..),
+
+ discoverySearch,
+ discoverySetupTunnel,
) where
import Control.Concurrent
@@ -11,39 +15,58 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
-import Data.IP qualified as IP
+import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
import Data.Maybe
+import Data.Proxy
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
-import Network.Socket
+import Text.Read
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
#endif
import Erebos.Identity
import Erebos.Network
+import Erebos.Network.Address
import Erebos.Object
import Erebos.Service
+import Erebos.Service.Stream
import Erebos.Storable
+#ifndef ENABLE_ICE_SUPPORT
+type IceConfig = ()
+type IceSession = ()
+type IceRemoteInfo = Stored Object
+#endif
+
+
data DiscoveryService
- = DiscoverySelf [ Text ] (Maybe Int)
- | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
- | DiscoverySearch Ref
- | DiscoveryResult Ref [ Text ]
+ = DiscoverySelf [ DiscoveryAddress ] (Maybe Int)
+ | DiscoveryAcknowledged [ DiscoveryAddress ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
+ | DiscoverySearch (Either Ref RefDigest)
+ | DiscoveryResult (Either Ref RefDigest) [ DiscoveryAddress ]
| DiscoveryConnectionRequest DiscoveryConnection
| DiscoveryConnectionResponse DiscoveryConnection
+data DiscoveryAddress
+ = DiscoveryIP InetAddress PortNumber
+ | DiscoveryICE
+ | DiscoveryTunnel
+ | DiscoveryOther Text
+
data DiscoveryAttributes = DiscoveryAttributes
{ discoveryStunPort :: Maybe Word16
, discoveryStunServer :: Maybe Text
, discoveryTurnPort :: Maybe Word16
, discoveryTurnServer :: Maybe Text
+ , discoveryProvideTunnel :: Peer -> PeerAddress -> Bool
}
defaultDiscoveryAttributes :: DiscoveryAttributes
@@ -52,23 +75,22 @@ defaultDiscoveryAttributes = DiscoveryAttributes
, discoveryStunServer = Nothing
, discoveryTurnPort = Nothing
, discoveryTurnServer = Nothing
+ , discoveryProvideTunnel = \_ _ -> False
}
data DiscoveryConnection = DiscoveryConnection
- { dconnSource :: Ref
- , dconnTarget :: Ref
+ { dconnSource :: Either Ref RefDigest
+ , dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
-#ifdef ENABLE_ICE_SUPPORT
+ , dconnTunnel :: Bool
, dconnIceInfo :: Maybe IceRemoteInfo
-#else
- , dconnIceInfo :: Maybe (Stored Object)
-#endif
}
-emptyConnection :: Ref -> Ref -> DiscoveryConnection
+emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
+ dconnTunnel = False
dconnIceInfo = Nothing
instance Storable DiscoveryService where
@@ -84,19 +106,21 @@ instance Storable DiscoveryService where
storeMbInt "stun-port" stunPort
storeMbText "turn-server" turnServer
storeMbInt "turn-port" turnPort
- DiscoverySearch ref -> storeRawRef "search" ref
- DiscoveryResult ref addr -> do
- storeRawRef "result" ref
+ DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst
+ DiscoveryResult edgst addr -> do
+ either (storeRawRef "result") (storeRawWeak "result") edgst
mapM_ (storeText "address") addr
DiscoveryConnectionRequest conn -> storeConnection "request" conn
DiscoveryConnectionResponse conn -> storeConnection "response" conn
- where storeConnection ctype conn = do
- storeText "connection" $ ctype
- storeRawRef "source" $ dconnSource conn
- storeRawRef "target" $ dconnTarget conn
- storeMbText "address" $ dconnAddress conn
- storeMbRef "ice-info" $ dconnIceInfo conn
+ where
+ storeConnection (ctype :: Text) DiscoveryConnection {..} = do
+ storeText "connection" $ ctype
+ either (storeRawRef "source") (storeRawWeak "source") dconnSource
+ either (storeRawRef "target") (storeRawWeak "target") dconnTarget
+ storeMbText "address" dconnAddress
+ when dconnTunnel $ storeEmpty "tunnel"
+ storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
[ do
@@ -114,29 +138,87 @@ instance Storable DiscoveryService where
<*> loadMbInt "stun-port"
<*> loadMbText "turn-server"
<*> loadMbInt "turn-port"
- , DiscoverySearch <$> loadRawRef "search"
+ , DiscoverySearch <$> msum
+ [ Left <$> loadRawRef "search"
+ , Right <$> loadRawWeak "search"
+ ]
, DiscoveryResult
- <$> loadRawRef "result"
+ <$> msum
+ [ Left <$> loadRawRef "result"
+ , Right <$> loadRawWeak "result"
+ ]
<*> loadTexts "address"
, loadConnection "request" DiscoveryConnectionRequest
, loadConnection "response" DiscoveryConnectionResponse
]
- where loadConnection ctype ctor = do
- ctype' <- loadText "connection"
- guard $ ctype == ctype'
- return . ctor =<< DiscoveryConnection
- <$> loadRawRef "source"
- <*> loadRawRef "target"
- <*> loadMbText "address"
- <*> loadMbRef "ice-info"
+ where
+ loadConnection (ctype :: Text) ctor = do
+ ctype' <- loadText "connection"
+ guard $ ctype == ctype'
+ dconnSource <- msum
+ [ Left <$> loadRawRef "source"
+ , Right <$> loadRawWeak "source"
+ ]
+ dconnTarget <- msum
+ [ Left <$> loadRawRef "target"
+ , Right <$> loadRawWeak "target"
+ ]
+ dconnAddress <- loadMbText "address"
+ dconnTunnel <- isJust <$> loadMbEmpty "tunnel"
+ dconnIceInfo <- loadMbRef "ice-info"
+ return $ ctor DiscoveryConnection {..}
+
+instance StorableText DiscoveryAddress where
+ toText = \case
+ DiscoveryIP addr port -> T.unwords [ T.pack $ show addr, T.pack $ show port ]
+ DiscoveryICE -> "ICE"
+ DiscoveryTunnel -> "tunnel"
+ DiscoveryOther str -> str
+
+ fromText str = return $ if
+ | [ addrStr, portStr ] <- T.words str
+ , Just addr <- readMaybe $ T.unpack addrStr
+ , Just port <- readMaybe $ T.unpack portStr
+ -> DiscoveryIP addr port
+
+ | "ice" <- T.toLower str
+ -> DiscoveryICE
+
+ | "tunnel" <- str
+ -> DiscoveryTunnel
+
+ | otherwise
+ -> DiscoveryOther str
+
data DiscoveryPeer = DiscoveryPeer
{ dpPriority :: Int
, dpPeer :: Maybe Peer
- , dpAddress :: [ Text ]
-#ifdef ENABLE_ICE_SUPPORT
+ , dpAddress :: [ DiscoveryAddress ]
, dpIceSession :: Maybe IceSession
-#endif
+ }
+
+emptyPeer :: DiscoveryPeer
+emptyPeer = DiscoveryPeer
+ { dpPriority = 0
+ , dpPeer = Nothing
+ , dpAddress = []
+ , dpIceSession = Nothing
+ }
+
+data DiscoveryPeerState = DiscoveryPeerState
+ { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ]
+ -- ( original target, our write stream )
+ , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ]
+ -- ( original source, ( from source, to target ))
+ , dpsStunServer :: Maybe ( Text, Word16 )
+ , dpsTurnServer :: Maybe ( Text, Word16 )
+ , dpsIceConfig :: Maybe IceConfig
+ }
+
+data DiscoveryGlobalState = DiscoveryGlobalState
+ { dgsPeers :: Map RefDigest DiscoveryPeer
+ , dgsSearchingFor :: Set RefDigest
}
instance Service DiscoveryService where
@@ -145,42 +227,44 @@ instance Service DiscoveryService where
type ServiceAttributes DiscoveryService = DiscoveryAttributes
defaultServiceAttributes _ = defaultDiscoveryAttributes
-#ifdef ENABLE_ICE_SUPPORT
- type ServiceState DiscoveryService = Maybe IceConfig
- emptyServiceState _ = Nothing
-#endif
-
- type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer
- emptyServiceGlobalState _ = M.empty
+ type ServiceState DiscoveryService = DiscoveryPeerState
+ emptyServiceState _ = DiscoveryPeerState
+ { dpsOurTunnelRequests = []
+ , dpsRelayedTunnelRequests = []
+ , dpsStunServer = Nothing
+ , dpsTurnServer = Nothing
+ , dpsIceConfig = Nothing
+ }
+
+ type ServiceGlobalState DiscoveryService = DiscoveryGlobalState
+ emptyServiceGlobalState _ = DiscoveryGlobalState
+ { dgsPeers = M.empty
+ , dgsSearchingFor = S.empty
+ }
serviceHandler msg = case fromStored msg of
DiscoverySelf addrs priority -> do
pid <- asks svcPeerIdentity
peer <- asks svcPeer
+ paddrs <- getPeerAddresses peer
+
let insertHelper new old | dpPriority new > dpPriority old = new
| otherwise = old
- matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if
- | addr == T.pack "ICE" -> do
- return $ Just addr
-
- | [ ipaddr, port ] <- words (T.unpack addr)
- , DatagramAddress paddr <- peerAddress peer -> do
- saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- return $ if paddr == addrAddress saddr
- then Just addr
- else Nothing
-
- | otherwise -> return Nothing
-
- forM_ (idDataF =<< unfoldOwners pid) $ \s ->
- svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer
- { dpPriority = fromMaybe 0 priority
- , dpPeer = Just peer
- , dpAddress = addrs
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+
+ let matchedAddrs = flip filter addrs $ \case
+ DiscoveryICE -> True
+ DiscoveryIP ipaddr port ->
+ DatagramAddress (inetToSockAddr ( ipaddr, port )) `elem` paddrs
+ _ -> False
+
+ forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do
+ let dp = DiscoveryPeer
+ { dpPriority = fromMaybe 0 priority
+ , dpPeer = Just peer
+ , dpAddress = matchedAddrs
+ , dpIceSession = Nothing
+ }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s }
attrs <- asks svcAttributes
replyPacket $ DiscoveryAcknowledged matchedAddrs
(discoveryStunServer attrs)
@@ -189,15 +273,8 @@ instance Service DiscoveryService where
(discoveryTurnPort attrs)
DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do
-#ifdef ENABLE_ICE_SUPPORT
- paddr <- asks (peerAddress . svcPeer) >>= return . \case
- (DatagramAddress saddr) -> case IP.fromSockAddr saddr of
- Just (IP.IPv6 ipv6, _)
- | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
- -> Just $ T.pack $ show (IP.toIPv4w ipv4)
- Just (addr, _)
- -> Just $ T.pack $ show addr
- _ -> Nothing
+ paddr <- asks svcPeerAddress >>= return . \case
+ (DatagramAddress saddr) -> T.pack . show . fst <$> inetFromSockAddr saddr
_ -> Nothing
let toIceServer Nothing Nothing = Nothing
@@ -205,152 +282,368 @@ instance Service DiscoveryService where
toIceServer (Just server) Nothing = Just ( server, 0 )
toIceServer (Just server) (Just port) = Just ( server, port )
- cfg <- liftIO $ iceCreateConfig
- (toIceServer stunServer stunPort)
- (toIceServer turnServer turnPort)
- svcSet cfg
-#endif
- return ()
+ svcModify $ \s -> s
+ { dpsStunServer = toIceServer stunServer stunPort
+ , dpsTurnServer = toIceServer turnServer turnPort
+ }
- DiscoverySearch ref -> do
- dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal
- replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer
+ DiscoverySearch edgst -> do
+ dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal
+ peer <- asks svcPeer
+ paddr <- asks svcPeerAddress
+ attrs <- asks svcAttributes
+ let offerTunnel
+ | discoveryProvideTunnel attrs peer paddr = (++ [ DiscoveryTunnel ])
+ | otherwise = id
+ replyPacket $ DiscoveryResult edgst $ maybe [] (offerTunnel . dpAddress) dpeer
- DiscoveryResult ref [] -> do
- svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found"
+ DiscoveryResult _ [] -> do
+ -- not found
+ return ()
- DiscoveryResult ref addrs -> do
+ DiscoveryResult edgst addrs -> do
+ let dgst = either refDigest id edgst
-- TODO: check if we really requested that
server <- asks svcServer
+ st <- getStorage
self <- svcSelf
- mbIceConfig <- svcGet
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
- liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if
- | addr == T.pack "ICE"
-#ifdef ENABLE_ICE_SUPPORT
- , Just config <- mbIceConfig
- -> do
- ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
- rinfo <- iceRemoteInfo ice
- res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
- case res of
- Right _ -> return ()
- Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
-
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Nothing
- , dpAddress = []
- , dpIceSession = Just ice
- }
-#else
- -> do
- return ()
-#endif
+ let tryAddresses = \case
+ DiscoveryIP ipaddr port : _ -> do
+ void $ liftIO $ forkIO $ do
+ let saddr = inetToSockAddr ( ipaddr, port )
+ peer <- serverPeer server saddr
+ runAsService $ do
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
- | [ ipaddr, port ] <- words (T.unpack addr) -> do
- saddr <- head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- serverPeer server (addrAddress saddr)
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Just peer
- , dpAddress = []
+ DiscoveryICE : rest -> do
#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
+ getIceConfig >>= \case
+ Just config -> do
+ void $ liftIO $ forkIO $ do
+ ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
+ rinfo <- iceRemoteInfo ice
+
+ -- Try to promote weak ref to normal one for older peers:
+ edgst' <- case edgst of
+ Left r -> return (Left r)
+ Right d -> refFromDigest st d >>= \case
+ Just r -> return (Left r)
+ Nothing -> return (Right d)
+
+ res <- runExceptT $ sendToPeer discoveryPeer $
+ DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo }
+ case res of
+ Right _ -> return ()
+ Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
+
+ runAsService $ do
+ let upd dp = dp { dpIceSession = Just ice }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+
+ Nothing -> do
#endif
- }
+ tryAddresses rest
- | otherwise -> do
- runAsService $ do
- svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
+ DiscoveryTunnel : _ -> do
+ discoverySetupTunnelResponse dgst
+
+ addr : rest -> do
+ svcPrint $ "Discovery: unsupported address in result: " ++ T.unpack (toText addr)
+ tryAddresses rest
+
+ [] -> svcPrint $ "Discovery: no (supported) address received for " <> show dgst
+
+ tryAddresses addrs
DiscoveryConnectionRequest conn -> do
self <- svcSelf
+ attrs <- asks svcAttributes
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ if either refDigest id (dconnTarget conn) `elem` identityDigests self
+ then if
+ -- request for us, create ICE sesssion or tunnel
+ | dconnTunnel conn -> do
+ receivedStreams >>= \case
+ (tunnelReader : _) -> do
+ tunnelWriter <- openStream
+ replyPacket $ DiscoveryConnectionResponse rconn
+ { dconnTunnel = True
+ }
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ server <- asks svcServer
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+
+ [] -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (endpoint)"
+
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
+ | Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
- svcGet >>= \case
+ getIceConfig >>= \case
Just config -> do
liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do
rinfo <- iceRemoteInfo ice
res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo }
case res of
- Right _ -> do
- case dconnIceInfo conn of
- Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice
- Nothing -> putStrLn $ "Discovery: connection request without ICE remote info"
+ Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice
Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err
Nothing -> do
- svcPrint $ "Discovery: ICE request from peer without ICE configuration"
-#else
- return ()
+ return ()
#endif
- else do
- -- request to some of our peers, relay
- mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal
- case mbdp of
+ | otherwise -> do
+ svcPrint $ "Discovery: unsupported connection request"
+
+ else do
+ -- request to some of our peers, relay
+ peer <- asks svcPeer
+ paddr <- asks svcPeerAddress
+ mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
+ streams <- receivedStreams
+ case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
- | Just dpeer <- dpPeer dp -> do
- sendToPeer dpeer $ DiscoveryConnectionRequest conn
+ | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn -> if
+ | not (discoveryProvideTunnel attrs peer paddr) -> do
+ replyPacket $ DiscoveryConnectionResponse rconn
+ | fromSource : _ <- streams -> do
+ void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ toTarget <- openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest conn
+ | otherwise -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (relay)"
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionRequest conn
| otherwise -> svcPrint $ "Discovery: failed to relay connection request"
DiscoveryConnectionResponse conn -> do
self <- svcSelf
- dpeers <- svcGetGlobal
- if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ dps <- svcGet
+ dpeers <- dgsPeers <$> svcGetGlobal
+
+ if either refDigest id (dconnSource conn) `elem` identityDigests self
+ then do
-- response to our request, try to connect to the peer
-#ifdef ENABLE_ICE_SUPPORT
server <- asks svcServer
- if | Just addr <- dconnAddress conn
- , [ipaddr, port] <- words (T.unpack addr) -> do
- saddr <- liftIO $ head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- liftIO $ serverPeer server (addrAddress saddr)
- svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $
- DiscoveryPeer 0 (Just peer) [] Nothing
-
- | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers
+ if
+ | Just addr <- dconnAddress conn
+ , [ addrStr, portStr ] <- words (T.unpack addr)
+ , Just ipaddr <- readMaybe addrStr
+ , Just port <- readMaybe portStr
+ -> do
+ let saddr = inetToSockAddr ( ipaddr, port )
+ peer <- liftIO $ serverPeer server saddr
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s
+ { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
+
+ | dconnTunnel conn
+ , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ receivedStreams >>= \case
+ tunnelReader : _ -> do
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+ [] -> do
+ svcPrint $ "Discovery: missing stream in tunnel response"
+ liftIO $ closeStream tunnelWriter
+
+ | Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ svcPrint $ "Discovery: tunnel request failed"
+ liftIO $ closeStream tunnelWriter
+
+#ifdef ENABLE_ICE_SUPPORT
+ | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
, Just rinfo <- dconnIceInfo conn -> do
liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice
+#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
-#else
- return ()
-#endif
- else do
- -- response to relayed request
- case M.lookup (refDigest $ dconnSource conn) dpeers of
- Just dp | Just dpeer <- dpPeer dp -> do
+ else do
+ -- response to relayed request
+ streams <- receivedStreams
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) }
+
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
+ Just dp | Just dpeer <- dpPeer dp -> if
+ -- successful tunnel request
+ | dconnTunnel conn
+ , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ , fromTarget : _ <- streams
+ -> liftIO $ do
+ toSourceVar <- newEmptyMVar
+ void $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ liftIO . putMVar toSourceVar =<< openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionResponse conn
+ void $ forkIO $ do
+ relayStream fromSource toTarget
+ void $ forkIO $ do
+ toSource <- readMVar toSourceVar
+ relayStream fromTarget toSource
+
+ -- failed tunnel request
+ | Just ( _, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ -> do
+ liftIO $ closeStream toTarget
+ sendToPeer dpeer $ DiscoveryConnectionResponse conn
+
+ | otherwise -> do
sendToPeer dpeer $ DiscoveryConnectionResponse conn
- _ -> svcPrint $ "Discovery: failed to relay connection response"
+ _ -> svcPrint $ "Discovery: failed to relay connection response"
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
- let addrToText saddr = do
- ( addr, port ) <- IP.fromSockAddr saddr
- Just $ T.pack $ show addr <> " " <> show port
addrs <- concat <$> sequence
- [ catMaybes . map addrToText <$> liftIO (getServerAddresses server)
+ [ catMaybes . map (fmap (uncurry DiscoveryIP) . inetFromSockAddr) <$> liftIO (getServerAddresses server)
#ifdef ENABLE_ICE_SUPPORT
- , return [ T.pack "ICE" ]
+ , return [ DiscoveryICE ]
#endif
]
+ pid <- asks svcPeerIdentity
+ gs <- svcGetGlobal
+ let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid)
+ svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor }
+
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
+ forM_ searchingFor $ \dgst -> do
+ sendToPeer peer $ DiscoverySearch (Right dgst)
+
+#ifdef ENABLE_ICE_SUPPORT
+ serviceStopServer _ _ _ pstates = do
+ forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do
+ mapM_ iceStopThread dpsIceConfig
+#endif
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
+
+
+getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig)
+getIceConfig = do
+ dpsIceConfig <$> svcGet >>= \case
+ Just cfg -> return $ Just cfg
+ Nothing -> do
+#ifdef ENABLE_ICE_SUPPORT
+ stun <- dpsStunServer <$> svcGet
+ turn <- dpsTurnServer <$> svcGet
+ liftIO (iceCreateConfig stun turn) >>= \case
+ Just cfg -> do
+ svcModify $ \s -> s { dpsIceConfig = Just cfg }
+ return $ Just cfg
+ Nothing -> do
+ svcPrint $ "Discovery: failed to create ICE config"
+ return Nothing
+#else
+ return Nothing
+#endif
+
+
+discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m ()
+discoverySearch server dgst = do
+ peers <- liftIO $ getCurrentPeerList server
+ match <- forM peers $ \peer -> do
+ getPeerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ dgst `elem` identityDigests pid
+ _ -> return False
+ when (not $ or match) $ do
+ modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
+ { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s
+ }
+ forM_ peers $ \peer -> do
+ sendToPeer peer $ DiscoverySearch $ Right dgst
+
+
+data TunnelAddress = TunnelAddress
+ { tunnelVia :: Peer
+ , tunnelIdentity :: UnifiedIdentity
+ , tunnelStreamNumber :: Int
+ , tunnelReader :: StreamReader
+ , tunnelWriter :: StreamWriter
+ }
+
+instance Eq TunnelAddress where
+ x == y = (==)
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Ord TunnelAddress where
+ compare x y = compare
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Show TunnelAddress where
+ show tunnel = concat
+ [ "tunnel@"
+ , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel
+ , "/" <> show (tunnelStreamNumber tunnel)
+ ]
+
+instance PeerAddressType TunnelAddress where
+ sendBytesToAddress TunnelAddress {..} bytes = do
+ writeStream tunnelWriter bytes
+
+ connectionToAddressClosed TunnelAddress {..} = do
+ closeStream tunnelWriter
+
+relayStream :: StreamReader -> StreamWriter -> IO ()
+relayStream r w = do
+ p <- readStreamPacket r
+ writeStreamPacket w p
+ case p of
+ StreamClosed {} -> return ()
+ _ -> relayStream r w
+
+receiveFromTunnel :: Server -> TunnelAddress -> IO ()
+receiveFromTunnel server taddr = do
+ p <- readStreamPacket (tunnelReader taddr)
+ case p of
+ StreamData {..} -> do
+ receivedFromCustomAddress server taddr stpData
+ receiveFromTunnel server taddr
+ StreamClosed {} -> do
+ return ()
+
+
+discoverySetupTunnel :: Peer -> RefDigest -> IO ()
+discoverySetupTunnel via target = do
+ runPeerService via $ do
+ discoverySetupTunnelResponse target
+
+discoverySetupTunnelResponse :: RefDigest -> ServiceHandler DiscoveryService ()
+discoverySetupTunnelResponse target = do
+ self <- refDigest . storedRef . idData <$> svcSelf
+ stream <- openStream
+ svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest
+ (emptyConnection (Right self) (Right target))
+ { dconnTunnel = True
+ }