summaryrefslogtreecommitdiff
path: root/src/Network.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2020-12-20 21:47:22 +0100
committerRoman Smrž <roman.smrz@seznam.cz>2020-12-23 22:32:09 +0100
commit36b9a1ddbddf1477c61809d340cd0b86360a7a83 (patch)
tree7b327df1b1635270e98391ec1cf63478b8730793 /src/Network.hs
parent0c4c6618d43a8b7179f11b8edb1f289169b5f2bc (diff)
Network: STM-based synchronization rewrite
Diffstat (limited to 'src/Network.hs')
-rw-r--r--src/Network.hs825
1 files changed, 422 insertions, 403 deletions
diff --git a/src/Network.hs b/src/Network.hs
index cbc68b6..52d83bb 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -1,11 +1,11 @@
module Network (
Server,
startServer,
- serverChanPeer,
+ getNextPeerChange,
- Peer(..),
- PeerAddress(..),
- PeerIdentity(..),
+ Peer,
+ PeerAddress(..), peerAddress,
+ PeerIdentity(..), peerIdentity,
PeerChannel(..),
WaitingRef, wrDigest,
Service(..),
@@ -16,6 +16,7 @@ module Network (
) where
import Control.Concurrent
+import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Except
@@ -23,13 +24,15 @@ import Control.Monad.State
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
-import Data.Either
+import Data.Function
import Data.List
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe
import Data.Typeable
+import GHC.Conc.Sync (unsafeIOToSTM)
+
import Network.Socket
import qualified Network.Socket.ByteString as S
@@ -55,27 +58,33 @@ data Server = Server
, serverIdentity :: MVar UnifiedIdentity
, serverSocket :: MVar Socket
, serverChanPacket :: Chan (PeerAddress, BC.ByteString)
+ , serverOutQueue :: TQueue (Peer, Bool, TransportPacket)
+ , serverDataResponse :: TQueue (Peer, Maybe PartialRef)
+ , serverIOActions :: TQueue (ExceptT String IO ())
, serverPeers :: MVar (Map PeerAddress Peer)
- , serverChanPeer' :: Chan Peer
+ , serverChanPeer :: TChan Peer
+ , serverErrorLog :: TQueue String
}
-serverChanPeer :: Server -> Chan Peer
-serverChanPeer = serverChanPeer'
+getNextPeerChange :: Server -> IO Peer
+getNextPeerChange = atomically . readTChan . serverChanPeer
data Peer = Peer
{ peerAddress :: PeerAddress
- , peerIdentity :: PeerIdentity
- , peerIdentityUpdate :: [WaitingRef]
- , peerChannel :: PeerChannel
+ , peerServer :: Server
+ , peerIdentityVar :: TVar PeerIdentity
+ , peerChannel :: TVar PeerChannel
, peerStorage :: Storage
, peerInStorage :: PartialStorage
- , peerServiceState :: MVar (M.Map ServiceID SomeServiceState)
- , peerServiceInQueue :: [(ServiceID, WaitingRef)]
- , peerServiceOutQueue :: MVar [TransportPacket]
- , peerWaitingRefs :: [WaitingRef]
+ , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState)
+ , peerServiceOutQueue :: TVar [TransportPacket]
+ , peerWaitingRefs :: TMVar [WaitingRef]
}
+instance Eq Peer where
+ (==) = (==) `on` peerIdentityVar
+
data PeerAddress = DatagramAddress Socket SockAddr
| PeerIceSession IceSession
@@ -95,8 +104,8 @@ instance Ord PeerAddress where
compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
-data PeerIdentity = PeerIdentityUnknown
- | PeerIdentityRef WaitingRef
+data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()])
+ | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])
| PeerIdentityFull UnifiedIdentity
data PeerChannel = ChannelWait
@@ -105,6 +114,9 @@ data PeerChannel = ChannelWait
| ChannelOurAccept (Stored ChannelAccept) (Stored Channel)
| ChannelEstablished Channel
+peerIdentity :: MonadIO m => Peer -> m PeerIdentity
+peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
+
data TransportPacket = TransportPacket TransportHeader [Ref]
@@ -162,53 +174,65 @@ lookupServiceType (_ : hs) = lookupServiceType hs
lookupServiceType [] = Nothing
-data WaitingRef = WaitingRef Storage PartialRef (MVar [RefDigest])
+data WaitingRef = WaitingRef
+ { wrefStorage :: Storage
+ , wrefPartial :: PartialRef
+ , wrefAction :: Ref -> WaitingRefCallback
+ , wrefStatus :: TVar (Either [RefDigest] Ref)
+ }
+
+type WaitingRefCallback = ExceptT String IO ()
wrDigest :: WaitingRef -> RefDigest
-wrDigest (WaitingRef _ pref _) = refDigest pref
+wrDigest = refDigest . wrefPartial
-newWaitingRef :: Storage -> PartialRef -> PacketHandler WaitingRef
-newWaitingRef st pref = do
- wref <- WaitingRef st pref <$> liftIO (newMVar [])
- updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p }
+newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
+newWaitingRef pref act = do
+ peer <- gets phPeer
+ wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left []))
+ modifyTMVarP (peerWaitingRefs peer) (wref:)
+ liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)
return wref
-copyOrRequestRef :: Storage -> PartialRef -> PacketHandler (Either WaitingRef Ref)
-copyOrRequestRef st pref = copyRef st pref >>= \case
- Right ref -> return $ Right ref
- Left dgst -> do
- addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst
- wref <- WaitingRef st pref <$> liftIO (newMVar [dgst])
- updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p }
- return $ Left wref
-
-checkWaitingRef :: WaitingRef -> PacketHandler (Maybe Ref)
-checkWaitingRef (WaitingRef st pref mvar) = do
- liftIO (readMVar mvar) >>= \case
- [] -> copyRef st pref >>= \case
- Right ref -> return $ Just ref
- Left dgst -> do liftIO $ modifyMVar_ mvar $ return . (dgst:)
- addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst
- return Nothing
- _ -> return Nothing
-
-receivedWaitingRef :: PartialRef -> WaitingRef -> PacketHandler (Maybe Ref)
-receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do
- liftIO $ modifyMVar_ mvar $ return . filter (/= refDigest nref)
- checkWaitingRef wr
-
startServer :: Head LocalState -> (String -> IO ()) -> String -> [SomeService] -> IO Server
-startServer origHead logd bhost services = do
+startServer origHead logd' bhost services = do
let storage = refStorage $ headRef origHead
chanPacket <- newChan
- chanPeer <- newChan
- chanSvc <- newChan
- svcStates <- newMVar M.empty
+ outQueue <- newTQueueIO
+ dataResponse <- newTQueueIO
+ ioActions <- newTQueueIO
+ chanPeer <- newTChanIO
+ chanSvc <- newTQueueIO
+ svcStates <- newTMVarIO M.empty
peers <- newMVar M.empty
midentity <- newMVar $ headLocalIdentity origHead
mshared <- newMVar $ lsShared $ load $ headRef origHead
ssocket <- newEmptyMVar
+ errlog <- newTQueueIO
+
+ let server = Server
+ { serverStorage = storage
+ , serverIdentity = midentity
+ , serverSocket = ssocket
+ , serverChanPacket = chanPacket
+ , serverOutQueue = outQueue
+ , serverDataResponse = dataResponse
+ , serverIOActions = ioActions
+ , serverPeers = peers
+ , serverChanPeer = chanPeer
+ , serverErrorLog = errlog
+ }
+
+ let logd = writeTQueue errlog
+ void $ forkIO $ forever $ do
+ logd' =<< atomically (readTQueue errlog)
+
+ void $ forkIO $ sendWorker server
+ void $ forkIO $ dataResponseWorker server
+ void $ forkIO $ forever $ do
+ either (atomically . logd) return =<< runExceptT =<<
+ atomically (readTQueue $ serverIOActions server)
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
@@ -229,27 +253,26 @@ startServer origHead logd bhost services = do
let announceUpdate identity = do
st <- derivePartialStorage storage
- let plaintext = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader $
- (AnnounceSelf $ partialRef st $ storedRef $ idData identity) :
+ let hitems = (AnnounceSelf $ partialRef st $ storedRef $ idData identity) :
map (AnnounceUpdate . partialRef st . storedRef) (idUpdates identity)
+ let packet = TransportPacket (TransportHeader hitems) []
ps <- readMVar peers
- forM_ ps $ \case
- peer
- | PeerIdentityFull _ <- peerIdentity peer
- , ChannelEstablished ch <- peerChannel peer
- -> runExceptT (channelEncrypt ch plaintext) >>= \case
- Right ctext -> void $ sendTo peer ctext
- Left err -> logd $ "Failed to encrypt data: " ++ err
- | otherwise -> return ()
-
- let shareState self shared peer
- | PeerIdentityFull pid <- peerIdentity peer
- , finalOwner pid `sameIdentity` finalOwner self = do
- forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case
- Left err -> logd $ "failed to sync state with peer: " ++ show err
- Right () -> return ()
- | otherwise = return ()
+ forM_ ps $ \peer -> atomically $ do
+ ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case
+ (PeerIdentityFull _, ChannelEstablished _) ->
+ writeTQueue outQueue (peer, True, packet)
+ _ -> return ()
+
+ let shareState self shared peer = do
+ let hitems = (ServiceType $ serviceID @SyncService Proxy) :
+ map (ServiceRef . partialRef (peerInStorage peer) . storedRef) shared
+ packet = TransportPacket (TransportHeader hitems) $
+ map storedRef shared
+ atomically $ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
+ sendToPeerS peer packet
+ _ -> return ()
watchHead origHead $ \h -> do
let idt = headLocalIdentity h
@@ -269,42 +292,40 @@ startServer origHead logd bhost services = do
forever $ do
(paddr, msg) <- readChan chanPacket
- modifyMVar_ peers $ \pvalue -> do
- let mbpeer = M.lookup paddr pvalue
- (peer, content, secure) <- if
- | Just peer <- mbpeer
- , Just ch <- case peerChannel peer of
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ sch -> Just $ fromStored sch
- _ -> Nothing
- , Right plain <- runExcept $ channelDecrypt ch msg
- -> return (peer, plain, True)
-
- | Just peer <- mbpeer
- -> return (peer, msg, False)
-
- | otherwise
- -> (, msg, False) <$> mkPeer storage paddr
-
- case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
- Right (obj:objs)
- | Just header <- transportFromObject obj -> do
- forM_ objs $ storeObject $ peerInStorage peer
- identity <- readMVar midentity
- let svcs = map someServiceID services
- handlePacket logd origHead identity secure peer chanSvc svcs header >>= \case
- Just peer' -> do
- writeChan chanPeer peer'
- return $ M.insert paddr peer' pvalue
- Nothing -> return pvalue
-
- | otherwise -> do
- logd $ show paddr ++ ": invalid objects"
- logd $ show objs
- return pvalue
-
- _ -> do logd $ show paddr ++ ": invalid objects"
- return pvalue
+ (peer, content, secure) <- modifyMVar peers $ \pvalue -> do
+ case M.lookup paddr pvalue of
+ Just peer -> do
+ mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ sch -> Just $ fromStored sch
+ _ -> Nothing
+
+ if | Just ch <- mbch
+ , Right plain <- runExcept $ channelDecrypt ch msg
+ -> return (pvalue, (peer, plain, True))
+
+ | otherwise
+ -> return (pvalue, (peer, msg, False))
+
+ Nothing -> do
+ peer <- mkPeer server paddr
+ return (M.insert paddr peer pvalue, (peer, msg, False))
+
+ case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
+ Right (obj:objs)
+ | Just header <- transportFromObject obj -> do
+ prefs <- forM objs $ storeObject $ peerInStorage peer
+ identity <- readMVar midentity
+ let svcs = map someServiceID services
+ handlePacket origHead identity secure peer chanSvc svcs header prefs >>= \case
+ Just peer' -> atomically $ writeTChan chanPeer peer'
+ Nothing -> return ()
+
+ | otherwise -> atomically $ do
+ logd $ show paddr ++ ": invalid objects"
+ logd $ show objs
+
+ _ -> do atomically $ logd $ show paddr ++ ": invalid objects"
void $ forkIO $ withSocketsDo $ do
let hints = defaultHints
@@ -314,49 +335,123 @@ startServer origHead logd bhost services = do
addr:_ <- getAddrInfo (Just hints) Nothing (Just discoveryPort)
bracket (open addr) close loop
- void $ forkIO $ forever $ readChan chanSvc >>= \case
- (peer, svc, ref)
- | PeerIdentityFull peerId <- peerIdentity peer
- -> modifyMVar_ svcStates $ \global ->
- modifyMVar (peerServiceState peer) $ \svcs ->
- case (maybe (someServiceEmptyState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc svcs,
- maybe (someServiceEmptyGlobalState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc global) of
- (Just (SomeServiceState (proxy :: Proxy s) s),
- Just (SomeServiceGlobalState (_ :: Proxy gs) gs))
- | Just (Refl :: s :~: gs) <- eqT -> do
- let inp = ServiceInput
- { svcPeer = peerId
- , svcPrintOp = logd
- }
- reloadHead origHead >>= \case
- Nothing -> do
- logd $ "current head deleted"
- return (svcs, global)
- Just h -> do
- (rsp, (s', gs')) <- handleServicePacket h inp s gs (wrappedLoad ref :: Stored s)
- identity <- readMVar midentity
- runExceptT (sendToPeerList identity peer rsp) >>= \case
- Left err -> logd $ "failed to send response to peer: " ++ show err
- Right () -> return ()
- return (M.insert svc (SomeServiceState proxy s') svcs,
- M.insert svc (SomeServiceGlobalState proxy gs') global)
- _ -> do
- logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
- return (svcs, global)
-
- | otherwise -> do
- logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer)
-
- return Server
- { serverStorage = storage
- , serverIdentity = midentity
- , serverSocket = ssocket
- , serverChanPacket = chanPacket
- , serverPeers = peers
- , serverChanPeer' = chanPeer
- }
-
-type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a
+ void $ forkIO $ forever $ do
+ (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ atomically (readTVar (peerIdentityVar peer)) >>= \case
+ PeerIdentityFull peerId -> do
+ (global, svcs) <- atomically $ (,)
+ <$> takeTMVar svcStates
+ <*> takeTMVar (peerServiceState peer)
+ case (maybe (someServiceEmptyState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc svcs,
+ maybe (someServiceEmptyGlobalState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc global) of
+ (Just (SomeServiceState (proxy :: Proxy s) s),
+ Just (SomeServiceGlobalState (_ :: Proxy gs) gs))
+ | Just (Refl :: s :~: gs) <- eqT -> do
+ let inp = ServiceInput
+ { svcPeer = peerId
+ , svcPrintOp = atomically . logd
+ }
+ reloadHead origHead >>= \case
+ Nothing -> atomically $ do
+ logd $ "current head deleted"
+ putTMVar (peerServiceState peer) svcs
+ putTMVar svcStates global
+ Just h -> do
+ (rsp, (s', gs')) <- handleServicePacket h inp s gs (wrappedLoad ref :: Stored s)
+ identity <- readMVar midentity
+ sendToPeerList identity peer rsp
+ atomically $ do
+ putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs
+ putTMVar svcStates $ M.insert svc (SomeServiceGlobalState proxy gs') global
+ _ -> atomically $ do
+ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+ putTMVar (peerServiceState peer) svcs
+ putTMVar svcStates global
+
+ _ -> do
+ atomically $ logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer)
+
+ return server
+
+sendWorker :: Server -> IO ()
+sendWorker server = forever $ do
+ (peer, secure, packet@(TransportPacket header content)) <-
+ atomically (readTQueue $ serverOutQueue server)
+
+ let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer)
+ let plain = BL.toStrict $ BL.concat $
+ (serializeObject $ transportToObject header)
+ : map lazyLoadBytes content
+ mbs <- do
+ mbch <- atomically $ do
+ readTVar (peerChannel peer) >>= \case
+ ChannelEstablished ch -> return (Just ch)
+ _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) (packet:)
+ return Nothing
+
+ case mbch of
+ Just ch -> do
+ runExceptT (channelEncrypt ch plain) >>= \case
+ Right ctext -> return $ Just ctext
+ Left err -> do logd $ "Failed to encrypt data: " ++ err
+ return Nothing
+ Nothing | secure -> return Nothing
+ | otherwise -> return $ Just plain
+
+ case mbs of
+ Just bs -> case peerAddress peer of
+ DatagramAddress sock addr -> void $ S.sendTo sock bs addr
+ PeerIceSession ice -> iceSend ice bs
+ Nothing -> return ()
+
+dataResponseWorker :: Server -> IO ()
+dataResponseWorker server = forever $ do
+ (peer, npref) <- atomically (readTQueue $ serverDataResponse server)
+
+ wait <- atomically $ takeTMVar (peerWaitingRefs peer)
+ list <- forM wait $ \wr@WaitingRef { wrefStatus = tvar } ->
+ atomically (readTVar tvar) >>= \case
+ Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of
+ [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
+ Right ref -> do
+ atomically (writeTVar tvar $ Right ref)
+ void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case
+ Left err -> atomically $ writeTQueue (serverErrorLog server) err
+ Right () -> return ()
+
+ return (Nothing, [])
+ Left dgst -> do
+ atomically (writeTVar tvar $ Left [dgst])
+ return (Just wr, [partialRefFromDigest (refStorage $ wrefPartial wr) dgst])
+ ds' -> do
+ atomically (writeTVar tvar $ Left ds')
+ return (Just wr, [])
+ Right _ -> return (Nothing, [])
+ atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list
+
+ let reqs = concat $ map snd list
+ when (not $ null reqs) $ do
+ let packet = TransportPacket (TransportHeader $ map DataRequest reqs) []
+ atomically $ sendToPeerPlain peer packet
+
+
+newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a }
+ deriving (Functor, Applicative, Monad, MonadState PacketHandlerState, MonadError String)
+
+instance MonadFail PacketHandler where
+ fail = throwError
+
+liftSTM :: STM a -> PacketHandler a
+liftSTM = PacketHandler . lift . lift
+
+readTVarP :: TVar a -> PacketHandler a
+readTVarP = liftSTM . readTVar
+
+writeTVarP :: TVar a -> a -> PacketHandler ()
+writeTVarP v = liftSTM . writeTVar v
+
+modifyTMVarP :: TMVar a -> (a -> a) -> PacketHandler ()
+modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v
data PacketHandlerState = PacketHandlerState
{ phPeer :: Peer
@@ -365,9 +460,6 @@ data PacketHandlerState = PacketHandlerState
, phBody :: [Ref]
}
-updatePeer :: (Peer -> Peer) -> PacketHandler ()
-updatePeer f = modify $ \ph -> ph { phPeer = f (phPeer ph), phPeerChanged = True }
-
addHeader :: TransportHeaderItem -> PacketHandler ()
addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph }
@@ -379,85 +471,74 @@ appendDistinct x (y:ys) | x == y = y : ys
| otherwise = y : appendDistinct x ys
appendDistinct x [] = [x]
-handlePacket :: (String -> IO ()) -> Head LocalState -> UnifiedIdentity -> Bool
- -> Peer -> Chan (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> IO (Maybe Peer)
-handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader headers) = do
+handlePacket :: Head LocalState -> UnifiedIdentity -> Bool
+ -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
+ -> TransportHeader -> [PartialRef] -> IO (Maybe Peer)
+handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
+ let server = peerServer peer
+ ochannel <- readTVar $ peerChannel peer
let sidentity = idData identity
plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat
[ [ storedRef sidentity ]
, map storedRef $ idUpdates identity
- , case peerChannel opeer of
+ , case ochannel of
ChannelOurRequest req -> [ storedRef req ]
ChannelOurAccept acc _ -> [ storedRef acc ]
_ -> []
]
- res <- runExceptT $ flip execStateT (PacketHandlerState opeer False [] []) $ do
+ res <- runExceptT $ flip execStateT (PacketHandlerState peer False [] []) $ unPacketHandler $ do
+ let logd = liftSTM . writeTQueue (serverErrorLog server)
forM_ headers $ \case
Acknowledged ref -> do
- gets (peerChannel . phPeer) >>= \case
+ readTVarP (peerChannel peer) >>= \case
ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
- updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) }
- finalizedChannel origHead identity
+ writeTVarP (peerChannel peer) $ ChannelEstablished (fromStored ch)
+ liftSTM $ finalizedChannel peer origHead identity
_ -> return ()
- Rejected _ -> return ()
+ Rejected ref -> do
+ logd $ "rejected by peer: " ++ show (refDigest ref)
DataRequest ref
| secure || refDigest ref `elem` plaintextRefs -> do
- Right mref <- copyRef (storedStorage sidentity) ref
+ Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref
addHeader $ DataResponse ref
addBody $ mref
| otherwise -> do
- liftIO $ logd $ "unauthorized data request for " ++ show ref
+ logd $ "unauthorized data request for " ++ show ref
addHeader $ Rejected ref
- DataResponse ref -> do
- liftIO (ioLoadBytes ref) >>= \case
- Right _ -> do
- addHeader $ Acknowledged ref
- wait <- gets $ peerWaitingRefs . phPeer
- wait' <- flip filterM wait $ receivedWaitingRef ref >=> \case
- Just _ -> return False
- Nothing -> return True
- updatePeer $ \p -> p { peerWaitingRefs = wait' }
- Left _ -> throwError $ "mismatched data response " ++ show ref
-
- AnnounceSelf ref -> do
- peer <- gets phPeer
- if | PeerIdentityRef wref <- peerIdentity peer, wrDigest wref == refDigest ref -> return ()
- | PeerIdentityFull pid <- peerIdentity peer, refDigest ref == (refDigest $ storedRef $ idData pid) -> return ()
- | refDigest ref == refDigest (storedRef sidentity) -> return ()
- | otherwise -> do
- copyOrRequestRef (peerStorage peer) ref >>= \case
- Right pref
- | Just idt <- validateIdentity $ wrappedLoad pref ->
- case peerIdentity peer of
- PeerIdentityFull prev | not (prev `sameIdentity` idt) ->
- throwError $ "peer identity does not follow"
- _ -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull idt }
- | otherwise -> throwError $ "broken identity " ++ show pref
- Left wref -> do
- addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity
- updatePeer $ \p -> p { peerIdentity = PeerIdentityRef wref }
+ DataResponse ref -> if
+ | ref `elem` prefs -> do
+ addHeader $ Acknowledged ref
+ liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref)
+ | otherwise -> throwError $ "mismatched data response " ++ show ref
+
+ AnnounceSelf pref
+ | refDigest pref == refDigest (storedRef sidentity) -> return ()
+ | otherwise -> readTVarP (peerIdentityVar peer) >>= \case
+ PeerIdentityUnknown idwait -> do
+ wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer
+ addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity
+ writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
+ modify $ \ph -> ph { phPeerChanged = True }
+ _ -> return ()
AnnounceUpdate ref -> do
- peer <- gets phPeer
- case peerIdentity peer of
- PeerIdentityFull pid -> copyOrRequestRef (peerStorage peer) ref >>= \case
- Right upd -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull $ updateOwners [wrappedLoad upd] pid }
- Left wref -> updatePeer $ \p -> p { peerIdentityUpdate = wref : peerIdentityUpdate p }
- _ -> return ()
+ readTVarP (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> do
+ void $ newWaitingRef ref $ handleIdentityUpdate peer
+ _ -> return ()
TrChannelRequest reqref -> do
- pst <- gets $ peerStorage . phPeer
let process = do
addHeader $ Acknowledged reqref
- handleChannelRequest identity =<< newWaitingRef pst reqref
+ wref <- newWaitingRef reqref $ handleChannelRequest peer identity
+ writeTVarP (peerChannel peer) $ ChannelPeerRequest wref
reject = addHeader $ Rejected reqref
- gets (peerChannel . phPeer) >>= \case
+ readTVarP (peerChannel peer) >>= \case
ChannelWait {} -> process
ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process
| otherwise -> reject
@@ -467,9 +548,8 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h
TrChannelAccept accref -> do
let process = do
- addHeader $ Acknowledged accref
handleChannelAccept origHead identity accref
- gets (peerChannel . phPeer) >>= \case
+ readTVarP (peerChannel peer) >>= \case
ChannelWait {} -> process
ChannelOurRequest {} -> process
ChannelPeerRequest {} -> process
@@ -482,207 +562,150 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h
| not secure -> throwError $ "service packet without secure channel"
| Just svc <- lookupServiceType headers -> if
| svc `elem` svcs -> do
- liftIO (ioLoadBytes pref) >>= \case
- Right _ -> do
+ if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -}
+ then do
addHeader $ Acknowledged pref
- pst <- gets $ peerStorage . phPeer
- wref <- newWaitingRef pst pref
- updatePeer $ \p -> p { peerServiceInQueue = (svc, wref) : peerServiceInQueue p }
- Left _ -> throwError $ "missing service object " ++ show pref
+ void $ newWaitingRef pref $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
+ else throwError $ "missing service object " ++ show pref
| otherwise -> addHeader $ Rejected pref
| otherwise -> throwError $ "service ref without type"
-
- setupChannel identity
- handleIdentityUpdate
- handleServices chanSvc
+ let logd = writeTQueue (serverErrorLog server)
case res of
Left err -> do
- logd $ "Error in handling packet from " ++ show (peerAddress opeer) ++ ": " ++ err
+ logd $ "Error in handling packet from " ++ show (peerAddress peer) ++ ": " ++ err
return Nothing
Right ph -> do
when (not $ null $ phHead ph) $ do
- let plain = BL.toStrict $ BL.concat
- [ serializeObject $ transportToObject $ TransportHeader $ phHead ph
- , BL.concat $ map lazyLoadBytes $ phBody ph
- ]
- case peerChannel $ phPeer ph of
- ChannelEstablished ch -> do
- x <- runExceptT (channelEncrypt ch plain)
- case x of Right ctext -> void $ sendTo (phPeer ph) ctext
- Left err -> logd $ "Failed to encrypt data: " ++ err
- _ -> void $ sendTo (phPeer ph) plain
+ let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
+ writeTQueue (serverOutQueue server) (peer, secure, packet)
return $ if phPeerChanged ph then Just $ phPeer ph
else Nothing
-getOrRequestIdentity :: PeerIdentity -> PacketHandler (Maybe UnifiedIdentity)
-getOrRequestIdentity = \case
- PeerIdentityUnknown -> return Nothing
- PeerIdentityRef wref -> checkWaitingRef wref >>= \case
- Just ref -> case validateIdentity (wrappedLoad ref) of
- Nothing -> throwError $ "broken identity"
- Just idt -> return $ Just idt
- Nothing -> return Nothing
- PeerIdentityFull idt -> return $ Just idt
-
-
-setupChannel :: UnifiedIdentity -> PacketHandler ()
-setupChannel identity = gets phPeer >>= \case
- peer@Peer { peerChannel = ChannelWait } -> do
- getOrRequestIdentity (peerIdentity peer) >>= \case
- Just pid | Just upid <- toUnifiedIdentity pid -> do
- let ist = peerInStorage peer
- req <- createChannelRequest (peerStorage peer) identity upid
- updatePeer $ \p -> p { peerChannel = ChannelOurRequest req }
- addHeader $ TrChannelRequest $ partialRef ist $ storedRef req
- addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData identity
- addBody $ storedRef req
+withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
+withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityUnknown tvar -> modifyTVar' tvar (act:)
+ PeerIdentityRef _ tvar -> modifyTVar' tvar (act:)
+ PeerIdentityFull idt -> writeTQueue (serverIOActions $ peerServer peer) (act idt)
+
+
+setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback
+setupChannel identity peer upid = do
+ req <- createChannelRequest (peerStorage peer) identity upid
+ let ist = peerInStorage peer
+ let hitems =
+ [ TrChannelRequest $ partialRef ist $ storedRef req
+ , AnnounceSelf $ partialRef ist $ storedRef $ idData identity
+ ]
+ liftIO $ atomically $ do
+ readTVar (peerChannel peer) >>= \case
+ ChannelWait -> do
+ sendToPeerPlain peer $ TransportPacket (TransportHeader hitems) [storedRef req]
+ writeTVar (peerChannel peer) $ ChannelOurRequest req
_ -> return ()
- Peer { peerChannel = ChannelPeerRequest wref } -> do
- handleChannelRequest identity wref
-
- _ -> return ()
-
-handleChannelRequest :: UnifiedIdentity -> WaitingRef -> PacketHandler ()
-handleChannelRequest identity reqref = do
- ist <- gets $ peerInStorage . phPeer
- checkWaitingRef reqref >>= \case
- Just req -> do
- pid <- gets (peerIdentity . phPeer) >>= \case
- PeerIdentityFull pid -> return pid
- PeerIdentityRef wref -> do
- Just idref <- checkWaitingRef wref
- Just pid <- return $ validateIdentity $ wrappedLoad idref
- return pid
- PeerIdentityUnknown -> throwError $ "unknown peer identity"
-
- (acc, ch) <- case toUnifiedIdentity pid of
- Just upid -> acceptChannelRequest identity upid (wrappedLoad req)
- Nothing -> throwError $ "non-unified peer identity"
- updatePeer $ \p -> p
- { peerIdentity = PeerIdentityFull pid
- , peerChannel = ChannelOurAccept acc ch
- }
- addHeader $ TrChannelAccept (partialRef ist $ storedRef acc)
- mapM_ addBody $ concat
- [ [ storedRef $ acc ]
- , [ storedRef $ signedData $ fromStored acc ]
- , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ]
- , map storedRef $ signedSignature $ fromStored acc
- ]
- Nothing -> do
- updatePeer $ \p -> p { peerChannel = ChannelPeerRequest reqref }
+handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
+handleChannelRequest peer identity req = do
+ withPeerIdentity peer $ \upid -> do
+ (acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req)
+ liftIO $ atomically $ do
+ readTVar (peerChannel peer) >>= \case
+ ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
+ writeTVar (peerChannel peer) $ ChannelOurAccept acc ch
+ let header = TrChannelAccept (partialRef (peerInStorage peer) $ storedRef acc)
+ sendToPeerPlain peer $ TransportPacket (TransportHeader [header]) $ concat
+ [ [ storedRef $ acc ]
+ , [ storedRef $ signedData $ fromStored acc ]
+ , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ]
+ , map storedRef $ signedSignature $ fromStored acc
+ ]
+ _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request"
handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler ()
handleChannelAccept oh identity accref = do
- pst <- gets $ peerStorage . phPeer
- copyRef pst accref >>= \case
- Right acc -> do
- pid <- gets (peerIdentity . phPeer) >>= \case
- PeerIdentityFull pid -> return pid
- PeerIdentityRef wref -> do
- Just idref <- checkWaitingRef wref
- Just pid <- return $ validateIdentity $ wrappedLoad idref
- return pid
- PeerIdentityUnknown -> throwError $ "unknown peer identity"
-
- ch <- case toUnifiedIdentity pid of
- Just upid -> acceptedChannel identity upid (wrappedLoad acc)
- Nothing -> throwError $ "non-unified peer identity"
- updatePeer $ \p -> p
- { peerIdentity = PeerIdentityFull pid
- , peerChannel = ChannelEstablished $ fromStored ch
- }
- finalizedChannel oh identity
- Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
-
-
-finalizedChannel :: Head LocalState -> UnifiedIdentity -> PacketHandler ()
-finalizedChannel oh self = do
- -- Identity update
- ist <- gets $ peerInStorage . phPeer
- addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData $ self
- mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self
+ peer <- gets phPeer
+ liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do
+ withPeerIdentity peer $ \upid -> do
+ copyRef (peerStorage peer) accref >>= \case
+ Right acc -> do
+ ch <- acceptedChannel identity upid (wrappedLoad acc)
+ liftIO $ atomically $ do
+ sendToPeerS peer $ TransportPacket (TransportHeader [Acknowledged accref]) []
+ writeTVar (peerChannel peer) $ ChannelEstablished $ fromStored ch
+ finalizedChannel peer oh identity
- -- Shared state
- gets phPeer >>= \case
- peer | PeerIdentityFull pid <- peerIdentity peer
- , finalOwner pid `sameIdentity` finalOwner self -> do
- Just h <- liftIO $ reloadHead oh
- let shared = lsShared $ headObject h
- addHeader $ ServiceType $ serviceID @SyncService Proxy
- mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared
- mapM_ (addBody . storedRef) shared
- | otherwise -> return ()
+ Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
- -- Outstanding service packets
- gets phPeer >>= \case
- peer@Peer
- { peerChannel = ChannelEstablished ch
- , peerServiceOutQueue = oqueue
- } -> do
- ps <- liftIO $ modifyMVar oqueue $ return . ([],)
- forM_ ps $ sendPacket peer ch
- _ -> return ()
+finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM ()
+finalizedChannel peer oh self = do
+ -- Identity update
+ let ist = peerInStorage peer
+ sendToPeerS peer $ flip TransportPacket [] $ TransportHeader $
+ ( AnnounceSelf $ partialRef ist $ storedRef $ idData $ self ) :
+ ( map (AnnounceUpdate . partialRef ist . storedRef) $ idUpdates $ self )
-handleIdentityUpdate :: PacketHandler ()
-handleIdentityUpdate = do
- peer <- gets phPeer
- case (peerIdentity peer, peerIdentityUpdate peer) of
- (PeerIdentityRef wref, _) -> checkWaitingRef wref >>= \case
- Just ref | Just pid <- validateIdentity $ wrappedLoad ref -> do
- updatePeer $ \p -> p { peerIdentity = PeerIdentityFull pid }
- handleIdentityUpdate
- _ -> return ()
+ -- Shared state
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do
+ writeTQueue (serverIOActions $ peerServer peer) $ do
+ Just h <- liftIO $ reloadHead oh
+ let shared = lsShared $ headObject h
+ let hitems = (ServiceType $ serviceID @SyncService Proxy) :
+ map (ServiceRef . partialRef ist . storedRef) shared
+ liftIO $ atomically $ sendToPeerS peer $
+ TransportPacket (TransportHeader hitems) $ map storedRef shared
+ _ -> return ()
- (PeerIdentityFull pid, wrefs@(_:_)) -> do
- (wrefs', upds) <- fmap partitionEithers $ forM wrefs $ \wref -> checkWaitingRef wref >>= \case
- Just upd -> return $ Right $ wrappedLoad upd
- Nothing -> return $ Left wref
- updatePeer $ \p -> p
- { peerIdentity = PeerIdentityFull $ updateOwners upds pid
- , peerIdentityUpdate = wrefs'
- }
-
- _ -> return ()
-
-
-handleServices :: Chan (Peer, ServiceID, Ref) -> PacketHandler ()
-handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case
- [] -> return ()
- queue -> do
- queue' <- flip filterM queue $ \case
- (svc, wref) -> checkWaitingRef wref >>= \case
- Just ref -> do
- peer <- gets phPeer
- liftIO $ writeChan chan (peer, svc, ref)
- return False
- Nothing -> return True
- updatePeer $ \p -> p { peerServiceInQueue = queue' }
-
-
-mkPeer :: Storage -> PeerAddress -> IO Peer
-mkPeer st paddr = do
- pst <- deriveEphemeralStorage st
- ist <- derivePartialStorage pst
- svcs <- newMVar M.empty
- oqueue <- newMVar []
- return $ Peer
- { peerAddress = paddr
- , peerIdentity = PeerIdentityUnknown
- , peerIdentityUpdate = []
- , peerChannel = ChannelWait
- , peerStorage = pst
- , peerInStorage = ist
- , peerServiceState = svcs
- , peerServiceInQueue = []
- , peerServiceOutQueue = oqueue
- , peerWaitingRefs = []
- }
+ -- Outstanding service packets
+ mapM_ (sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) []
+
+
+handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
+handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
+ pidentity <- readTVar (peerIdentityVar peer)
+ if | PeerIdentityRef wref wact <- pidentity
+ , wrDigest wref == refDigest ref
+ -> case validateIdentity $ wrappedLoad ref of
+ Just pid -> do
+ writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid
+ writeTChan (serverChanPeer $ peerServer peer) peer
+ mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) .
+ reverse =<< readTVar wact
+ writeTQueue (serverIOActions $ peerServer peer) $ do
+ setupChannel self peer pid
+ Nothing -> return ()
+
+ | otherwise -> return ()
+
+handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback
+handleIdentityUpdate peer ref = liftIO $ atomically $ do
+ pidentity <- readTVar (peerIdentityVar peer)
+ if | PeerIdentityFull pid <- pidentity
+ -> do
+ writeTVar (peerIdentityVar peer) $ PeerIdentityFull $
+ updateOwners [wrappedLoad ref] pid
+ writeTChan (serverChanPeer $ peerServer peer) peer
+
+ | otherwise -> return ()
+
+
+mkPeer :: Server -> PeerAddress -> IO Peer
+mkPeer server paddr = do
+ pst <- deriveEphemeralStorage $ serverStorage server
+ Peer
+ <$> pure paddr
+ <*> pure server
+ <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO [])
+ <*> newTVarIO ChannelWait
+ <*> pure pst
+ <*> derivePartialStorage pst
+ <*> newTMVarIO M.empty
+ <*> newTVarIO []
+ <*> newTMVarIO []
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer server paddr = do
@@ -702,27 +725,24 @@ serverPeer' server paddr = do
case M.lookup paddr pvalue of
Just peer -> return (pvalue, (peer, False))
Nothing -> do
- peer <- mkPeer (serverStorage server) paddr
+ peer <- mkPeer server paddr
return (M.insert paddr peer pvalue, (peer, True))
when hello $ do
identity <- readMVar (serverIdentity server)
- void $ sendTo peer $
- BL.toStrict $ serializeObject $ transportToObject $ TransportHeader
- [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]
+ atomically $ writeTQueue (serverOutQueue server) $ (peer, False,) $
+ TransportPacket
+ (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ])
+ []
return peer
-sendTo :: Peer -> BC.ByteString -> IO ()
-sendTo Peer { peerAddress = DatagramAddress sock addr } msg = void $ S.sendTo sock msg addr
-sendTo Peer { peerAddress = PeerIceSession ice } msg = iceSend ice msg
-
-sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m ()
+sendToPeer :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> s -> m ()
sendToPeer self peer packet = sendToPeerList self peer [ServiceReply (Left packet) True]
-sendToPeerStored :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> Stored s -> m ()
+sendToPeerStored :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> Stored s -> m ()
sendToPeerStored self peer spacket = sendToPeerList self peer [ServiceReply (Right spacket) True]
-sendToPeerList :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m ()
+sendToPeerList :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m ()
sendToPeerList _ peer parts = do
let st = peerStorage peer
pst = peerInStorage peer
@@ -732,27 +752,26 @@ sendToPeerList _ peer parts = do
let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs)
header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)
packet = TransportPacket header content
- case peerChannel peer of
- ChannelEstablished ch -> do
- sendPacket peer ch packet
- _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:)
+ liftIO $ atomically $ sendToPeerS peer packet
-sendPacket :: (MonadIO m, MonadError String m) => Peer -> Channel -> TransportPacket -> m ()
-sendPacket peer ch (TransportPacket header content) = do
- let plain = BL.toStrict $ BL.concat $
- (serializeObject $ transportToObject header)
- : map lazyLoadBytes content
- ctext <- channelEncrypt ch plain
- void $ liftIO $ sendTo peer ctext
+sendToPeerS :: Peer -> TransportPacket -> STM ()
+sendToPeerS peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, True, packet)
+
+sendToPeerPlain :: Peer -> TransportPacket -> STM ()
+sendToPeerPlain peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, False, packet)
sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith identity peer fobj = do
let sproxy = Proxy @s
sid = serviceID sproxy
- res <- liftIO $ modifyMVar (peerServiceState peer) $ \svcs -> do
- runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case
+ res <- liftIO $ do
+ svcs <- atomically $ takeTMVar (peerServiceState peer)
+ (svcs', res) <- runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case
Right (obj, s') -> return $ (M.insert sid (SomeServiceState sproxy s') svcs, Right obj)
Left err -> return $ (svcs, Left err)
+ atomically $ putTMVar (peerServiceState peer) svcs'
+ return res
+
case res of
Right (Just obj) -> sendToPeer identity peer obj
Right Nothing -> return ()