summaryrefslogtreecommitdiff
path: root/src/Network/Protocol.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-08-19 12:44:35 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-08-27 12:01:16 +0200
commit558ea4d565799aa2000af0b1fc6d159447c9868b (patch)
treed6b25479e5c466cc26931d0f021c1eb628a9a0cd /src/Network/Protocol.hs
parente4fd563f40bacce33712a84e0b275feb1881ea28 (diff)
Network: connection initiation with cookie
Diffstat (limited to 'src/Network/Protocol.hs')
-rw-r--r--src/Network/Protocol.hs245
1 files changed, 183 insertions, 62 deletions
diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs
index db5e767..bd93386 100644
--- a/src/Network/Protocol.hs
+++ b/src/Network/Protocol.hs
@@ -11,6 +11,7 @@ module Network.Protocol (
ChannelState(..),
ControlRequest(..),
+ ControlMessage(..),
erebosNetworkProtocol,
Connection,
@@ -63,6 +64,9 @@ data TransportHeaderItem
= Acknowledged RefDigest
| Rejected RefDigest
| ProtocolVersion Text
+ | Initiation RefDigest
+ | CookieSet Cookie
+ | CookieEcho Cookie
| DataRequest RefDigest
| DataResponse RefDigest
| AnnounceSelf RefDigest
@@ -73,12 +77,18 @@ data TransportHeaderItem
| ServiceRef RefDigest
deriving (Eq)
+newtype Cookie = Cookie ByteString
+ deriving (Eq)
+
transportToObject :: PartialStorage -> TransportHeader -> PartialObject
transportToObject st (TransportHeader items) = Rec $ map single items
where single = \case
Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst)
Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst)
ProtocolVersion ver -> (BC.pack "VER", RecText ver)
+ Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst)
+ CookieSet (Cookie bytes) -> (BC.pack "CKS", RecBinary bytes)
+ CookieEcho (Cookie bytes) -> (BC.pack "CKE", RecBinary bytes)
DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst)
DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst)
AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst)
@@ -96,6 +106,9 @@ transportFromObject (Rec items) = case catMaybes $ map single items of
| name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref
| name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref
| name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver
+ | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref
+ | name == BC.pack "CKS", RecBinary bytes <- content -> Just $ CookieSet (Cookie bytes)
+ | name == BC.pack "CKE", RecBinary bytes <- content -> Just $ CookieEcho (Cookie bytes)
| name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref
| name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref
| name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref
@@ -109,14 +122,15 @@ transportFromObject _ = Nothing
data GlobalState addr = (Eq addr, Show addr) => GlobalState
- { gIdentity :: TVar UnifiedIdentity
+ { gIdentity :: TVar (UnifiedIdentity, [UnifiedIdentity])
, gConnections :: TVar [Connection addr]
, gDataFlow :: SymFlow (addr, ByteString)
- , gControlFlow :: Flow (ControlRequest addr) (Connection addr)
+ , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr)
, gLog :: String -> STM ()
, gStorage :: PartialStorage
, gNowVar :: TVar TimeSpec
, gNextTimeout :: TVar TimeSpec
+ , gInitConfig :: Ref
}
data Connection addr = Connection
@@ -156,6 +170,8 @@ wrDigest = refDigest . wrefPartial
data ChannelState = ChannelNone
+ | ChannelCookieWait
+ | ChannelCookieReceived Cookie
| ChannelOurRequest (Stored ChannelRequest)
| ChannelPeerRequest WaitingRef
| ChannelOurAccept (Stored ChannelAccept) Channel
@@ -165,29 +181,35 @@ data ChannelState = ChannelNone
data SentPacket = SentPacket
{ spTime :: TimeSpec
, spRetryCount :: Int
- , spAckedBy :: [TransportHeaderItem]
+ , spAckedBy :: Maybe (TransportHeaderItem -> Bool)
, spData :: BC.ByteString
}
data ControlRequest addr = RequestConnection addr
| SendAnnounce addr
+ | UpdateSelfIdentity UnifiedIdentity
+
+data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest)
+ | ReceivedAnnounce addr RefDigest
erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
=> UnifiedIdentity
-> (String -> STM ())
-> SymFlow (addr, ByteString)
- -> Flow (ControlRequest addr) (Connection addr)
+ -> Flow (ControlRequest addr) (ControlMessage addr)
-> IO ()
erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
- gIdentity <- newTVarIO initialIdentity
+ gIdentity <- newTVarIO (initialIdentity, [])
gConnections <- newTVarIO []
- gStorage <- derivePartialStorage =<< memoryStorage
+ mStorage <- memoryStorage
+ gStorage <- derivePartialStorage mStorage
startTime <- getTime MonotonicRaw
gNowVar <- newTVarIO startTime
gNextTimeout <- newTVarIO startTime
+ gInitConfig <- store mStorage $ (Rec [] :: Object)
let gs = GlobalState {..}
@@ -212,31 +234,38 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
getConnection :: GlobalState addr -> addr -> STM (Connection addr)
-getConnection GlobalState {..} addr = do
+getConnection gs addr = do
+ maybe (newConnection gs addr) return =<< findConnection gs addr
+
+findConnection :: GlobalState addr -> addr -> STM (Maybe (Connection addr))
+findConnection GlobalState {..} addr = do
+ find ((addr==) . cAddress) <$> readTVar gConnections
+
+newConnection :: GlobalState addr -> addr -> STM (Connection addr)
+newConnection GlobalState {..} addr = do
conns <- readTVar gConnections
- case find ((addr==) . cAddress) conns of
- Just conn -> return conn
- Nothing -> do
- let cAddress = addr
- (cDataUp, cDataInternal) <- newFlow
- cChannel <- newTVar ChannelNone
- cSecureOutQueue <- newTQueue
- cSentPackets <- newTVar []
- let conn = Connection {..}
-
- writeTVar gConnections (conn : conns)
- writeFlow gControlFlow conn
- return conn
+
+ let cAddress = addr
+ (cDataUp, cDataInternal) <- newFlow
+ cChannel <- newTVar ChannelNone
+ cSecureOutQueue <- newTQueue
+ cSentPackets <- newTVar []
+ let conn = Connection {..}
+
+ writeTVar gConnections (conn : conns)
+ return conn
processIncomming :: GlobalState addr -> STM (IO ())
processIncomming gs@GlobalState {..} = do
(addr, msg) <- readFlow gDataFlow
- conn@Connection {..} <- getConnection gs addr
+ mbconn <- findConnection gs addr
- mbch <- readTVar cChannel >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ ch -> Just ch
- _ -> Nothing
+ mbch <- case mbconn of
+ Nothing -> return Nothing
+ Just conn -> readTVar (cChannel conn) >>= return . \case
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
return $ do
let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict
@@ -269,30 +298,112 @@ processIncomming gs@GlobalState {..} = do
Right (secure, objs)
| hobj:content <- objs
, Just header@(TransportHeader items) <- transportFromObject hobj
- -> atomically $ do
- processAcknowledgements gs conn items
- writeFlow cDataInternal (secure, TransportPacket header content)
+ -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case
+ Just (conn@Connection {..}, mbup) -> atomically $ do
+ processAcknowledgements gs conn items
+ case mbup of
+ Just up -> writeFlow cDataInternal (secure, up)
+ Nothing -> return ()
+ Nothing -> return ()
| otherwise -> atomically $ do
- gLog $ show cAddress ++ ": invalid objects"
+ gLog $ show addr ++ ": invalid objects"
gLog $ show objs
Left err -> do
- atomically $ gLog $ show cAddress <> ": failed to parse packet: " <> err
-
+ atomically $ gLog $ show addr <> ": failed to parse packet: " <> err
+
+processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a)))
+processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if
+ | Right conn <- econn, secure
+ -> return $ Just (conn, Just packet)
+
+ | _:_ <- mapMaybe (\case Initiation x -> Just x; _ -> Nothing) header
+ , Just ver <- version
+ -> do
+ cookie <- createCookie gs addr
+ atomically $ do
+ identity <- fst <$> readTVar gIdentity
+ let reply = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader
+ [ CookieSet cookie
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
+ , ProtocolVersion ver
+ ]
+ writeFlow gDataFlow (addr, reply)
+ return Nothing
+
+ | cookie:_ <- mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header
+ , Just _ <- version
+ , Right conn@Connection {..} <- econn
+ -> do
+ atomically $ readTVar cChannel >>= \case
+ ChannelCookieWait -> do
+ writeTVar cChannel $ ChannelCookieReceived cookie
+ writeFlow gControlFlow (NewConnection conn mbpid)
+ return $ Just (conn, Nothing)
+ _ -> return Nothing
+
+ | Right conn <- econn
+ -> return $ Just (conn, Just packet)
+
+ | cookie:_ <- mapMaybe (\case CookieEcho x -> Just x; _ -> Nothing) header
+ , Just _ <- version
+ -> verifyCookie gs addr cookie >>= \case
+ True -> do
+ conn <- atomically $ findConnection gs addr >>= \case
+ Just conn -> return conn
+ Nothing -> do
+ conn <- newConnection gs addr
+ writeFlow gControlFlow (NewConnection conn mbpid)
+ return conn
+ return $ Just (conn, Just packet)
+ False -> return Nothing
+
+ | dgst:_ <- mapMaybe (\case AnnounceSelf x -> Just x; _ -> Nothing) header
+ , Just _ <- version
+ -> do
+ atomically $ do
+ (cur, past) <- readTVar gIdentity
+ when (not $ dgst `elem` map (refDigest . storedRef . idData) (cur : past)) $ do
+ writeFlow gControlFlow $ ReceivedAnnounce addr dgst
+ return Nothing
+
+ | otherwise -> return Nothing
+
+ where
+ addr = either id cAddress econn
+ mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header
+ version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions
+
+
+createCookie :: GlobalState addr -> addr -> IO Cookie
+createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr)
+
+verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool
+verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie
+
+resendBytes :: GlobalState addr -> Connection addr -> SentPacket -> IO ()
+resendBytes GlobalState {..} Connection {..} sp = do
+ now <- getTime MonotonicRaw
+ atomically $ do
+ when (isJust $ spAckedBy sp) $ do
+ modifyTVar' cSentPackets $ (:) sp
+ { spTime = now
+ , spRetryCount = spRetryCount sp + 1
+ }
+ writeFlow gDataFlow (cAddress, spData sp)
+
+sendBytes :: GlobalState addr -> Connection addr -> ByteString -> Maybe (TransportHeaderItem -> Bool) -> IO ()
+sendBytes gs conn bs ackedBy = resendBytes gs conn
+ SentPacket
+ { spTime = undefined
+ , spRetryCount = -1
+ , spAckedBy = ackedBy
+ , spData = bs
+ }
processOutgoing :: forall addr. GlobalState addr -> STM (IO ())
processOutgoing gs@GlobalState {..} = do
- let sendBytes :: Connection addr -> SentPacket -> IO ()
- sendBytes Connection {..} sp = do
- now <- getTime MonotonicRaw
- atomically $ do
- when (not $ null $ spAckedBy sp) $ do
- modifyTVar' cSentPackets $ (:) sp
- { spTime = now
- , spRetryCount = spRetryCount sp + 1
- }
- writeFlow gDataFlow (cAddress, spData sp)
let sendNextPacket :: Connection addr -> STM (IO ())
sendNextPacket conn@Connection {..} = do
@@ -304,16 +415,20 @@ processOutgoing gs@GlobalState {..} = do
| isJust mbch = readTQueue cSecureOutQueue
| otherwise = retry
- (secure, packet@(TransportPacket header content), ackedBy) <-
+ (secure, packet@(TransportPacket (TransportHeader hitems) content), ackedBy) <-
checkOutstanding <|> readFlow cDataInternal
+ when (isNothing mbch && secure) $ do
+ writeTQueue cSecureOutQueue (secure, packet, ackedBy)
+
+ header <- readTVar cChannel >>= return . TransportHeader . \case
+ ChannelCookieReceived cookie -> CookieEcho cookie : ProtocolVersion protocolVersion : hitems
+ _ -> hitems
+
let plain = BL.concat $
(serializeObject $ transportToObject gStorage header)
: map lazyLoadBytes content
- when (isNothing mbch && secure) $ do
- writeTQueue cSecureOutQueue (secure, packet, ackedBy)
-
return $ do
mbs <- case mbch of
Just ch -> do
@@ -325,13 +440,7 @@ processOutgoing gs@GlobalState {..} = do
| otherwise -> return $ Just $ BL.toStrict plain
case mbs of
- Just bs -> do
- sendBytes conn $ SentPacket
- { spTime = undefined
- , spRetryCount = -1
- , spAckedBy = ackedBy
- , spData = bs
- }
+ Just bs -> sendBytes gs conn bs $ guard (not $ null ackedBy) >> Just (`elem` ackedBy)
Nothing -> return ()
let retransmitPacket :: Connection addr -> STM (IO ())
@@ -350,26 +459,38 @@ processOutgoing gs@GlobalState {..} = do
else retry
else do
writeTVar cSentPackets rest
- return $ sendBytes conn sp
+ return $ resendBytes gs conn sp
let handleControlRequests = readFlow gControlFlow >>= \case
RequestConnection addr -> do
- _ <- getConnection gs addr
- identity <- readTVar gIdentity
- let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $
- [ AnnounceSelf $ refDigest $ storedRef $ idData identity
- ] ++ map ProtocolVersion protocolVersions
- writeFlow gDataFlow (addr, packet)
- return $ return ()
+ conn@Connection {..} <- getConnection gs addr
+ identity <- fst <$> readTVar gIdentity
+ readTVar cChannel >>= \case
+ ChannelNone -> do
+ let packet = BL.toStrict $ BL.concat
+ [ serializeObject $ transportToObject gStorage $ TransportHeader $
+ [ Initiation $ refDigest gInitConfig
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
+ ] ++ map ProtocolVersion protocolVersions
+ , lazyLoadBytes gInitConfig
+ ]
+ writeTVar cChannel ChannelCookieWait
+ return $ sendBytes gs conn packet $ Just $ \case CookieSet {} -> True; _ -> False
+ _ -> return $ return ()
SendAnnounce addr -> do
- identity <- readTVar gIdentity
+ identity <- fst <$> readTVar gIdentity
let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $
[ AnnounceSelf $ refDigest $ storedRef $ idData identity
] ++ map ProtocolVersion protocolVersions
writeFlow gDataFlow (addr, packet)
return $ return ()
+ UpdateSelfIdentity nid -> do
+ (cur, past) <- readTVar gIdentity
+ writeTVar gIdentity (nid, cur : past)
+ return $ return ()
+
conns <- readTVar gConnections
msum $ concat $
[ map retransmitPacket conns
@@ -379,4 +500,4 @@ processOutgoing gs@GlobalState {..} = do
processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM ()
processAcknowledgements GlobalState {} Connection {..} = mapM_ $ \hitem -> do
- modifyTVar' cSentPackets $ filter $ (hitem `notElem`) . spAckedBy
+ modifyTVar' cSentPackets $ filter $ \sp -> not (fromJust (spAckedBy sp) hitem)