summaryrefslogtreecommitdiff
path: root/src/Erebos/Discovery.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Discovery.hs')
-rw-r--r--src/Erebos/Discovery.hs259
1 files changed, 211 insertions, 48 deletions
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index ef289cb..ede9cc9 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -6,6 +6,7 @@ module Erebos.Discovery (
DiscoveryConnection(..),
discoverySearch,
+ discoverySetupTunnel,
) where
import Control.Concurrent
@@ -32,8 +33,10 @@ import Erebos.ICE
#endif
import Erebos.Identity
import Erebos.Network
+import Erebos.Object
import Erebos.Service
-import Erebos.Storage
+import Erebos.Service.Stream
+import Erebos.Storable
#ifndef ENABLE_ICE_SUPPORT
@@ -46,8 +49,8 @@ type IceRemoteInfo = Stored Object
data DiscoveryService
= DiscoverySelf [ Text ] (Maybe Int)
| DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
- | DiscoverySearch Ref
- | DiscoveryResult Ref [ Text ]
+ | DiscoverySearch (Either Ref RefDigest)
+ | DiscoveryResult (Either Ref RefDigest) [ Text ]
| DiscoveryConnectionRequest DiscoveryConnection
| DiscoveryConnectionResponse DiscoveryConnection
@@ -56,6 +59,7 @@ data DiscoveryAttributes = DiscoveryAttributes
, discoveryStunServer :: Maybe Text
, discoveryTurnPort :: Maybe Word16
, discoveryTurnServer :: Maybe Text
+ , discoveryProvideTunnel :: Peer -> Bool
}
defaultDiscoveryAttributes :: DiscoveryAttributes
@@ -64,19 +68,22 @@ defaultDiscoveryAttributes = DiscoveryAttributes
, discoveryStunServer = Nothing
, discoveryTurnPort = Nothing
, discoveryTurnServer = Nothing
+ , discoveryProvideTunnel = const False
}
data DiscoveryConnection = DiscoveryConnection
- { dconnSource :: Ref
- , dconnTarget :: Ref
+ { dconnSource :: Either Ref RefDigest
+ , dconnTarget :: Either Ref RefDigest
, dconnAddress :: Maybe Text
+ , dconnTunnel :: Bool
, dconnIceInfo :: Maybe IceRemoteInfo
}
-emptyConnection :: Ref -> Ref -> DiscoveryConnection
+emptyConnection :: Either Ref RefDigest -> Either Ref RefDigest -> DiscoveryConnection
emptyConnection dconnSource dconnTarget = DiscoveryConnection {..}
where
dconnAddress = Nothing
+ dconnTunnel = False
dconnIceInfo = Nothing
instance Storable DiscoveryService where
@@ -92,9 +99,9 @@ instance Storable DiscoveryService where
storeMbInt "stun-port" stunPort
storeMbText "turn-server" turnServer
storeMbInt "turn-port" turnPort
- DiscoverySearch ref -> storeRawRef "search" ref
- DiscoveryResult ref addr -> do
- storeRawRef "result" ref
+ DiscoverySearch edgst -> either (storeRawRef "search") (storeRawWeak "search") edgst
+ DiscoveryResult edgst addr -> do
+ either (storeRawRef "result") (storeRawWeak "result") edgst
mapM_ (storeText "address") addr
DiscoveryConnectionRequest conn -> storeConnection "request" conn
DiscoveryConnectionResponse conn -> storeConnection "response" conn
@@ -102,9 +109,10 @@ instance Storable DiscoveryService where
where
storeConnection ctype DiscoveryConnection {..} = do
storeText "connection" $ ctype
- storeRawRef "source" dconnSource
- storeRawRef "target" dconnTarget
+ either (storeRawRef "source") (storeRawWeak "source") dconnSource
+ either (storeRawRef "target") (storeRawWeak "target") dconnTarget
storeMbText "address" dconnAddress
+ when dconnTunnel $ storeEmpty "tunnel"
storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
@@ -123,9 +131,15 @@ instance Storable DiscoveryService where
<*> loadMbInt "stun-port"
<*> loadMbText "turn-server"
<*> loadMbInt "turn-port"
- , DiscoverySearch <$> loadRawRef "search"
+ , DiscoverySearch <$> msum
+ [ Left <$> loadRawRef "search"
+ , Right <$> loadRawWeak "search"
+ ]
, DiscoveryResult
- <$> loadRawRef "result"
+ <$> msum
+ [ Left <$> loadRawRef "result"
+ , Right <$> loadRawWeak "result"
+ ]
<*> loadTexts "address"
, loadConnection "request" DiscoveryConnectionRequest
, loadConnection "response" DiscoveryConnectionResponse
@@ -134,9 +148,16 @@ instance Storable DiscoveryService where
loadConnection ctype ctor = do
ctype' <- loadText "connection"
guard $ ctype == ctype'
- dconnSource <- loadRawRef "source"
- dconnTarget <- loadRawRef "target"
+ dconnSource <- msum
+ [ Left <$> loadRawRef "source"
+ , Right <$> loadRawWeak "source"
+ ]
+ dconnTarget <- msum
+ [ Left <$> loadRawRef "target"
+ , Right <$> loadRawWeak "target"
+ ]
dconnAddress <- loadMbText "address"
+ dconnTunnel <- isJust <$> loadMbEmpty "tunnel"
dconnIceInfo <- loadMbRef "ice-info"
return $ ctor DiscoveryConnection {..}
@@ -156,7 +177,11 @@ emptyPeer = DiscoveryPeer
}
data DiscoveryPeerState = DiscoveryPeerState
- { dpsStunServer :: Maybe ( Text, Word16 )
+ { dpsOurTunnelRequests :: [ ( RefDigest, StreamWriter ) ]
+ -- ( original target, our write stream )
+ , dpsRelayedTunnelRequests :: [ ( RefDigest, ( StreamReader, StreamWriter )) ]
+ -- ( original source, ( from source, to target ))
+ , dpsStunServer :: Maybe ( Text, Word16 )
, dpsTurnServer :: Maybe ( Text, Word16 )
, dpsIceConfig :: Maybe IceConfig
}
@@ -174,7 +199,9 @@ instance Service DiscoveryService where
type ServiceState DiscoveryService = DiscoveryPeerState
emptyServiceState _ = DiscoveryPeerState
- { dpsStunServer = Nothing
+ { dpsOurTunnelRequests = []
+ , dpsRelayedTunnelRequests = []
+ , dpsStunServer = Nothing
, dpsTurnServer = Nothing
, dpsIceConfig = Nothing
}
@@ -240,18 +267,19 @@ instance Service DiscoveryService where
, dpsTurnServer = toIceServer turnServer turnPort
}
- DiscoverySearch ref -> do
- dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal
- replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer
+ DiscoverySearch edgst -> do
+ dpeer <- M.lookup (either refDigest id edgst) . dgsPeers <$> svcGetGlobal
+ replyPacket $ DiscoveryResult edgst $ maybe [] dpAddress dpeer
DiscoveryResult _ [] -> do
-- not found
return ()
- DiscoveryResult ref addrs -> do
- let dgst = refDigest ref
+ DiscoveryResult edgst addrs -> do
+ let dgst = either refDigest id edgst
-- TODO: check if we really requested that
server <- asks svcServer
+ st <- getStorage
self <- svcSelf
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
@@ -265,8 +293,15 @@ instance Service DiscoveryService where
ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
rinfo <- iceRemoteInfo ice
+ -- Try to promote weak ref to normal one for older peers:
+ edgst' <- case edgst of
+ Left r -> return (Left r)
+ Right d -> refFromDigest st d >>= \case
+ Just r -> return (Left r)
+ Nothing -> return (Right d)
+
res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
+ DiscoveryConnectionRequest (emptyConnection (Left $ storedRef $ idData self) edgst') { dconnIceInfo = Just rinfo }
case res of
Right _ -> return ()
Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
@@ -294,11 +329,31 @@ instance Service DiscoveryService where
DiscoveryConnectionRequest conn -> do
self <- svcSelf
+ attrs <- asks svcAttributes
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` identityDigests self
+ if either refDigest id (dconnTarget conn) `elem` identityDigests self
then if
+ -- request for us, create ICE sesssion or tunnel
+ | dconnTunnel conn -> do
+ receivedStreams >>= \case
+ (tunnelReader : _) -> do
+ tunnelWriter <- openStream
+ replyPacket $ DiscoveryConnectionResponse rconn
+ { dconnTunnel = True
+ }
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ server <- asks svcServer
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+
+ [] -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (endpoint)"
+
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
| Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
@@ -318,20 +373,36 @@ instance Service DiscoveryService where
svcPrint $ "Discovery: unsupported connection request"
else do
- -- request to some of our peers, relay
- mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
- case mbdp of
+ -- request to some of our peers, relay
+ peer <- asks svcPeer
+ mbdp <- M.lookup (either refDigest id $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
+ streams <- receivedStreams
+ case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
- | Just dpeer <- dpPeer dp -> do
- sendToPeer dpeer $ DiscoveryConnectionRequest conn
+ | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn -> if
+ | not (discoveryProvideTunnel attrs peer) -> do
+ replyPacket $ DiscoveryConnectionResponse rconn
+ | fromSource : _ <- streams -> do
+ void $ liftIO $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ toTarget <- openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest conn
+ | otherwise -> do
+ svcPrint $ "Discovery: missing stream on tunnel request (relay)"
+ | otherwise -> do
+ sendToPeer dpeer $ DiscoveryConnectionRequest conn
| otherwise -> svcPrint $ "Discovery: failed to relay connection request"
DiscoveryConnectionResponse conn -> do
self <- svcSelf
+ dps <- svcGet
dpeers <- dgsPeers <$> svcGetGlobal
- if refDigest (dconnSource conn) `elem` identityDigests self
- then do
+
+ if either refDigest id (dconnSource conn) `elem` identityDigests self
+ then do
-- response to our request, try to connect to the peer
server <- asks svcServer
if | Just addr <- dconnAddress conn
@@ -341,27 +412,61 @@ instance Service DiscoveryService where
peer <- liftIO $ serverPeer server (addrAddress saddr)
let upd dp = dp { dpPeer = Just peer }
svcModifyGlobal $ \s -> s
- { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s }
+ { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (either refDigest id $ dconnTarget conn) $ dgsPeers s }
+
+ | dconnTunnel conn
+ , Just tunnelWriter <- lookup (either refDigest id (dconnTarget conn)) (dpsOurTunnelRequests dps)
+ -> do
+ receivedStreams >>= \case
+ tunnelReader : _ -> do
+ tunnelVia <- asks svcPeer
+ tunnelIdentity <- asks svcPeerIdentity
+ void $ liftIO $ forkIO $ do
+ tunnelStreamNumber <- getStreamWriterNumber tunnelWriter
+ let addr = TunnelAddress {..}
+ void $ serverPeerCustom server addr
+ receiveFromTunnel server addr
+ [] -> svcPrint $ "Discovery: missing stream in tunnel response"
#ifdef ENABLE_ICE_SUPPORT
- | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers
+ | Just dp <- M.lookup (either refDigest id $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
, Just rinfo <- dconnIceInfo conn -> do
liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice
#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
- else do
- -- response to relayed request
- case M.lookup (refDigest $ dconnSource conn) dpeers of
- Just dp | Just dpeer <- dpPeer dp -> do
+ else do
+ -- response to relayed request
+ streams <- receivedStreams
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ filter ((either refDigest id (dconnSource conn) /=) . fst) (dpsRelayedTunnelRequests s) }
+
+ case M.lookup (either refDigest id $ dconnSource conn) dpeers of
+ Just dp | Just dpeer <- dpPeer dp -> if
+ | dconnTunnel conn
+ , Just ( fromSource, toTarget ) <- lookup (either refDigest id (dconnSource conn)) (dpsRelayedTunnelRequests dps)
+ , fromTarget : _ <- streams
+ -> liftIO $ do
+ toSourceVar <- newEmptyMVar
+ void $ forkIO $ runPeerService @DiscoveryService dpeer $ do
+ liftIO . putMVar toSourceVar =<< openStream
+ svcModify $ \s -> s { dpsRelayedTunnelRequests =
+ ( either refDigest id $ dconnSource conn, ( fromSource, toTarget )) : dpsRelayedTunnelRequests s }
+ replyPacket $ DiscoveryConnectionResponse conn
+ void $ forkIO $ do
+ relayStream fromSource toTarget
+ void $ forkIO $ do
+ toSource <- readMVar toSourceVar
+ relayStream fromTarget toSource
+
+ | otherwise -> do
sendToPeer dpeer $ DiscoveryConnectionResponse conn
- _ -> svcPrint $ "Discovery: failed to relay connection response"
+ _ -> svcPrint $ "Discovery: failed to relay connection response"
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
- st <- getStorage
let addrToText saddr = do
( addr, port ) <- IP.fromSockAddr saddr
@@ -381,9 +486,7 @@ instance Service DiscoveryService where
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
forM_ searchingFor $ \dgst -> do
- liftIO (refFromDigest st dgst) >>= \case
- Just ref -> sendToPeer peer $ DiscoverySearch ref
- Nothing -> return ()
+ sendToPeer peer $ DiscoverySearch (Right dgst)
#ifdef ENABLE_ICE_SUPPORT
serviceStopServer _ _ _ pstates = do
@@ -416,17 +519,77 @@ getIceConfig = do
#endif
-discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m ()
-discoverySearch server ref = do
+discoverySearch :: (MonadIO m, MonadError e m, FromErebosError e) => Server -> RefDigest -> m ()
+discoverySearch server dgst = do
peers <- liftIO $ getCurrentPeerList server
match <- forM peers $ \peer -> do
peerIdentity peer >>= \case
PeerIdentityFull pid -> do
- return $ refDigest ref `elem` identityDigests pid
+ return $ dgst `elem` identityDigests pid
_ -> return False
when (not $ or match) $ do
modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
- { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s
+ { dgsSearchingFor = S.insert dgst $ dgsSearchingFor s
}
forM_ peers $ \peer -> do
- sendToPeer peer $ DiscoverySearch ref
+ sendToPeer peer $ DiscoverySearch $ Right dgst
+
+
+data TunnelAddress = TunnelAddress
+ { tunnelVia :: Peer
+ , tunnelIdentity :: UnifiedIdentity
+ , tunnelStreamNumber :: Int
+ , tunnelReader :: StreamReader
+ , tunnelWriter :: StreamWriter
+ }
+
+instance Eq TunnelAddress where
+ x == y = (==)
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Ord TunnelAddress where
+ compare x y = compare
+ (idData (tunnelIdentity x), tunnelStreamNumber x)
+ (idData (tunnelIdentity y), tunnelStreamNumber y)
+
+instance Show TunnelAddress where
+ show tunnel = concat
+ [ "tunnel@"
+ , show $ refDigest $ storedRef $ idData $ tunnelIdentity tunnel
+ , "/" <> show (tunnelStreamNumber tunnel)
+ ]
+
+instance PeerAddressType TunnelAddress where
+ sendBytesToAddress TunnelAddress {..} bytes = do
+ writeStream tunnelWriter bytes
+
+relayStream :: StreamReader -> StreamWriter -> IO ()
+relayStream r w = do
+ p <- readStreamPacket r
+ writeStreamPacket w p
+ case p of
+ StreamClosed {} -> return ()
+ _ -> relayStream r w
+
+receiveFromTunnel :: Server -> TunnelAddress -> IO ()
+receiveFromTunnel server taddr = do
+ p <- readStreamPacket (tunnelReader taddr)
+ case p of
+ StreamData {..} -> do
+ receivedFromCustomAddress server taddr stpData
+ receiveFromTunnel server taddr
+ StreamClosed {} -> do
+ return ()
+
+
+discoverySetupTunnel :: Peer -> RefDigest -> IO ()
+discoverySetupTunnel via target = do
+ runPeerService via $ do
+ self <- refDigest . storedRef . idData <$> svcSelf
+ stream <- openStream
+ svcModify $ \s -> s { dpsOurTunnelRequests = ( target, stream ) : dpsOurTunnelRequests s }
+ replyPacket $ DiscoveryConnectionRequest
+ (emptyConnection (Right self) (Right target))
+ { dconnTunnel = True
+ }