diff options
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 223 |
1 files changed, 176 insertions, 47 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index a009ad1..025f52c 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -3,6 +3,7 @@ module Erebos.Network.Protocol ( transportToObject, TransportHeader(..), TransportHeaderItem(..), + ServiceID(..), SecurityRequirement(..), WaitingRef(..), @@ -22,7 +23,8 @@ module Erebos.Network.Protocol ( connSetChannel, connClose, - RawStreamReader, RawStreamWriter, + RawStreamReader(..), RawStreamWriter(..), + StreamPacket(..), connAddWriteStream, connAddReadStream, readStreamToList, @@ -36,11 +38,22 @@ import Control.Applicative import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM +import Control.Exception import Control.Monad import Control.Monad.Except import Control.Monad.Trans +import Crypto.Cipher.ChaChaPoly1305 qualified as C +import Crypto.MAC.Poly1305 qualified as C (Auth(..), authTag) +import Crypto.Error +import Crypto.Random + +import Data.Binary +import Data.Binary.Get +import Data.Binary.Put import Data.Bits +import Data.ByteArray (Bytes, ScrubbedBytes) +import Data.ByteArray qualified as BA import Data.ByteString (ByteString) import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC @@ -51,15 +64,16 @@ import Data.Maybe import Data.Text (Text) import Data.Text qualified as T import Data.Void -import Data.Word import System.Clock -import Erebos.Channel import Erebos.Flow import Erebos.Identity -import Erebos.Service +import Erebos.Network.Channel +import Erebos.Object +import Erebos.Storable import Erebos.Storage +import Erebos.UUID (UUID) protocolVersion :: Text @@ -68,6 +82,9 @@ protocolVersion = T.pack "0.1" protocolVersions :: [Text] protocolVersions = [protocolVersion] +keepAliveInternal :: TimeSpec +keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int) + data TransportPacket a = TransportPacket TransportHeader [a] @@ -93,6 +110,9 @@ data TransportHeaderItem | StreamOpen Word8 deriving (Eq, Show) +newtype ServiceID = ServiceID UUID + deriving (Eq, Ord, Show, StorableUUID) + newtype Cookie = Cookie ByteString deriving (Eq, Show) @@ -101,6 +121,35 @@ data SecurityRequirement = PlaintextOnly | EncryptedOnly deriving (Eq, Ord) +data ParsedCookie = ParsedCookie + { cookieNonce :: C.Nonce + , cookieValidity :: Word32 + , cookieContent :: ByteString + , cookieMac :: C.Auth + } + +instance Eq ParsedCookie where + (==) = (==) `on` (\c -> ( BA.convert (cookieNonce c) :: ByteString, cookieValidity c, cookieContent c, cookieMac c )) + +instance Show ParsedCookie where + show ParsedCookie {..} = show (nonce, cookieValidity, cookieContent, mac) + where C.Auth mac = cookieMac + nonce = BA.convert cookieNonce :: ByteString + +instance Binary ParsedCookie where + put ParsedCookie {..} = do + putByteString $ BA.convert cookieNonce + putWord32be cookieValidity + putByteString $ BA.convert cookieMac + putByteString cookieContent + + get = do + Just cookieNonce <- maybeCryptoError . C.nonce12 <$> getByteString 12 + cookieValidity <- getWord32be + Just cookieMac <- maybeCryptoError . C.authTag <$> getByteString 16 + cookieContent <- BL.toStrict <$> getRemainingLazyByteString + return ParsedCookie {..} + isHeaderItemAcknowledged :: TransportHeaderItem -> Bool isHeaderItemAcknowledged = \case Acknowledged {} -> False @@ -165,9 +214,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () , gStorage :: PartialStorage + , gStartTime :: TimeSpec , gNowVar :: TVar TimeSpec , gNextTimeout :: TVar TimeSpec , gInitConfig :: Ref + , gCookieKey :: ScrubbedBytes + , gCookieStartTime :: Word32 } data Connection addr = Connection @@ -186,6 +238,7 @@ data Connection addr = Connection , cReservedPackets :: TVar Int , cSentPackets :: TVar [SentPacket] , cToAcknowledge :: TVar [Integer] + , cNextKeepAlive :: TVar (Maybe TimeSpec) , cInStreams :: TVar [(Word8, Stream)] , cOutStreams :: TVar [(Word8, Stream)] } @@ -236,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do runExceptT $ do ((streamNumber, stream), outStreams') <- doInsert 1 outStreams lift $ writeTVar cOutStreams outStreams' - return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + return + ( StreamOpen streamNumber + , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream) + , go cGlobalState streamNumber stream + ) where go gs@GlobalState {..} streamNumber stream = do @@ -276,7 +333,7 @@ connAddWriteStream conn@Connection {..} = do Right (ctext, counter) -> do let isAcked = True return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing Nothing | secure -> return Nothing | otherwise -> return $ Just (plain, plainAckedBy) @@ -309,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do sNextSequence <- newTVar 0 sWaitingForAck <- newTVar 0 let stream = Stream {..} - return (stream, (streamNumber, stream) : streams) - (stream, inStreams') <- doInsert inStreams + return ( streamNumber, stream, (streamNumber, stream) : streams ) + ( num, stream, inStreams' ) <- doInsert inStreams writeTVar cInStreams inStreams' - return $ sFlowOut stream + return $ RawStreamReader (fromIntegral num) (sFlowOut stream) -type RawStreamReader = Flow StreamPacket Void -type RawStreamWriter = Flow Void StreamPacket +data RawStreamReader = RawStreamReader + { rsrNum :: Int + , rsrFlow :: Flow StreamPacket Void + } + +data RawStreamWriter = RawStreamWriter + { rswNum :: Int + , rswFlow :: Flow Void StreamPacket + } data Stream = Stream { sState :: TVar StreamState @@ -351,20 +415,20 @@ streamClosed Connection {..} snum = atomically $ do modifyTVar' cOutStreams $ filter ((snum /=) . fst) readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) -readStreamToList stream = readFlowIO stream >>= \case +readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) -readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject]) +readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except ErebosError [PartialObject]) readObjectsFromStream st stream = do (seqEnd, list) <- readStreamToList stream let validate s ((s', bytes) : rest) | s == s' = (bytes : ) <$> validate (s + 1) rest | s > s' = validate s rest - | otherwise = throwError "missing object chunk" + | otherwise = throwOtherError "missing object chunk" validate s [] | s == seqEnd = return [] - | otherwise = throwError "content length mismatch" + | otherwise = throwOtherError "content length mismatch" return $ do content <- BL.fromChunks <$> validate 0 list deserializeObjects st content @@ -373,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () writeByteStringToStream stream = go 0 where go seqNum bstr - | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum | otherwise = do let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU - writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur) go (seqNum + 1) rest @@ -387,7 +451,7 @@ data WaitingRef = WaitingRef , wrefStatus :: TVar (Either [RefDigest] Ref) } -type WaitingRefCallback = ExceptT String IO () +type WaitingRefCallback = ExceptT ErebosError IO () wrDigest :: WaitingRef -> RefDigest wrDigest = refDigest . wrefPartial @@ -440,15 +504,18 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do mStorage <- memoryStorage gStorage <- derivePartialStorage mStorage - startTime <- getTime MonotonicRaw - gNowVar <- newTVarIO startTime - gNextTimeout <- newTVarIO startTime + gStartTime <- getTime Monotonic + gNowVar <- newTVarIO gStartTime + gNextTimeout <- newTVarIO gStartTime gInitConfig <- store mStorage $ (Rec [] :: Object) + gCookieKey <- getRandomBytes 32 + gCookieStartTime <- runGet getWord32host . BL.pack . BA.unpack @ScrubbedBytes <$> getRandomBytes 4 + let gs = GlobalState {..} let signalTimeouts = forever $ do - now <- getTime MonotonicRaw + now <- getTime Monotonic next <- atomically $ do writeTVar gNowVar now readTVar gNextTimeout @@ -462,8 +529,10 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do race_ (waitTill next) waitForUpdate - race_ signalTimeouts $ forever $ join $ atomically $ - passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + race_ signalTimeouts $ forever $ do + io <- atomically $ do + passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + catch io $ \(e :: SomeException) -> atomically $ gLog $ "exception during network protocol handling: " <> show e getConnection :: GlobalState addr -> addr -> STM (Connection addr) @@ -487,6 +556,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cReservedPackets <- newTVar 0 cSentPackets <- newTVar [] cToAcknowledge <- newTVar [] + cNextKeepAlive <- newTVar Nothing cInStreams <- newTVar [] cOutStreams <- newTVar [] let conn = Connection {..} @@ -520,7 +590,7 @@ processIncoming gs@GlobalState {..} = do let parse = case B.uncons msg of Just (b, enc) | b .&. 0xE0 == 0x80 -> do - ch <- maybe (throwError "unexpected encrypted packet") return mbch + ch <- maybe (throwOtherError "unexpected encrypted packet") return mbch (dec, counter) <- channelDecrypt ch enc case B.uncons dec of @@ -535,19 +605,20 @@ processIncoming gs@GlobalState {..} = do return $ Right (snum, seq8, content, counter) Just (_, _) -> do - throwError "unexpected stream header" + throwOtherError "unexpected stream header" Nothing -> do - throwError "empty decrypted content" + throwOtherError "empty decrypted content" | b .&. 0xE0 == 0x60 -> do objs <- deserialize msg return $ Left (False, objs, Nothing) - | otherwise -> throwError "invalid packet" + | otherwise -> throwOtherError "invalid packet" - Nothing -> throwError "empty packet" + Nothing -> throwOtherError "empty packet" + now <- getTime Monotonic runExceptT parse >>= \case Right (Left (secure, objs, mbcounter)) | hobj:content <- objs @@ -562,6 +633,7 @@ processIncoming gs@GlobalState {..} = do case mbup of Just up -> putTMVar gNextUp (conn, (secure, up)) Nothing -> return () + updateKeepAlive conn now processAcknowledgements gs conn items ioAfter Nothing -> return () @@ -571,8 +643,9 @@ processIncoming gs@GlobalState {..} = do gLog $ show objs Right (Right (snum, seq8, content, counter)) - | Just Connection {..} <- mbconn + | Just conn@Connection {..} <- mbconn -> atomically $ do + updateKeepAlive conn now (lookup snum <$> readTVar cInStreams) >>= \case Nothing -> gLog $ "unexpected stream number " ++ show snum @@ -594,7 +667,7 @@ processIncoming gs@GlobalState {..} = do atomically $ gLog $ show addr <> ": stream packet without connection" Left err -> do - atomically $ gLog $ show addr <> ": failed to parse packet: " <> err + atomically $ gLog $ show addr <> ": failed to parse packet: " <> showErebosError err processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if @@ -694,11 +767,38 @@ generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie -createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) +createCookie GlobalState {..} addr = do + (nonceBytes :: Bytes) <- getRandomBytes 12 + validUntil <- (fromNanoSecs (60 * 10^(9 :: Int)) +) <$> getTime Monotonic + let validSecondsFromStart = fromIntegral $ toNanoSecs (validUntil - gStartTime) `div` (10^(9 :: Int)) + cookieValidity = validSecondsFromStart - gCookieStartTime + plainContent = BC.pack (show addr) + throwCryptoErrorIO $ do + cookieNonce <- C.nonce12 nonceBytes + st1 <- C.initialize gCookieKey cookieNonce + let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1 + (cookieContent, st3) = C.encrypt plainContent st2 + cookieMac = C.finalize st3 + return $ Cookie $ BL.toStrict $ encode $ ParsedCookie {..} verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool -verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie - +verifyCookie GlobalState {..} addr (Cookie cookie) = do + ctime <- getTime Monotonic + return $ fromMaybe False $ do + ( _, _, ParsedCookie {..} ) <- either (const Nothing) Just $ decodeOrFail $ BL.fromStrict cookie + maybeCryptoError $ do + st1 <- C.initialize gCookieKey cookieNonce + let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1 + (plainContent, st3) = C.decrypt cookieContent st2 + mac = C.finalize st3 + + validSecondsFromStart = fromIntegral $ cookieValidity + gCookieStartTime + validUntil = gStartTime + fromNanoSecs (validSecondsFromStart * (10^(9 :: Int))) + return $ and + [ mac == cookieMac + , ctime <= validUntil + , show addr == BC.unpack plainContent + ] reservePacket :: Connection addr -> STM ReservedToSend reservePacket conn@Connection {..} = do @@ -713,9 +813,9 @@ reservePacket conn@Connection {..} = do return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () -resendBytes Connection {..} reserved sp = do +resendBytes conn@Connection {..} reserved sp = do let GlobalState {..} = cGlobalState - now <- getTime MonotonicRaw + now <- getTime Monotonic atomically $ do when (isJust reserved) $ do modifyTVar' cReservedPackets (subtract 1) @@ -726,6 +826,7 @@ resendBytes Connection {..} reserved sp = do , spRetryCount = spRetryCount sp + 1 } writeFlow gDataFlow (cAddress, spData sp) + updateKeepAlive conn now sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () sendBytes conn reserved bs = resendBytes conn reserved @@ -738,6 +839,12 @@ sendBytes conn reserved bs = resendBytes conn reserved , spData = bs } +updateKeepAlive :: Connection addr -> TimeSpec -> STM () +updateKeepAlive Connection {..} now = do + let next = now + keepAliveInternal + writeTVar cNextKeepAlive $ Just next + + processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) processOutgoing gs@GlobalState {..} = do @@ -777,11 +884,12 @@ processOutgoing gs@GlobalState {..} = do let onAck = sequence_ $ map (streamAccepted conn) $ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let mkPlain extraHeaders = - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems - in BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders + | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems = + BL.concat $ + (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems) + : map lazyLoadBytes content + | otherwise = BL.empty let usePlaintext = do plain <- mkPlain <$> generateCookieHeaders conn channel @@ -793,7 +901,7 @@ processOutgoing gs@GlobalState {..} = do Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing mbs <- case (secure, mbch) of @@ -811,6 +919,13 @@ processOutgoing gs@GlobalState {..} = do sendBytes conn mbReserved' bs Nothing -> return () + let waitUntil :: TimeSpec -> TimeSpec -> STM () + waitUntil now till = do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || till < nextTimeout + then writeTVar gNextTimeout till + else retry + let retransmitPacket :: Connection addr -> STM (IO ()) retransmitPacket conn@Connection {..} = do now <- readTVar gNowVar @@ -819,11 +934,8 @@ processOutgoing gs@GlobalState {..} = do _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 if | now < nextTry -> do - nextTimeout <- readTVar gNextTimeout - if nextTimeout <= now || nextTry < nextTimeout - then do writeTVar gNextTimeout nextTry - return $ return () - else retry + waitUntil now nextTry + return $ return () | spRetryCount sp < 2 -> do reserved <- reservePacket conn writeTVar cSentPackets rest @@ -863,11 +975,28 @@ processOutgoing gs@GlobalState {..} = do writeTVar gIdentity (nid, cur : past) return $ return () + let sendKeepAlive :: Connection addr -> STM (IO ()) + sendKeepAlive Connection {..} = do + readTVar cNextKeepAlive >>= \case + Nothing -> retry + Just next -> do + now <- readTVar gNowVar + if next <= now + then do + writeTVar cNextKeepAlive Nothing + identity <- fst <$> readTVar gIdentity + let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] + writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], []) + else do + waitUntil now next + return $ return () + conns <- readTVar gConnections msum $ concat $ [ map retransmitPacket conns , map sendNextPacket conns , [ handleControlRequests ] + , map sendKeepAlive conns ] processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ()) |