summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-08-01 23:01:30 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-08-12 23:08:55 +0200
commitbda62efef1ad38779f23b38b4e1436f06fb9c7c1 (patch)
treed7efc976a63f97bea3e8877c30b291225f05ded3
parent71cfee5086a6bf1c7a810d83fd67320bb9552197 (diff)
Network protocol refactoring with explicit data flows
-rw-r--r--erebos.cabal3
-rw-r--r--src/Flow.hs52
-rw-r--r--src/ICE.chs17
-rw-r--r--src/Network.hs451
-rw-r--r--src/Network/Protocol.hs325
5 files changed, 532 insertions, 316 deletions
diff --git a/erebos.cabal b/erebos.cabal
index f5d3fb9..12cb9ac 100644
--- a/erebos.cabal
+++ b/erebos.cabal
@@ -24,8 +24,10 @@ executable erebos
Channel,
Contact
Discovery
+ Flow
Message,
Network,
+ Network.Protocol
Pairing
PubKey,
Service
@@ -65,6 +67,7 @@ executable erebos
UndecidableInstances
build-depends: aeson >=1.4 && <2.1,
+ async >=2.2 && <2.3,
base >=4.13 && <4.17,
binary >=0.8 && <0.11,
bytestring >=0.10 && <0.12,
diff --git a/src/Flow.hs b/src/Flow.hs
new file mode 100644
index 0000000..349178f
--- /dev/null
+++ b/src/Flow.hs
@@ -0,0 +1,52 @@
+module Flow (
+ Flow, SymFlow,
+ newFlow, newFlowIO,
+ readFlow, writeFlow, writeFlowBulk,
+ readFlowIO, writeFlowIO,
+
+ mapPath,
+) where
+
+import Control.Concurrent.STM
+
+
+data Flow r w = Flow (TMVar [r]) (TMVar [w])
+ | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
+
+type SymFlow a = Flow a a
+
+newFlow :: STM (Flow a b, Flow b a)
+newFlow = do
+ x <- newEmptyTMVar
+ y <- newEmptyTMVar
+ return (Flow x y, Flow y x)
+
+newFlowIO :: IO (Flow a b, Flow b a)
+newFlowIO = atomically newFlow
+
+readFlow :: Flow r w -> STM r
+readFlow (Flow rvar _) = takeTMVar rvar >>= \case
+ (x:[]) -> return x
+ (x:xs) -> putTMVar rvar xs >> return x
+ [] -> error "Flow: empty list"
+readFlow (MappedFlow f _ up) = f <$> readFlow up
+
+writeFlow :: Flow r w -> w -> STM ()
+writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
+writeFlow (MappedFlow _ f up) = writeFlow up . f
+
+writeFlowBulk :: Flow r w -> [w] -> STM ()
+writeFlowBulk _ [] = return ()
+writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
+writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
+
+readFlowIO :: Flow r w -> IO r
+readFlowIO path = atomically $ readFlow path
+
+writeFlowIO :: Flow r w -> w -> IO ()
+writeFlowIO path = atomically . writeFlow path
+
+
+mapPath :: (r -> r') -> (w' -> w) -> Flow r w -> Flow r' w'
+mapPath rf wf (MappedFlow rf' wf' up) = MappedFlow (rf . rf') (wf' . wf) up
+mapPath rf wf up = MappedFlow rf wf up
diff --git a/src/ICE.chs b/src/ICE.chs
index 98584a2..d553a88 100644
--- a/src/ICE.chs
+++ b/src/ICE.chs
@@ -17,7 +17,6 @@ module ICE (
) where
import Control.Arrow
-import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.Except
@@ -31,6 +30,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Text.Read as T
+import Data.Void
import Foreign.C.String
import Foreign.C.Types
@@ -39,17 +39,16 @@ import Foreign.Marshal.Array
import Foreign.Ptr
import Foreign.StablePtr
+import Flow
import Storage
#include "pjproject.h"
data IceSession = IceSession
{ isStrans :: PjIceStrans
- , isChan :: MVar (Either [ByteString] (MappedChan ByteString))
+ , isChan :: MVar (Either [ByteString] (Flow Void ByteString))
}
-data MappedChan a = forall b. MappedChan (a -> b) (Chan b)
-
instance Eq IceSession where
(==) = (==) `on` isStrans
@@ -188,13 +187,13 @@ foreign export ccall ice_call_cb :: StablePtr (IO ()) -> IO ()
ice_call_cb :: StablePtr (IO ()) -> IO ()
ice_call_cb = join . deRefStablePtr
-iceSetChan :: IceSession -> (ByteString -> a) -> Chan a -> IO ()
-iceSetChan sess f chan = do
+iceSetChan :: IceSession -> Flow Void ByteString -> IO ()
+iceSetChan sess chan = do
modifyMVar_ (isChan sess) $ \orig -> do
case orig of
- Left buf -> writeList2Chan chan $ map f $ reverse buf
+ Left buf -> mapM_ (writeFlowIO chan) $ reverse buf
Right _ -> return ()
- return $ Right $ MappedChan f chan
+ return $ Right chan
foreign export ccall ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO ()
ice_rx_data :: StablePtr IceSession -> Ptr CChar -> Int -> IO ()
@@ -202,5 +201,5 @@ ice_rx_data sptr buf len = do
sess <- deRefStablePtr sptr
bs <- packCStringLen (buf, len)
modifyMVar_ (isChan sess) $ \case
- mc@(Right (MappedChan f chan)) -> writeChan chan (f bs) >> return mc
+ mc@(Right chan) -> writeFlowIO chan bs >> return mc
Left bss -> return $ Left (bs:bss)
diff --git a/src/Network.hs b/src/Network.hs
index da786c6..787bff9 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -8,7 +8,6 @@ module Network (
Peer, peerServer, peerStorage,
PeerAddress(..), peerAddress,
PeerIdentity(..), peerIdentity,
- PeerChannel(..),
WaitingRef, wrDigest,
Service(..),
serverPeer, serverPeerIce,
@@ -18,7 +17,6 @@ module Network (
discoveryPort,
) where
-import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
@@ -45,11 +43,10 @@ import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket
import qualified Network.Socket.ByteString as S
-import System.Clock
-
import Channel
import ICE
import Identity
+import Network.Protocol
import PubKey
import Service
import State
@@ -70,7 +67,8 @@ data Server = Server
, serverIdentity_ :: MVar UnifiedIdentity
, serverThreads :: MVar [ThreadId]
, serverSocket :: MVar Socket
- , serverChanPacket :: Chan (PeerAddress, BC.ByteString)
+ , serverRawPath :: SymFlow (PeerAddress, BC.ByteString)
+ , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
, serverIOActions :: TQueue (ExceptT String IO ())
, serverServices :: [SomeService]
@@ -101,30 +99,29 @@ defaultServerOptions = ServerOptions
data Peer = Peer
{ peerAddress :: PeerAddress
, peerServer_ :: Server
+ , peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress))
, peerIdentityVar :: TVar PeerIdentity
- , peerChannel :: TVar PeerChannel
, peerStorage_ :: Storage
, peerInStorage :: PartialStorage
- , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket)
- , peerSentPackets :: TVar [SentPacket]
, peerServiceState :: TMVar (M.Map ServiceID SomeServiceState)
- , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)]
, peerWaitingRefs :: TMVar [WaitingRef]
}
-data SentPacket = SentPacket
- { spTime :: TimeSpec
- , spRetryCount :: Int
- , spAckedBy :: [TransportHeaderItem]
- , spData :: BC.ByteString
- }
-
peerServer :: Peer -> Server
peerServer = peerServer_
peerStorage :: Peer -> Storage
peerStorage = peerStorage_
+getPeerChannel :: Peer -> STM ChannelState
+getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection
+
+setPeerChannel :: Peer -> ChannelState -> STM ()
+setPeerChannel Peer {..} ch = do
+ readTVar peerConnection >>= \case
+ Left _ -> retry
+ Right conn -> connSetChannel conn ch
+
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
@@ -157,89 +154,21 @@ data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String
| PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])
| PeerIdentityFull UnifiedIdentity
-data PeerChannel = ChannelWait
- | ChannelOurRequest (Stored ChannelRequest)
- | ChannelPeerRequest WaitingRef
- | ChannelOurAccept (Stored ChannelAccept) Channel
- | ChannelEstablished Channel
-
peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data TransportPacket = TransportPacket TransportHeader [Ref]
-
-
-data TransportHeaderItem
- = Acknowledged PartialRef
- | Rejected PartialRef
- | DataRequest PartialRef
- | DataResponse PartialRef
- | AnnounceSelf PartialRef
- | AnnounceUpdate PartialRef
- | TrChannelRequest PartialRef
- | TrChannelAccept PartialRef
- | ServiceType ServiceID
- | ServiceRef PartialRef
- deriving (Eq)
-
-data TransportHeader = TransportHeader [TransportHeaderItem]
-
-transportToObject :: TransportHeader -> PartialObject
-transportToObject (TransportHeader items) = Rec $ map single items
- where single = \case
- Acknowledged ref -> (BC.pack "ACK", RecRef ref)
- Rejected ref -> (BC.pack "REJ", RecRef ref)
- DataRequest ref -> (BC.pack "REQ", RecRef ref)
- DataResponse ref -> (BC.pack "RSP", RecRef ref)
- AnnounceSelf ref -> (BC.pack "ANN", RecRef ref)
- AnnounceUpdate ref -> (BC.pack "ANU", RecRef ref)
- TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref)
- TrChannelAccept ref -> (BC.pack "CAC", RecRef ref)
- ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype)
- ServiceRef ref -> (BC.pack "SRF", RecRef ref)
-
-transportFromObject :: PartialObject -> Maybe TransportHeader
-transportFromObject (Rec items) = case catMaybes $ map single items of
- [] -> Nothing
- titems -> Just $ TransportHeader titems
- where single (name, content) = if
- | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged ref
- | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected ref
- | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref
- | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref
- | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref
- | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate ref
- | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref
- | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref
- | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid
- | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef ref
- | otherwise -> Nothing
-transportFromObject _ = Nothing
-
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType (ServiceType stype : _) = Just stype
lookupServiceType (_ : hs) = lookupServiceType hs
lookupServiceType [] = Nothing
-data WaitingRef = WaitingRef
- { wrefStorage :: Storage
- , wrefPartial :: PartialRef
- , wrefAction :: Ref -> WaitingRefCallback
- , wrefStatus :: TVar (Either [RefDigest] Ref)
- }
-
-type WaitingRefCallback = ExceptT String IO ()
-
-wrDigest :: WaitingRef -> RefDigest
-wrDigest = refDigest . wrefPartial
-
-newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
-newWaitingRef pref act = do
- peer <- gets phPeer
- wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left []))
- modifyTMVarP (peerWaitingRefs peer) (wref:)
+newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
+newWaitingRef dgst act = do
+ peer@Peer {..} <- gets phPeer
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left []))
+ modifyTMVarP peerWaitingRefs (wref:)
liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)
return wref
@@ -254,7 +183,8 @@ startServer opt serverOrigHead logd' serverServices = do
serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead
serverThreads <- newMVar []
serverSocket <- newEmptyMVar
- serverChanPacket <- newChan
+ (serverRawPath, protocolRawPath) <- newFlowIO
+ (serverNewConnection, protocolNewConnection) <- newFlowIO
serverDataResponse <- newTQueueIO
serverIOActions <- newTQueueIO
serverServiceStates <- newTMVarIO M.empty
@@ -289,23 +219,23 @@ startServer opt serverOrigHead logd' serverServices = do
when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
readMVar serverIdentity_ >>= \identity -> do
st <- derivePartialStorage serverStorage
- let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]
+ let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]
mapM_ (void . S.sendTo sock packet) broadcastAddreses
threadDelay $ announceIntervalSeconds * 1000 * 1000
let announceUpdate identity = do
st <- derivePartialStorage serverStorage
let selfRef = partialRef st $ storedRef $ idData identity
- updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity)
+ updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity)
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
hitems = map AnnounceUpdate updateRefs
packet = TransportPacket (TransportHeader $ hitems) []
ps <- readMVar serverPeers
forM_ ps $ \peer -> atomically $ do
- ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case
+ ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case
(PeerIdentityFull _, ChannelEstablished _) ->
- writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
+ sendToPeerS peer ackedBy packet
_ -> return ()
void $ watchHead serverOrigHead $ \h -> do
@@ -325,42 +255,38 @@ startServer opt serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
- writeChan serverChanPacket (DatagramAddress sock saddr, msg)
+ writeFlowIO serverRawPath (DatagramAddress sock saddr, msg)
- forever $ do
- (paddr, msg) <- readChan serverChanPacket
- (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do
- case M.lookup paddr pvalue of
- Just peer -> do
- mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ ch -> Just ch
- _ -> Nothing
-
- if | Just ch <- mbch
- , Right plain <- runExcept $ channelDecrypt ch msg
- -> return (pvalue, (peer, plain, True))
-
- | otherwise
- -> return (pvalue, (peer, msg, False))
+ forkServerThread server $ forever $ do
+ (paddr, msg) <- readFlowIO serverRawPath
+ case paddr of
+ DatagramAddress _ addr -> void $ S.sendTo sock msg addr
+ PeerIceSession ice -> iceSend ice msg
+ forkServerThread server $ forever $ do
+ conn <- readFlowIO serverNewConnection
+ let paddr = connAddress conn
+ peer <- modifyMVar serverPeers $ \pvalue -> do
+ case M.lookup paddr pvalue of
+ Just peer -> return (pvalue, peer)
Nothing -> do
peer <- mkPeer server paddr
- return (M.insert paddr peer pvalue, (peer, msg, False))
+ return (M.insert paddr peer pvalue, peer)
- case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
- Right (obj:objs)
- | Just header <- transportFromObject obj -> do
- prefs <- forM objs $ storeObject $ peerInStorage peer
- identity <- readMVar serverIdentity_
- let svcs = map someServiceID serverServices
- handlePacket identity secure peer chanSvc svcs header prefs
+ atomically $ do
+ readTVar (peerConnection peer) >>= \case
+ Left packets -> writeFlowBulk (connData conn) $ reverse packets
+ Right _ -> return ()
+ writeTVar (peerConnection peer) (Right conn)
- | otherwise -> atomically $ do
- logd $ show paddr ++ ": invalid objects"
- logd $ show objs
+ forkServerThread server $ forever $ do
+ (secure, TransportPacket header objs) <- readFlowIO $ connData conn
+ prefs <- forM objs $ storeObject $ peerInStorage peer
+ identity <- readMVar serverIdentity_
+ let svcs = map someServiceID serverServices
+ handlePacket identity secure peer chanSvc svcs header prefs
- _ -> do atomically $ logd $ show paddr ++ ": invalid objects"
+ erebosNetworkProtocol logd protocolRawPath protocolNewConnection
forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
@@ -383,87 +309,6 @@ stopServer :: Server -> IO ()
stopServer Server {..} = do
mapM_ killThread =<< takeMVar serverThreads
-sendWorker :: Peer -> IO ()
-sendWorker peer = do
- startTime <- getTime MonotonicRaw
- nowVar <- newTVarIO startTime
- waitVar <- newTVarIO startTime
-
- let waitTill time = forkServerThread (peerServer peer) $ do
- now <- getTime MonotonicRaw
- when (time > now) $
- threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
- atomically . writeTVar nowVar =<< getTime MonotonicRaw
-
- let sendBytes sp = do
- when (not $ null $ spAckedBy sp) $ do
- now <- getTime MonotonicRaw
- atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp
- { spTime = now
- , spRetryCount = spRetryCount sp + 1
- }
- case peerAddress peer of
- DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr
- PeerIceSession ice -> iceSend ice (spData sp)
-
- let sendNextPacket = do
- (secure, ackedBy, packet@(TransportPacket header content)) <-
- readTQueue (peerOutQueue peer)
-
- let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer)
- let plain = BL.toStrict $ BL.concat $
- (serializeObject $ transportToObject header)
- : map lazyLoadBytes content
-
- mbch <- readTVar (peerChannel peer) >>= \case
- ChannelEstablished ch -> return (Just ch)
- _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):)
- return Nothing
-
- return $ do
- mbs <- case mbch of
- Just ch -> do
- runExceptT (channelEncrypt ch plain) >>= \case
- Right ctext -> return $ Just ctext
- Left err -> do logd $ "Failed to encrypt data: " ++ err
- return Nothing
- Nothing | secure -> return Nothing
- | otherwise -> return $ Just plain
-
- case mbs of
- Just bs -> do
- sendBytes $ SentPacket
- { spTime = undefined
- , spRetryCount = -1
- , spAckedBy = ackedBy
- , spData = bs
- }
- Nothing -> return ()
-
- let retransmitPacket = do
- now <- readTVar nowVar
- (sp, rest) <- readTVar (peerSentPackets peer) >>= \case
- sps@(_:_) -> return (last sps, init sps)
- _ -> retry
- let nextTry = spTime sp + fromNanoSecs 1000000000
- if now < nextTry
- then do
- wait <- readTVar waitVar
- if wait <= now || nextTry < wait
- then do writeTVar waitVar nextTry
- return $ waitTill nextTry
- else retry
- else do
- writeTVar (peerSentPackets peer) rest
- return $ sendBytes sp
-
- forever $ join $ atomically $ do
- retransmitPacket <|> sendNextPacket
-
-processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM ()
-processAcknowledgements peer = mapM_ $ \hitem -> do
- modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy
-
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
(peer, npref) <- atomically (readTQueue $ serverDataResponse server)
@@ -489,7 +334,7 @@ dataResponseWorker server = forever $ do
Right _ -> return (Nothing, [])
atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list
- let reqs = concat $ map snd list
+ let reqs = map refDigest $ concat $ map snd list
when (not $ null reqs) $ do
let packet = TransportPacket (TransportHeader $ map DataRequest reqs) []
ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ]
@@ -540,8 +385,7 @@ handlePacket :: UnifiedIdentity -> Bool
-> TransportHeader -> [PartialRef] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
- processAcknowledgements peer headers
- ochannel <- readTVar $ peerChannel peer
+ ochannel <- getPeerChannel peer
let sidentity = idData identity
plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat
[ [ storedRef sidentity ]
@@ -555,89 +399,89 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do
let logd = liftSTM . writeTQueue (serverErrorLog server)
forM_ headers $ \case
- Acknowledged ref -> do
- readTVarP (peerChannel peer) >>= \case
- ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
- writeTVarP (peerChannel peer) $ ChannelEstablished ch
- liftSTM $ finalizedChannel peer identity
+ Acknowledged dgst -> do
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do
+ liftSTM $ finalizedChannel peer ch identity
_ -> return ()
- Rejected ref -> do
- logd $ "rejected by peer: " ++ show (refDigest ref)
+ Rejected dgst -> do
+ logd $ "rejected by peer: " ++ show dgst
- DataRequest ref
- | secure || refDigest ref `elem` plaintextRefs -> do
- Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref
- addHeader $ DataResponse ref
- addAckedBy [ Acknowledged ref, Rejected ref ]
+ DataRequest dgst
+ | secure || dgst `elem` plaintextRefs -> do
+ Right mref <- liftSTM $ unsafeIOToSTM $
+ copyRef (peerStorage peer) $
+ partialRefFromDigest (peerInStorage peer) dgst
+ addHeader $ DataResponse dgst
+ addAckedBy [ Acknowledged dgst, Rejected dgst ]
addBody $ mref
| otherwise -> do
- logd $ "unauthorized data request for " ++ show ref
- addHeader $ Rejected ref
+ logd $ "unauthorized data request for " ++ show dgst
+ addHeader $ Rejected dgst
- DataResponse ref -> if
- | ref `elem` prefs -> do
- addHeader $ Acknowledged ref
- liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref)
- | otherwise -> throwError $ "mismatched data response " ++ show ref
+ DataResponse dgst -> if
+ | Just pref <- find ((==dgst) . refDigest) prefs -> do
+ addHeader $ Acknowledged dgst
+ liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref)
+ | otherwise -> throwError $ "mismatched data response " ++ show dgst
- AnnounceSelf pref
- | refDigest pref == refDigest (storedRef sidentity) -> return ()
+ AnnounceSelf dgst
+ | dgst == refDigest (storedRef sidentity) -> return ()
| otherwise -> do
- wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer
+ wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityUnknown idwait -> do
- let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity
- addHeader $ AnnounceSelf ref
+ addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity
writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer
_ -> return ()
- AnnounceUpdate ref -> do
+ AnnounceUpdate dgst -> do
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityFull _ -> do
- void $ newWaitingRef ref $ handleIdentityUpdate peer
- addHeader $ Acknowledged ref
+ void $ newWaitingRef dgst $ handleIdentityUpdate peer
+ addHeader $ Acknowledged dgst
_ -> return ()
- TrChannelRequest reqref -> do
+ TrChannelRequest dgst -> do
let process = do
- addHeader $ Acknowledged reqref
- wref <- newWaitingRef reqref $ handleChannelRequest peer identity
- writeTVarP (peerChannel peer) $ ChannelPeerRequest wref
- reject = addHeader $ Rejected reqref
-
- readTVarP (peerChannel peer) >>= \case
- ChannelWait {} -> process
- ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process
+ addHeader $ Acknowledged dgst
+ wref <- newWaitingRef dgst $ handleChannelRequest peer identity
+ liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref
+ reject = addHeader $ Rejected dgst
+
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelNone {} -> process
+ ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
| otherwise -> reject
ChannelPeerRequest {} -> process
ChannelOurAccept {} -> reject
ChannelEstablished {} -> process
- TrChannelAccept accref -> do
+ TrChannelAccept dgst -> do
let process = do
- handleChannelAccept identity accref
- readTVarP (peerChannel peer) >>= \case
- ChannelWait {} -> process
+ handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelNone {} -> process
ChannelOurRequest {} -> process
ChannelPeerRequest {} -> process
- ChannelOurAccept our _ | refDigest accref < refDigest (storedRef our) -> process
- | otherwise -> addHeader $ Rejected accref
+ ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> addHeader $ Rejected dgst
ChannelEstablished {} -> process
ServiceType _ -> return ()
- ServiceRef pref
+ ServiceRef dgst
| not secure -> throwError $ "service packet without secure channel"
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
- if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -}
+ if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
then do
- addHeader $ Acknowledged pref
- void $ newWaitingRef pref $ \ref ->
+ addHeader $ Acknowledged dgst
+ void $ newWaitingRef dgst $ \ref ->
liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show pref
- | otherwise -> addHeader $ Rejected pref
+ else throwError $ "missing service object " ++ show dgst
+ | otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
let logd = writeTQueue (serverErrorLog server)
@@ -647,7 +491,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
Right ph -> do
when (not $ null $ phHead ph) $ do
let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
- writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet)
+ sendToPeerS' secure peer (phAckedBy ph) packet
withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
@@ -660,18 +504,17 @@ withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer
setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback
setupChannel identity peer upid = do
req <- createChannelRequest (peerStorage peer) identity upid
- let ist = peerInStorage peer
- let reqref = partialRef ist $ storedRef req
+ let reqref = refDigest $ storedRef req
let hitems =
[ TrChannelRequest reqref
- , AnnounceSelf $ partialRef ist $ storedRef $ idData identity
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
]
liftIO $ atomically $ do
- readTVar (peerChannel peer) >>= \case
- ChannelWait -> do
+ getPeerChannel peer >>= \case
+ ChannelNone -> do
sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
TransportPacket (TransportHeader hitems) [storedRef req]
- writeTVar (peerChannel peer) $ ChannelOurRequest req
+ setPeerChannel peer $ ChannelOurRequest req
_ -> return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
@@ -679,10 +522,10 @@ handleChannelRequest peer identity req = do
withPeerIdentity peer $ \upid -> do
(acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req)
liftIO $ atomically $ do
- readTVar (peerChannel peer) >>= \case
+ getPeerChannel peer >>= \case
ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
- writeTVar (peerChannel peer) $ ChannelOurAccept acc ch
- let accref = (partialRef (peerInStorage peer) $ storedRef acc)
+ setPeerChannel peer $ ChannelOurAccept acc ch
+ let accref = refDigest $ storedRef acc
header = TrChannelAccept accref
ackedBy = [ Acknowledged accref, Rejected accref ]
sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat
@@ -702,32 +545,28 @@ handleChannelAccept identity accref = do
Right acc -> do
ch <- acceptedChannel identity upid (wrappedLoad acc)
liftIO $ atomically $ do
- sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []
- writeTVar (peerChannel peer) $ ChannelEstablished ch
- finalizedChannel peer identity
+ sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) []
+ finalizedChannel peer ch identity
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
-finalizedChannel :: Peer -> UnifiedIdentity -> STM ()
-finalizedChannel peer self = do
- let ist = peerInStorage peer
+finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM ()
+finalizedChannel peer@Peer {..} ch self = do
+ setPeerChannel peer $ ChannelEstablished ch
-- Identity update
- do
- let selfRef = partialRef ist $ storedRef $ idData $ self
- updateRefs = selfRef : map (partialRef ist . storedRef) (idUpdates self)
+ writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do
+ let selfRef = refDigest $ storedRef $ idData $ self
+ updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self)
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs
-- Notify services about new peer
- readTVar (peerIdentityVar peer) >>= \case
+ readTVar peerIdentityVar >>= \case
PeerIdentityFull _ -> notifyServicesOfPeer peer
_ -> return ()
- -- Outstanding service packets
- mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) []
-
handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
@@ -775,22 +614,14 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
mkPeer :: Server -> PeerAddress -> IO Peer
-mkPeer server paddr = do
- pst <- deriveEphemeralStorage $ serverStorage server
- peer <- Peer
- <$> pure paddr
- <*> pure server
- <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO [])
- <*> newTVarIO ChannelWait
- <*> pure pst
- <*> derivePartialStorage pst
- <*> newTQueueIO
- <*> newTVarIO []
- <*> newTMVarIO M.empty
- <*> newTVarIO []
- <*> newTMVarIO []
- forkServerThread server $ sendWorker peer
- return peer
+mkPeer peerServer_ peerAddress = do
+ peerConnection <- newTVarIO (Left [])
+ peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO []
+ peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_
+ peerInStorage <- derivePartialStorage peerStorage_
+ peerServiceState <- newTMVarIO M.empty
+ peerWaitingRefs <- newTMVarIO []
+ return Peer {..}
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer server paddr = do
@@ -798,10 +629,10 @@ serverPeer server paddr = do
serverPeer' server (DatagramAddress sock paddr)
serverPeerIce :: Server -> IceSession -> IO Peer
-serverPeerIce server ice = do
+serverPeerIce server@Server {..} ice = do
let paddr = PeerIceSession ice
peer <- serverPeer' server paddr
- iceSetChan ice (paddr,) $ serverChanPacket server
+ iceSetChan ice $ mapPath undefined (paddr,) serverRawPath
return peer
serverPeer' :: Server -> PeerAddress -> IO Peer
@@ -814,9 +645,10 @@ serverPeer' server paddr = do
return (M.insert paddr peer pvalue, (peer, True))
when hello $ do
identity <- serverIdentity server
- atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $
- TransportPacket
- (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ])
+ atomically $ do
+ writeFlow (serverNewConnection server) paddr
+ sendToPeerPlain peer [] $ TransportPacket
+ (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ])
[]
return peer
@@ -830,23 +662,28 @@ sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- pst = peerInStorage peer
srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
ServiceReply (Left x) use -> Just . (,use) <$> store st x
ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
ServiceFinally act -> act >> return Nothing
- prefs <- mapM (copyRef pst . fst) srefs
+ let dgsts = map (refDigest . fst) srefs
let content = map fst $ filter snd srefs
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)
+ header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ]
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
liftIO $ atomically $ sendToPeerS peer ackedBy packet
-sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM ()
-sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
+sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerS' secure Peer {..} ackedBy packet = do
+ readTVar peerConnection >>= \case
+ Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs
+ Right conn -> writeFlow (connData conn) (secure, packet, ackedBy)
+
+sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerS = sendToPeerS' True
-sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM ()
-sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet)
+sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerPlain = sendToPeerS' False
sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith peer fobj = do
diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs
new file mode 100644
index 0000000..adc9471
--- /dev/null
+++ b/src/Network/Protocol.hs
@@ -0,0 +1,325 @@
+module Network.Protocol (
+ TransportPacket(..),
+ transportToObject,
+ TransportHeader(..),
+ TransportHeaderItem(..),
+
+ WaitingRef(..),
+ WaitingRefCallback,
+ wrDigest,
+
+ ChannelState(..),
+
+ erebosNetworkProtocol,
+
+ Connection,
+ connAddress,
+ connData,
+ connGetChannel,
+ connSetChannel,
+
+ module Flow,
+) where
+
+import Control.Applicative
+import Control.Concurrent
+import Control.Concurrent.Async
+import Control.Concurrent.STM
+import Control.Monad
+import Control.Monad.Except
+
+import Data.ByteString (ByteString)
+import Data.ByteString.Char8 qualified as BC
+import Data.ByteString.Lazy qualified as BL
+import Data.List
+import Data.Maybe
+
+import System.Clock
+
+import Channel
+import Flow
+import Service
+import Storage
+
+
+data TransportPacket a = TransportPacket TransportHeader [a]
+
+data TransportHeader = TransportHeader [TransportHeaderItem]
+
+data TransportHeaderItem
+ = Acknowledged RefDigest
+ | Rejected RefDigest
+ | DataRequest RefDigest
+ | DataResponse RefDigest
+ | AnnounceSelf RefDigest
+ | AnnounceUpdate RefDigest
+ | TrChannelRequest RefDigest
+ | TrChannelAccept RefDigest
+ | ServiceType ServiceID
+ | ServiceRef RefDigest
+ deriving (Eq)
+
+transportToObject :: PartialStorage -> TransportHeader -> PartialObject
+transportToObject st (TransportHeader items) = Rec $ map single items
+ where single = \case
+ Acknowledged dgst -> (BC.pack "ACK", RecRef $ partialRefFromDigest st dgst)
+ Rejected dgst -> (BC.pack "REJ", RecRef $ partialRefFromDigest st dgst)
+ DataRequest dgst -> (BC.pack "REQ", RecRef $ partialRefFromDigest st dgst)
+ DataResponse dgst -> (BC.pack "RSP", RecRef $ partialRefFromDigest st dgst)
+ AnnounceSelf dgst -> (BC.pack "ANN", RecRef $ partialRefFromDigest st dgst)
+ AnnounceUpdate dgst -> (BC.pack "ANU", RecRef $ partialRefFromDigest st dgst)
+ TrChannelRequest dgst -> (BC.pack "CRQ", RecRef $ partialRefFromDigest st dgst)
+ TrChannelAccept dgst -> (BC.pack "CAC", RecRef $ partialRefFromDigest st dgst)
+ ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype)
+ ServiceRef dgst -> (BC.pack "SRF", RecRef $ partialRefFromDigest st dgst)
+
+transportFromObject :: PartialObject -> Maybe TransportHeader
+transportFromObject (Rec items) = case catMaybes $ map single items of
+ [] -> Nothing
+ titems -> Just $ TransportHeader titems
+ where single (name, content) = if
+ | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged $ refDigest ref
+ | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected $ refDigest ref
+ | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest $ refDigest ref
+ | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse $ refDigest ref
+ | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf $ refDigest ref
+ | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate $ refDigest ref
+ | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest $ refDigest ref
+ | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept $ refDigest ref
+ | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid
+ | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef $ refDigest ref
+ | otherwise -> Nothing
+transportFromObject _ = Nothing
+
+
+data GlobalState addr = (Eq addr, Show addr) => GlobalState
+ { gConnections :: TVar [Connection addr]
+ , gDataFlow :: SymFlow (addr, ByteString)
+ , gConnectionFlow :: Flow addr (Connection addr)
+ , gLog :: String -> STM ()
+ , gStorage :: PartialStorage
+ , gNowVar :: TVar TimeSpec
+ , gNextTimeout :: TVar TimeSpec
+ }
+
+data Connection addr = Connection
+ { cAddress :: addr
+ , cDataUp :: Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem])
+ , cDataInternal :: Flow (Bool, TransportPacket Ref, [TransportHeaderItem]) (Bool, TransportPacket PartialObject)
+ , cChannel :: TVar ChannelState
+ , cSecureOutQueue :: TQueue (Bool, TransportPacket Ref, [TransportHeaderItem])
+ , cSentPackets :: TVar [SentPacket]
+ }
+
+connAddress :: Connection addr -> addr
+connAddress = cAddress
+
+connData :: Connection addr -> Flow (Bool, TransportPacket PartialObject) (Bool, TransportPacket Ref, [TransportHeaderItem])
+connData = cDataUp
+
+connGetChannel :: Connection addr -> STM ChannelState
+connGetChannel Connection {..} = readTVar cChannel
+
+connSetChannel :: Connection addr -> ChannelState -> STM ()
+connSetChannel Connection {..} ch = do
+ writeTVar cChannel ch
+
+
+data WaitingRef = WaitingRef
+ { wrefStorage :: Storage
+ , wrefPartial :: PartialRef
+ , wrefAction :: Ref -> WaitingRefCallback
+ , wrefStatus :: TVar (Either [RefDigest] Ref)
+ }
+
+type WaitingRefCallback = ExceptT String IO ()
+
+wrDigest :: WaitingRef -> RefDigest
+wrDigest = refDigest . wrefPartial
+
+
+data ChannelState = ChannelNone
+ | ChannelOurRequest (Stored ChannelRequest)
+ | ChannelPeerRequest WaitingRef
+ | ChannelOurAccept (Stored ChannelAccept) Channel
+ | ChannelEstablished Channel
+
+
+data SentPacket = SentPacket
+ { spTime :: TimeSpec
+ , spRetryCount :: Int
+ , spAckedBy :: [TransportHeaderItem]
+ , spData :: BC.ByteString
+ }
+
+
+erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
+ => (String -> STM ())
+ -> SymFlow (addr, ByteString)
+ -> Flow addr (Connection addr)
+ -> IO ()
+erebosNetworkProtocol gLog gDataFlow gConnectionFlow = do
+ gConnections <- newTVarIO []
+ gStorage <- derivePartialStorage =<< memoryStorage
+
+ startTime <- getTime MonotonicRaw
+ gNowVar <- newTVarIO startTime
+ gNextTimeout <- newTVarIO startTime
+
+ let gs = GlobalState {..}
+
+ let signalTimeouts = forever $ do
+ now <- getTime MonotonicRaw
+ next <- atomically $ do
+ writeTVar gNowVar now
+ readTVar gNextTimeout
+
+ let waitTill time
+ | time > now = threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
+ | otherwise = threadDelay maxBound
+ waitForUpdate = atomically $ do
+ next' <- readTVar gNextTimeout
+ when (next' == next) retry
+
+ race_ (waitTill next) waitForUpdate
+
+ race_ signalTimeouts $ forever $ join $ atomically $
+ processIncomming gs <|> processOutgoing gs
+
+
+
+getConnection :: GlobalState addr -> addr -> STM (Connection addr)
+getConnection GlobalState {..} addr = do
+ conns <- readTVar gConnections
+ case find ((addr==) . cAddress) conns of
+ Just conn -> return conn
+ Nothing -> do
+ let cAddress = addr
+ (cDataUp, cDataInternal) <- newFlow
+ cChannel <- newTVar ChannelNone
+ cSecureOutQueue <- newTQueue
+ cSentPackets <- newTVar []
+ let conn = Connection {..}
+
+ writeTVar gConnections (conn : conns)
+ writeFlow gConnectionFlow conn
+ return conn
+
+processIncomming :: GlobalState addr -> STM (IO ())
+processIncomming gs@GlobalState {..} = do
+ (addr, msg) <- readFlow gDataFlow
+ conn@Connection {..} <- getConnection gs addr
+
+ mbch <- readTVar cChannel >>= return . \case
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ ch -> Just ch
+ _ -> Nothing
+
+ return $ do
+ (content, secure) <- do
+ if | Just ch <- mbch
+ -> runExceptT (channelDecrypt ch msg) >>= \case
+ Right plain -> return (plain, True)
+ _ -> return (msg, False)
+
+ | otherwise
+ -> return (msg, False)
+
+ case runExcept $ deserializeObjects gStorage $ BL.fromStrict content of
+ Right (obj:objs)
+ | Just header@(TransportHeader items) <- transportFromObject obj -> atomically $ do
+ processAcknowledgements gs conn items
+ writeFlow cDataInternal (secure, TransportPacket header objs)
+
+ | otherwise -> atomically $ do
+ gLog $ show cAddress ++ ": invalid objects"
+ gLog $ show objs
+
+ _ -> do atomically $ gLog $ show cAddress ++ ": invalid objects"
+
+
+processOutgoing :: forall addr. GlobalState addr -> STM (IO ())
+processOutgoing gs@GlobalState {..} = do
+ let sendBytes :: Connection addr -> SentPacket -> IO ()
+ sendBytes Connection {..} sp = do
+ now <- getTime MonotonicRaw
+ atomically $ do
+ when (not $ null $ spAckedBy sp) $ do
+ modifyTVar' cSentPackets $ (:) sp
+ { spTime = now
+ , spRetryCount = spRetryCount sp + 1
+ }
+ writeFlow gDataFlow (cAddress, spData sp)
+
+ let sendNextPacket :: Connection addr -> STM (IO ())
+ sendNextPacket conn@Connection {..} = do
+ mbch <- readTVar cChannel >>= return . \case
+ ChannelEstablished ch -> Just ch
+ _ -> Nothing
+
+ let checkOutstanding
+ | isJust mbch = readTQueue cSecureOutQueue
+ | otherwise = retry
+
+ (secure, packet@(TransportPacket header content), ackedBy) <-
+ checkOutstanding <|> readFlow cDataInternal
+
+ let plain = BL.toStrict $ BL.concat $
+ (serializeObject $ transportToObject gStorage header)
+ : map lazyLoadBytes content
+
+ when (isNothing mbch && secure) $ do
+ writeTQueue cSecureOutQueue (secure, packet, ackedBy)
+
+ return $ do
+ mbs <- case mbch of
+ Just ch -> do
+ runExceptT (channelEncrypt ch plain) >>= \case
+ Right ctext -> return $ Just ctext
+ Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err
+ return Nothing
+ Nothing | secure -> return Nothing
+ | otherwise -> return $ Just plain
+
+ case mbs of
+ Just bs -> do
+ sendBytes conn $ SentPacket
+ { spTime = undefined
+ , spRetryCount = -1
+ , spAckedBy = ackedBy
+ , spData = bs
+ }
+ Nothing -> return ()
+
+ let retransmitPacket :: Connection addr -> STM (IO ())
+ retransmitPacket conn@Connection {..} = do
+ now <- readTVar gNowVar
+ (sp, rest) <- readTVar cSentPackets >>= \case
+ sps@(_:_) -> return (last sps, init sps)
+ _ -> retry
+ let nextTry = spTime sp + fromNanoSecs 1000000000
+ if now < nextTry
+ then do
+ nextTimeout <- readTVar gNextTimeout
+ if nextTimeout <= now || nextTry < nextTimeout
+ then do writeTVar gNextTimeout nextTry
+ return $ return ()
+ else retry
+ else do
+ writeTVar cSentPackets rest
+ return $ sendBytes conn sp
+
+ let establishNewConnection = do
+ _ <- getConnection gs =<< readFlow gConnectionFlow
+ return $ return ()
+
+ conns <- readTVar gConnections
+ msum $ concat $
+ [ map retransmitPacket conns
+ , map sendNextPacket conns
+ , [ establishNewConnection ]
+ ]
+
+processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM ()
+processAcknowledgements GlobalState {} Connection {..} = mapM_ $ \hitem -> do
+ modifyTVar' cSentPackets $ filter $ (hitem `notElem`) . spAckedBy