diff options
| -rw-r--r-- | src/Main.hs | 6 | ||||
| -rw-r--r-- | src/Network.hs | 232 | 
2 files changed, 154 insertions, 84 deletions
| diff --git a/src/Main.hs b/src/Main.hs index 0398233..34c2b3b 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -96,7 +96,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do                               False -> error "Requires terminal"      extPrint <- getExternalPrint      let extPrintLn str = extPrint $ str ++ "\n"; -    chanPeer <- liftIO $ do +    server <- liftIO $ do          erebosHead <- loadLocalStateHead st          startServer erebosHead extPrintLn bhost              [ SomeService @AttachService Proxy @@ -107,7 +107,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do      peers <- liftIO $ newMVar []      void $ liftIO $ forkIO $ void $ forever $ do -        peer <- readChan chanPeer +        peer <- readChan $ serverChanPeer server          if | PeerIdentityFull pid <- peerIdentity peer -> do                   let update [] = ([peer], Nothing)                       update (p:ps) | PeerIdentityFull pid' <- peerIdentity p @@ -142,6 +142,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do              curIdentity <- liftIO $ loadLocalIdentity st              res <- liftIO $ runExceptT $ flip execStateT cstate $ runReaderT cmd CommandInput                  { ciSelf = curIdentity +                , ciServer = server                  , ciLine = line                  , ciPrint = extPrintLn                  , ciPeers = liftIO $ readMVar peers @@ -158,6 +159,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do  data CommandInput = CommandInput      { ciSelf :: UnifiedIdentity +    , ciServer :: Server      , ciLine :: String      , ciPrint :: String -> IO ()      , ciPeers :: CommandM [Peer] diff --git a/src/Network.hs b/src/Network.hs index 429dee1..f07e7ce 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,12 +1,18 @@  module Network ( +    Server, +    startServer, +    serverChanPeer, +      Peer(..),      PeerAddress(..),      PeerIdentity(..),      PeerChannel(..),      WaitingRef, wrDigest,      Service(..), -    startServer, +    serverPeer,      sendToPeer, sendToPeerStored, sendToPeerWith, + +    discoveryPort,  ) where  import Control.Concurrent @@ -19,6 +25,7 @@ import qualified Data.ByteString.Char8 as BC  import qualified Data.ByteString.Lazy as BL  import Data.Either  import Data.List +import Data.Map (Map)  import qualified Data.Map as M  import Data.Maybe  import Data.Typeable @@ -42,6 +49,18 @@ announceIntervalSeconds :: Int  announceIntervalSeconds = 60 +data Server = Server +    { serverStorage :: Storage +    , serverIdentity :: MVar UnifiedIdentity +    , serverSocket :: MVar Socket +    , serverPeers :: MVar (Map SockAddr Peer) +    , serverChanPeer' :: Chan Peer +    } + +serverChanPeer :: Server -> Chan Peer +serverChanPeer = serverChanPeer' + +  data Peer = Peer      { peerAddress :: PeerAddress      , peerIdentity :: PeerIdentity @@ -51,7 +70,8 @@ data Peer = Peer      , peerStorage :: Storage      , peerInStorage :: PartialStorage      , peerServiceState :: MVar (M.Map ServiceID SomeServiceState) -    , peerServiceQueue :: [(ServiceID, WaitingRef)] +    , peerServiceInQueue :: [(ServiceID, WaitingRef)] +    , peerServiceOutQueue :: MVar [TransportPacket]      , peerWaitingRefs :: [WaitingRef]      } @@ -69,6 +89,9 @@ data PeerChannel = ChannelWait                   | ChannelEstablished Channel +data TransportPacket = TransportPacket TransportHeader [Ref] + +  data TransportHeaderItem      = Acknowledged PartialRef      | Rejected PartialRef @@ -158,7 +181,7 @@ receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do      checkWaitingRef wr -startServer :: Head -> (String -> IO ()) -> String -> [SomeService] -> IO (Chan Peer) +startServer :: Head -> (String -> IO ()) -> String -> [SomeService] -> IO Server  startServer origHead logd bhost services = do      let storage = refStorage $ headRef origHead      chanPeer <- newChan @@ -167,9 +190,11 @@ startServer origHead logd bhost services = do      peers <- newMVar M.empty      midentity <- newMVar $ headLocalIdentity origHead      mshared <- newMVar $ lsShared $ load $ headRef origHead +    ssocket <- newEmptyMVar      let open addr = do              sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) +            putMVar ssocket sock              setSocketOption sock ReuseAddr 1              setSocketOption sock Broadcast 1              setCloseOnExecIfNeeded =<< fdSocket sock @@ -223,54 +248,42 @@ startServer origHead logd bhost services = do              forever $ do                  (msg, paddr) <- recvFrom sock 4096 -                mbpeer <- M.lookup paddr <$> readMVar peers -                (peer, content, secure) <- if -                    | Just peer <- mbpeer -                    , Just ch <- case peerChannel peer of -                                      ChannelEstablished ch  -> Just ch -                                      ChannelOurAccept _ sch -> Just $ fromStored sch -                                      _                      -> Nothing -                    , Right plain <- runExcept $ channelDecrypt ch msg -                    -> return (peer, plain, True) - -                    | Just peer <- mbpeer -                    -> return (peer, msg, False) - -                    | otherwise -> do -                          pst <- deriveEphemeralStorage storage -                          ist <- derivePartialStorage pst -                          svcs <- newMVar M.empty -                          let peer = Peer -                                  { peerAddress = DatagramAddress paddr -                                  , peerIdentity = PeerIdentityUnknown -                                  , peerIdentityUpdate = [] -                                  , peerChannel = ChannelWait -                                  , peerSocket = sock -                                  , peerStorage = pst -                                  , peerInStorage = ist -                                  , peerServiceState = svcs -                                  , peerServiceQueue = [] -                                  , peerWaitingRefs = [] -                                  } -                          return (peer, msg, False) - -                case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of -                     Right (obj:objs) -                         | Just header <- transportFromObject obj -> do -                               forM_ objs $ storeObject $ peerInStorage peer -                               identity <- readMVar midentity -                               let svcs = map someServiceID services -                               handlePacket logd identity secure peer chanSvc svcs header >>= \case -                                   Just peer' -> do -                                       modifyMVar_ peers $ return . M.insert paddr peer' -                                       writeChan chanPeer peer' -                                   Nothing -> return () - -                         | otherwise -> do -                               logd $ show paddr ++ ": invalid objects" -                               logd $ show objs - -                     _ -> logd $ show paddr ++ ": invalid objects" +                modifyMVar_ peers $ \pvalue -> do +                    let mbpeer = M.lookup paddr pvalue +                    (peer, content, secure) <- if +                        | Just peer <- mbpeer +                        , Just ch <- case peerChannel peer of +                                          ChannelEstablished ch  -> Just ch +                                          ChannelOurAccept _ sch -> Just $ fromStored sch +                                          _                      -> Nothing +                        , Right plain <- runExcept $ channelDecrypt ch msg +                        -> return (peer, plain, True) + +                        | Just peer <- mbpeer +                        -> return (peer, msg, False) + +                        | otherwise +                        -> (, msg, False) <$> mkPeer storage sock paddr + +                    case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of +                         Right (obj:objs) +                             | Just header <- transportFromObject obj -> do +                                   forM_ objs $ storeObject $ peerInStorage peer +                                   identity <- readMVar midentity +                                   let svcs = map someServiceID services +                                   handlePacket logd identity secure peer chanSvc svcs header >>= \case +                                       Just peer' -> do +                                           writeChan chanPeer peer' +                                           return $ M.insert paddr peer' pvalue +                                       Nothing -> return pvalue + +                             | otherwise -> do +                                   logd $ show paddr ++ ": invalid objects" +                                   logd $ show objs +                                   return pvalue + +                         _ -> do logd $ show paddr ++ ": invalid objects" +                                 return pvalue      void $ forkIO $ withSocketsDo $ do          let hints = defaultHints @@ -308,7 +321,13 @@ startServer origHead logd bhost services = do              | DatagramAddress paddr <- peerAddress peer -> do                  logd $ "service packet from peer with incomplete identity " ++ show paddr -    return chanPeer +    return Server +        { serverStorage = storage +        , serverIdentity = midentity +        , serverSocket = ssocket +        , serverPeers = peers +        , serverChanPeer' = chanPeer +        }  type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a @@ -354,9 +373,7 @@ handlePacket logd identity secure opeer chanSvc svcs (TransportHeader headers) =                  gets (peerChannel . phPeer) >>= \case                      ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do                          updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } -                        sendIdentityUpdate identity -                        sendSharedState identity . lsShared . fromStored =<< -                            liftIO (loadLocalState $ storedStorage $ idData identity) +                        finalizedChannel identity                      _ -> return ()              Rejected _ -> return () @@ -444,7 +461,7 @@ handlePacket logd identity secure opeer chanSvc svcs (TransportHeader headers) =                                  addHeader $ Acknowledged pref                                  pst <- gets $ peerStorage . phPeer                                  wref <- newWaitingRef pst pref -                                updatePeer $ \p -> p { peerServiceQueue = (svc, wref) : peerServiceQueue p } +                                updatePeer $ \p -> p { peerServiceInQueue = (svc, wref) : peerServiceInQueue p }                              Left _ -> throwError $ "missing service object " ++ show pref                      | otherwise -> addHeader $ Rejected pref                  | otherwise -> throwError $ "service ref without type" @@ -553,30 +570,39 @@ handleChannelAccept identity accref = do                  { peerIdentity = PeerIdentityFull pid                  , peerChannel = ChannelEstablished $ fromStored ch                  } -            sendIdentityUpdate identity -            sendSharedState identity . lsShared . fromStored =<< -                liftIO (loadLocalState $ storedStorage $ idData identity) +            finalizedChannel identity          Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -sendIdentityUpdate :: UnifiedIdentity -> PacketHandler () -sendIdentityUpdate self = do +finalizedChannel :: UnifiedIdentity -> PacketHandler () +finalizedChannel self = do +    -- Identity update      ist <- gets $ peerInStorage . phPeer      addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData $ self      mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self - -sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler () -sendSharedState self shared = do +    -- Shared state      gets phPeer >>= \case          peer | PeerIdentityFull pid <- peerIdentity peer               , finalOwner pid `sameIdentity` finalOwner self -> do -                 ist <- gets $ peerInStorage . phPeer +                 shared <- lsShared . fromStored <$> +                     liftIO (loadLocalState $ storedStorage $ idData self)                   addHeader $ ServiceType $ serviceID @SyncService Proxy                   mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared                   mapM_ (addBody . storedRef) shared               | otherwise -> return () +    -- Outstanding service packets +    gets phPeer >>= \case +        Peer { peerChannel = ChannelEstablished ch +             , peerAddress = DatagramAddress paddr +             , peerServiceOutQueue = oqueue +             , peerSocket = sock +             } -> do +                 ps <- liftIO $ modifyMVar oqueue $ return . ([],) +                 forM_ ps $ sendPacket sock paddr ch +        _ -> return () +  handleIdentityUpdate :: PacketHandler ()  handleIdentityUpdate = do @@ -601,7 +627,7 @@ handleIdentityUpdate = do  handleServices :: Chan (Peer, ServiceID, Ref) -> PacketHandler () -handleServices chan = gets (peerServiceQueue . phPeer) >>= \case +handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case      [] -> return ()      queue -> do          queue' <- flip filterM queue $ \case @@ -611,7 +637,45 @@ handleServices chan = gets (peerServiceQueue . phPeer) >>= \case                      liftIO $ writeChan chan (peer, svc, ref)                      return False                  Nothing -> return True -        updatePeer $ \p -> p { peerServiceQueue = queue' } +        updatePeer $ \p -> p { peerServiceInQueue = queue' } + + +mkPeer :: Storage -> Socket -> SockAddr -> IO Peer +mkPeer st sock paddr = do +    pst <- deriveEphemeralStorage st +    ist <- derivePartialStorage pst +    svcs <- newMVar M.empty +    oqueue <- newMVar [] +    return $ Peer +        { peerAddress = DatagramAddress paddr +        , peerIdentity = PeerIdentityUnknown +        , peerIdentityUpdate = [] +        , peerChannel = ChannelWait +        , peerSocket = sock +        , peerStorage = pst +        , peerInStorage = ist +        , peerServiceState = svcs +        , peerServiceInQueue = [] +        , peerServiceOutQueue = oqueue +        , peerWaitingRefs = [] +        } + +serverPeer :: Server -> SockAddr -> IO Peer +serverPeer server paddr = do +    sock <- readMVar $ serverSocket server +    (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do +        case M.lookup paddr pvalue of +             Just peer -> return (pvalue, (peer, False)) +             Nothing -> do +                 peer <- mkPeer (serverStorage server) sock paddr +                 return (M.insert paddr peer pvalue, (peer, True)) +    when hello $ do +        identity <- readMVar (serverIdentity server) +        void $ sendTo sock +            (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader +                [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ] +            ) paddr +    return peer  sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m () @@ -621,24 +685,28 @@ sendToPeerStored :: (Service s, MonadIO m, MonadError String m) => UnifiedIdenti  sendToPeerStored self peer spacket = sendToPeerList self peer [ServiceReply (Right spacket) True]  sendToPeerList :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m () -sendToPeerList _ peer@Peer { peerChannel = ChannelEstablished ch } parts = do -    let st = peerInStorage peer +sendToPeerList _ peer parts = do +    let st = peerStorage peer +        pst = peerInStorage peer      srefs <- liftIO $ forM parts $ \case ServiceReply (Left x) _ -> store st x -                                         ServiceReply (Right sx) _ -> copyRef st (storedRef sx) - -    bytes <- forM (zip parts srefs) $ -        \case (ServiceReply _ False, _) -> return BL.empty -              (ServiceReply _ True, ref) -> case lazyLoadBytes ref of -                  Right bytes -> return bytes -                  Left dgst -> throwError $ "incomplete ref " ++ show ref ++ ", missing " ++ BC.unpack (showRefDigest dgst) +                                         ServiceReply (Right sx) _ -> return $ storedRef sx +    prefs <- mapM (copyRef pst) srefs +    let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs) +        header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) +        packet = TransportPacket header content +    case peerChannel peer of +         ChannelEstablished ch -> do +             let DatagramAddress paddr = peerAddress peer +             sendPacket (peerSocket peer) paddr ch packet +         _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:) + +sendPacket :: (MonadIO m, MonadError String m) => Socket -> SockAddr -> Channel -> TransportPacket -> m () +sendPacket sock addr ch (TransportPacket header content) = do      let plain = BL.toStrict $ BL.concat $ -            (serializeObject $ transportToObject $ TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef srefs)) -            : bytes +            (serializeObject $ transportToObject header) +            : map lazyLoadBytes content      ctext <- channelEncrypt ch plain -    let DatagramAddress paddr = peerAddress peer -    void $ liftIO $ sendTo (peerSocket peer) ctext paddr - -sendToPeerList _ _ _ = throwError $ "no channel to peer" +    void $ liftIO $ sendTo sock ctext addr  sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()  sendToPeerWith identity peer fobj = do |