diff options
Diffstat (limited to 'src/Network.hs')
-rw-r--r-- | src/Network.hs | 106 |
1 files changed, 69 insertions, 37 deletions
diff --git a/src/Network.hs b/src/Network.hs index 5685627..cbc68b6 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -9,7 +9,7 @@ module Network ( PeerChannel(..), WaitingRef, wrDigest, Service(..), - serverPeer, + serverPeer, serverPeerIce, sendToPeer, sendToPeerStored, sendToPeerWith, discoveryPort, @@ -31,9 +31,10 @@ import Data.Maybe import Data.Typeable import Network.Socket -import Network.Socket.ByteString (recvFrom, sendTo) +import qualified Network.Socket.ByteString as S import Channel +import ICE import Identity import PubKey import Service @@ -53,7 +54,8 @@ data Server = Server { serverStorage :: Storage , serverIdentity :: MVar UnifiedIdentity , serverSocket :: MVar Socket - , serverPeers :: MVar (Map SockAddr Peer) + , serverChanPacket :: Chan (PeerAddress, BC.ByteString) + , serverPeers :: MVar (Map PeerAddress Peer) , serverChanPeer' :: Chan Peer } @@ -66,7 +68,6 @@ data Peer = Peer , peerIdentity :: PeerIdentity , peerIdentityUpdate :: [WaitingRef] , peerChannel :: PeerChannel - , peerSocket :: Socket , peerStorage :: Storage , peerInStorage :: PartialStorage , peerServiceState :: MVar (M.Map ServiceID SomeServiceState) @@ -75,8 +76,24 @@ data Peer = Peer , peerWaitingRefs :: [WaitingRef] } -data PeerAddress = DatagramAddress SockAddr - deriving (Show) +data PeerAddress = DatagramAddress Socket SockAddr + | PeerIceSession IceSession + +instance Show PeerAddress where + show (DatagramAddress _ addr) = show addr + show (PeerIceSession ice) = show ice + +instance Eq PeerAddress where + DatagramAddress _ addr == DatagramAddress _ addr' = addr == addr' + PeerIceSession ice == PeerIceSession ice' = ice == ice' + _ == _ = False + +instance Ord PeerAddress where + compare (DatagramAddress _ addr) (DatagramAddress _ addr') = compare addr addr' + compare (DatagramAddress _ _ ) _ = LT + compare _ (DatagramAddress _ _ ) = GT + compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' + data PeerIdentity = PeerIdentityUnknown | PeerIdentityRef WaitingRef @@ -184,6 +201,7 @@ receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do startServer :: Head LocalState -> (String -> IO ()) -> String -> [SomeService] -> IO Server startServer origHead logd bhost services = do let storage = refStorage $ headRef origHead + chanPacket <- newChan chanPeer <- newChan chanSvc <- newChan svcStates <- newMVar M.empty @@ -206,7 +224,7 @@ startServer origHead logd bhost services = do readMVar midentity >>= \identity -> do st <- derivePartialStorage storage baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort) - void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr) + void $ S.sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]) (addrAddress baddr) threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do @@ -220,9 +238,8 @@ startServer origHead logd bhost services = do peer | PeerIdentityFull _ <- peerIdentity peer , ChannelEstablished ch <- peerChannel peer - , DatagramAddress paddr <- peerAddress peer -> runExceptT (channelEncrypt ch plaintext) >>= \case - Right ctext -> void $ sendTo (peerSocket peer) ctext paddr + Right ctext -> void $ sendTo peer ctext Left err -> logd $ "Failed to encrypt data: " ++ err | otherwise -> return () @@ -246,8 +263,12 @@ startServer origHead logd bhost services = do when changedShared $ do mapM_ (shareState idt shared) =<< readMVar peers + void $ forkIO $ forever $ do + (msg, saddr) <- S.recvFrom sock 4096 + writeChan chanPacket (DatagramAddress sock saddr, msg) + forever $ do - (msg, paddr) <- recvFrom sock 4096 + (paddr, msg) <- readChan chanPacket modifyMVar_ peers $ \pvalue -> do let mbpeer = M.lookup paddr pvalue (peer, content, secure) <- if @@ -263,7 +284,7 @@ startServer origHead logd bhost services = do -> return (peer, msg, False) | otherwise - -> (, msg, False) <$> mkPeer storage sock paddr + -> (, msg, False) <$> mkPeer storage paddr case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of Right (obj:objs) @@ -323,13 +344,14 @@ startServer origHead logd bhost services = do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return (svcs, global) - | DatagramAddress paddr <- peerAddress peer -> do - logd $ "service packet from peer with incomplete identity " ++ show paddr + | otherwise -> do + logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) return Server { serverStorage = storage , serverIdentity = midentity , serverSocket = ssocket + , serverChanPacket = chanPacket , serverPeers = peers , serverChanPeer' = chanPeer } @@ -362,7 +384,6 @@ handlePacket :: (String -> IO ()) -> Head LocalState -> UnifiedIdentity -> Bool -> TransportHeader -> IO (Maybe Peer) handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader headers) = do let sidentity = idData identity - DatagramAddress paddr = peerAddress opeer plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat [ [ storedRef sidentity ] , map storedRef $ idUpdates identity @@ -477,7 +498,7 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h case res of Left err -> do - logd $ "Error in handling packet from " ++ show paddr ++ ": " ++ err + logd $ "Error in handling packet from " ++ show (peerAddress opeer) ++ ": " ++ err return Nothing Right ph -> do when (not $ null $ phHead ph) $ do @@ -488,9 +509,9 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h case peerChannel $ phPeer ph of ChannelEstablished ch -> do x <- runExceptT (channelEncrypt ch plain) - case x of Right ctext -> void $ sendTo (peerSocket $ phPeer ph) ctext paddr + case x of Right ctext -> void $ sendTo (phPeer ph) ctext Left err -> logd $ "Failed to encrypt data: " ++ err - _ -> void $ sendTo (peerSocket $ phPeer ph) plain paddr + _ -> void $ sendTo (phPeer ph) plain return $ if phPeerChanged ph then Just $ phPeer ph else Nothing @@ -599,13 +620,12 @@ finalizedChannel oh self = do -- Outstanding service packets gets phPeer >>= \case - Peer { peerChannel = ChannelEstablished ch - , peerAddress = DatagramAddress paddr - , peerServiceOutQueue = oqueue - , peerSocket = sock - } -> do - ps <- liftIO $ modifyMVar oqueue $ return . ([],) - forM_ ps $ sendPacket sock paddr ch + peer@Peer + { peerChannel = ChannelEstablished ch + , peerServiceOutQueue = oqueue + } -> do + ps <- liftIO $ modifyMVar oqueue $ return . ([],) + forM_ ps $ sendPacket peer ch _ -> return () @@ -645,18 +665,17 @@ handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case updatePeer $ \p -> p { peerServiceInQueue = queue' } -mkPeer :: Storage -> Socket -> SockAddr -> IO Peer -mkPeer st sock paddr = do +mkPeer :: Storage -> PeerAddress -> IO Peer +mkPeer st paddr = do pst <- deriveEphemeralStorage st ist <- derivePartialStorage pst svcs <- newMVar M.empty oqueue <- newMVar [] return $ Peer - { peerAddress = DatagramAddress paddr + { peerAddress = paddr , peerIdentity = PeerIdentityUnknown , peerIdentityUpdate = [] , peerChannel = ChannelWait - , peerSocket = sock , peerStorage = pst , peerInStorage = ist , peerServiceState = svcs @@ -668,21 +687,35 @@ mkPeer st sock paddr = do serverPeer :: Server -> SockAddr -> IO Peer serverPeer server paddr = do sock <- readMVar $ serverSocket server + serverPeer' server (DatagramAddress sock paddr) + +serverPeerIce :: Server -> IceSession -> IO Peer +serverPeerIce server ice = do + let paddr = PeerIceSession ice + peer <- serverPeer' server paddr + iceSetChan ice (paddr,) $ serverChanPacket server + return peer + +serverPeer' :: Server -> PeerAddress -> IO Peer +serverPeer' server paddr = do (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do case M.lookup paddr pvalue of Just peer -> return (pvalue, (peer, False)) Nothing -> do - peer <- mkPeer (serverStorage server) sock paddr + peer <- mkPeer (serverStorage server) paddr return (M.insert paddr peer pvalue, (peer, True)) when hello $ do identity <- readMVar (serverIdentity server) - void $ sendTo sock - (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader + void $ sendTo peer $ + BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ] - ) paddr return peer +sendTo :: Peer -> BC.ByteString -> IO () +sendTo Peer { peerAddress = DatagramAddress sock addr } msg = void $ S.sendTo sock msg addr +sendTo Peer { peerAddress = PeerIceSession ice } msg = iceSend ice msg + sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m () sendToPeer self peer packet = sendToPeerList self peer [ServiceReply (Left packet) True] @@ -701,17 +734,16 @@ sendToPeerList _ peer parts = do packet = TransportPacket header content case peerChannel peer of ChannelEstablished ch -> do - let DatagramAddress paddr = peerAddress peer - sendPacket (peerSocket peer) paddr ch packet + sendPacket peer ch packet _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:) -sendPacket :: (MonadIO m, MonadError String m) => Socket -> SockAddr -> Channel -> TransportPacket -> m () -sendPacket sock addr ch (TransportPacket header content) = do +sendPacket :: (MonadIO m, MonadError String m) => Peer -> Channel -> TransportPacket -> m () +sendPacket peer ch (TransportPacket header content) = do let plain = BL.toStrict $ BL.concat $ (serializeObject $ transportToObject header) : map lazyLoadBytes content ctext <- channelEncrypt ch plain - void $ liftIO $ sendTo sock ctext addr + void $ liftIO $ sendTo peer ctext sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () sendToPeerWith identity peer fobj = do |