diff options
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r-- | src/Erebos/Network.hs | 385 |
1 files changed, 292 insertions, 93 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 41b6279..8da4c8d 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -6,6 +6,7 @@ module Erebos.Network ( stopServer, getCurrentPeerList, getNextPeerChange, + getServerAddresses, ServerOptions(..), serverIdentity, defaultServerOptions, Peer, peerServer, peerStorage, @@ -13,14 +14,22 @@ module Erebos.Network ( PeerIdentity(..), peerIdentity, WaitingRef, wrDigest, Service(..), + + PeerAddressType(..), + receivedFromCustomAddress, + serverPeer, + serverPeerCustom, #ifdef ENABLE_ICE_SUPPORT serverPeerIce, #endif dropPeer, isPeerDropped, - sendToPeer, sendToPeerStored, sendToPeerWith, + sendToPeer, sendManyToPeer, + sendToPeerStored, sendManyToPeerStored, + sendToPeerWith, runPeerService, + modifyServiceGlobalState, discoveryPort, ) where @@ -33,31 +42,37 @@ import Control.Monad.Except import Control.Monad.Reader import Control.Monad.State +import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BC import Data.ByteString.Lazy qualified as BL import Data.Function import Data.IP qualified as IP import Data.List import Data.Map (Map) -import qualified Data.Map as M +import Data.Map qualified as M import Data.Maybe import Data.Typeable import Data.Word +import Foreign.C.Types +import Foreign.Marshal.Alloc +import Foreign.Marshal.Array import Foreign.Ptr -import Foreign.Storable +import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) -import qualified Network.Socket.ByteString as S +import Network.Socket.ByteString qualified as S -import Erebos.Channel +import Erebos.Error #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity +import Erebos.Network.Channel import Erebos.Network.Protocol +import Erebos.Object.Internal import Erebos.PubKey import Erebos.Service import Erebos.State @@ -69,12 +84,16 @@ import Erebos.Storage.Merge discoveryPort :: PortNumber discoveryPort = 29665 +discoveryMulticastGroup :: HostAddress6 +discoveryMulticastGroup = tupleToHostAddress6 (0xff12, 0xb6a4, 0x6b1f, 0x0969, 0xcaee, 0xacc2, 0x5c93, 0x73e1) -- ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1 + announceIntervalSeconds :: Int announceIntervalSeconds = 60 data Server = Server { serverStorage :: Storage + , serverOptions :: ServerOptions , serverOrigHead :: Head LocalState , serverIdentity_ :: MVar UnifiedIdentity , serverThreads :: MVar [ThreadId] @@ -82,7 +101,7 @@ data Server = Server , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) , serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) - , serverIOActions :: TQueue (ExceptT String IO ()) + , serverIOActions :: TQueue (ExceptT ErebosError IO ()) , serverServices :: [SomeService] , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState) , serverPeers :: MVar (Map PeerAddress Peer) @@ -145,12 +164,19 @@ setPeerChannel Peer {..} ch = do instance Eq Peer where (==) = (==) `on` peerIdentityVar -data PeerAddress = DatagramAddress SockAddr +class (Eq addr, Ord addr, Show addr, Typeable addr) => PeerAddressType addr where + sendBytesToAddress :: addr -> ByteString -> IO () + +data PeerAddress + = forall addr. PeerAddressType addr => CustomPeerAddress addr + | DatagramAddress SockAddr #ifdef ENABLE_ICE_SUPPORT - | PeerIceSession IceSession + | PeerIceSession IceSession #endif instance Show PeerAddress where + show (CustomPeerAddress addr) = show addr + show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of Just (IP.IPv6 ipv6, port) | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 @@ -158,37 +184,48 @@ instance Show PeerAddress where Just (addr, port) -> [show addr, show port] _ -> [show saddr] + #ifdef ENABLE_ICE_SUPPORT show (PeerIceSession ice) = show ice #endif instance Eq PeerAddress where + CustomPeerAddress addr == CustomPeerAddress addr' + | Just addr'' <- cast addr' = addr == addr'' DatagramAddress addr == DatagramAddress addr' = addr == addr' #ifdef ENABLE_ICE_SUPPORT PeerIceSession ice == PeerIceSession ice' = ice == ice' - _ == _ = False #endif + _ == _ = False instance Ord PeerAddress where + compare (CustomPeerAddress addr) (CustomPeerAddress addr') + | Just addr'' <- cast addr' = compare addr addr'' + | otherwise = compare (typeOf addr) (typeOf addr') + compare (CustomPeerAddress _ ) _ = LT + compare _ (CustomPeerAddress _ ) = GT + compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' #ifdef ENABLE_ICE_SUPPORT compare (DatagramAddress _ ) _ = LT compare _ (DatagramAddress _ ) = GT + compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' #endif -data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) - | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) + | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) | PeerIdentityFull UnifiedIdentity peerIdentity :: MonadIO m => Peer -> m PeerIdentity peerIdentity = liftIO . atomically . readTVar . peerIdentityVar -data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])] - | PeerConnected (Connection PeerAddress) - | PeerDropped +data PeerState + = PeerInit [ ( SecurityRequirement, TransportPacket Ref, [ TransportHeaderItem ] ) ] + | PeerConnected (Connection PeerAddress) + | PeerDropped lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID @@ -221,7 +258,7 @@ forkServerThread server act = do return (t:ts) startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server -startServer opt serverOrigHead logd' serverServices = do +startServer serverOptions serverOrigHead logd' serverServices = do let serverStorage = headStorage serverOrigHead serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead serverThreads <- newMVar [] @@ -244,11 +281,9 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ dataResponseWorker server forkServerThread server $ forever $ do - either (atomically . logd) return =<< runExceptT =<< + either (atomically . logd . showErebosError) return =<< runExceptT =<< atomically (readTQueue serverIOActions) - broadcastAddreses <- getBroadcastAddresses discoveryPort - let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) putMVar serverSocket sock @@ -259,9 +294,14 @@ startServer opt serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do - atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses - threadDelay $ announceIntervalSeconds * 1000 * 1000 + when (serverLocalDiscovery serverOptions) $ forkServerThread server $ do + announceAddreses <- fmap concat $ sequence $ + [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock + , getBroadcastAddresses discoveryPort + ] + forever $ do + atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) announceAddreses + threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do st <- derivePartialStorage serverStorage @@ -287,13 +327,18 @@ startServer opt serverOrigHead logd' serverServices = do announceUpdate idt forM_ serverServices $ \(SomeService service _) -> do - forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do - watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do - withMVar serverPeers $ mapM_ $ \peer -> atomically $ do - readTVar (peerIdentityVar peer) >>= \case - PeerIdentityFull _ -> writeTQueue serverIOActions $ do - runPeerService peer $ act x - _ -> return () + forM_ (serviceStorageWatchers service) $ \case + SomeStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + GlobalStorageWatcher sel act -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + atomically $ writeTQueue serverIOActions $ do + act server x forkServerThread server $ forever $ do (msg, saddr) <- S.recvFrom sock 4096 @@ -301,10 +346,12 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - case paddr of - DatagramAddress addr -> void $ S.sendTo sock msg addr + handle (\(e :: SomeException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + case paddr of + CustomPeerAddress addr -> sendBytesToAddress addr msg + DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg + PeerIceSession ice -> iceSend ice msg #endif forkServerThread server $ forever $ do @@ -365,20 +412,33 @@ startServer opt serverOrigHead logd' serverServices = do , addrFamily = AF_INET6 , addrSocketType = Datagram } - addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt) + addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort serverOptions) bracket (open addr) close loop forkServerThread server $ forever $ do - (peer, svc, ref) <- atomically $ readTQueue chanSvc + ( peer, svc, ref, streams ) <- atomically $ readTQueue chanSvc case find ((svc ==) . someServiceID) serverServices of - Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just ( service, attr )) streams peer (serviceHandler $ wrappedLoad @s ref) _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server stopServer :: Server -> IO () -stopServer Server {..} = do - mapM_ killThread =<< takeMVar serverThreads +stopServer server@Server {..} = do + withMVar serverPeers $ \peers -> do + ( global, peerStates ) <- atomically $ (,) + <$> takeTMVar serverServiceStates + <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState) + + forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do + ps <- forM peerStates $ \( peer, states ) -> + return $ ( peer, ) $ case M.lookup (serviceID proxy) states of + Just (SomeServiceState (_ :: Proxy ps) pstate) + | Just (Refl :: s :~: ps) <- eqT + -> pstate + _ -> emptyServiceState proxy + serviceStopServer proxy server gs ps + mapM_ killThread =<< takeMVar serverThreads dataResponseWorker :: Server -> IO () dataResponseWorker server = forever $ do @@ -392,7 +452,7 @@ dataResponseWorker server = forever $ do Right ref -> do atomically (writeTVar tvar $ Right ref) forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case - Left err -> atomically $ writeTQueue (serverErrorLog server) err + Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () return (Nothing, []) @@ -421,12 +481,18 @@ instance MonadFail PacketHandler where runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM () runPacketHandler secure peer@Peer {..} act = do let logd = writeTQueue $ serverErrorLog peerServer_ - runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case + runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case Left err -> do logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err Right ph -> do when (not $ null $ phHead ph) $ do - let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + body <- case phBodyStream ph of + Nothing -> return $ phBody ph + Just stream -> do + writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do + writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph + return [] + let packet = TransportPacket (TransportHeader $ phHead ph) body secreq = case (secure, phPlaintextReply ph) of (True, _) -> EncryptedOnly (False, False) -> PlaintextAllowed @@ -450,6 +516,7 @@ data PacketHandlerState = PacketHandlerState , phHead :: [TransportHeaderItem] , phAckedBy :: [TransportHeaderItem] , phBody :: [Ref] + , phBodyStream :: Maybe RawStreamWriter , phPlaintextReply :: Bool } @@ -462,6 +529,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } +sendBodyAsStream :: PacketHandler () +sendBodyAsStream = do + gets phBodyStream >>= \case + Nothing -> do + stream <- openStream + modify $ \ph -> ph { phBodyStream = Just stream } + Just _ -> return () + keepPlaintextReply :: PacketHandler () keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True } @@ -471,9 +546,7 @@ openStream = do conn <- readTVarP peerState >>= \case PeerConnected conn -> return conn _ -> throwError "can't open stream without established connection" - (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case - Right res -> return res - Left err -> throwError err + (hdr, writer, handler) <- liftEither =<< liftSTM (connAddWriteStream conn) liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler) addHeader hdr @@ -493,8 +566,8 @@ appendDistinct x (y:ys) | x == y = y : ys appendDistinct x [] = [x] handlePacket :: UnifiedIdentity -> Bool - -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> [PartialRef] -> IO () + -> Peer -> TQueue ( Peer, ServiceID, Ref, [ RawStreamReader ]) -> [ ServiceID ] + -> TransportHeader -> [ PartialRef ] -> IO () handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- getPeerChannel peer @@ -517,8 +590,12 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ finalizedChannel peer ch identity _ -> return () - Rejected dgst -> do - logd $ "rejected by peer: " ++ show dgst + Rejected dgst + | peerRequest : _ <- mapMaybe (\case TrChannelRequest d -> Just d; _ -> Nothing) headers + , peerRequest < dgst + -> return () -- Our request was rejected due to lower priority + + | otherwise -> logd $ "rejected by peer: " ++ show dgst DataRequest dgst | secure || dgst `elem` plaintextRefs -> do @@ -532,15 +609,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = -- otherwise lost the channel, so keep the reply plaintext as well. when (not secure) keepPlaintextReply - let bytes = lazyLoadBytes mref -- TODO: MTU - if (secure && BL.length bytes > 500) - then do - stream <- openStream - liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do - writeByteStringToStream stream bytes - else do - addBody $ mref + when (secure && BL.length (lazyLoadBytes mref) > 500) + sendBodyAsStream + + addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show dgst addHeader $ Rejected dgst @@ -556,7 +629,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) $ - "failed to receive object from stream: " <> err + "failed to receive object from stream: " <> showErebosError err Right objs -> do forM_ objs $ \obj -> do pref <- storeObject (peerInStorage peer) obj @@ -593,9 +666,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelCookieWait {} -> return () ChannelCookieReceived {} -> process ChannelCookieConfirmed {} -> process - ChannelOurRequest our | dgst < refDigest (storedRef our) -> process - | otherwise -> reject - ChannelPeerRequest {} -> process + ChannelOurRequest our + | dgst < refDigest (storedRef our) -> process + | otherwise -> do + -- Reject peer channel request with lower priority + addHeader $ TrChannelRequest $ refDigest $ storedRef our + reject + ChannelPeerRequest prev + | dgst == wrDigest prev -> addHeader $ Acknowledged dgst + | otherwise -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process ChannelClosed {} -> return () @@ -622,17 +701,18 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} - then do - void $ newWaitingRef dgst $ \ref -> - liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) - else throwError $ "missing service object " ++ show dgst + then do + streamReaders <- mapM acceptStream $ lookupNewStreams headers + void $ newWaitingRef dgst $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc ( peer, svc, ref, streamReaders ) + else throwError $ "missing service object " ++ show dgst | otherwise -> addHeader $ Rejected dgst | otherwise -> throwError $ "service ref without type" _ -> return () -withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT ErebosError IO ()) -> m () withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) @@ -647,12 +727,14 @@ setupChannel identity peer upid = do [ TrChannelRequest reqref , AnnounceSelf $ refDigest $ storedRef $ idData identity ] + let sendChannelRequest = do + sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ + TransportPacket (TransportHeader hitems) [storedRef req] + setPeerChannel peer $ ChannelOurRequest req liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelCookieConfirmed -> do - sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ - TransportPacket (TransportHeader hitems) [storedRef req] - setPeerChannel peer $ ChannelOurRequest req + ChannelCookieReceived -> sendChannelRequest + ChannelCookieConfirmed -> sendChannelRequest _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -686,7 +768,7 @@ handleChannelAccept identity accref = do sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] finalizedChannel peer ch identity - Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) + Left dgst -> throwOtherError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () @@ -748,9 +830,13 @@ notifyServicesOfPeer :: Peer -> STM () notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do writeTQueue serverIOActions $ do forM_ serverServices $ \service@(SomeService _ attrs) -> - runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer + runPeerServiceOn (Just ( service, attrs )) [] peer serviceNewPeer +receivedFromCustomAddress :: PeerAddressType addr => Server -> addr -> ByteString -> IO () +receivedFromCustomAddress Server {..} addr msg = do + writeFlowIO serverRawPath ( CustomPeerAddress addr, msg ) + mkPeer :: Server -> PeerAddress -> IO Peer mkPeer peerServer_ peerAddress = do peerState <- newTVarIO (PeerInit []) @@ -769,6 +855,9 @@ serverPeer server paddr = do _ -> paddr serverPeer' server (DatagramAddress paddr') +serverPeerCustom :: PeerAddressType addr => Server -> addr -> IO Peer +serverPeerCustom server addr = serverPeer' server (CustomPeerAddress addr) + #ifdef ENABLE_ICE_SUPPORT serverPeerIce :: Server -> IceSession -> IO Peer serverPeerIce server@Server {..} ice = do @@ -806,24 +895,60 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case _ -> return False sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () -sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] +sendToPeer peer = sendManyToPeer peer . (: []) + +sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m () +sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True) sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m () -sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True] +sendToPeerStored peer = sendManyToPeerStored peer . (: []) -sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () +sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () +sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) + +sendToPeerList :: (Service s, MonadIO m) => Peer -> [ ServiceReply s ] -> m () sendToPeerList peer parts = do let st = peerStorage peer - srefs <- liftIO $ fmap catMaybes $ forM parts $ \case - ServiceReply (Left x) use -> Just . (,use) <$> store st x - ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) - ServiceFinally act -> act >> return Nothing - let dgsts = map (refDigest . fst) srefs - let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU - header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) - packet = TransportPacket header content - ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] - liftIO $ atomically $ sendToPeerS peer ackedBy packet + res <- runExceptT $ do + srefs <- liftIO $ fmap catMaybes $ forM parts $ \case + ServiceReply (Left x) use -> Just . (,use) <$> store st x + ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) + _ -> return Nothing + + streamHeaders <- concat <$> do + (liftEither =<<) $ liftIO $ atomically $ runExceptT $ do + forM parts $ \case + ServiceOpenStream cb -> do + conn <- lift (readTVar (peerState peer)) >>= \case + PeerConnected conn -> return conn + _ -> throwError "can't open stream without established connection" + (hdr, writer, handler) <- liftEither =<< lift (connAddWriteStream conn) + + lift $ writeTQueue (serverIOActions (peerServer peer)) $ do + liftIO $ forkServerThread (peerServer peer) handler + return [ ( hdr, cb writer ) ] + _ -> return [] + liftIO $ sequence_ $ map snd streamHeaders + + liftIO $ forM_ parts $ \case + ServiceFinally act -> act + _ -> return () + + let dgsts = map (refDigest . fst) srefs + let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU + header = TransportHeader $ concat + [ [ ServiceType (serviceID $ head parts) ] + , map ServiceRef dgsts + , map fst streamHeaders + ] + packet = TransportPacket header content + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet + + case res of + Right () -> return () + Left err -> liftIO $ atomically $ writeTQueue (serverErrorLog $ peerServer peer) $ + "failed to send packet to " <> show (peerAddress peer) <> ": " <> err sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerS' secure Peer {..} ackedBy packet = do @@ -838,7 +963,7 @@ sendToPeerS = sendToPeerS' EncryptedOnly sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerPlain = sendToPeerS' PlaintextAllowed -sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () +sendToPeerWith :: forall s m e. (Service s, MonadIO m, MonadError e m, FromErebosError e) => Peer -> (ServiceState s -> ExceptT ErebosError IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do let sproxy = Proxy @s sid = serviceID sproxy @@ -853,20 +978,20 @@ sendToPeerWith peer fobj = do case res of Right (Just obj) -> sendToPeer peer obj Right Nothing -> return () - Left err -> throwError err + Left err -> throwError $ fromErebosError err -lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) | Just (Refl :: s :~: t) <- eqT = Just (service, attr) | otherwise = lookupService proxy rest lookupService _ [] = Nothing runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () -runPeerService = runPeerServiceOn Nothing +runPeerService = runPeerServiceOn Nothing [] -runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () -runPeerServiceOn mbservice peer handler = liftIO $ do +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe ( SomeService, ServiceAttributes s ) -> [ RawStreamReader ] -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice newStreams peer handler = liftIO $ do let server = peerServer peer proxy = Proxy @s svc = serviceID proxy @@ -891,6 +1016,7 @@ runPeerServiceOn mbservice peer handler = liftIO $ do , svcPeerIdentity = peerId , svcServer = server , svcPrintOp = atomically . logd + , svcNewStreams = newStreams } reloadHead (serverOrigHead server) >>= \case Nothing -> atomically $ do @@ -911,9 +1037,79 @@ runPeerServiceOn mbservice peer handler = liftIO $ do _ -> atomically $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" - +modifyServiceGlobalState + :: forall s a m e proxy. (Service s, MonadIO m, MonadError e m, FromErebosError e) + => Server -> proxy s + -> (ServiceGlobalState s -> ( ServiceGlobalState s, a )) + -> m a +modifyServiceGlobalState server proxy f = do + let svc = serviceID proxy + case lookupService proxy (serverServices server) of + Just ( service, _ ) -> do + liftIO $ atomically $ do + global <- takeTMVar (serverServiceStates server) + ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of + SomeServiceGlobalState (_ :: Proxy gs) gs -> do + (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT + let ( gs', res ) = f gs + return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res ) + putTMVar (serverServiceStates server) global' + return res + Nothing -> do + throwOtherError $ "unhandled service '" ++ show (toUUID svc) ++ "'" + + +foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) +foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) -foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () +foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO () + +data InetAddress = InetAddress { fromInetAddress :: IP.IP } + +instance F.Storable InetAddress where + sizeOf _ = sizeOf (undefined :: CInt) + 16 + alignment _ = 8 + + peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case + AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) + AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) + _ -> fail "InetAddress: unknown family" + + poke ptr (InetAddress addr) = case addr of + IP.IPv4 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET) + pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) + IP.IPv6 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET6) + pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) + +joinMulticast :: Socket -> IO [ Word32 ] +joinMulticast sock = + withFdSocket sock $ \fd -> + alloca $ \pcount -> do + ptr <- cJoinMulticast fd pcount + if ptr == nullPtr + then do + return [] + else do + count <- fromIntegral <$> peek pcount + res <- forM [ 0 .. count - 1 ] $ \i -> + peekElemOff ptr i + cFree ptr + return res + +getServerAddresses :: Server -> IO [ SockAddr ] +getServerAddresses Server {..} = do + alloca $ \pcount -> do + ptr <- cLocalAddresses pcount + if ptr == nullPtr + then do + return [] + else do + count <- fromIntegral <$> peek pcount + res <- peekArray count ptr + cFree ptr + return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res getBroadcastAddresses :: PortNumber -> IO [SockAddr] getBroadcastAddresses port = do @@ -922,6 +1118,9 @@ getBroadcastAddresses port = do w <- peekElemOff ptr i if w == 0 then return [] else (SockAddrInet port w:) <$> parse (i + 1) - addrs <- parse 0 - cFree ptr - return addrs + if ptr == nullPtr + then return [] + else do + addrs <- parse 0 + cFree ptr + return addrs |