diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-19 12:44:35 +0200 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2023-08-27 12:01:16 +0200 | 
| commit | 558ea4d565799aa2000af0b1fc6d159447c9868b (patch) | |
| tree | d6b25479e5c466cc26931d0f021c1eb628a9a0cd /src/Network | |
| parent | e4fd563f40bacce33712a84e0b275feb1881ea28 (diff) | |
Network: connection initiation with cookie
Diffstat (limited to 'src/Network')
| -rw-r--r-- | src/Network/Protocol.hs | 245 | 
1 files changed, 183 insertions, 62 deletions
| diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index db5e767..bd93386 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -11,6 +11,7 @@ module Network.Protocol (      ChannelState(..),      ControlRequest(..), +    ControlMessage(..),      erebosNetworkProtocol,      Connection, @@ -63,6 +64,9 @@ data TransportHeaderItem      = Acknowledged RefDigest      | Rejected RefDigest      | ProtocolVersion Text +    | Initiation RefDigest +    | CookieSet Cookie +    | CookieEcho Cookie      | DataRequest RefDigest      | DataResponse RefDigest      | AnnounceSelf RefDigest @@ -73,12 +77,18 @@ data TransportHeaderItem      | ServiceRef RefDigest      deriving (Eq) +newtype Cookie = Cookie ByteString +    deriving (Eq) +  transportToObject :: PartialStorage -> TransportHeader -> PartialObject  transportToObject st (TransportHeader items) = Rec $ map single items      where single = \case                Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst)                Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst)                ProtocolVersion ver -> (BC.pack "VER", RecText ver) +              Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst) +              CookieSet (Cookie bytes) -> (BC.pack "CKS", RecBinary bytes) +              CookieEcho (Cookie bytes) -> (BC.pack "CKE", RecBinary bytes)                DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst)                DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst)                AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst) @@ -96,6 +106,9 @@ transportFromObject (Rec items) = case catMaybes $ map single items of                | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref                | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref                | name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver +              | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref +              | name == BC.pack "CKS", RecBinary bytes <- content -> Just $ CookieSet (Cookie bytes) +              | name == BC.pack "CKE", RecBinary bytes <- content -> Just $ CookieEcho (Cookie bytes)                | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref                | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref                | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref @@ -109,14 +122,15 @@ transportFromObject _ = Nothing  data GlobalState addr = (Eq addr, Show addr) => GlobalState -    { gIdentity :: TVar UnifiedIdentity +    { gIdentity :: TVar (UnifiedIdentity, [UnifiedIdentity])      , gConnections :: TVar [Connection addr]      , gDataFlow :: SymFlow (addr, ByteString) -    , gControlFlow :: Flow (ControlRequest addr) (Connection addr) +    , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr)      , gLog :: String -> STM ()      , gStorage :: PartialStorage      , gNowVar :: TVar TimeSpec      , gNextTimeout :: TVar TimeSpec +    , gInitConfig :: Ref      }  data Connection addr = Connection @@ -156,6 +170,8 @@ wrDigest = refDigest . wrefPartial  data ChannelState = ChannelNone +                  | ChannelCookieWait +                  | ChannelCookieReceived Cookie                    | ChannelOurRequest (Stored ChannelRequest)                    | ChannelPeerRequest WaitingRef                    | ChannelOurAccept (Stored ChannelAccept) Channel @@ -165,29 +181,35 @@ data ChannelState = ChannelNone  data SentPacket = SentPacket      { spTime :: TimeSpec      , spRetryCount :: Int -    , spAckedBy :: [TransportHeaderItem] +    , spAckedBy :: Maybe (TransportHeaderItem -> Bool)      , spData :: BC.ByteString      }  data ControlRequest addr = RequestConnection addr                           | SendAnnounce addr +                         | UpdateSelfIdentity UnifiedIdentity + +data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest) +                         | ReceivedAnnounce addr RefDigest  erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)                        => UnifiedIdentity                        -> (String -> STM ())                        -> SymFlow (addr, ByteString) -                      -> Flow (ControlRequest addr) (Connection addr) +                      -> Flow (ControlRequest addr) (ControlMessage addr)                        -> IO ()  erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do -    gIdentity <- newTVarIO initialIdentity +    gIdentity <- newTVarIO (initialIdentity, [])      gConnections <- newTVarIO [] -    gStorage <- derivePartialStorage =<< memoryStorage +    mStorage <- memoryStorage +    gStorage <- derivePartialStorage mStorage      startTime <- getTime MonotonicRaw      gNowVar <- newTVarIO startTime      gNextTimeout <- newTVarIO startTime +    gInitConfig <- store mStorage $ (Rec [] :: Object)      let gs = GlobalState {..} @@ -212,31 +234,38 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do  getConnection :: GlobalState addr -> addr -> STM (Connection addr) -getConnection GlobalState {..} addr = do +getConnection gs addr = do +    maybe (newConnection gs addr) return =<< findConnection gs addr + +findConnection :: GlobalState addr -> addr -> STM (Maybe (Connection addr)) +findConnection GlobalState {..} addr = do +    find ((addr==) . cAddress) <$> readTVar gConnections + +newConnection :: GlobalState addr -> addr -> STM (Connection addr) +newConnection GlobalState {..} addr = do      conns <- readTVar gConnections -    case find ((addr==) . cAddress) conns of -        Just conn -> return conn -        Nothing -> do -            let cAddress = addr -            (cDataUp, cDataInternal) <- newFlow -            cChannel <- newTVar ChannelNone -            cSecureOutQueue <- newTQueue -            cSentPackets <- newTVar [] -            let conn = Connection {..} - -            writeTVar gConnections (conn : conns) -            writeFlow gControlFlow conn -            return conn + +    let cAddress = addr +    (cDataUp, cDataInternal) <- newFlow +    cChannel <- newTVar ChannelNone +    cSecureOutQueue <- newTQueue +    cSentPackets <- newTVar [] +    let conn = Connection {..} + +    writeTVar gConnections (conn : conns) +    return conn  processIncomming :: GlobalState addr -> STM (IO ())  processIncomming gs@GlobalState {..} = do      (addr, msg) <- readFlow gDataFlow -    conn@Connection {..} <- getConnection gs addr +    mbconn <- findConnection gs addr -    mbch <- readTVar cChannel >>= return . \case -        ChannelEstablished ch -> Just ch -        ChannelOurAccept _ ch -> Just ch -        _                     -> Nothing +    mbch <- case mbconn of +        Nothing -> return Nothing +        Just conn -> readTVar (cChannel conn) >>= return . \case +            ChannelEstablished ch -> Just ch +            ChannelOurAccept _ ch -> Just ch +            _                     -> Nothing      return $ do          let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict @@ -269,30 +298,112 @@ processIncomming gs@GlobalState {..} = do              Right (secure, objs)                  | hobj:content <- objs                  , Just header@(TransportHeader items) <- transportFromObject hobj -                -> atomically $ do -                    processAcknowledgements gs conn items -                    writeFlow cDataInternal (secure, TransportPacket header content) +                -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case +                    Just (conn@Connection {..}, mbup) -> atomically $ do +                        processAcknowledgements gs conn items +                        case mbup of +                            Just up -> writeFlow cDataInternal (secure, up) +                            Nothing -> return () +                    Nothing -> return ()                  | otherwise -> atomically $ do -                      gLog $ show cAddress ++ ": invalid objects" +                      gLog $ show addr ++ ": invalid objects"                        gLog $ show objs              Left err -> do -                atomically $ gLog $ show cAddress <> ": failed to parse packet: " <> err - +                atomically $ gLog $ show addr <> ": failed to parse packet: " <> err + +processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) +processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if +    | Right conn <- econn, secure +    -> return $ Just (conn, Just packet) + +    | _:_ <- mapMaybe (\case Initiation x -> Just x; _ -> Nothing) header +    , Just ver <- version +    -> do +        cookie <- createCookie gs addr +        atomically $ do +            identity <- fst <$> readTVar gIdentity +            let reply = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader +                    [ CookieSet cookie +                    , AnnounceSelf $ refDigest $ storedRef $ idData identity +                    , ProtocolVersion ver +                    ] +            writeFlow gDataFlow (addr, reply) +        return Nothing + +    | cookie:_ <- mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header +    , Just _ <- version +    , Right conn@Connection {..} <- econn +    -> do +        atomically $ readTVar cChannel >>= \case +            ChannelCookieWait -> do +                writeTVar cChannel $ ChannelCookieReceived cookie +                writeFlow gControlFlow (NewConnection conn mbpid) +                return $ Just (conn, Nothing) +            _ -> return Nothing + +    | Right conn <- econn +    -> return $ Just (conn, Just packet) + +    | cookie:_ <- mapMaybe (\case CookieEcho x -> Just x; _ -> Nothing) header +    , Just _ <- version +    -> verifyCookie gs addr cookie >>= \case +        True -> do +            conn <- atomically $ findConnection gs addr >>= \case +                Just conn -> return conn +                Nothing -> do +                    conn <- newConnection gs addr +                    writeFlow gControlFlow (NewConnection conn mbpid) +                    return conn +            return $ Just (conn, Just packet) +        False -> return Nothing + +    | dgst:_ <- mapMaybe (\case AnnounceSelf x -> Just x; _ -> Nothing) header +    , Just _ <- version +    -> do +        atomically $ do +            (cur, past) <- readTVar gIdentity +            when (not $ dgst `elem` map (refDigest . storedRef . idData) (cur : past)) $ do +                writeFlow gControlFlow $ ReceivedAnnounce addr dgst +        return Nothing + +    | otherwise -> return Nothing + +  where +    addr = either id cAddress econn +    mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header +    version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions + + +createCookie :: GlobalState addr -> addr -> IO Cookie +createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) + +verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool +verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie + +resendBytes :: GlobalState addr -> Connection addr -> SentPacket -> IO () +resendBytes GlobalState {..} Connection {..} sp = do +    now <- getTime MonotonicRaw +    atomically $ do +        when (isJust $ spAckedBy sp) $ do +            modifyTVar' cSentPackets $ (:) sp +                { spTime = now +                , spRetryCount = spRetryCount sp + 1 +                } +        writeFlow gDataFlow (cAddress, spData sp) + +sendBytes :: GlobalState addr -> Connection addr -> ByteString -> Maybe (TransportHeaderItem -> Bool) -> IO () +sendBytes gs conn bs ackedBy = resendBytes gs conn +    SentPacket +        { spTime = undefined +        , spRetryCount = -1 +        , spAckedBy = ackedBy +        , spData = bs +        }  processOutgoing :: forall addr. GlobalState addr -> STM (IO ())  processOutgoing gs@GlobalState {..} = do -    let sendBytes :: Connection addr -> SentPacket -> IO () -        sendBytes Connection {..} sp = do -            now <- getTime MonotonicRaw -            atomically $ do -                when (not $ null $ spAckedBy sp) $ do -                    modifyTVar' cSentPackets $ (:) sp -                        { spTime = now -                        , spRetryCount = spRetryCount sp + 1 -                        } -                writeFlow gDataFlow (cAddress, spData sp)      let sendNextPacket :: Connection addr -> STM (IO ())          sendNextPacket conn@Connection {..} = do @@ -304,16 +415,20 @@ processOutgoing gs@GlobalState {..} = do                      | isJust mbch = readTQueue cSecureOutQueue                      | otherwise = retry -            (secure, packet@(TransportPacket header content), ackedBy) <- +            (secure, packet@(TransportPacket (TransportHeader hitems) content), ackedBy) <-                  checkOutstanding <|> readFlow cDataInternal +            when (isNothing mbch && secure) $ do +                writeTQueue cSecureOutQueue (secure, packet, ackedBy) + +            header <- readTVar cChannel >>= return . TransportHeader . \case +                ChannelCookieReceived cookie -> CookieEcho cookie : ProtocolVersion protocolVersion : hitems +                _                            -> hitems +              let plain = BL.concat $                      (serializeObject $ transportToObject gStorage header)                      : map lazyLoadBytes content -            when (isNothing mbch && secure) $ do -                writeTQueue cSecureOutQueue (secure, packet, ackedBy) -              return $ do                  mbs <- case mbch of                      Just ch -> do @@ -325,13 +440,7 @@ processOutgoing gs@GlobalState {..} = do                              | otherwise -> return $ Just $ BL.toStrict plain                  case mbs of -                    Just bs -> do -                        sendBytes conn $ SentPacket -                            { spTime = undefined -                            , spRetryCount = -1 -                            , spAckedBy = ackedBy -                            , spData = bs -                            } +                    Just bs -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)                      Nothing -> return ()      let retransmitPacket :: Connection addr -> STM (IO ()) @@ -350,26 +459,38 @@ processOutgoing gs@GlobalState {..} = do                     else retry                else do                  writeTVar cSentPackets rest -                return $ sendBytes conn sp +                return $ resendBytes gs conn sp      let handleControlRequests = readFlow gControlFlow >>= \case              RequestConnection addr -> do -                _ <- getConnection gs addr -                identity <- readTVar gIdentity -                let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $ -                        [ AnnounceSelf $ refDigest $ storedRef $ idData identity -                        ] ++ map ProtocolVersion protocolVersions -                writeFlow gDataFlow (addr, packet) -                return $ return () +                conn@Connection {..} <- getConnection gs addr +                identity <- fst <$> readTVar gIdentity +                readTVar cChannel >>= \case +                    ChannelNone -> do +                        let packet = BL.toStrict $ BL.concat +                                [ serializeObject $ transportToObject gStorage $ TransportHeader $ +                                    [ Initiation $ refDigest gInitConfig +                                    , AnnounceSelf $ refDigest $ storedRef $ idData identity +                                    ] ++ map ProtocolVersion protocolVersions +                                , lazyLoadBytes gInitConfig +                                ] +                        writeTVar cChannel ChannelCookieWait +                        return $ sendBytes gs conn packet $ Just $ \case CookieSet {} -> True; _ -> False +                    _ -> return $ return ()              SendAnnounce addr -> do -                identity <- readTVar gIdentity +                identity <- fst <$> readTVar gIdentity                  let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $                          [ AnnounceSelf $ refDigest $ storedRef $ idData identity                          ] ++ map ProtocolVersion protocolVersions                  writeFlow gDataFlow (addr, packet)                  return $ return () +            UpdateSelfIdentity nid -> do +                (cur, past) <- readTVar gIdentity +                writeTVar gIdentity (nid, cur : past) +                return $ return () +      conns <- readTVar gConnections      msum $ concat $          [ map retransmitPacket conns @@ -379,4 +500,4 @@ processOutgoing gs@GlobalState {..} = do  processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM ()  processAcknowledgements GlobalState {} Connection {..} = mapM_ $ \hitem -> do -    modifyTVar' cSentPackets $ filter $ (hitem `notElem`) . spAckedBy +    modifyTVar' cSentPackets $ filter $ \sp -> not (fromJust (spAckedBy sp) hitem) |