summaryrefslogtreecommitdiff
path: root/src/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network.hs')
-rw-r--r--src/Network.hs451
1 files changed, 144 insertions, 307 deletions
diff --git a/src/Network.hs b/src/Network.hs
index da786c6..787bff9 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -8,7 +8,6 @@ module Network (
Peer, peerServer, peerStorage,
PeerAddress(..), peerAddress,
PeerIdentity(..), peerIdentity,
- PeerChannel(..),
WaitingRef, wrDigest,
Service(..),
serverPeer, serverPeerIce,
@@ -18,7 +17,6 @@ module Network (
discoveryPort,
) where
-import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
@@ -45,11 +43,10 @@ import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket
import qualified Network.Socket.ByteString as S
-import System.Clock
-
import Channel
import ICE
import Identity
+import Network.Protocol
import PubKey
import Service
import State
@@ -70,7 +67,8 @@ data Server = Server
, serverIdentity_ :: MVar UnifiedIdentity
, serverThreads :: MVar [ThreadId]
, serverSocket :: MVar Socket
- , serverChanPacket :: Chan (PeerAddress, BC.ByteString)
+ , serverRawPath :: SymFlow (PeerAddress, BC.ByteString)
+ , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
, serverIOActions :: TQueue (ExceptT String IO ())
, serverServices :: [SomeService]
@@ -101,30 +99,29 @@ defaultServerOptions = ServerOptions
data Peer = Peer
{ peerAddress :: PeerAddress
, peerServer_ :: Server
+ , peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress))
, peerIdentityVar :: TVar PeerIdentity
- , peerChannel :: TVar PeerChannel
, peerStorage_ :: Storage
, peerInStorage :: PartialStorage
- , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket)
- , peerSentPackets :: TVar [SentPacket]
, peerServiceState :: TMVar (M.Map ServiceID SomeServiceState)
- , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)]
, peerWaitingRefs :: TMVar [WaitingRef]
}
-data SentPacket = SentPacket
- { spTime :: TimeSpec
- , spRetryCount :: Int
- , spAckedBy :: [TransportHeaderItem]
- , spData :: BC.ByteString
- }
-
peerServer :: Peer -> Server
peerServer = peerServer_
peerStorage :: Peer -> Storage
peerStorage = peerStorage_
+getPeerChannel :: Peer -> STM ChannelState
+getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection
+
+setPeerChannel :: Peer -> ChannelState -> STM ()
+setPeerChannel Peer {..} ch = do
+ readTVar peerConnection >>= \case
+ Left _ -> retry
+ Right conn -> connSetChannel conn ch
+
instance Eq Peer where
(==) = (==) `on` peerIdentityVar
@@ -157,89 +154,21 @@ data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String
| PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])
| PeerIdentityFull UnifiedIdentity
-data PeerChannel = ChannelWait
- | ChannelOurRequest (Stored ChannelRequest)
- | ChannelPeerRequest WaitingRef
- | ChannelOurAccept (Stored ChannelAccept) Channel
- | ChannelEstablished Channel
-
peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
-data TransportPacket = TransportPacket TransportHeader [Ref]
-
-
-data TransportHeaderItem
- = Acknowledged PartialRef
- | Rejected PartialRef
- | DataRequest PartialRef
- | DataResponse PartialRef
- | AnnounceSelf PartialRef
- | AnnounceUpdate PartialRef
- | TrChannelRequest PartialRef
- | TrChannelAccept PartialRef
- | ServiceType ServiceID
- | ServiceRef PartialRef
- deriving (Eq)
-
-data TransportHeader = TransportHeader [TransportHeaderItem]
-
-transportToObject :: TransportHeader -> PartialObject
-transportToObject (TransportHeader items) = Rec $ map single items
- where single = \case
- Acknowledged ref -> (BC.pack "ACK", RecRef ref)
- Rejected ref -> (BC.pack "REJ", RecRef ref)
- DataRequest ref -> (BC.pack "REQ", RecRef ref)
- DataResponse ref -> (BC.pack "RSP", RecRef ref)
- AnnounceSelf ref -> (BC.pack "ANN", RecRef ref)
- AnnounceUpdate ref -> (BC.pack "ANU", RecRef ref)
- TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref)
- TrChannelAccept ref -> (BC.pack "CAC", RecRef ref)
- ServiceType stype -> (BC.pack "STP", RecUUID $ toUUID stype)
- ServiceRef ref -> (BC.pack "SRF", RecRef ref)
-
-transportFromObject :: PartialObject -> Maybe TransportHeader
-transportFromObject (Rec items) = case catMaybes $ map single items of
- [] -> Nothing
- titems -> Just $ TransportHeader titems
- where single (name, content) = if
- | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged ref
- | name == BC.pack "REJ", RecRef ref <- content -> Just $ Rejected ref
- | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref
- | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref
- | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref
- | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate ref
- | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref
- | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref
- | name == BC.pack "STP", RecUUID uuid <- content -> Just $ ServiceType $ fromUUID uuid
- | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef ref
- | otherwise -> Nothing
-transportFromObject _ = Nothing
-
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType (ServiceType stype : _) = Just stype
lookupServiceType (_ : hs) = lookupServiceType hs
lookupServiceType [] = Nothing
-data WaitingRef = WaitingRef
- { wrefStorage :: Storage
- , wrefPartial :: PartialRef
- , wrefAction :: Ref -> WaitingRefCallback
- , wrefStatus :: TVar (Either [RefDigest] Ref)
- }
-
-type WaitingRefCallback = ExceptT String IO ()
-
-wrDigest :: WaitingRef -> RefDigest
-wrDigest = refDigest . wrefPartial
-
-newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
-newWaitingRef pref act = do
- peer <- gets phPeer
- wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left []))
- modifyTMVarP (peerWaitingRefs peer) (wref:)
+newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
+newWaitingRef dgst act = do
+ peer@Peer {..} <- gets phPeer
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left []))
+ modifyTMVarP peerWaitingRefs (wref:)
liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)
return wref
@@ -254,7 +183,8 @@ startServer opt serverOrigHead logd' serverServices = do
serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead
serverThreads <- newMVar []
serverSocket <- newEmptyMVar
- serverChanPacket <- newChan
+ (serverRawPath, protocolRawPath) <- newFlowIO
+ (serverNewConnection, protocolNewConnection) <- newFlowIO
serverDataResponse <- newTQueueIO
serverIOActions <- newTQueueIO
serverServiceStates <- newTMVarIO M.empty
@@ -289,23 +219,23 @@ startServer opt serverOrigHead logd' serverServices = do
when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
readMVar serverIdentity_ >>= \identity -> do
st <- derivePartialStorage serverStorage
- let packet = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef $ idData identity ]
+ let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]
mapM_ (void . S.sendTo sock packet) broadcastAddreses
threadDelay $ announceIntervalSeconds * 1000 * 1000
let announceUpdate identity = do
st <- derivePartialStorage serverStorage
let selfRef = partialRef st $ storedRef $ idData identity
- updateRefs = selfRef : map (partialRef st . storedRef) (idUpdates identity)
+ updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity)
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
hitems = map AnnounceUpdate updateRefs
packet = TransportPacket (TransportHeader $ hitems) []
ps <- readMVar serverPeers
forM_ ps $ \peer -> atomically $ do
- ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case
+ ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case
(PeerIdentityFull _, ChannelEstablished _) ->
- writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
+ sendToPeerS peer ackedBy packet
_ -> return ()
void $ watchHead serverOrigHead $ \h -> do
@@ -325,42 +255,38 @@ startServer opt serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
- writeChan serverChanPacket (DatagramAddress sock saddr, msg)
+ writeFlowIO serverRawPath (DatagramAddress sock saddr, msg)
- forever $ do
- (paddr, msg) <- readChan serverChanPacket
- (peer, content, secure) <- modifyMVar serverPeers $ \pvalue -> do
- case M.lookup paddr pvalue of
- Just peer -> do
- mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ ch -> Just ch
- _ -> Nothing
-
- if | Just ch <- mbch
- , Right plain <- runExcept $ channelDecrypt ch msg
- -> return (pvalue, (peer, plain, True))
-
- | otherwise
- -> return (pvalue, (peer, msg, False))
+ forkServerThread server $ forever $ do
+ (paddr, msg) <- readFlowIO serverRawPath
+ case paddr of
+ DatagramAddress _ addr -> void $ S.sendTo sock msg addr
+ PeerIceSession ice -> iceSend ice msg
+ forkServerThread server $ forever $ do
+ conn <- readFlowIO serverNewConnection
+ let paddr = connAddress conn
+ peer <- modifyMVar serverPeers $ \pvalue -> do
+ case M.lookup paddr pvalue of
+ Just peer -> return (pvalue, peer)
Nothing -> do
peer <- mkPeer server paddr
- return (M.insert paddr peer pvalue, (peer, msg, False))
+ return (M.insert paddr peer pvalue, peer)
- case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
- Right (obj:objs)
- | Just header <- transportFromObject obj -> do
- prefs <- forM objs $ storeObject $ peerInStorage peer
- identity <- readMVar serverIdentity_
- let svcs = map someServiceID serverServices
- handlePacket identity secure peer chanSvc svcs header prefs
+ atomically $ do
+ readTVar (peerConnection peer) >>= \case
+ Left packets -> writeFlowBulk (connData conn) $ reverse packets
+ Right _ -> return ()
+ writeTVar (peerConnection peer) (Right conn)
- | otherwise -> atomically $ do
- logd $ show paddr ++ ": invalid objects"
- logd $ show objs
+ forkServerThread server $ forever $ do
+ (secure, TransportPacket header objs) <- readFlowIO $ connData conn
+ prefs <- forM objs $ storeObject $ peerInStorage peer
+ identity <- readMVar serverIdentity_
+ let svcs = map someServiceID serverServices
+ handlePacket identity secure peer chanSvc svcs header prefs
- _ -> do atomically $ logd $ show paddr ++ ": invalid objects"
+ erebosNetworkProtocol logd protocolRawPath protocolNewConnection
forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
@@ -383,87 +309,6 @@ stopServer :: Server -> IO ()
stopServer Server {..} = do
mapM_ killThread =<< takeMVar serverThreads
-sendWorker :: Peer -> IO ()
-sendWorker peer = do
- startTime <- getTime MonotonicRaw
- nowVar <- newTVarIO startTime
- waitVar <- newTVarIO startTime
-
- let waitTill time = forkServerThread (peerServer peer) $ do
- now <- getTime MonotonicRaw
- when (time > now) $
- threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000
- atomically . writeTVar nowVar =<< getTime MonotonicRaw
-
- let sendBytes sp = do
- when (not $ null $ spAckedBy sp) $ do
- now <- getTime MonotonicRaw
- atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp
- { spTime = now
- , spRetryCount = spRetryCount sp + 1
- }
- case peerAddress peer of
- DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr
- PeerIceSession ice -> iceSend ice (spData sp)
-
- let sendNextPacket = do
- (secure, ackedBy, packet@(TransportPacket header content)) <-
- readTQueue (peerOutQueue peer)
-
- let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer)
- let plain = BL.toStrict $ BL.concat $
- (serializeObject $ transportToObject header)
- : map lazyLoadBytes content
-
- mbch <- readTVar (peerChannel peer) >>= \case
- ChannelEstablished ch -> return (Just ch)
- _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):)
- return Nothing
-
- return $ do
- mbs <- case mbch of
- Just ch -> do
- runExceptT (channelEncrypt ch plain) >>= \case
- Right ctext -> return $ Just ctext
- Left err -> do logd $ "Failed to encrypt data: " ++ err
- return Nothing
- Nothing | secure -> return Nothing
- | otherwise -> return $ Just plain
-
- case mbs of
- Just bs -> do
- sendBytes $ SentPacket
- { spTime = undefined
- , spRetryCount = -1
- , spAckedBy = ackedBy
- , spData = bs
- }
- Nothing -> return ()
-
- let retransmitPacket = do
- now <- readTVar nowVar
- (sp, rest) <- readTVar (peerSentPackets peer) >>= \case
- sps@(_:_) -> return (last sps, init sps)
- _ -> retry
- let nextTry = spTime sp + fromNanoSecs 1000000000
- if now < nextTry
- then do
- wait <- readTVar waitVar
- if wait <= now || nextTry < wait
- then do writeTVar waitVar nextTry
- return $ waitTill nextTry
- else retry
- else do
- writeTVar (peerSentPackets peer) rest
- return $ sendBytes sp
-
- forever $ join $ atomically $ do
- retransmitPacket <|> sendNextPacket
-
-processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM ()
-processAcknowledgements peer = mapM_ $ \hitem -> do
- modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy
-
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
(peer, npref) <- atomically (readTQueue $ serverDataResponse server)
@@ -489,7 +334,7 @@ dataResponseWorker server = forever $ do
Right _ -> return (Nothing, [])
atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list
- let reqs = concat $ map snd list
+ let reqs = map refDigest $ concat $ map snd list
when (not $ null reqs) $ do
let packet = TransportPacket (TransportHeader $ map DataRequest reqs) []
ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ]
@@ -540,8 +385,7 @@ handlePacket :: UnifiedIdentity -> Bool
-> TransportHeader -> [PartialRef] -> IO ()
handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
- processAcknowledgements peer headers
- ochannel <- readTVar $ peerChannel peer
+ ochannel <- getPeerChannel peer
let sidentity = idData identity
plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat
[ [ storedRef sidentity ]
@@ -555,89 +399,89 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do
let logd = liftSTM . writeTQueue (serverErrorLog server)
forM_ headers $ \case
- Acknowledged ref -> do
- readTVarP (peerChannel peer) >>= \case
- ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
- writeTVarP (peerChannel peer) $ ChannelEstablished ch
- liftSTM $ finalizedChannel peer identity
+ Acknowledged dgst -> do
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do
+ liftSTM $ finalizedChannel peer ch identity
_ -> return ()
- Rejected ref -> do
- logd $ "rejected by peer: " ++ show (refDigest ref)
+ Rejected dgst -> do
+ logd $ "rejected by peer: " ++ show dgst
- DataRequest ref
- | secure || refDigest ref `elem` plaintextRefs -> do
- Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref
- addHeader $ DataResponse ref
- addAckedBy [ Acknowledged ref, Rejected ref ]
+ DataRequest dgst
+ | secure || dgst `elem` plaintextRefs -> do
+ Right mref <- liftSTM $ unsafeIOToSTM $
+ copyRef (peerStorage peer) $
+ partialRefFromDigest (peerInStorage peer) dgst
+ addHeader $ DataResponse dgst
+ addAckedBy [ Acknowledged dgst, Rejected dgst ]
addBody $ mref
| otherwise -> do
- logd $ "unauthorized data request for " ++ show ref
- addHeader $ Rejected ref
+ logd $ "unauthorized data request for " ++ show dgst
+ addHeader $ Rejected dgst
- DataResponse ref -> if
- | ref `elem` prefs -> do
- addHeader $ Acknowledged ref
- liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref)
- | otherwise -> throwError $ "mismatched data response " ++ show ref
+ DataResponse dgst -> if
+ | Just pref <- find ((==dgst) . refDigest) prefs -> do
+ addHeader $ Acknowledged dgst
+ liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref)
+ | otherwise -> throwError $ "mismatched data response " ++ show dgst
- AnnounceSelf pref
- | refDigest pref == refDigest (storedRef sidentity) -> return ()
+ AnnounceSelf dgst
+ | dgst == refDigest (storedRef sidentity) -> return ()
| otherwise -> do
- wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer
+ wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityUnknown idwait -> do
- let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity
- addHeader $ AnnounceSelf ref
+ addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity
writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer
_ -> return ()
- AnnounceUpdate ref -> do
+ AnnounceUpdate dgst -> do
readTVarP (peerIdentityVar peer) >>= \case
PeerIdentityFull _ -> do
- void $ newWaitingRef ref $ handleIdentityUpdate peer
- addHeader $ Acknowledged ref
+ void $ newWaitingRef dgst $ handleIdentityUpdate peer
+ addHeader $ Acknowledged dgst
_ -> return ()
- TrChannelRequest reqref -> do
+ TrChannelRequest dgst -> do
let process = do
- addHeader $ Acknowledged reqref
- wref <- newWaitingRef reqref $ handleChannelRequest peer identity
- writeTVarP (peerChannel peer) $ ChannelPeerRequest wref
- reject = addHeader $ Rejected reqref
-
- readTVarP (peerChannel peer) >>= \case
- ChannelWait {} -> process
- ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process
+ addHeader $ Acknowledged dgst
+ wref <- newWaitingRef dgst $ handleChannelRequest peer identity
+ liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref
+ reject = addHeader $ Rejected dgst
+
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelNone {} -> process
+ ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
| otherwise -> reject
ChannelPeerRequest {} -> process
ChannelOurAccept {} -> reject
ChannelEstablished {} -> process
- TrChannelAccept accref -> do
+ TrChannelAccept dgst -> do
let process = do
- handleChannelAccept identity accref
- readTVarP (peerChannel peer) >>= \case
- ChannelWait {} -> process
+ handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelNone {} -> process
ChannelOurRequest {} -> process
ChannelPeerRequest {} -> process
- ChannelOurAccept our _ | refDigest accref < refDigest (storedRef our) -> process
- | otherwise -> addHeader $ Rejected accref
+ ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> addHeader $ Rejected dgst
ChannelEstablished {} -> process
ServiceType _ -> return ()
- ServiceRef pref
+ ServiceRef dgst
| not secure -> throwError $ "service packet without secure channel"
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
- if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -}
+ if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
then do
- addHeader $ Acknowledged pref
- void $ newWaitingRef pref $ \ref ->
+ addHeader $ Acknowledged dgst
+ void $ newWaitingRef dgst $ \ref ->
liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
- else throwError $ "missing service object " ++ show pref
- | otherwise -> addHeader $ Rejected pref
+ else throwError $ "missing service object " ++ show dgst
+ | otherwise -> addHeader $ Rejected dgst
| otherwise -> throwError $ "service ref without type"
let logd = writeTQueue (serverErrorLog server)
@@ -647,7 +491,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
Right ph -> do
when (not $ null $ phHead ph) $ do
let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
- writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet)
+ sendToPeerS' secure peer (phAckedBy ph) packet
withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
@@ -660,18 +504,17 @@ withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer
setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback
setupChannel identity peer upid = do
req <- createChannelRequest (peerStorage peer) identity upid
- let ist = peerInStorage peer
- let reqref = partialRef ist $ storedRef req
+ let reqref = refDigest $ storedRef req
let hitems =
[ TrChannelRequest reqref
- , AnnounceSelf $ partialRef ist $ storedRef $ idData identity
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
]
liftIO $ atomically $ do
- readTVar (peerChannel peer) >>= \case
- ChannelWait -> do
+ getPeerChannel peer >>= \case
+ ChannelNone -> do
sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
TransportPacket (TransportHeader hitems) [storedRef req]
- writeTVar (peerChannel peer) $ ChannelOurRequest req
+ setPeerChannel peer $ ChannelOurRequest req
_ -> return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
@@ -679,10 +522,10 @@ handleChannelRequest peer identity req = do
withPeerIdentity peer $ \upid -> do
(acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req)
liftIO $ atomically $ do
- readTVar (peerChannel peer) >>= \case
+ getPeerChannel peer >>= \case
ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
- writeTVar (peerChannel peer) $ ChannelOurAccept acc ch
- let accref = (partialRef (peerInStorage peer) $ storedRef acc)
+ setPeerChannel peer $ ChannelOurAccept acc ch
+ let accref = refDigest $ storedRef acc
header = TrChannelAccept accref
ackedBy = [ Acknowledged accref, Rejected accref ]
sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat
@@ -702,32 +545,28 @@ handleChannelAccept identity accref = do
Right acc -> do
ch <- acceptedChannel identity upid (wrappedLoad acc)
liftIO $ atomically $ do
- sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []
- writeTVar (peerChannel peer) $ ChannelEstablished ch
- finalizedChannel peer identity
+ sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) []
+ finalizedChannel peer ch identity
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
-finalizedChannel :: Peer -> UnifiedIdentity -> STM ()
-finalizedChannel peer self = do
- let ist = peerInStorage peer
+finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM ()
+finalizedChannel peer@Peer {..} ch self = do
+ setPeerChannel peer $ ChannelEstablished ch
-- Identity update
- do
- let selfRef = partialRef ist $ storedRef $ idData $ self
- updateRefs = selfRef : map (partialRef ist . storedRef) (idUpdates self)
+ writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do
+ let selfRef = refDigest $ storedRef $ idData $ self
+ updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self)
ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs
-- Notify services about new peer
- readTVar (peerIdentityVar peer) >>= \case
+ readTVar peerIdentityVar >>= \case
PeerIdentityFull _ -> notifyServicesOfPeer peer
_ -> return ()
- -- Outstanding service packets
- mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) []
-
handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
@@ -775,22 +614,14 @@ notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
mkPeer :: Server -> PeerAddress -> IO Peer
-mkPeer server paddr = do
- pst <- deriveEphemeralStorage $ serverStorage server
- peer <- Peer
- <$> pure paddr
- <*> pure server
- <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO [])
- <*> newTVarIO ChannelWait
- <*> pure pst
- <*> derivePartialStorage pst
- <*> newTQueueIO
- <*> newTVarIO []
- <*> newTMVarIO M.empty
- <*> newTVarIO []
- <*> newTMVarIO []
- forkServerThread server $ sendWorker peer
- return peer
+mkPeer peerServer_ peerAddress = do
+ peerConnection <- newTVarIO (Left [])
+ peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO []
+ peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_
+ peerInStorage <- derivePartialStorage peerStorage_
+ peerServiceState <- newTMVarIO M.empty
+ peerWaitingRefs <- newTMVarIO []
+ return Peer {..}
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer server paddr = do
@@ -798,10 +629,10 @@ serverPeer server paddr = do
serverPeer' server (DatagramAddress sock paddr)
serverPeerIce :: Server -> IceSession -> IO Peer
-serverPeerIce server ice = do
+serverPeerIce server@Server {..} ice = do
let paddr = PeerIceSession ice
peer <- serverPeer' server paddr
- iceSetChan ice (paddr,) $ serverChanPacket server
+ iceSetChan ice $ mapPath undefined (paddr,) serverRawPath
return peer
serverPeer' :: Server -> PeerAddress -> IO Peer
@@ -814,9 +645,10 @@ serverPeer' server paddr = do
return (M.insert paddr peer pvalue, (peer, True))
when hello $ do
identity <- serverIdentity server
- atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $
- TransportPacket
- (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ])
+ atomically $ do
+ writeFlow (serverNewConnection server) paddr
+ sendToPeerPlain peer [] $ TransportPacket
+ (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ])
[]
return peer
@@ -830,23 +662,28 @@ sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList peer parts = do
let st = peerStorage peer
- pst = peerInStorage peer
srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
ServiceReply (Left x) use -> Just . (,use) <$> store st x
ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
ServiceFinally act -> act >> return Nothing
- prefs <- mapM (copyRef pst . fst) srefs
+ let dgsts = map (refDigest . fst) srefs
let content = map fst $ filter snd srefs
- header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)
+ header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
packet = TransportPacket header content
- ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ]
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
liftIO $ atomically $ sendToPeerS peer ackedBy packet
-sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM ()
-sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet)
+sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerS' secure Peer {..} ackedBy packet = do
+ readTVar peerConnection >>= \case
+ Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs
+ Right conn -> writeFlow (connData conn) (secure, packet, ackedBy)
+
+sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerS = sendToPeerS' True
-sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM ()
-sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet)
+sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerPlain = sendToPeerS' False
sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith peer fobj = do