From b08e5a3e6d82ca5e5a2e29e791a2e61bf08964a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 10 May 2020 23:30:59 +0200 Subject: Network: support adding custom peers --- src/Main.hs | 6 +- src/Network.hs | 232 +++++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 154 insertions(+), 84 deletions(-) diff --git a/src/Main.hs b/src/Main.hs index 0398233..34c2b3b 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -96,7 +96,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do False -> error "Requires terminal" extPrint <- getExternalPrint let extPrintLn str = extPrint $ str ++ "\n"; - chanPeer <- liftIO $ do + server <- liftIO $ do erebosHead <- loadLocalStateHead st startServer erebosHead extPrintLn bhost [ SomeService @AttachService Proxy @@ -107,7 +107,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do peers <- liftIO $ newMVar [] void $ liftIO $ forkIO $ void $ forever $ do - peer <- readChan chanPeer + peer <- readChan $ serverChanPeer server if | PeerIdentityFull pid <- peerIdentity peer -> do let update [] = ([peer], Nothing) update (p:ps) | PeerIdentityFull pid' <- peerIdentity p @@ -142,6 +142,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do curIdentity <- liftIO $ loadLocalIdentity st res <- liftIO $ runExceptT $ flip execStateT cstate $ runReaderT cmd CommandInput { ciSelf = curIdentity + , ciServer = server , ciLine = line , ciPrint = extPrintLn , ciPeers = liftIO $ readMVar peers @@ -158,6 +159,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do data CommandInput = CommandInput { ciSelf :: UnifiedIdentity + , ciServer :: Server , ciLine :: String , ciPrint :: String -> IO () , ciPeers :: CommandM [Peer] diff --git a/src/Network.hs b/src/Network.hs index 429dee1..f07e7ce 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,12 +1,18 @@ module Network ( + Server, + startServer, + serverChanPeer, + Peer(..), PeerAddress(..), PeerIdentity(..), PeerChannel(..), WaitingRef, wrDigest, Service(..), - startServer, + serverPeer, sendToPeer, sendToPeerStored, sendToPeerWith, + + discoveryPort, ) where import Control.Concurrent @@ -19,6 +25,7 @@ import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL import Data.Either import Data.List +import Data.Map (Map) import qualified Data.Map as M import Data.Maybe import Data.Typeable @@ -42,6 +49,18 @@ announceIntervalSeconds :: Int announceIntervalSeconds = 60 +data Server = Server + { serverStorage :: Storage + , serverIdentity :: MVar UnifiedIdentity + , serverSocket :: MVar Socket + , serverPeers :: MVar (Map SockAddr Peer) + , serverChanPeer' :: Chan Peer + } + +serverChanPeer :: Server -> Chan Peer +serverChanPeer = serverChanPeer' + + data Peer = Peer { peerAddress :: PeerAddress , peerIdentity :: PeerIdentity @@ -51,7 +70,8 @@ data Peer = Peer , peerStorage :: Storage , peerInStorage :: PartialStorage , peerServiceState :: MVar (M.Map ServiceID SomeServiceState) - , peerServiceQueue :: [(ServiceID, WaitingRef)] + , peerServiceInQueue :: [(ServiceID, WaitingRef)] + , peerServiceOutQueue :: MVar [TransportPacket] , peerWaitingRefs :: [WaitingRef] } @@ -69,6 +89,9 @@ data PeerChannel = ChannelWait | ChannelEstablished Channel +data TransportPacket = TransportPacket TransportHeader [Ref] + + data TransportHeaderItem = Acknowledged PartialRef | Rejected PartialRef @@ -158,7 +181,7 @@ receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do checkWaitingRef wr -startServer :: Head -> (String -> IO ()) -> String -> [SomeService] -> IO (Chan Peer) +startServer :: Head -> (String -> IO ()) -> String -> [SomeService] -> IO Server startServer origHead logd bhost services = do let storage = refStorage $ headRef origHead chanPeer <- newChan @@ -167,9 +190,11 @@ startServer origHead logd bhost services = do peers <- newMVar M.empty midentity <- newMVar $ headLocalIdentity origHead mshared <- newMVar $ lsShared $ load $ headRef origHead + ssocket <- newEmptyMVar let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) + putMVar ssocket sock setSocketOption sock ReuseAddr 1 setSocketOption sock Broadcast 1 setCloseOnExecIfNeeded =<< fdSocket sock @@ -223,54 +248,42 @@ startServer origHead logd bhost services = do forever $ do (msg, paddr) <- recvFrom sock 4096 - mbpeer <- M.lookup paddr <$> readMVar peers - (peer, content, secure) <- if - | Just peer <- mbpeer - , Just ch <- case peerChannel peer of - ChannelEstablished ch -> Just ch - ChannelOurAccept _ sch -> Just $ fromStored sch - _ -> Nothing - , Right plain <- runExcept $ channelDecrypt ch msg - -> return (peer, plain, True) - - | Just peer <- mbpeer - -> return (peer, msg, False) - - | otherwise -> do - pst <- deriveEphemeralStorage storage - ist <- derivePartialStorage pst - svcs <- newMVar M.empty - let peer = Peer - { peerAddress = DatagramAddress paddr - , peerIdentity = PeerIdentityUnknown - , peerIdentityUpdate = [] - , peerChannel = ChannelWait - , peerSocket = sock - , peerStorage = pst - , peerInStorage = ist - , peerServiceState = svcs - , peerServiceQueue = [] - , peerWaitingRefs = [] - } - return (peer, msg, False) - - case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of - Right (obj:objs) - | Just header <- transportFromObject obj -> do - forM_ objs $ storeObject $ peerInStorage peer - identity <- readMVar midentity - let svcs = map someServiceID services - handlePacket logd identity secure peer chanSvc svcs header >>= \case - Just peer' -> do - modifyMVar_ peers $ return . M.insert paddr peer' - writeChan chanPeer peer' - Nothing -> return () - - | otherwise -> do - logd $ show paddr ++ ": invalid objects" - logd $ show objs - - _ -> logd $ show paddr ++ ": invalid objects" + modifyMVar_ peers $ \pvalue -> do + let mbpeer = M.lookup paddr pvalue + (peer, content, secure) <- if + | Just peer <- mbpeer + , Just ch <- case peerChannel peer of + ChannelEstablished ch -> Just ch + ChannelOurAccept _ sch -> Just $ fromStored sch + _ -> Nothing + , Right plain <- runExcept $ channelDecrypt ch msg + -> return (peer, plain, True) + + | Just peer <- mbpeer + -> return (peer, msg, False) + + | otherwise + -> (, msg, False) <$> mkPeer storage sock paddr + + case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of + Right (obj:objs) + | Just header <- transportFromObject obj -> do + forM_ objs $ storeObject $ peerInStorage peer + identity <- readMVar midentity + let svcs = map someServiceID services + handlePacket logd identity secure peer chanSvc svcs header >>= \case + Just peer' -> do + writeChan chanPeer peer' + return $ M.insert paddr peer' pvalue + Nothing -> return pvalue + + | otherwise -> do + logd $ show paddr ++ ": invalid objects" + logd $ show objs + return pvalue + + _ -> do logd $ show paddr ++ ": invalid objects" + return pvalue void $ forkIO $ withSocketsDo $ do let hints = defaultHints @@ -308,7 +321,13 @@ startServer origHead logd bhost services = do | DatagramAddress paddr <- peerAddress peer -> do logd $ "service packet from peer with incomplete identity " ++ show paddr - return chanPeer + return Server + { serverStorage = storage + , serverIdentity = midentity + , serverSocket = ssocket + , serverPeers = peers + , serverChanPeer' = chanPeer + } type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a @@ -354,9 +373,7 @@ handlePacket logd identity secure opeer chanSvc svcs (TransportHeader headers) = gets (peerChannel . phPeer) >>= \case ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } - sendIdentityUpdate identity - sendSharedState identity . lsShared . fromStored =<< - liftIO (loadLocalState $ storedStorage $ idData identity) + finalizedChannel identity _ -> return () Rejected _ -> return () @@ -444,7 +461,7 @@ handlePacket logd identity secure opeer chanSvc svcs (TransportHeader headers) = addHeader $ Acknowledged pref pst <- gets $ peerStorage . phPeer wref <- newWaitingRef pst pref - updatePeer $ \p -> p { peerServiceQueue = (svc, wref) : peerServiceQueue p } + updatePeer $ \p -> p { peerServiceInQueue = (svc, wref) : peerServiceInQueue p } Left _ -> throwError $ "missing service object " ++ show pref | otherwise -> addHeader $ Rejected pref | otherwise -> throwError $ "service ref without type" @@ -553,30 +570,39 @@ handleChannelAccept identity accref = do { peerIdentity = PeerIdentityFull pid , peerChannel = ChannelEstablished $ fromStored ch } - sendIdentityUpdate identity - sendSharedState identity . lsShared . fromStored =<< - liftIO (loadLocalState $ storedStorage $ idData identity) + finalizedChannel identity Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -sendIdentityUpdate :: UnifiedIdentity -> PacketHandler () -sendIdentityUpdate self = do +finalizedChannel :: UnifiedIdentity -> PacketHandler () +finalizedChannel self = do + -- Identity update ist <- gets $ peerInStorage . phPeer addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData $ self mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self - -sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler () -sendSharedState self shared = do + -- Shared state gets phPeer >>= \case peer | PeerIdentityFull pid <- peerIdentity peer , finalOwner pid `sameIdentity` finalOwner self -> do - ist <- gets $ peerInStorage . phPeer + shared <- lsShared . fromStored <$> + liftIO (loadLocalState $ storedStorage $ idData self) addHeader $ ServiceType $ serviceID @SyncService Proxy mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared mapM_ (addBody . storedRef) shared | otherwise -> return () + -- Outstanding service packets + gets phPeer >>= \case + Peer { peerChannel = ChannelEstablished ch + , peerAddress = DatagramAddress paddr + , peerServiceOutQueue = oqueue + , peerSocket = sock + } -> do + ps <- liftIO $ modifyMVar oqueue $ return . ([],) + forM_ ps $ sendPacket sock paddr ch + _ -> return () + handleIdentityUpdate :: PacketHandler () handleIdentityUpdate = do @@ -601,7 +627,7 @@ handleIdentityUpdate = do handleServices :: Chan (Peer, ServiceID, Ref) -> PacketHandler () -handleServices chan = gets (peerServiceQueue . phPeer) >>= \case +handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case [] -> return () queue -> do queue' <- flip filterM queue $ \case @@ -611,7 +637,45 @@ handleServices chan = gets (peerServiceQueue . phPeer) >>= \case liftIO $ writeChan chan (peer, svc, ref) return False Nothing -> return True - updatePeer $ \p -> p { peerServiceQueue = queue' } + updatePeer $ \p -> p { peerServiceInQueue = queue' } + + +mkPeer :: Storage -> Socket -> SockAddr -> IO Peer +mkPeer st sock paddr = do + pst <- deriveEphemeralStorage st + ist <- derivePartialStorage pst + svcs <- newMVar M.empty + oqueue <- newMVar [] + return $ Peer + { peerAddress = DatagramAddress paddr + , peerIdentity = PeerIdentityUnknown + , peerIdentityUpdate = [] + , peerChannel = ChannelWait + , peerSocket = sock + , peerStorage = pst + , peerInStorage = ist + , peerServiceState = svcs + , peerServiceInQueue = [] + , peerServiceOutQueue = oqueue + , peerWaitingRefs = [] + } + +serverPeer :: Server -> SockAddr -> IO Peer +serverPeer server paddr = do + sock <- readMVar $ serverSocket server + (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do + case M.lookup paddr pvalue of + Just peer -> return (pvalue, (peer, False)) + Nothing -> do + peer <- mkPeer (serverStorage server) sock paddr + return (M.insert paddr peer pvalue, (peer, True)) + when hello $ do + identity <- readMVar (serverIdentity server) + void $ sendTo sock + (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader + [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ] + ) paddr + return peer sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m () @@ -621,24 +685,28 @@ sendToPeerStored :: (Service s, MonadIO m, MonadError String m) => UnifiedIdenti sendToPeerStored self peer spacket = sendToPeerList self peer [ServiceReply (Right spacket) True] sendToPeerList :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m () -sendToPeerList _ peer@Peer { peerChannel = ChannelEstablished ch } parts = do - let st = peerInStorage peer +sendToPeerList _ peer parts = do + let st = peerStorage peer + pst = peerInStorage peer srefs <- liftIO $ forM parts $ \case ServiceReply (Left x) _ -> store st x - ServiceReply (Right sx) _ -> copyRef st (storedRef sx) - - bytes <- forM (zip parts srefs) $ - \case (ServiceReply _ False, _) -> return BL.empty - (ServiceReply _ True, ref) -> case lazyLoadBytes ref of - Right bytes -> return bytes - Left dgst -> throwError $ "incomplete ref " ++ show ref ++ ", missing " ++ BC.unpack (showRefDigest dgst) + ServiceReply (Right sx) _ -> return $ storedRef sx + prefs <- mapM (copyRef pst) srefs + let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs) + header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) + packet = TransportPacket header content + case peerChannel peer of + ChannelEstablished ch -> do + let DatagramAddress paddr = peerAddress peer + sendPacket (peerSocket peer) paddr ch packet + _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:) + +sendPacket :: (MonadIO m, MonadError String m) => Socket -> SockAddr -> Channel -> TransportPacket -> m () +sendPacket sock addr ch (TransportPacket header content) = do let plain = BL.toStrict $ BL.concat $ - (serializeObject $ transportToObject $ TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef srefs)) - : bytes + (serializeObject $ transportToObject header) + : map lazyLoadBytes content ctext <- channelEncrypt ch plain - let DatagramAddress paddr = peerAddress peer - void $ liftIO $ sendTo (peerSocket peer) ctext paddr - -sendToPeerList _ _ _ = throwError $ "no channel to peer" + void $ liftIO $ sendTo sock ctext addr sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () sendToPeerWith identity peer fobj = do -- cgit v1.2.3