diff options
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r-- | src/Erebos/Network.hs | 193 |
1 files changed, 145 insertions, 48 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 41b6279..54658de 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -6,6 +6,7 @@ module Erebos.Network ( stopServer, getCurrentPeerList, getNextPeerChange, + getServerAddresses, ServerOptions(..), serverIdentity, defaultServerOptions, Peer, peerServer, peerStorage, @@ -19,7 +20,9 @@ module Erebos.Network ( #endif dropPeer, isPeerDropped, - sendToPeer, sendToPeerStored, sendToPeerWith, + sendToPeer, sendManyToPeer, + sendToPeerStored, sendManyToPeerStored, + sendToPeerWith, runPeerService, discoveryPort, @@ -44,20 +47,25 @@ import Data.Maybe import Data.Typeable import Data.Word +import Foreign.C.Types +import Foreign.Marshal.Alloc +import Foreign.Marshal.Array import Foreign.Ptr -import Foreign.Storable +import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) import qualified Network.Socket.ByteString as S -import Erebos.Channel +import Erebos.Error #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity +import Erebos.Network.Channel import Erebos.Network.Protocol +import Erebos.Object.Internal import Erebos.PubKey import Erebos.Service import Erebos.State @@ -69,12 +77,16 @@ import Erebos.Storage.Merge discoveryPort :: PortNumber discoveryPort = 29665 +discoveryMulticastGroup :: HostAddress6 +discoveryMulticastGroup = tupleToHostAddress6 (0xff12, 0xb6a4, 0x6b1f, 0x0969, 0xcaee, 0xacc2, 0x5c93, 0x73e1) -- ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1 + announceIntervalSeconds :: Int announceIntervalSeconds = 60 data Server = Server { serverStorage :: Storage + , serverOptions :: ServerOptions , serverOrigHead :: Head LocalState , serverIdentity_ :: MVar UnifiedIdentity , serverThreads :: MVar [ThreadId] @@ -82,7 +94,7 @@ data Server = Server , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) , serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) - , serverIOActions :: TQueue (ExceptT String IO ()) + , serverIOActions :: TQueue (ExceptT ErebosError IO ()) , serverServices :: [SomeService] , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState) , serverPeers :: MVar (Map PeerAddress Peer) @@ -178,8 +190,8 @@ instance Ord PeerAddress where #endif -data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) - | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) + | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) | PeerIdentityFull UnifiedIdentity peerIdentity :: MonadIO m => Peer -> m PeerIdentity @@ -221,7 +233,7 @@ forkServerThread server act = do return (t:ts) startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server -startServer opt serverOrigHead logd' serverServices = do +startServer serverOptions serverOrigHead logd' serverServices = do let serverStorage = headStorage serverOrigHead serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead serverThreads <- newMVar [] @@ -244,11 +256,9 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ dataResponseWorker server forkServerThread server $ forever $ do - either (atomically . logd) return =<< runExceptT =<< + either (atomically . logd . showErebosError) return =<< runExceptT =<< atomically (readTQueue serverIOActions) - broadcastAddreses <- getBroadcastAddresses discoveryPort - let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) putMVar serverSocket sock @@ -259,9 +269,14 @@ startServer opt serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do - atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses - threadDelay $ announceIntervalSeconds * 1000 * 1000 + when (serverLocalDiscovery serverOptions) $ forkServerThread server $ do + announceAddreses <- fmap concat $ sequence $ + [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock + , getBroadcastAddresses discoveryPort + ] + forever $ do + atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) announceAddreses + threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do st <- derivePartialStorage serverStorage @@ -301,10 +316,11 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - case paddr of - DatagramAddress addr -> void $ S.sendTo sock msg addr + handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + case paddr of + DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg + PeerIceSession ice -> iceSend ice msg #endif forkServerThread server $ forever $ do @@ -365,7 +381,7 @@ startServer opt serverOrigHead logd' serverServices = do , addrFamily = AF_INET6 , addrSocketType = Datagram } - addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt) + addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort serverOptions) bracket (open addr) close loop forkServerThread server $ forever $ do @@ -392,7 +408,7 @@ dataResponseWorker server = forever $ do Right ref -> do atomically (writeTVar tvar $ Right ref) forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case - Left err -> atomically $ writeTQueue (serverErrorLog server) err + Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () return (Nothing, []) @@ -421,12 +437,18 @@ instance MonadFail PacketHandler where runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM () runPacketHandler secure peer@Peer {..} act = do let logd = writeTQueue $ serverErrorLog peerServer_ - runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case + runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case Left err -> do logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err Right ph -> do when (not $ null $ phHead ph) $ do - let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + body <- case phBodyStream ph of + Nothing -> return $ phBody ph + Just stream -> do + writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do + writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph + return [] + let packet = TransportPacket (TransportHeader $ phHead ph) body secreq = case (secure, phPlaintextReply ph) of (True, _) -> EncryptedOnly (False, False) -> PlaintextAllowed @@ -450,6 +472,7 @@ data PacketHandlerState = PacketHandlerState , phHead :: [TransportHeaderItem] , phAckedBy :: [TransportHeaderItem] , phBody :: [Ref] + , phBodyStream :: Maybe RawStreamWriter , phPlaintextReply :: Bool } @@ -462,6 +485,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } +sendBodyAsStream :: PacketHandler () +sendBodyAsStream = do + gets phBodyStream >>= \case + Nothing -> do + stream <- openStream + modify $ \ph -> ph { phBodyStream = Just stream } + Just _ -> return () + keepPlaintextReply :: PacketHandler () keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True } @@ -517,8 +548,12 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ finalizedChannel peer ch identity _ -> return () - Rejected dgst -> do - logd $ "rejected by peer: " ++ show dgst + Rejected dgst + | peerRequest : _ <- mapMaybe (\case TrChannelRequest d -> Just d; _ -> Nothing) headers + , peerRequest < dgst + -> return () -- Our request was rejected due to lower priority + + | otherwise -> logd $ "rejected by peer: " ++ show dgst DataRequest dgst | secure || dgst `elem` plaintextRefs -> do @@ -532,15 +567,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = -- otherwise lost the channel, so keep the reply plaintext as well. when (not secure) keepPlaintextReply - let bytes = lazyLoadBytes mref -- TODO: MTU - if (secure && BL.length bytes > 500) - then do - stream <- openStream - liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do - writeByteStringToStream stream bytes - else do - addBody $ mref + when (secure && BL.length (lazyLoadBytes mref) > 500) + sendBodyAsStream + + addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show dgst addHeader $ Rejected dgst @@ -556,7 +587,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) $ - "failed to receive object from stream: " <> err + "failed to receive object from stream: " <> showErebosError err Right objs -> do forM_ objs $ \obj -> do pref <- storeObject (peerInStorage peer) obj @@ -593,9 +624,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelCookieWait {} -> return () ChannelCookieReceived {} -> process ChannelCookieConfirmed {} -> process - ChannelOurRequest our | dgst < refDigest (storedRef our) -> process - | otherwise -> reject - ChannelPeerRequest {} -> process + ChannelOurRequest our + | dgst < refDigest (storedRef our) -> process + | otherwise -> do + -- Reject peer channel request with lower priority + addHeader $ TrChannelRequest $ refDigest $ storedRef our + reject + ChannelPeerRequest prev + | dgst == wrDigest prev -> addHeader $ Acknowledged dgst + | otherwise -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process ChannelClosed {} -> return () @@ -632,7 +669,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = _ -> return () -withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT ErebosError IO ()) -> m () withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) @@ -647,12 +684,14 @@ setupChannel identity peer upid = do [ TrChannelRequest reqref , AnnounceSelf $ refDigest $ storedRef $ idData identity ] + let sendChannelRequest = do + sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ + TransportPacket (TransportHeader hitems) [storedRef req] + setPeerChannel peer $ ChannelOurRequest req liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelCookieConfirmed -> do - sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ - TransportPacket (TransportHeader hitems) [storedRef req] - setPeerChannel peer $ ChannelOurRequest req + ChannelCookieReceived -> sendChannelRequest + ChannelCookieConfirmed -> sendChannelRequest _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -686,7 +725,7 @@ handleChannelAccept identity accref = do sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] finalizedChannel peer ch identity - Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) + Left dgst -> throwOtherError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () @@ -806,10 +845,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case _ -> return False sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () -sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] +sendToPeer peer = sendManyToPeer peer . (: []) + +sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m () +sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True) sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m () -sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True] +sendToPeerStored peer = sendManyToPeerStored peer . (: []) + +sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () +sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () sendToPeerList peer parts = do @@ -838,7 +883,7 @@ sendToPeerS = sendToPeerS' EncryptedOnly sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerPlain = sendToPeerS' PlaintextAllowed -sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () +sendToPeerWith :: forall s m e. (Service s, MonadIO m, MonadError e m, FromErebosError e) => Peer -> (ServiceState s -> ExceptT ErebosError IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do let sproxy = Proxy @s sid = serviceID sproxy @@ -853,7 +898,7 @@ sendToPeerWith peer fobj = do case res of Right (Just obj) -> sendToPeer peer obj Right Nothing -> return () - Left err -> throwError err + Left err -> throwError $ fromErebosError err lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) @@ -912,8 +957,57 @@ runPeerServiceOn mbservice peer handler = liftIO $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) +foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) -foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () +foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO () + +data InetAddress = InetAddress { fromInetAddress :: IP.IP } + +instance F.Storable InetAddress where + sizeOf _ = sizeOf (undefined :: CInt) + 16 + alignment _ = 8 + + peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case + AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) + AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) + _ -> fail "InetAddress: unknown family" + + poke ptr (InetAddress addr) = case addr of + IP.IPv4 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET) + pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) + IP.IPv6 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET6) + pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) + +joinMulticast :: Socket -> IO [ Word32 ] +joinMulticast sock = + withFdSocket sock $ \fd -> + alloca $ \pcount -> do + ptr <- cJoinMulticast fd pcount + if ptr == nullPtr + then do + return [] + else do + count <- fromIntegral <$> peek pcount + res <- forM [ 0 .. count - 1 ] $ \i -> + peekElemOff ptr i + cFree ptr + return res + +getServerAddresses :: Server -> IO [ SockAddr ] +getServerAddresses Server {..} = do + alloca $ \pcount -> do + ptr <- cLocalAddresses pcount + if ptr == nullPtr + then do + return [] + else do + count <- fromIntegral <$> peek pcount + res <- peekArray count ptr + cFree ptr + return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res getBroadcastAddresses :: PortNumber -> IO [SockAddr] getBroadcastAddresses port = do @@ -922,6 +1016,9 @@ getBroadcastAddresses port = do w <- peekElemOff ptr i if w == 0 then return [] else (SockAddrInet port w:) <$> parse (i + 1) - addrs <- parse 0 - cFree ptr - return addrs + if ptr == nullPtr + then return [] + else do + addrs <- parse 0 + cFree ptr + return addrs |