diff options
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r-- | src/Erebos/Network.hs | 160 |
1 files changed, 112 insertions, 48 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index ed68ed4..b341974 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -14,7 +14,12 @@ module Erebos.Network ( PeerIdentity(..), peerIdentity, WaitingRef, wrDigest, Service(..), + + PeerAddressType(..), + receivedFromCustomAddress, + serverPeer, + serverPeerCustom, #ifdef ENABLE_ICE_SUPPORT serverPeerIce, #endif @@ -37,13 +42,14 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State +import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.Function import Data.IP qualified as IP import Data.List import Data.Map (Map) -import qualified Data.Map as M +import Data.Map qualified as M import Data.Maybe import Data.Typeable import Data.Word @@ -57,14 +63,16 @@ import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) -import qualified Network.Socket.ByteString as S +import Network.Socket.ByteString qualified as S -import Erebos.Channel +import Erebos.Error #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity +import Erebos.Network.Channel import Erebos.Network.Protocol +import Erebos.Object.Internal import Erebos.PubKey import Erebos.Service import Erebos.State @@ -93,7 +101,7 @@ data Server = Server , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) , serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) - , serverIOActions :: TQueue (ExceptT String IO ()) + , serverIOActions :: TQueue (ExceptT ErebosError IO ()) , serverServices :: [SomeService] , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState) , serverPeers :: MVar (Map PeerAddress Peer) @@ -156,12 +164,19 @@ setPeerChannel Peer {..} ch = do instance Eq Peer where (==) = (==) `on` peerIdentityVar -data PeerAddress = DatagramAddress SockAddr +class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where + sendBytesToAddress :: addr -> ByteString -> IO () + +data PeerAddress + = forall addr. PeerAddressType addr => CustomPeerAddress addr + | DatagramAddress SockAddr #ifdef ENABLE_ICE_SUPPORT - | PeerIceSession IceSession + | PeerIceSession IceSession #endif instance Show PeerAddress where + show (CustomPeerAddress addr) = show addr + show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, port) | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 @@ -169,37 +184,48 @@ instance Show PeerAddress where Just (addr, port) -> [show addr, show port] _ -> [show saddr] + #ifdef ENABLE_ICE_SUPPORT show (PeerIceSession ice) = show ice #endif instance Eq PeerAddress where + CustomPeerAddress addr == CustomPeerAddress addr' + | Just addr'' <- cast addr' = addr == addr'' DatagramAddress addr == DatagramAddress addr' = addr == addr' #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice == PeerIceSession ice' = ice == ice' - _ == _ = False #endif + _ == _ = False instance Ord PeerAddress where + compare (CustomPeerAddress addr) (CustomPeerAddress addr') + | Just addr'' <- cast addr' = compare addr addr'' + | otherwise = compare (typeOf addr) (typeOf addr') + compare (CustomPeerAddress _ ) _ = LT + compare _ (CustomPeerAddress _ ) = GT + compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' #ifdef ENABLE_ICE_SUPPORT compare (DatagramAddress _ ) _ = LT compare _ (DatagramAddress _ ) = GT + compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' #endif -data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) - | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) + | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) | PeerIdentityFull UnifiedIdentity peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] - | PeerConnected (Connection PeerAddress) - | PeerDropped +data PeerState + = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ] + | PeerConnected (Connection PeerAddress) + | PeerDropped lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID @@ -255,7 +281,7 @@ startServer serverOptions serverOrigHead logd' serverServices = do forkServerThread server $ dataResponseWorker server forkServerThread server $ forever $ do - either (atomically . logd) return =<< runExceptT =<< + either (atomically . logd . showErebosError) return =<< runExceptT =<< atomically (readTQueue serverIOActions) let open addr = do @@ -315,8 +341,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do case paddr of + CustomPeerAddress addr -> sendBytesToAddress addr msg DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice -> iceSend ice msg @@ -384,9 +411,9 @@ startServer serverOptions serverOrigHead logd' serverServices = do bracket (open addr) close loop forkServerThread server $ forever $ do - (peer, svc, ref) <- atomically $ readTQueue chanSvc + ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of - Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server @@ -420,7 +447,7 @@ dataResponseWorker server = forever $ do Right ref -> do atomically (writeTVar tvar $ Right ref) forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case - Left err -> atomically $ writeTQueue (serverErrorLog server) err + Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () return (Nothing, []) @@ -514,9 +541,7 @@ openStream = do conn <- readTVarP peerState >>= \case PeerConnected conn -> return conn _ -> throwError "can't open stream without established connection" - (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case - Right res -> return res - Left err -> throwError err + (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn) liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) addHeader hdr @@ -536,8 +561,8 @@ appendDistinct x (y:ys) | x == y = y : ys appendDistinct x [] = [x] handlePacket :: UnifiedIdentity -> Bool - -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> [PartialRef] -> IO () + -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ] + -> TransportHeader -> [ PartialRef ] -> IO () handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- getPeerChannel peer @@ -599,7 +624,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) $ - "failed to receive object from stream: " <> err + "failed to receive object from stream: " <> showErebosError err Right objs -> do forM_ objs $ \obj -> do pref <- storeObject (peerInStorage peer) obj @@ -671,17 +696,18 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} - then do - void $ newWaitingRef dgst $ \ref -> - liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show dgst + then do + streamReaders <- mapM acceptStream $ lookupNewStreams headers + void $ newWaitingRef dgst $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders ) + else throwError $ "missing service object " ++ show dgst | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" _ -> return () -withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT ErebosError IO ()) -> m () withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) @@ -737,7 +763,7 @@ handleChannelAccept identity accref = do sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] finalizedChannel peer ch identity - Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) + Left dgst -> throwOtherError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () @@ -799,8 +825,12 @@ notifyServicesOfPeer :: Peer -> STM () notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do writeTQueue serverIOActions $ do forM_ serverServices $ \service@(SomeService _ attrs) -> - runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer + runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer + +receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () +receivedFromCustomAddress Server {..} addr msg = do + writeFlowIO serverRawPath ( CustomPeerAddress addr, msg ) mkPeer :: Server -> PeerAddress -> IO Peer mkPeer peerServer_ peerAddress = do @@ -820,6 +850,9 @@ serverPeer server paddr = do _ -> paddr serverPeer' server (DatagramAddress paddr') +serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer +serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr) + #ifdef ENABLE_ICE_SUPPORT serverPeerIce :: Server -> IceSession -> IO Peer serverPeerIce server@Server {..} ice = do @@ -868,19 +901,49 @@ sendToPeerStored peer = sendManyToPeerStored peer . (: []) sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) -sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () +sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m () sendToPeerList peer parts = do let st = peerStorage peer - srefs <- liftIO $ fmap catMaybes $ forM parts $ \case - ServiceReply (Left x) use -> Just . (,use) <$> store st x - ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) - ServiceFinally act -> act >> return Nothing - let dgsts = map (refDigest . fst) srefs - let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) - packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] - liftIO $ atomically $ sendToPeerS peer ackedBy packet + res <- runExceptT $ do + srefs <- liftIO $ fmap catMaybes $ forM parts $ \case + ServiceReply (Left x) use -> Just . (,use) <$> store st x + ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) + _ -> return Nothing + + streamHeaders <- concat <$> do + (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do + forM parts $ \case + ServiceOpenStream cb -> do + conn <- lift (readTVar (peerState peer)) >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't open stream without established connection" + (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn) + + lift $ writeTQueue (serverIOActions (peerServer peer)) $ do + liftIO $ forkServerThread (peerServer peer) handler + return [ ( hdr, cb writer ) ] + _ -> return [] + liftIO $ sequence_ $ map snd streamHeaders + + liftIO $ forM_ parts $ \case + ServiceFinally act -> act + _ -> return () + + let dgsts = map (refDigest . fst) srefs + let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU + header = TransportHeader $ concat + [ [ ServiceType (serviceID $ head parts) ] + , map ServiceRef dgsts + , map fst streamHeaders + ] + packet = TransportPacket header content + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet + + case res of + Right () -> return () + Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $ + "failed to send packet to " <> show (peerAddress peer) <> ": " <> err sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS' secure Peer {..} ackedBy packet = do @@ -895,7 +958,7 @@ sendToPeerS = sendToPeerS' EncryptedOnly sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerPlain = sendToPeerS' PlaintextAllowed -sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () +sendToPeerWith :: forall s m e. (Service s, MonadIO m, MonadError e m, FromErebosError e) => Peer -> (ServiceState s -> ExceptT ErebosError IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do let sproxy = Proxy @s sid = serviceID sproxy @@ -910,7 +973,7 @@ sendToPeerWith peer fobj = do case res of Right (Just obj) -> sendToPeer peer obj Right Nothing -> return () - Left err -> throwError err + Left err -> throwError $ fromErebosError err lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) @@ -920,10 +983,10 @@ lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) lookupService _ [] = Nothing runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () -runPeerService = runPeerServiceOn Nothing +runPeerService = runPeerServiceOn Nothing [] -runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () -runPeerServiceOn mbservice peer handler = liftIO $ do +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice newStreams peer handler = liftIO $ do let server = peerServer peer proxy = Proxy @s svc = serviceID proxy @@ -948,6 +1011,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do , svcPeerIdentity = peerId , svcServer = server , svcPrintOp = atomically . logd + , svcNewStreams = newStreams } reloadHead (serverOrigHead server) >>= \case Nothing -> atomically $ do @@ -969,7 +1033,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" modifyServiceGlobalState - :: forall s a m proxy. (Service s, MonadIO m, MonadError String m) + :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e) => Server -> proxy s -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) -> m a @@ -987,7 +1051,7 @@ modifyServiceGlobalState server proxy f = do putTMVar (serverServiceStates server) global' return res Nothing -> do - throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) |