diff options
Diffstat (limited to 'src/Erebos/Network')
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 877 | ||||
-rw-r--r-- | src/Erebos/Network/ifaddrs.c | 41 | ||||
-rw-r--r-- | src/Erebos/Network/ifaddrs.h | 3 |
3 files changed, 921 insertions, 0 deletions
diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs new file mode 100644 index 0000000..a009ad1 --- /dev/null +++ b/src/Erebos/Network/Protocol.hs @@ -0,0 +1,877 @@ +module Erebos.Network.Protocol ( + TransportPacket(..), + transportToObject, + TransportHeader(..), + TransportHeaderItem(..), + SecurityRequirement(..), + + WaitingRef(..), + WaitingRefCallback, + wrDigest, + + ChannelState(..), + + ControlRequest(..), + ControlMessage(..), + erebosNetworkProtocol, + + Connection, + connAddress, + connData, + connGetChannel, + connSetChannel, + connClose, + + RawStreamReader, RawStreamWriter, + connAddWriteStream, + connAddReadStream, + readStreamToList, + readObjectsFromStream, + writeByteStringToStream, + + module Erebos.Flow, +) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.Except +import Control.Monad.Trans + +import Data.Bits +import Data.ByteString (ByteString) +import Data.ByteString qualified as B +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL +import Data.Function +import Data.List +import Data.Maybe +import Data.Text (Text) +import Data.Text qualified as T +import Data.Void +import Data.Word + +import System.Clock + +import Erebos.Channel +import Erebos.Flow +import Erebos.Identity +import Erebos.Service +import Erebos.Storage + + +protocolVersion :: Text +protocolVersion = T.pack "0.1" + +protocolVersions :: [Text] +protocolVersions = [protocolVersion] + + +data TransportPacket a = TransportPacket TransportHeader [a] + +data TransportHeader = TransportHeader [TransportHeaderItem] + deriving (Show) + +data TransportHeaderItem + = Acknowledged RefDigest + | AcknowledgedSingle Integer + | Rejected RefDigest + | ProtocolVersion Text + | Initiation RefDigest + | CookieSet Cookie + | CookieEcho Cookie + | DataRequest RefDigest + | DataResponse RefDigest + | AnnounceSelf RefDigest + | AnnounceUpdate RefDigest + | TrChannelRequest RefDigest + | TrChannelAccept RefDigest + | ServiceType ServiceID + | ServiceRef RefDigest + | StreamOpen Word8 + deriving (Eq, Show) + +newtype Cookie = Cookie ByteString + deriving (Eq, Show) + +data SecurityRequirement = PlaintextOnly + | PlaintextAllowed + | EncryptedOnly + deriving (Eq, Ord) + +isHeaderItemAcknowledged :: TransportHeaderItem -> Bool +isHeaderItemAcknowledged = \case + Acknowledged {} -> False + AcknowledgedSingle {} -> False + Rejected {} -> False + ProtocolVersion {} -> False + Initiation {} -> False + CookieSet {} -> False + CookieEcho {} -> False + _ -> True + +transportToObject :: PartialStorage -> TransportHeader -> PartialObject +transportToObject st (TransportHeader items) = Rec $ map single items + where single = \case + Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst) + AcknowledgedSingle num -> (BC.pack "ACK", RecInt num) + Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst) + ProtocolVersion ver -> (BC.pack "VER", RecText ver) + Initiation dgst -> (BC.pack "INI", RecRef $ partialRefFromDigest st dgst) + CookieSet (Cookie bytes) -> (BC.pack "CKS", RecBinary bytes) + CookieEcho (Cookie bytes) -> (BC.pack "CKE", RecBinary bytes) + DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst) + DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst) + AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst) + AnnounceUpdate dgst -> (BC.pack "ANU", RecRef $ partialRefFromDigest st dgst) + TrChannelRequest dgst -> (BC.pack "CRQ", RecRef $ partialRefFromDigest st dgst) + TrChannelAccept dgst -> (BC.pack "CAC", RecRef $ partialRefFromDigest st dgst) + ServiceType stype -> (BC.pack "SVT", RecUUID $ toUUID stype) + ServiceRef dgst -> (BC.pack "SVR", RecRef $ partialRefFromDigest st dgst) + StreamOpen num -> (BC.pack "STO", RecInt $ fromIntegral num) + +transportFromObject :: PartialObject -> Maybe TransportHeader +transportFromObject (Rec items) = case catMaybes $ map single items of + [] -> Nothing + titems -> Just $ TransportHeader titems + where single (name, content) = if + | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref + | name == BC.pack "ACK", RecInt num <- content -> Just $ AcknowledgedSingle num + | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref + | name == BC.pack "VER", RecText ver <- content -> Just $ ProtocolVersion ver + | name == BC.pack "INI", RecRef ref <- content -> Just $ Initiation $ refDigest ref + | name == BC.pack "CKS", RecBinary bytes <- content -> Just $ CookieSet (Cookie bytes) + | name == BC.pack "CKE", RecBinary bytes <- content -> Just $ CookieEcho (Cookie bytes) + | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref + | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref + | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref + | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate $ refDigest ref + | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest $ refDigest ref + | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept $ refDigest ref + | name == BC.pack "SVT", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid + | name == BC.pack "SVR", RecRef ref <- content -> Just $ ServiceRef $ refDigest ref + | name == BC.pack "STO", RecInt num <- content -> Just $ StreamOpen $ fromIntegral num + | otherwise -> Nothing +transportFromObject _ = Nothing + + +data GlobalState addr = (Eq addr, Show addr) => GlobalState + { gIdentity :: TVar (UnifiedIdentity, [UnifiedIdentity]) + , gConnections :: TVar [Connection addr] + , gDataFlow :: SymFlow (addr, ByteString) + , gControlFlow :: Flow (ControlRequest addr) (ControlMessage addr) + , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) + , gLog :: String -> STM () + , gStorage :: PartialStorage + , gNowVar :: TVar TimeSpec + , gNextTimeout :: TVar TimeSpec + , gInitConfig :: Ref + } + +data Connection addr = Connection + { cGlobalState :: GlobalState addr + , cAddress :: addr + , cDataUp :: Flow + (Maybe (Bool, TransportPacket PartialObject)) + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + , cDataInternal :: Flow + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + (Maybe (Bool, TransportPacket PartialObject)) + , cChannel :: TVar ChannelState + , cCookie :: TVar (Maybe Cookie) + , cSecureOutQueue :: TQueue (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) + , cMaxInFlightPackets :: TVar Int + , cReservedPackets :: TVar Int + , cSentPackets :: TVar [SentPacket] + , cToAcknowledge :: TVar [Integer] + , cInStreams :: TVar [(Word8, Stream)] + , cOutStreams :: TVar [(Word8, Stream)] + } + +instance Eq (Connection addr) where + (==) = (==) `on` cChannel + +connAddress :: Connection addr -> addr +connAddress = cAddress + +connData :: Connection addr -> Flow + (Maybe (Bool, TransportPacket PartialObject)) + (SecurityRequirement, TransportPacket Ref, [TransportHeaderItem]) +connData = cDataUp + +connGetChannel :: Connection addr -> STM ChannelState +connGetChannel Connection {..} = readTVar cChannel + +connSetChannel :: Connection addr -> ChannelState -> STM () +connSetChannel Connection {..} ch = do + writeTVar cChannel ch + +connClose :: Connection addr -> STM () +connClose conn@Connection {..} = do + let GlobalState {..} = cGlobalState + readTVar cChannel >>= \case + ChannelClosed -> return () + _ -> do + writeTVar cChannel ChannelClosed + writeTVar gConnections . filter (/=conn) =<< readTVar gConnections + writeFlow cDataInternal Nothing + +connAddWriteStream :: Connection addr -> STM (Either String (TransportHeaderItem, RawStreamWriter, IO ())) +connAddWriteStream conn@Connection {..} = do + outStreams <- readTVar cOutStreams + let doInsert :: Word8 -> [(Word8, Stream)] -> ExceptT String STM ((Word8, Stream), [(Word8, Stream)]) + doInsert n (s@(n', _) : rest) | n == n' = + fmap (s:) <$> doInsert (n + 1) rest + doInsert n streams | n < 63 = lift $ do + sState <- newTVar StreamOpening + (sFlowIn, sFlowOut) <- newFlow + sNextSequence <- newTVar 0 + sWaitingForAck <- newTVar 0 + let info = (n, Stream {..}) + return (info, info : streams) + doInsert _ _ = throwError "all outbound streams in use" + + runExceptT $ do + ((streamNumber, stream), outStreams') <- doInsert 1 outStreams + lift $ writeTVar cOutStreams outStreams' + return (StreamOpen streamNumber, sFlowIn stream, go cGlobalState streamNumber stream) + + where + go gs@GlobalState {..} streamNumber stream = do + (reserved, msg) <- atomically $ do + readTVar (sState stream) >>= \case + StreamRunning -> return () + _ -> retry + (,) <$> reservePacket conn + <*> readFlow (sFlowOut stream) + + (plain, cont, onAck) <- case msg of + StreamData {..} -> do + return (stpData, True, return ()) + StreamClosed {} -> do + atomically $ do + -- wait for ack on all sent stream data + waits <- readTVar (sWaitingForAck stream) + when (waits > 0) retry + return (BC.empty, False, streamClosed conn streamNumber) + + let secure = True + plainAckedBy = [] + mbReserved = Just reserved + + mbch <- atomically (readTVar cChannel) >>= return . \case + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing + + mbs <- case mbch of + Just ch -> do + runExceptT (channelEncrypt ch $ B.concat + [ B.singleton streamNumber + , B.singleton (fromIntegral (stpSequence msg) :: Word8) + , plain + ] ) >>= \case + Right (ctext, counter) -> do + let isAcked = True + return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + return Nothing + Nothing | secure -> return Nothing + | otherwise -> return $ Just (plain, plainAckedBy) + + case mbs of + Just (bs, ackedBy) -> do + atomically $ do + modifyTVar' (sWaitingForAck stream) (+ 1) + let mbReserved' = (\rs -> rs + { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) + , rsOnAck = do + rsOnAck rs + onAck + atomically $ modifyTVar' (sWaitingForAck stream) (subtract 1) + }) <$> mbReserved + sendBytes conn mbReserved' bs + Nothing -> return () + + when cont $ go gs streamNumber stream + +connAddReadStream :: Connection addr -> Word8 -> STM RawStreamReader +connAddReadStream Connection {..} streamNumber = do + inStreams <- readTVar cInStreams + let doInsert (s@(n, _) : rest) + | streamNumber < n = fmap (s:) <$> doInsert rest + | streamNumber == n = doInsert rest + doInsert streams = do + sState <- newTVar StreamRunning + (sFlowIn, sFlowOut) <- newFlow + sNextSequence <- newTVar 0 + sWaitingForAck <- newTVar 0 + let stream = Stream {..} + return (stream, (streamNumber, stream) : streams) + (stream, inStreams') <- doInsert inStreams + writeTVar cInStreams inStreams' + return $ sFlowOut stream + + +type RawStreamReader = Flow StreamPacket Void +type RawStreamWriter = Flow Void StreamPacket + +data Stream = Stream + { sState :: TVar StreamState + , sFlowIn :: Flow Void StreamPacket + , sFlowOut :: Flow StreamPacket Void + , sNextSequence :: TVar Word64 + , sWaitingForAck :: TVar Word64 + } + +data StreamState = StreamOpening | StreamRunning + +data StreamPacket + = StreamData + { stpSequence :: Word64 + , stpData :: BC.ByteString + } + | StreamClosed + { stpSequence :: Word64 + } + +streamAccepted :: Connection addr -> Word8 -> IO () +streamAccepted Connection {..} snum = atomically $ do + (lookup snum <$> readTVar cOutStreams) >>= \case + Just Stream {..} -> do + modifyTVar' sState $ \case + StreamOpening -> StreamRunning + x -> x + Nothing -> return () + +streamClosed :: Connection addr -> Word8 -> IO () +streamClosed Connection {..} snum = atomically $ do + modifyTVar' cOutStreams $ filter ((snum /=) . fst) + +readStreamToList :: RawStreamReader -> IO (Word64, [(Word64, BC.ByteString)]) +readStreamToList stream = readFlowIO stream >>= \case + StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream + StreamClosed sqEnd -> return (sqEnd, []) + +readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject]) +readObjectsFromStream st stream = do + (seqEnd, list) <- readStreamToList stream + let validate s ((s', bytes) : rest) + | s == s' = (bytes : ) <$> validate (s + 1) rest + | s > s' = validate s rest + | otherwise = throwError "missing object chunk" + validate s [] + | s == seqEnd = return [] + | otherwise = throwError "content length mismatch" + return $ do + content <- BL.fromChunks <$> validate 0 list + deserializeObjects st content + +writeByteStringToStream :: RawStreamWriter -> BL.ByteString -> IO () +writeByteStringToStream stream = go 0 + where + go seqNum bstr + | BL.null bstr = writeFlowIO stream $ StreamClosed seqNum + | otherwise = do + let (cur, rest) = BL.splitAt 500 bstr -- TODO: MTU + writeFlowIO stream $ StreamData seqNum (BL.toStrict cur) + go (seqNum + 1) rest + + +data WaitingRef = WaitingRef + { wrefStorage :: Storage + , wrefPartial :: PartialRef + , wrefAction :: Ref -> WaitingRefCallback + , wrefStatus :: TVar (Either [RefDigest] Ref) + } + +type WaitingRefCallback = ExceptT String IO () + +wrDigest :: WaitingRef -> RefDigest +wrDigest = refDigest . wrefPartial + + +data ChannelState = ChannelNone + | ChannelCookieWait -- sent initiation, waiting for response + | ChannelCookieReceived -- received cookie, but no cookie echo yet + | ChannelCookieConfirmed -- received cookie echo, no need to send from our side + | ChannelOurRequest (Stored ChannelRequest) + | ChannelPeerRequest WaitingRef + | ChannelOurAccept (Stored ChannelAccept) Channel + | ChannelEstablished Channel + | ChannelClosed + +data ReservedToSend = ReservedToSend + { rsAckedBy :: Maybe (TransportHeaderItem -> Bool) + , rsOnAck :: IO () + , rsOnFail :: IO () + } + +data SentPacket = SentPacket + { spTime :: TimeSpec + , spRetryCount :: Int + , spAckedBy :: Maybe (TransportHeaderItem -> Bool) + , spOnAck :: IO () + , spOnFail :: IO () + , spData :: BC.ByteString + } + + +data ControlRequest addr = RequestConnection addr + | SendAnnounce addr + | UpdateSelfIdentity UnifiedIdentity + +data ControlMessage addr = NewConnection (Connection addr) (Maybe RefDigest) + | ReceivedAnnounce addr RefDigest + + +erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) + => UnifiedIdentity + -> (String -> STM ()) + -> SymFlow (addr, ByteString) + -> Flow (ControlRequest addr) (ControlMessage addr) + -> IO () +erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do + gIdentity <- newTVarIO (initialIdentity, []) + gConnections <- newTVarIO [] + gNextUp <- newEmptyTMVarIO + mStorage <- memoryStorage + gStorage <- derivePartialStorage mStorage + + startTime <- getTime MonotonicRaw + gNowVar <- newTVarIO startTime + gNextTimeout <- newTVarIO startTime + gInitConfig <- store mStorage $ (Rec [] :: Object) + + let gs = GlobalState {..} + + let signalTimeouts = forever $ do + now <- getTime MonotonicRaw + next <- atomically $ do + writeTVar gNowVar now + readTVar gNextTimeout + + let waitTill time + | time > now = threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 + | otherwise = threadDelay maxBound + waitForUpdate = atomically $ do + next' <- readTVar gNextTimeout + when (next' == next) retry + + race_ (waitTill next) waitForUpdate + + race_ signalTimeouts $ forever $ join $ atomically $ + passUpIncoming gs <|> processIncoming gs <|> processOutgoing gs + + +getConnection :: GlobalState addr -> addr -> STM (Connection addr) +getConnection gs addr = do + maybe (newConnection gs addr) return =<< findConnection gs addr + +findConnection :: GlobalState addr -> addr -> STM (Maybe (Connection addr)) +findConnection GlobalState {..} addr = do + find ((addr==) . cAddress) <$> readTVar gConnections + +newConnection :: GlobalState addr -> addr -> STM (Connection addr) +newConnection cGlobalState@GlobalState {..} addr = do + conns <- readTVar gConnections + + let cAddress = addr + (cDataUp, cDataInternal) <- newFlow + cChannel <- newTVar ChannelNone + cCookie <- newTVar Nothing + cSecureOutQueue <- newTQueue + cMaxInFlightPackets <- newTVar 4 + cReservedPackets <- newTVar 0 + cSentPackets <- newTVar [] + cToAcknowledge <- newTVar [] + cInStreams <- newTVar [] + cOutStreams <- newTVar [] + let conn = Connection {..} + + writeTVar gConnections (conn : conns) + return conn + +passUpIncoming :: GlobalState addr -> STM (IO ()) +passUpIncoming GlobalState {..} = do + (Connection {..}, up) <- takeTMVar gNextUp + writeFlow cDataInternal (Just up) + return $ return () + +processIncoming :: GlobalState addr -> STM (IO ()) +processIncoming gs@GlobalState {..} = do + guard =<< isEmptyTMVar gNextUp + guard =<< canWriteFlow gControlFlow + + (addr, msg) <- readFlow gDataFlow + mbconn <- findConnection gs addr + + mbch <- case mbconn of + Nothing -> return Nothing + Just conn -> readTVar (cChannel conn) >>= return . \case + ChannelEstablished ch -> Just ch + ChannelOurAccept _ ch -> Just ch + _ -> Nothing + + return $ do + let deserialize = liftEither . runExcept . deserializeObjects gStorage . BL.fromStrict + let parse = case B.uncons msg of + Just (b, enc) + | b .&. 0xE0 == 0x80 -> do + ch <- maybe (throwError "unexpected encrypted packet") return mbch + (dec, counter) <- channelDecrypt ch enc + + case B.uncons dec of + Just (0x00, content) -> do + objs <- deserialize content + return $ Left (True, objs, Just counter) + + Just (snum, dec') + | snum < 64 + , Just (seq8, content) <- B.uncons dec' + -> do + return $ Right (snum, seq8, content, counter) + + Just (_, _) -> do + throwError "unexpected stream header" + + Nothing -> do + throwError "empty decrypted content" + + | b .&. 0xE0 == 0x60 -> do + objs <- deserialize msg + return $ Left (False, objs, Nothing) + + | otherwise -> throwError "invalid packet" + + Nothing -> throwError "empty packet" + + runExceptT parse >>= \case + Right (Left (secure, objs, mbcounter)) + | hobj:content <- objs + , Just header@(TransportHeader items) <- transportFromObject hobj + -> processPacket gs (maybe (Left addr) Right mbconn) secure (TransportPacket header content) >>= \case + Just (conn@Connection {..}, mbup) -> do + ioAfter <- atomically $ do + case mbcounter of + Just counter | any isHeaderItemAcknowledged items -> + modifyTVar' cToAcknowledge (fromIntegral counter :) + _ -> return () + case mbup of + Just up -> putTMVar gNextUp (conn, (secure, up)) + Nothing -> return () + processAcknowledgements gs conn items + ioAfter + Nothing -> return () + + | otherwise -> atomically $ do + gLog $ show addr ++ ": invalid objects" + gLog $ show objs + + Right (Right (snum, seq8, content, counter)) + | Just Connection {..} <- mbconn + -> atomically $ do + (lookup snum <$> readTVar cInStreams) >>= \case + Nothing -> + gLog $ "unexpected stream number " ++ show snum + + Just Stream {..} -> do + expectedSequence <- readTVar sNextSequence + let seqFull = expectedSequence - 0x80 + fromIntegral (seq8 - fromIntegral expectedSequence + 0x80 :: Word8) + sdata <- if + | B.null content -> do + modifyTVar' cInStreams $ filter ((/=snum) . fst) + return $ StreamClosed seqFull + | otherwise -> do + writeTVar sNextSequence $ max expectedSequence (seqFull + 1) + return $ StreamData seqFull content + writeFlow sFlowIn sdata + modifyTVar' cToAcknowledge (fromIntegral counter :) + + | otherwise -> do + atomically $ gLog $ show addr <> ": stream packet without connection" + + Left err -> do + atomically $ gLog $ show addr <> ": failed to parse packet: " <> err + +processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) +processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if + -- Established secure communication + | Right conn <- econn, secure + -> return $ Just (conn, Just packet) + + -- Plaintext communication with cookies to prove origin + | cookie:_ <- mapMaybe (\case CookieEcho x -> Just x; _ -> Nothing) header + -> verifyCookie gs addr cookie >>= \case + True -> do + atomically $ do + conn@Connection {..} <- getConnection gs addr + oldCookie <- readTVar cCookie + let received = listToMaybe $ mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header + case received `mplus` oldCookie of + Just current -> do + writeTVar cCookie (Just current) + cookieEchoReceived gs conn mbpid + return $ Just (conn, Just packet) + Nothing -> do + gLog $ show addr <> ": missing cookie set, dropping " <> show header + return $ Nothing + + False -> do + atomically $ gLog $ show addr <> ": cookie verification failed, dropping " <> show header + return Nothing + + -- Response to initiation packet + | cookie:_ <- mapMaybe (\case CookieSet x -> Just x; _ -> Nothing) header + , Just _ <- version + , Right conn@Connection {..} <- econn + -> do + atomically $ readTVar cChannel >>= \case + ChannelCookieWait -> do + writeTVar cChannel $ ChannelCookieReceived + writeTVar cCookie $ Just cookie + writeFlow gControlFlow (NewConnection conn mbpid) + return $ Just (conn, Nothing) + _ -> return Nothing + + -- Initiation packet + | _:_ <- mapMaybe (\case Initiation x -> Just x; _ -> Nothing) header + , Just ver <- version + -> do + cookie <- createCookie gs addr + atomically $ do + identity <- fst <$> readTVar gIdentity + let reply = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader + [ CookieSet cookie + , AnnounceSelf $ refDigest $ storedRef $ idData identity + , ProtocolVersion ver + ] + writeFlow gDataFlow (addr, reply) + return Nothing + + -- Announce packet outside any connection + | dgst:_ <- mapMaybe (\case AnnounceSelf x -> Just x; _ -> Nothing) header + , Just _ <- version + -> do + atomically $ do + (cur, past) <- readTVar gIdentity + when (not $ dgst `elem` map (refDigest . storedRef . idData) (cur : past)) $ do + writeFlow gControlFlow $ ReceivedAnnounce addr dgst + return Nothing + + | otherwise -> do + atomically $ gLog $ show addr <> ": dropping packet " <> show header + return Nothing + + where + addr = either id cAddress econn + mbpid = listToMaybe $ mapMaybe (\case AnnounceSelf dgst -> Just dgst; _ -> Nothing) header + version = listToMaybe $ filter (\v -> ProtocolVersion v `elem` header) protocolVersions + +cookieEchoReceived :: GlobalState addr -> Connection addr -> Maybe RefDigest -> STM () +cookieEchoReceived GlobalState {..} conn@Connection {..} mbpid = do + readTVar cChannel >>= \case + ChannelNone -> newConn + ChannelCookieWait -> newConn + ChannelCookieReceived {} -> update + _ -> return () + where + update = do + writeTVar cChannel ChannelCookieConfirmed + newConn = do + update + writeFlow gControlFlow (NewConnection conn mbpid) + +generateCookieHeaders :: Connection addr -> ChannelState -> IO [TransportHeaderItem] +generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, setHeader ] + where + echoHeader = fmap CookieEcho <$> atomically (readTVar cCookie) + setHeader = case ch of + ChannelCookieWait {} -> Just . CookieSet <$> createCookie cGlobalState cAddress + ChannelCookieReceived {} -> Just . CookieSet <$> createCookie cGlobalState cAddress + _ -> return Nothing + +createCookie :: GlobalState addr -> addr -> IO Cookie +createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) + +verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool +verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie + + +reservePacket :: Connection addr -> STM ReservedToSend +reservePacket conn@Connection {..} = do + maxPackets <- readTVar cMaxInFlightPackets + reserved <- readTVar cReservedPackets + sent <- length <$> readTVar cSentPackets + + when (sent + reserved >= maxPackets) $ do + retry + + writeTVar cReservedPackets $ reserved + 1 + return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) + +resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () +resendBytes Connection {..} reserved sp = do + let GlobalState {..} = cGlobalState + now <- getTime MonotonicRaw + atomically $ do + when (isJust reserved) $ do + modifyTVar' cReservedPackets (subtract 1) + + when (isJust $ spAckedBy sp) $ do + modifyTVar' cSentPackets $ (:) sp + { spTime = now + , spRetryCount = spRetryCount sp + 1 + } + writeFlow gDataFlow (cAddress, spData sp) + +sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () +sendBytes conn reserved bs = resendBytes conn reserved + SentPacket + { spTime = undefined + , spRetryCount = -1 + , spAckedBy = rsAckedBy =<< reserved + , spOnAck = maybe (return ()) rsOnAck reserved + , spOnFail = maybe (return ()) rsOnFail reserved + , spData = bs + } + +processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) +processOutgoing gs@GlobalState {..} = do + + let sendNextPacket :: Connection addr -> STM (IO ()) + sendNextPacket conn@Connection {..} = do + channel <- readTVar cChannel + let mbch = case channel of + ChannelEstablished ch -> Just ch + _ -> Nothing + + let checkOutstanding + | isJust mbch = do + (,) <$> readTQueue cSecureOutQueue <*> (Just <$> reservePacket conn) + | otherwise = retry + + checkDataInternal = do + (,) <$> readFlow cDataInternal <*> (Just <$> reservePacket conn) + + checkAcknowledgements + | isJust mbch = do + acks <- readTVar cToAcknowledge + if null acks then retry + else return ((EncryptedOnly, TransportPacket (TransportHeader []) [], []), Nothing) + | otherwise = retry + + ((secure, packet@(TransportPacket (TransportHeader hitems) content), plainAckedBy), mbReserved) <- + checkOutstanding <|> checkDataInternal <|> checkAcknowledgements + + when (isNothing mbch && secure >= EncryptedOnly) $ do + writeTQueue cSecureOutQueue (secure, packet, plainAckedBy) + + acknowledge <- case mbch of + Nothing -> return [] + Just _ -> swapTVar cToAcknowledge [] + + return $ do + let onAck = sequence_ $ map (streamAccepted conn) $ + catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) + + let mkPlain extraHeaders = + let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems + in BL.concat $ + (serializeObject $ transportToObject gStorage header) + : map lazyLoadBytes content + + let usePlaintext = do + plain <- mkPlain <$> generateCookieHeaders conn channel + return $ Just (BL.toStrict plain, plainAckedBy) + + let useEncryption ch = do + plain <- mkPlain <$> return [] + runExceptT (channelEncrypt ch $ BL.toStrict $ 0x00 `BL.cons` plain) >>= \case + Right (ctext, counter) -> do + let isAcked = any isHeaderItemAcknowledged hitems + return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + return Nothing + + mbs <- case (secure, mbch) of + (PlaintextOnly, _) -> usePlaintext + (PlaintextAllowed, Nothing) -> usePlaintext + (_, Just ch) -> useEncryption ch + (EncryptedOnly, Nothing) -> return Nothing + + case mbs of + Just (bs, ackedBy) -> do + let mbReserved' = (\rs -> rs + { rsAckedBy = guard (not $ null ackedBy) >> Just (`elem` ackedBy) + , rsOnAck = rsOnAck rs >> onAck + }) <$> mbReserved + sendBytes conn mbReserved' bs + Nothing -> return () + + let retransmitPacket :: Connection addr -> STM (IO ()) + retransmitPacket conn@Connection {..} = do + now <- readTVar gNowVar + (sp, rest) <- readTVar cSentPackets >>= \case + sps@(_:_) -> return (last sps, init sps) + _ -> retry + let nextTry = spTime sp + fromNanoSecs 1000000000 + if | now < nextTry -> do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || nextTry < nextTimeout + then do writeTVar gNextTimeout nextTry + return $ return () + else retry + | spRetryCount sp < 2 -> do + reserved <- reservePacket conn + writeTVar cSentPackets rest + return $ resendBytes conn (Just reserved) sp + | otherwise -> do + return $ spOnFail sp + + let handleControlRequests = readFlow gControlFlow >>= \case + RequestConnection addr -> do + conn@Connection {..} <- getConnection gs addr + identity <- fst <$> readTVar gIdentity + readTVar cChannel >>= \case + ChannelNone -> do + reserved <- reservePacket conn + let packet = BL.toStrict $ BL.concat + [ serializeObject $ transportToObject gStorage $ TransportHeader $ + [ Initiation $ refDigest gInitConfig + , AnnounceSelf $ refDigest $ storedRef $ idData identity + ] ++ map ProtocolVersion protocolVersions + , lazyLoadBytes gInitConfig + ] + writeTVar cChannel ChannelCookieWait + let reserved' = reserved { rsAckedBy = Just $ \case CookieSet {} -> True; _ -> False } + return $ sendBytes conn (Just reserved') packet + _ -> return $ return () + + SendAnnounce addr -> do + identity <- fst <$> readTVar gIdentity + let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader $ + [ AnnounceSelf $ refDigest $ storedRef $ idData identity + ] ++ map ProtocolVersion protocolVersions + writeFlow gDataFlow (addr, packet) + return $ return () + + UpdateSelfIdentity nid -> do + (cur, past) <- readTVar gIdentity + writeTVar gIdentity (nid, cur : past) + return $ return () + + conns <- readTVar gConnections + msum $ concat $ + [ map retransmitPacket conns + , map sendNextPacket conns + , [ handleControlRequests ] + ] + +processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ()) +processAcknowledgements GlobalState {} Connection {..} header = do + (acked, notAcked) <- partition (\sp -> any (fromJust (spAckedBy sp)) header) <$> readTVar cSentPackets + writeTVar cSentPackets notAcked + return $ sequence_ $ map spOnAck acked diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c new file mode 100644 index 0000000..37c3e00 --- /dev/null +++ b/src/Erebos/Network/ifaddrs.c @@ -0,0 +1,41 @@ +#include "ifaddrs.h" + +#include <arpa/inet.h> +#include <ifaddrs.h> +#include <net/if.h> +#include <stdlib.h> +#include <sys/types.h> +#include <endian.h> + +uint32_t * broadcast_addresses(void) +{ + struct ifaddrs * addrs; + if (getifaddrs(&addrs) < 0) + return 0; + + size_t capacity = 16, count = 0; + uint32_t * ret = malloc(sizeof(uint32_t) * capacity); + + for (struct ifaddrs * ifa = addrs; ifa; ifa = ifa->ifa_next) { + if (ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET && + ifa->ifa_flags & IFF_BROADCAST) { + if (count + 2 >= capacity) { + capacity *= 2; + uint32_t * nret = realloc(ret, sizeof(uint32_t) * capacity); + if (nret) { + ret = nret; + } else { + free(ret); + return 0; + } + } + + ret[count] = ((struct sockaddr_in*)ifa->ifa_broadaddr)->sin_addr.s_addr; + count++; + } + } + + freeifaddrs(addrs); + ret[count] = 0; + return ret; +} diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h new file mode 100644 index 0000000..06d26ec --- /dev/null +++ b/src/Erebos/Network/ifaddrs.h @@ -0,0 +1,3 @@ +#include <stdint.h> + +uint32_t * broadcast_addresses(void); |