summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs160
1 files changed, 112 insertions, 48 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index ed68ed4..b341974 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -14,7 +14,12 @@ module Erebos.Network (
PeerIdentity(..), peerIdentity,
WaitingRef, wrDigest,
Service(..),
+
+ PeerAddressType(..),
+ receivedFromCustomAddress,
+
serverPeer,
+ serverPeerCustom,
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce,
#endif
@@ -37,13 +42,14 @@ import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
+import Data.ByteString (ByteString)
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
import Data.Map (Map)
-import qualified Data.Map as M
+import Data.Map qualified as M
import Data.Maybe
import Data.Typeable
import Data.Word
@@ -57,14 +63,16 @@ import Foreign.Storable as F
import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
-import qualified Network.Socket.ByteString as S
+import Network.Socket.ByteString qualified as S
-import Erebos.Channel
+import Erebos.Error
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
#endif
import Erebos.Identity
+import Erebos.Network.Channel
import Erebos.Network.Protocol
+import Erebos.Object.Internal
import Erebos.PubKey
import Erebos.Service
import Erebos.State
@@ -93,7 +101,7 @@ data Server = Server
, serverRawPath :: SymFlow (PeerAddress, BC.ByteString)
, serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
- , serverIOActions :: TQueue (ExceptT String IO ())
+ , serverIOActions :: TQueue (ExceptT ErebosError IO ())
, serverServices :: [SomeService]
, serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState)
, serverPeers :: MVar (Map PeerAddress Peer)
@@ -156,12 +164,19 @@ setPeerChannel Peer {..} ch = do
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
-data PeerAddress = DatagramAddress SockAddr
+class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where
+ sendBytesToAddress :: addr -> ByteString -> IO ()
+
+data PeerAddress
+ = forall addr. PeerAddressType addr => CustomPeerAddress addr
+ | DatagramAddress SockAddr
#ifdef ENABLE_ICE_SUPPORT
- | PeerIceSession IceSession
+ | PeerIceSession IceSession
#endif
instance Show PeerAddress where
+ show (CustomPeerAddress addr) = show addr
+
show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of
Just (IP.IPv6 ipv6, port)
| (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
@@ -169,37 +184,48 @@ instance Show PeerAddress where
Just (addr, port)
-> [show addr, show port]
_ -> [show saddr]
+
#ifdef ENABLE_ICE_SUPPORT
show (PeerIceSession ice) = show ice
#endif
instance Eq PeerAddress where
+ CustomPeerAddress addr == CustomPeerAddress addr'
+ | Just addr'' <- cast addr' = addr == addr''
DatagramAddress addr == DatagramAddress addr' = addr == addr'
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice == PeerIceSession ice' = ice == ice'
- _ == _ = False
#endif
+ _ == _ = False
instance Ord PeerAddress where
+ compare (CustomPeerAddress addr) (CustomPeerAddress addr')
+ | Just addr'' <- cast addr' = compare addr addr''
+ | otherwise = compare (typeOf addr) (typeOf addr')
+ compare (CustomPeerAddress _ ) _ = LT
+ compare _ (CustomPeerAddress _ ) = GT
+
compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr'
#ifdef ENABLE_ICE_SUPPORT
compare (DatagramAddress _ ) _ = LT
compare _ (DatagramAddress _ ) = GT
+
compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
#endif
-data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()])
- | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])
+data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()])
+ | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()])
| PeerIdentityFull UnifiedIdentity
peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
- | PeerConnected (Connection PeerAddress)
- | PeerDropped
+data PeerState
+ = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
@@ -255,7 +281,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do
forkServerThread server $ dataResponseWorker server
forkServerThread server $ forever $ do
- either (atomically . logd) return =<< runExceptT =<<
+ either (atomically . logd . showErebosError) return =<< runExceptT =<<
atomically (readTQueue serverIOActions)
let open addr = do
@@ -315,8 +341,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
case paddr of
+ CustomPeerAddress addr -> sendBytesToAddress addr msg
DatagramAddress addr -> void $ S.sendTo sock msg addr
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice -> iceSend ice msg
@@ -384,9 +411,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do
bracket (open addr) close loop
forkServerThread server $ forever $ do
- (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc
case find ((svc ==) . someServiceID) serverServices of
- Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref)
_ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
@@ -420,7 +447,7 @@ dataResponseWorker server = forever $ do
Right ref -> do
atomically (writeTVar tvar $ Right ref)
forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case
- Left err -> atomically $ writeTQueue (serverErrorLog server) err
+ Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err)
Right () -> return ()
return (Nothing, [])
@@ -514,9 +541,7 @@ openStream = do
conn <- readTVarP peerState >>= \case
PeerConnected conn -> return conn
_ -> throwError "can't open stream without established connection"
- (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
- Right res -> return res
- Left err -> throwError err
+ (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn)
liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
addHeader hdr
@@ -536,8 +561,8 @@ appendDistinct x (y:ys) | x == y = y : ys
appendDistinct x [] = [x]
handlePacket :: UnifiedIdentity -> Bool
- -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO ()
+ -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ]
+ -> TransportHeader -> [ PartialRef ] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- getPeerChannel peer
@@ -599,7 +624,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
(runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case
Left err -> atomically $ writeTQueue (serverErrorLog server) $
- "failed to receive object from stream: " <> err
+ "failed to receive object from stream: " <> showErebosError err
Right objs -> do
forM_ objs $ \obj -> do
pref <- storeObject (peerInStorage peer) obj
@@ -671,17 +696,18 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
- then do
- void $ newWaitingRef dgst $ \ref ->
- liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show dgst
+ then do
+ streamReaders <- mapM acceptStream $ lookupNewStreams headers
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders )
+ else throwError $ "missing service object " ++ show dgst
| otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
_ -> return ()
-withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
+withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT ErebosError IO ()) -> m ()
withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case
PeerIdentityUnknown tvar -> modifyTVar' tvar (act:)
PeerIdentityRef _ tvar -> modifyTVar' tvar (act:)
@@ -737,7 +763,7 @@ handleChannelAccept identity accref = do
sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) []
finalizedChannel peer ch identity
- Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
+ Left dgst -> throwOtherError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM ()
@@ -799,8 +825,12 @@ notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
writeTQueue serverIOActions $ do
forM_ serverServices $ \service@(SomeService _ attrs) ->
- runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+ runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer
+
+receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO ()
+receivedFromCustomAddress Server {..} addr msg = do
+ writeFlowIO serverRawPath ( CustomPeerAddress addr, msg )
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer peerServer_ peerAddress = do
@@ -820,6 +850,9 @@ serverPeer server paddr = do
_ -> paddr
serverPeer' server (DatagramAddress paddr')
+serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer
+serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr)
+
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce :: Server -> IceSession -> IO Peer
serverPeerIce server@Server {..} ice = do
@@ -868,19 +901,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: [])
sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
-sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
- ServiceReply (Left x) use -> Just . (,use) <$> store st x
- ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
- ServiceFinally act -> act >> return Nothing
- let dgsts = map (refDigest . fst) srefs
- let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
- packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
- liftIO $ atomically $ sendToPeerS peer ackedBy packet
+ res <- runExceptT $ do
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ _ -> return Nothing
+
+ streamHeaders <- concat <$> do
+ (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do
+ forM parts $ \case
+ ServiceOpenStream cb -> do
+ conn <- lift (readTVar (peerState peer)) >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn)
+
+ lift $ writeTQueue (serverIOActions (peerServer peer)) $ do
+ liftIO $ forkServerThread (peerServer peer) handler
+ return [ ( hdr, cb writer ) ]
+ _ -> return []
+ liftIO $ sequence_ $ map snd streamHeaders
+
+ liftIO $ forM_ parts $ \case
+ ServiceFinally act -> act
+ _ -> return ()
+
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader $ concat
+ [ [ ServiceType (serviceID $ head parts) ]
+ , map ServiceRef dgsts
+ , map fst streamHeaders
+ ]
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+ case res of
+ Right () -> return ()
+ Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $
+ "failed to send packet to " <> show (peerAddress peer) <> ": " <> err
sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' secure Peer {..} ackedBy packet = do
@@ -895,7 +958,7 @@ sendToPeerS = sendToPeerS' EncryptedOnly
sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerPlain = sendToPeerS' PlaintextAllowed
-sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
+sendToPeerWith :: forall s m e. (Service s, MonadIO m, MonadError e m, FromErebosError e) => Peer -> (ServiceState s -> ExceptT ErebosError IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith peer fobj = do
let sproxy = Proxy @s
sid = serviceID sproxy
@@ -910,7 +973,7 @@ sendToPeerWith peer fobj = do
case res of
Right (Just obj) -> sendToPeer peer obj
Right Nothing -> return ()
- Left err -> throwError err
+ Left err -> throwError $ fromErebosError err
lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
@@ -920,10 +983,10 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
lookupService _ [] = Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
-runPeerService = runPeerServiceOn Nothing
+runPeerService = runPeerServiceOn Nothing []
-runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
-runPeerServiceOn mbservice peer handler = liftIO $ do
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice newStreams peer handler = liftIO $ do
let server = peerServer peer
proxy = Proxy @s
svc = serviceID proxy
@@ -948,6 +1011,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
, svcPeerIdentity = peerId
, svcServer = server
, svcPrintOp = atomically . logd
+ , svcNewStreams = newStreams
}
reloadHead (serverOrigHead server) >>= \case
Nothing -> atomically $ do
@@ -969,7 +1033,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
modifyServiceGlobalState
- :: forall s a m proxy. (Service s, MonadIO m, MonadError String m)
+ :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e)
=> Server -> proxy s
-> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
-> m a
@@ -987,7 +1051,7 @@ modifyServiceGlobalState server proxy f = do
putTMVar (serverServiceStates server) global'
return res
Nothing -> do
- throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+ throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)