summaryrefslogtreecommitdiff
path: root/src/Erebos/Network/Protocol.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network/Protocol.hs')
-rw-r--r--src/Erebos/Network/Protocol.hs223
1 files changed, 176 insertions, 47 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs
index a009ad1..025f52c 100644
--- a/src/Erebos/Network/Protocol.hs
+++ b/src/Erebos/Network/Protocol.hs
@@ -3,6 +3,7 @@ module Erebos.Network.Protocol (
transportToObject,
TransportHeader(..),
TransportHeaderItem(..),
+ ServiceID(..),
SecurityRequirement(..),
WaitingRef(..),
@@ -22,7 +23,8 @@ module Erebos.Network.Protocol (
connSetChannel,
connClose,
- RawStreamReader, RawStreamWriter,
+ RawStreamReader(..), RawStreamWriter(..),
+ StreamPacket(..),
connAddWriteStream,
connAddReadStream,
readStreamToList,
@@ -36,11 +38,22 @@ import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
+import Control.Exception
import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans
+import Crypto.Cipher.ChaChaPoly1305 qualified as C
+import Crypto.MAC.Poly1305 qualified as C (Auth(..), authTag)
+import Crypto.Error
+import Crypto.Random
+
+import Data.Binary
+import Data.Binary.Get
+import Data.Binary.Put
import Data.Bits
+import Data.ByteArray (Bytes, ScrubbedBytes)
+import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC
@@ -51,15 +64,16 @@ import Data.Maybe
import Data.Text (Text)
import Data.Text qualified as T
import Data.Void
-import Data.Word
import System.Clock
-import Erebos.Channel
import Erebos.Flow
import Erebos.Identity
-import Erebos.Service
+import Erebos.Network.Channel
+import Erebos.Object
+import Erebos.Storable
import Erebos.Storage
+import Erebos.UUID (UUID)
protocolVersion :: Text
@@ -68,6 +82,9 @@ protocolVersion = T.pack "0.1"
protocolVersions :: [Text]
protocolVersions = [protocolVersion]
+keepAliveInternal :: TimeSpec
+keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int)
+
data TransportPacket a = TransportPacket TransportHeader [a]
@@ -93,6 +110,9 @@ data TransportHeaderItem
| StreamOpen Word8
deriving (Eq, Show)
+newtype ServiceID = ServiceID UUID
+ deriving (Eq, Ord, Show, StorableUUID)
+
newtype Cookie = Cookie ByteString
deriving (Eq, Show)
@@ -101,6 +121,35 @@ data SecurityRequirement = PlaintextOnly
| EncryptedOnly
deriving (Eq, Ord)
+data ParsedCookie = ParsedCookie
+ { cookieNonce :: C.Nonce
+ , cookieValidity :: Word32
+ , cookieContent :: ByteString
+ , cookieMac :: C.Auth
+ }
+
+instance Eq ParsedCookie where
+ (==) = (==) `on` (\c -> ( BA.convert (cookieNonce c) :: ByteString, cookieValidity c, cookieContent c, cookieMac c ))
+
+instance Show ParsedCookie where
+ show ParsedCookie {..} = show (nonce, cookieValidity, cookieContent, mac)
+ where C.Auth mac = cookieMac
+ nonce = BA.convert cookieNonce :: ByteString
+
+instance Binary ParsedCookie where
+ put ParsedCookie {..} = do
+ putByteString $ BA.convert cookieNonce
+ putWord32be cookieValidity
+ putByteString $ BA.convert cookieMac
+ putByteString cookieContent
+
+ get = do
+ Just cookieNonce <- maybeCryptoError . C.nonce12 <$> getByteString 12
+ cookieValidity <- getWord32be
+ Just cookieMac <- maybeCryptoError . C.authTag <$> getByteString 16
+ cookieContent <- BL.toStrict <$> getRemainingLazyByteString
+ return ParsedCookie {..}
+
isHeaderItemAcknowledged :: TransportHeaderItem -> Bool
isHeaderItemAcknowledged = \case
Acknowledged {} -> False
@@ -165,9 +214,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState
, gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject))
, gLog :: String -> STM ()
, gStorage :: PartialStorage
+ , gStartTime :: TimeSpec
, gNowVar :: TVar TimeSpec
, gNextTimeout :: TVar TimeSpec
, gInitConfig :: Ref
+ , gCookieKey :: ScrubbedBytes
+ , gCookieStartTime :: Word32
}
data Connection addr = Connection
@@ -186,6 +238,7 @@ data Connection addr = Connection
, cReservedPackets :: TVar Int
, cSentPackets :: TVar [SentPacket]
, cToAcknowledge :: TVar [Integer]
+ , cNextKeepAlive :: TVar (Maybe TimeSpec)
, cInStreams :: TVar [(Word8, Stream)]
, cOutStreams :: TVar [(Word8, Stream)]
}
@@ -236,7 +289,11 @@ connAddWriteStream conn@Connection {..} = do
runExceptT $ do
((streamNumber, stream), outStreams') <- doInsert 1 outStreams
lift $ writeTVar cOutStreams outStreams'
- return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream)
+ return
+ ( StreamOpen streamNumber
+ , RawStreamWriter (fromIntegral streamNumber) (sFlowIn stream)
+ , go cGlobalState streamNumber stream
+ )
where
go gs@GlobalState {..} streamNumber stream = do
@@ -276,7 +333,7 @@ connAddWriteStream conn@Connection {..} = do
Right (ctext, counter) -> do
let isAcked = True
return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
- Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err
return Nothing
Nothing | secure -> return Nothing
| otherwise -> return $ Just (plain, plainAckedBy)
@@ -309,14 +366,21 @@ connAddReadStream Connection {..} streamNumber = do
sNextSequence <- newTVar 0
sWaitingForAck <- newTVar 0
let stream = Stream {..}
- return (stream, (streamNumber, stream) : streams)
- (stream, inStreams') <- doInsert inStreams
+ return ( streamNumber, stream, (streamNumber, stream) : streams )
+ ( num, stream, inStreams' ) <- doInsert inStreams
writeTVar cInStreams inStreams'
- return $ sFlowOut stream
+ return $ RawStreamReader (fromIntegral num) (sFlowOut stream)
-type RawStreamReader = Flow StreamPacket Void
-type RawStreamWriter = Flow Void StreamPacket
+data RawStreamReader = RawStreamReader
+ { rsrNum :: Int
+ , rsrFlow :: Flow StreamPacket Void
+ }
+
+data RawStreamWriter = RawStreamWriter
+ { rswNum :: Int
+ , rswFlow :: Flow Void StreamPacket
+ }
data Stream = Stream
{ sState :: TVar StreamState
@@ -351,20 +415,20 @@ streamClosed Connection {..} snum = atomically $ do
modifyTVar' cOutStreams $ filter ((snum /=) . fst)
readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)])
-readStreamToList stream = readFlowIO stream >>= \case
+readStreamToList stream = readFlowIO (rsrFlow stream) >>= \case
StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream
StreamClosed sqEnd -> return (sqEnd, [])
-readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject])
+readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except ErebosError [PartialObject])
readObjectsFromStream st stream = do
(seqEnd, list) <- readStreamToList stream
let validate s ((s', bytes) : rest)
| s == s' = (bytes : ) <$> validate (s + 1) rest
| s > s' = validate s rest
- | otherwise = throwError "missing object chunk"
+ | otherwise = throwOtherError "missing object chunk"
validate s []
| s == seqEnd = return []
- | otherwise = throwError "content length mismatch"
+ | otherwise = throwOtherError "content length mismatch"
return $ do
content <- BL.fromChunks <$> validate 0 list
deserializeObjects st content
@@ -373,10 +437,10 @@ writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO ()
writeByteStringToStream stream = go 0
where
go seqNum bstr
- | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum
+ | BL.null bstr = writeFlowIO (rswFlow stream) $ StreamClosed seqNum
| otherwise = do
let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU
- writeFlowIO stream $ StreamData seqNum (BL.toStrict cur)
+ writeFlowIO (rswFlow stream) $ StreamData seqNum (BL.toStrict cur)
go (seqNum + 1) rest
@@ -387,7 +451,7 @@ data WaitingRef = WaitingRef
, wrefStatus :: TVar (Either [RefDigest] Ref)
}
-type WaitingRefCallback = ExceptT String IO ()
+type WaitingRefCallback = ExceptT ErebosError IO ()
wrDigest :: WaitingRef -> RefDigest
wrDigest = refDigest . wrefPartial
@@ -440,15 +504,18 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
mStorage <- memoryStorage
gStorage <- derivePartialStorage mStorage
- startTime <- getTime MonotonicRaw
- gNowVar <- newTVarIO startTime
- gNextTimeout <- newTVarIO startTime
+ gStartTime <- getTime Monotonic
+ gNowVar <- newTVarIO gStartTime
+ gNextTimeout <- newTVarIO gStartTime
gInitConfig <- store mStorage $ (Rec [] :: Object)
+ gCookieKey <- getRandomBytes 32
+ gCookieStartTime <- runGet getWord32host . BL.pack . BA.unpack @ScrubbedBytes <$> getRandomBytes 4
+
let gs = GlobalState {..}
let signalTimeouts = forever $ do
- now <- getTime MonotonicRaw
+ now <- getTime Monotonic
next <- atomically $ do
writeTVar gNowVar now
readTVar gNextTimeout
@@ -462,8 +529,10 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
race_ (waitTill next) waitForUpdate
- race_ signalTimeouts $ forever $ join $ atomically $
- passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+ race_ signalTimeouts $ forever $ do
+ io <- atomically $ do
+ passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs
+ catch io $ \(e :: SomeException) -> atomically $ gLog $ "exception during network protocol handling: " <> show e
getConnection :: GlobalState addr -> addr -> STM (Connection addr)
@@ -487,6 +556,7 @@ newConnection cGlobalState@GlobalState {..} addr = do
cReservedPackets <- newTVar 0
cSentPackets <- newTVar []
cToAcknowledge <- newTVar []
+ cNextKeepAlive <- newTVar Nothing
cInStreams <- newTVar []
cOutStreams <- newTVar []
let conn = Connection {..}
@@ -520,7 +590,7 @@ processIncoming gs@GlobalState {..} = do
let parse = case B.uncons msg of
Just (b, enc)
| b .&. 0xE0 == 0x80 -> do
- ch <- maybe (throwError "unexpected encrypted packet") return mbch
+ ch <- maybe (throwOtherError "unexpected encrypted packet") return mbch
(dec, counter) <- channelDecrypt ch enc
case B.uncons dec of
@@ -535,19 +605,20 @@ processIncoming gs@GlobalState {..} = do
return $ Right (snum, seq8, content, counter)
Just (_, _) -> do
- throwError "unexpected stream header"
+ throwOtherError "unexpected stream header"
Nothing -> do
- throwError "empty decrypted content"
+ throwOtherError "empty decrypted content"
| b .&. 0xE0 == 0x60 -> do
objs <- deserialize msg
return $ Left (False, objs, Nothing)
- | otherwise -> throwError "invalid packet"
+ | otherwise -> throwOtherError "invalid packet"
- Nothing -> throwError "empty packet"
+ Nothing -> throwOtherError "empty packet"
+ now <- getTime Monotonic
runExceptT parse >>= \case
Right (Left (secure, objs, mbcounter))
| hobj:content <- objs
@@ -562,6 +633,7 @@ processIncoming gs@GlobalState {..} = do
case mbup of
Just up -> putTMVar gNextUp (conn, (secure, up))
Nothing -> return ()
+ updateKeepAlive conn now
processAcknowledgements gs conn items
ioAfter
Nothing -> return ()
@@ -571,8 +643,9 @@ processIncoming gs@GlobalState {..} = do
gLog $ show objs
Right (Right (snum, seq8, content, counter))
- | Just Connection {..} <- mbconn
+ | Just conn@Connection {..} <- mbconn
-> atomically $ do
+ updateKeepAlive conn now
(lookup snum <$> readTVar cInStreams) >>= \case
Nothing ->
gLog $ "unexpected stream number " ++ show snum
@@ -594,7 +667,7 @@ processIncoming gs@GlobalState {..} = do
atomically $ gLog $ show addr <> ": stream packet without connection"
Left err -> do
- atomically $ gLog $ show addr <> ": failed to parse packet: " <> err
+ atomically $ gLog $ show addr <> ": failed to parse packet: " <> showErebosError err
processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a)))
processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if
@@ -694,11 +767,38 @@ generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader,
_ -> return Nothing
createCookie :: GlobalState addr -> addr -> IO Cookie
-createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr)
+createCookie GlobalState {..} addr = do
+ (nonceBytes :: Bytes) <- getRandomBytes 12
+ validUntil <- (fromNanoSecs (60 * 10^(9 :: Int)) +) <$> getTime Monotonic
+ let validSecondsFromStart = fromIntegral $ toNanoSecs (validUntil - gStartTime) `div` (10^(9 :: Int))
+ cookieValidity = validSecondsFromStart - gCookieStartTime
+ plainContent = BC.pack (show addr)
+ throwCryptoErrorIO $ do
+ cookieNonce <- C.nonce12 nonceBytes
+ st1 <- C.initialize gCookieKey cookieNonce
+ let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1
+ (cookieContent, st3) = C.encrypt plainContent st2
+ cookieMac = C.finalize st3
+ return $ Cookie $ BL.toStrict $ encode $ ParsedCookie {..}
verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool
-verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie
-
+verifyCookie GlobalState {..} addr (Cookie cookie) = do
+ ctime <- getTime Monotonic
+ return $ fromMaybe False $ do
+ ( _, _, ParsedCookie {..} ) <- either (const Nothing) Just $ decodeOrFail $ BL.fromStrict cookie
+ maybeCryptoError $ do
+ st1 <- C.initialize gCookieKey cookieNonce
+ let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1
+ (plainContent, st3) = C.decrypt cookieContent st2
+ mac = C.finalize st3
+
+ validSecondsFromStart = fromIntegral $ cookieValidity + gCookieStartTime
+ validUntil = gStartTime + fromNanoSecs (validSecondsFromStart * (10^(9 :: Int)))
+ return $ and
+ [ mac == cookieMac
+ , ctime <= validUntil
+ , show addr == BC.unpack plainContent
+ ]
reservePacket :: Connection addr -> STM ReservedToSend
reservePacket conn@Connection {..} = do
@@ -713,9 +813,9 @@ reservePacket conn@Connection {..} = do
return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn)
resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO ()
-resendBytes Connection {..} reserved sp = do
+resendBytes conn@Connection {..} reserved sp = do
let GlobalState {..} = cGlobalState
- now <- getTime MonotonicRaw
+ now <- getTime Monotonic
atomically $ do
when (isJust reserved) $ do
modifyTVar' cReservedPackets (subtract 1)
@@ -726,6 +826,7 @@ resendBytes Connection {..} reserved sp = do
, spRetryCount = spRetryCount sp + 1
}
writeFlow gDataFlow (cAddress, spData sp)
+ updateKeepAlive conn now
sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO ()
sendBytes conn reserved bs = resendBytes conn reserved
@@ -738,6 +839,12 @@ sendBytes conn reserved bs = resendBytes conn reserved
, spData = bs
}
+updateKeepAlive :: Connection addr -> TimeSpec -> STM ()
+updateKeepAlive Connection {..} now = do
+ let next = now + keepAliveInternal
+ writeTVar cNextKeepAlive $ Just next
+
+
processOutgoing :: forall addr. GlobalState addr -> STM (IO ())
processOutgoing gs@GlobalState {..} = do
@@ -777,11 +884,12 @@ processOutgoing gs@GlobalState {..} = do
let onAck = sequence_ $ map (streamAccepted conn) $
catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems)
- let mkPlain extraHeaders =
- let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems
- in BL.concat $
- (serializeObject $ transportToObject gStorage header)
- : map lazyLoadBytes content
+ let mkPlain extraHeaders
+ | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems =
+ BL.concat $
+ (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems)
+ : map lazyLoadBytes content
+ | otherwise = BL.empty
let usePlaintext = do
plain <- mkPlain <$> generateCookieHeaders conn channel
@@ -793,7 +901,7 @@ processOutgoing gs@GlobalState {..} = do
Right (ctext, counter) -> do
let isAcked = any isHeaderItemAcknowledged hitems
return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else [])
- Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err
return Nothing
mbs <- case (secure, mbch) of
@@ -811,6 +919,13 @@ processOutgoing gs@GlobalState {..} = do
sendBytes conn mbReserved' bs
Nothing -> return ()
+ let waitUntil :: TimeSpec -> TimeSpec -> STM ()
+ waitUntil now till = do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || till < nextTimeout
+ then writeTVar gNextTimeout till
+ else retry
+
let retransmitPacket :: Connection addr -> STM (IO ())
retransmitPacket conn@Connection {..} = do
now <- readTVar gNowVar
@@ -819,11 +934,8 @@ processOutgoing gs@GlobalState {..} = do
_ -> retry
let nextTry = spTime sp + fromNanoSecs 1000000000
if | now < nextTry -> do
- nextTimeout <- readTVar gNextTimeout
- if nextTimeout <= now || nextTry < nextTimeout
- then do writeTVar gNextTimeout nextTry
- return $ return ()
- else retry
+ waitUntil now nextTry
+ return $ return ()
| spRetryCount sp < 2 -> do
reserved <- reservePacket conn
writeTVar cSentPackets rest
@@ -863,11 +975,28 @@ processOutgoing gs@GlobalState {..} = do
writeTVar gIdentity (nid, cur : past)
return $ return ()
+ let sendKeepAlive :: Connection addr -> STM (IO ())
+ sendKeepAlive Connection {..} = do
+ readTVar cNextKeepAlive >>= \case
+ Nothing -> retry
+ Just next -> do
+ now <- readTVar gNowVar
+ if next <= now
+ then do
+ writeTVar cNextKeepAlive Nothing
+ identity <- fst <$> readTVar gIdentity
+ let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]
+ writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], [])
+ else do
+ waitUntil now next
+ return $ return ()
+
conns <- readTVar gConnections
msum $ concat $
[ map retransmitPacket conns
, map sendNextPacket conns
, [ handleControlRequests ]
+ , map sendKeepAlive conns
]
processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ())