From 36b9a1ddbddf1477c61809d340cd0b86360a7a83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roman=20Smr=C5=BE?= Date: Sun, 20 Dec 2020 21:47:22 +0100 Subject: Network: STM-based synchronization rewrite --- src/Network.hs | 825 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 422 insertions(+), 403 deletions(-) (limited to 'src/Network.hs') diff --git a/src/Network.hs b/src/Network.hs index cbc68b6..52d83bb 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,11 +1,11 @@ module Network ( Server, startServer, - serverChanPeer, + getNextPeerChange, - Peer(..), - PeerAddress(..), - PeerIdentity(..), + Peer, + PeerAddress(..), peerAddress, + PeerIdentity(..), peerIdentity, PeerChannel(..), WaitingRef, wrDigest, Service(..), @@ -16,6 +16,7 @@ module Network ( ) where import Control.Concurrent +import Control.Concurrent.STM import Control.Exception import Control.Monad import Control.Monad.Except @@ -23,13 +24,15 @@ import Control.Monad.State import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL -import Data.Either +import Data.Function import Data.List import Data.Map (Map) import qualified Data.Map as M import Data.Maybe import Data.Typeable +import GHC.Conc.Sync (unsafeIOToSTM) + import Network.Socket import qualified Network.Socket.ByteString as S @@ -55,27 +58,33 @@ data Server = Server , serverIdentity :: MVar UnifiedIdentity , serverSocket :: MVar Socket , serverChanPacket :: Chan (PeerAddress, BC.ByteString) + , serverOutQueue :: TQueue (Peer, Bool, TransportPacket) + , serverDataResponse :: TQueue (Peer, Maybe PartialRef) + , serverIOActions :: TQueue (ExceptT String IO ()) , serverPeers :: MVar (Map PeerAddress Peer) - , serverChanPeer' :: Chan Peer + , serverChanPeer :: TChan Peer + , serverErrorLog :: TQueue String } -serverChanPeer :: Server -> Chan Peer -serverChanPeer = serverChanPeer' +getNextPeerChange :: Server -> IO Peer +getNextPeerChange = atomically . readTChan . serverChanPeer data Peer = Peer { peerAddress :: PeerAddress - , peerIdentity :: PeerIdentity - , peerIdentityUpdate :: [WaitingRef] - , peerChannel :: PeerChannel + , peerServer :: Server + , peerIdentityVar :: TVar PeerIdentity + , peerChannel :: TVar PeerChannel , peerStorage :: Storage , peerInStorage :: PartialStorage - , peerServiceState :: MVar (M.Map ServiceID SomeServiceState) - , peerServiceInQueue :: [(ServiceID, WaitingRef)] - , peerServiceOutQueue :: MVar [TransportPacket] - , peerWaitingRefs :: [WaitingRef] + , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) + , peerServiceOutQueue :: TVar [TransportPacket] + , peerWaitingRefs :: TMVar [WaitingRef] } +instance Eq Peer where + (==) = (==) `on` peerIdentityVar + data PeerAddress = DatagramAddress Socket SockAddr | PeerIceSession IceSession @@ -95,8 +104,8 @@ instance Ord PeerAddress where compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' -data PeerIdentity = PeerIdentityUnknown - | PeerIdentityRef WaitingRef +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) + | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) | PeerIdentityFull UnifiedIdentity data PeerChannel = ChannelWait @@ -105,6 +114,9 @@ data PeerChannel = ChannelWait | ChannelOurAccept (Stored ChannelAccept) (Stored Channel) | ChannelEstablished Channel +peerIdentity :: MonadIO m => Peer -> m PeerIdentity +peerIdentity = liftIO . atomically . readTVar . peerIdentityVar + data TransportPacket = TransportPacket TransportHeader [Ref] @@ -162,53 +174,65 @@ lookupServiceType (_ : hs) = lookupServiceType hs lookupServiceType [] = Nothing -data WaitingRef = WaitingRef Storage PartialRef (MVar [RefDigest]) +data WaitingRef = WaitingRef + { wrefStorage :: Storage + , wrefPartial :: PartialRef + , wrefAction :: Ref -> WaitingRefCallback + , wrefStatus :: TVar (Either [RefDigest] Ref) + } + +type WaitingRefCallback = ExceptT String IO () wrDigest :: WaitingRef -> RefDigest -wrDigest (WaitingRef _ pref _) = refDigest pref +wrDigest = refDigest . wrefPartial -newWaitingRef :: Storage -> PartialRef -> PacketHandler WaitingRef -newWaitingRef st pref = do - wref <- WaitingRef st pref <$> liftIO (newMVar []) - updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p } +newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef +newWaitingRef pref act = do + peer <- gets phPeer + wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left [])) + modifyTMVarP (peerWaitingRefs peer) (wref:) + liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing) return wref -copyOrRequestRef :: Storage -> PartialRef -> PacketHandler (Either WaitingRef Ref) -copyOrRequestRef st pref = copyRef st pref >>= \case - Right ref -> return $ Right ref - Left dgst -> do - addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst - wref <- WaitingRef st pref <$> liftIO (newMVar [dgst]) - updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p } - return $ Left wref - -checkWaitingRef :: WaitingRef -> PacketHandler (Maybe Ref) -checkWaitingRef (WaitingRef st pref mvar) = do - liftIO (readMVar mvar) >>= \case - [] -> copyRef st pref >>= \case - Right ref -> return $ Just ref - Left dgst -> do liftIO $ modifyMVar_ mvar $ return . (dgst:) - addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst - return Nothing - _ -> return Nothing - -receivedWaitingRef :: PartialRef -> WaitingRef -> PacketHandler (Maybe Ref) -receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do - liftIO $ modifyMVar_ mvar $ return . filter (/= refDigest nref) - checkWaitingRef wr - startServer :: Head LocalState -> (String -> IO ()) -> String -> [SomeService] -> IO Server -startServer origHead logd bhost services = do +startServer origHead logd' bhost services = do let storage = refStorage $ headRef origHead chanPacket <- newChan - chanPeer <- newChan - chanSvc <- newChan - svcStates <- newMVar M.empty + outQueue <- newTQueueIO + dataResponse <- newTQueueIO + ioActions <- newTQueueIO + chanPeer <- newTChanIO + chanSvc <- newTQueueIO + svcStates <- newTMVarIO M.empty peers <- newMVar M.empty midentity <- newMVar $ headLocalIdentity origHead mshared <- newMVar $ lsShared $ load $ headRef origHead ssocket <- newEmptyMVar + errlog <- newTQueueIO + + let server = Server + { serverStorage = storage + , serverIdentity = midentity + , serverSocket = ssocket + , serverChanPacket = chanPacket + , serverOutQueue = outQueue + , serverDataResponse = dataResponse + , serverIOActions = ioActions + , serverPeers = peers + , serverChanPeer = chanPeer + , serverErrorLog = errlog + } + + let logd = writeTQueue errlog + void $ forkIO $ forever $ do + logd' =<< atomically (readTQueue errlog) + + void $ forkIO $ sendWorker server + void $ forkIO $ dataResponseWorker server + void $ forkIO $ forever $ do + either (atomically . logd) return =<< runExceptT =<< + atomically (readTQueue $ serverIOActions server) let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) @@ -229,27 +253,26 @@ startServer origHead logd bhost services = do let announceUpdate identity = do st <- derivePartialStorage storage - let plaintext = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader $ - (AnnounceSelf $ partialRef st $ storedRef $ idData identity) : + let hitems = (AnnounceSelf $ partialRef st $ storedRef $ idData identity) : map (AnnounceUpdate . partialRef st . storedRef) (idUpdates identity) + let packet = TransportPacket (TransportHeader hitems) [] ps <- readMVar peers - forM_ ps $ \case - peer - | PeerIdentityFull _ <- peerIdentity peer - , ChannelEstablished ch <- peerChannel peer - -> runExceptT (channelEncrypt ch plaintext) >>= \case - Right ctext -> void $ sendTo peer ctext - Left err -> logd $ "Failed to encrypt data: " ++ err - | otherwise -> return () - - let shareState self shared peer - | PeerIdentityFull pid <- peerIdentity peer - , finalOwner pid `sameIdentity` finalOwner self = do - forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case - Left err -> logd $ "failed to sync state with peer: " ++ show err - Right () -> return () - | otherwise = return () + forM_ ps $ \peer -> atomically $ do + ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case + (PeerIdentityFull _, ChannelEstablished _) -> + writeTQueue outQueue (peer, True, packet) + _ -> return () + + let shareState self shared peer = do + let hitems = (ServiceType $ serviceID @SyncService Proxy) : + map (ServiceRef . partialRef (peerInStorage peer) . storedRef) shared + packet = TransportPacket (TransportHeader hitems) $ + map storedRef shared + atomically $ readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do + sendToPeerS peer packet + _ -> return () watchHead origHead $ \h -> do let idt = headLocalIdentity h @@ -269,42 +292,40 @@ startServer origHead logd bhost services = do forever $ do (paddr, msg) <- readChan chanPacket - modifyMVar_ peers $ \pvalue -> do - let mbpeer = M.lookup paddr pvalue - (peer, content, secure) <- if - | Just peer <- mbpeer - , Just ch <- case peerChannel peer of - ChannelEstablished ch -> Just ch - ChannelOurAccept _ sch -> Just $ fromStored sch - _ -> Nothing - , Right plain <- runExcept $ channelDecrypt ch msg - -> return (peer, plain, True) - - | Just peer <- mbpeer - -> return (peer, msg, False) - - | otherwise - -> (, msg, False) <$> mkPeer storage paddr - - case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of - Right (obj:objs) - | Just header <- transportFromObject obj -> do - forM_ objs $ storeObject $ peerInStorage peer - identity <- readMVar midentity - let svcs = map someServiceID services - handlePacket logd origHead identity secure peer chanSvc svcs header >>= \case - Just peer' -> do - writeChan chanPeer peer' - return $ M.insert paddr peer' pvalue - Nothing -> return pvalue - - | otherwise -> do - logd $ show paddr ++ ": invalid objects" - logd $ show objs - return pvalue - - _ -> do logd $ show paddr ++ ": invalid objects" - return pvalue + (peer, content, secure) <- modifyMVar peers $ \pvalue -> do + case M.lookup paddr pvalue of + Just peer -> do + mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case + ChannelEstablished ch -> Just ch + ChannelOurAccept _ sch -> Just $ fromStored sch + _ -> Nothing + + if | Just ch <- mbch + , Right plain <- runExcept $ channelDecrypt ch msg + -> return (pvalue, (peer, plain, True)) + + | otherwise + -> return (pvalue, (peer, msg, False)) + + Nothing -> do + peer <- mkPeer server paddr + return (M.insert paddr peer pvalue, (peer, msg, False)) + + case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of + Right (obj:objs) + | Just header <- transportFromObject obj -> do + prefs <- forM objs $ storeObject $ peerInStorage peer + identity <- readMVar midentity + let svcs = map someServiceID services + handlePacket origHead identity secure peer chanSvc svcs header prefs >>= \case + Just peer' -> atomically $ writeTChan chanPeer peer' + Nothing -> return () + + | otherwise -> atomically $ do + logd $ show paddr ++ ": invalid objects" + logd $ show objs + + _ -> do atomically $ logd $ show paddr ++ ": invalid objects" void $ forkIO $ withSocketsDo $ do let hints = defaultHints @@ -314,49 +335,123 @@ startServer origHead logd bhost services = do addr:_ <- getAddrInfo (Just hints) Nothing (Just discoveryPort) bracket (open addr) close loop - void $ forkIO $ forever $ readChan chanSvc >>= \case - (peer, svc, ref) - | PeerIdentityFull peerId <- peerIdentity peer - -> modifyMVar_ svcStates $ \global -> - modifyMVar (peerServiceState peer) $ \svcs -> - case (maybe (someServiceEmptyState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc svcs, - maybe (someServiceEmptyGlobalState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc global) of - (Just (SomeServiceState (proxy :: Proxy s) s), - Just (SomeServiceGlobalState (_ :: Proxy gs) gs)) - | Just (Refl :: s :~: gs) <- eqT -> do - let inp = ServiceInput - { svcPeer = peerId - , svcPrintOp = logd - } - reloadHead origHead >>= \case - Nothing -> do - logd $ "current head deleted" - return (svcs, global) - Just h -> do - (rsp, (s', gs')) <- handleServicePacket h inp s gs (wrappedLoad ref :: Stored s) - identity <- readMVar midentity - runExceptT (sendToPeerList identity peer rsp) >>= \case - Left err -> logd $ "failed to send response to peer: " ++ show err - Right () -> return () - return (M.insert svc (SomeServiceState proxy s') svcs, - M.insert svc (SomeServiceGlobalState proxy gs') global) - _ -> do - logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" - return (svcs, global) - - | otherwise -> do - logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) - - return Server - { serverStorage = storage - , serverIdentity = midentity - , serverSocket = ssocket - , serverChanPacket = chanPacket - , serverPeers = peers - , serverChanPeer' = chanPeer - } - -type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a + void $ forkIO $ forever $ do + (peer, svc, ref) <- atomically $ readTQueue chanSvc + atomically (readTVar (peerIdentityVar peer)) >>= \case + PeerIdentityFull peerId -> do + (global, svcs) <- atomically $ (,) + <$> takeTMVar svcStates + <*> takeTMVar (peerServiceState peer) + case (maybe (someServiceEmptyState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc svcs, + maybe (someServiceEmptyGlobalState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc global) of + (Just (SomeServiceState (proxy :: Proxy s) s), + Just (SomeServiceGlobalState (_ :: Proxy gs) gs)) + | Just (Refl :: s :~: gs) <- eqT -> do + let inp = ServiceInput + { svcPeer = peerId + , svcPrintOp = atomically . logd + } + reloadHead origHead >>= \case + Nothing -> atomically $ do + logd $ "current head deleted" + putTMVar (peerServiceState peer) svcs + putTMVar svcStates global + Just h -> do + (rsp, (s', gs')) <- handleServicePacket h inp s gs (wrappedLoad ref :: Stored s) + identity <- readMVar midentity + sendToPeerList identity peer rsp + atomically $ do + putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs + putTMVar svcStates $ M.insert svc (SomeServiceGlobalState proxy gs') global + _ -> atomically $ do + logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" + putTMVar (peerServiceState peer) svcs + putTMVar svcStates global + + _ -> do + atomically $ logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) + + return server + +sendWorker :: Server -> IO () +sendWorker server = forever $ do + (peer, secure, packet@(TransportPacket header content)) <- + atomically (readTQueue $ serverOutQueue server) + + let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) + let plain = BL.toStrict $ BL.concat $ + (serializeObject $ transportToObject header) + : map lazyLoadBytes content + mbs <- do + mbch <- atomically $ do + readTVar (peerChannel peer) >>= \case + ChannelEstablished ch -> return (Just ch) + _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) (packet:) + return Nothing + + case mbch of + Just ch -> do + runExceptT (channelEncrypt ch plain) >>= \case + Right ctext -> return $ Just ctext + Left err -> do logd $ "Failed to encrypt data: " ++ err + return Nothing + Nothing | secure -> return Nothing + | otherwise -> return $ Just plain + + case mbs of + Just bs -> case peerAddress peer of + DatagramAddress sock addr -> void $ S.sendTo sock bs addr + PeerIceSession ice -> iceSend ice bs + Nothing -> return () + +dataResponseWorker :: Server -> IO () +dataResponseWorker server = forever $ do + (peer, npref) <- atomically (readTQueue $ serverDataResponse server) + + wait <- atomically $ takeTMVar (peerWaitingRefs peer) + list <- forM wait $ \wr@WaitingRef { wrefStatus = tvar } -> + atomically (readTVar tvar) >>= \case + Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of + [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case + Right ref -> do + atomically (writeTVar tvar $ Right ref) + void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case + Left err -> atomically $ writeTQueue (serverErrorLog server) err + Right () -> return () + + return (Nothing, []) + Left dgst -> do + atomically (writeTVar tvar $ Left [dgst]) + return (Just wr, [partialRefFromDigest (refStorage $ wrefPartial wr) dgst]) + ds' -> do + atomically (writeTVar tvar $ Left ds') + return (Just wr, []) + Right _ -> return (Nothing, []) + atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list + + let reqs = concat $ map snd list + when (not $ null reqs) $ do + let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] + atomically $ sendToPeerPlain peer packet + + +newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a } + deriving (Functor, Applicative, Monad, MonadState PacketHandlerState, MonadError String) + +instance MonadFail PacketHandler where + fail = throwError + +liftSTM :: STM a -> PacketHandler a +liftSTM = PacketHandler . lift . lift + +readTVarP :: TVar a -> PacketHandler a +readTVarP = liftSTM . readTVar + +writeTVarP :: TVar a -> a -> PacketHandler () +writeTVarP v = liftSTM . writeTVar v + +modifyTMVarP :: TMVar a -> (a -> a) -> PacketHandler () +modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v data PacketHandlerState = PacketHandlerState { phPeer :: Peer @@ -365,9 +460,6 @@ data PacketHandlerState = PacketHandlerState , phBody :: [Ref] } -updatePeer :: (Peer -> Peer) -> PacketHandler () -updatePeer f = modify $ \ph -> ph { phPeer = f (phPeer ph), phPeerChanged = True } - addHeader :: TransportHeaderItem -> PacketHandler () addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph } @@ -379,85 +471,74 @@ appendDistinct x (y:ys) | x == y = y : ys | otherwise = y : appendDistinct x ys appendDistinct x [] = [x] -handlePacket :: (String -> IO ()) -> Head LocalState -> UnifiedIdentity -> Bool - -> Peer -> Chan (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> IO (Maybe Peer) -handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader headers) = do +handlePacket :: Head LocalState -> UnifiedIdentity -> Bool + -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] + -> TransportHeader -> [PartialRef] -> IO (Maybe Peer) +handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do + let server = peerServer peer + ochannel <- readTVar $ peerChannel peer let sidentity = idData identity plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat [ [ storedRef sidentity ] , map storedRef $ idUpdates identity - , case peerChannel opeer of + , case ochannel of ChannelOurRequest req -> [ storedRef req ] ChannelOurAccept acc _ -> [ storedRef acc ] _ -> [] ] - res <- runExceptT $ flip execStateT (PacketHandlerState opeer False [] []) $ do + res <- runExceptT $ flip execStateT (PacketHandlerState peer False [] []) $ unPacketHandler $ do + let logd = liftSTM . writeTQueue (serverErrorLog server) forM_ headers $ \case Acknowledged ref -> do - gets (peerChannel . phPeer) >>= \case + readTVarP (peerChannel peer) >>= \case ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do - updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } - finalizedChannel origHead identity + writeTVarP (peerChannel peer) $ ChannelEstablished (fromStored ch) + liftSTM $ finalizedChannel peer origHead identity _ -> return () - Rejected _ -> return () + Rejected ref -> do + logd $ "rejected by peer: " ++ show (refDigest ref) DataRequest ref | secure || refDigest ref `elem` plaintextRefs -> do - Right mref <- copyRef (storedStorage sidentity) ref + Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref addHeader $ DataResponse ref addBody $ mref | otherwise -> do - liftIO $ logd $ "unauthorized data request for " ++ show ref + logd $ "unauthorized data request for " ++ show ref addHeader $ Rejected ref - DataResponse ref -> do - liftIO (ioLoadBytes ref) >>= \case - Right _ -> do - addHeader $ Acknowledged ref - wait <- gets $ peerWaitingRefs . phPeer - wait' <- flip filterM wait $ receivedWaitingRef ref >=> \case - Just _ -> return False - Nothing -> return True - updatePeer $ \p -> p { peerWaitingRefs = wait' } - Left _ -> throwError $ "mismatched data response " ++ show ref - - AnnounceSelf ref -> do - peer <- gets phPeer - if | PeerIdentityRef wref <- peerIdentity peer, wrDigest wref == refDigest ref -> return () - | PeerIdentityFull pid <- peerIdentity peer, refDigest ref == (refDigest $ storedRef $ idData pid) -> return () - | refDigest ref == refDigest (storedRef sidentity) -> return () - | otherwise -> do - copyOrRequestRef (peerStorage peer) ref >>= \case - Right pref - | Just idt <- validateIdentity $ wrappedLoad pref -> - case peerIdentity peer of - PeerIdentityFull prev | not (prev `sameIdentity` idt) -> - throwError $ "peer identity does not follow" - _ -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull idt } - | otherwise -> throwError $ "broken identity " ++ show pref - Left wref -> do - addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity - updatePeer $ \p -> p { peerIdentity = PeerIdentityRef wref } + DataResponse ref -> if + | ref `elem` prefs -> do + addHeader $ Acknowledged ref + liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref) + | otherwise -> throwError $ "mismatched data response " ++ show ref + + AnnounceSelf pref + | refDigest pref == refDigest (storedRef sidentity) -> return () + | otherwise -> readTVarP (peerIdentityVar peer) >>= \case + PeerIdentityUnknown idwait -> do + wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer + addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity + writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait + modify $ \ph -> ph { phPeerChanged = True } + _ -> return () AnnounceUpdate ref -> do - peer <- gets phPeer - case peerIdentity peer of - PeerIdentityFull pid -> copyOrRequestRef (peerStorage peer) ref >>= \case - Right upd -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull $ updateOwners [wrappedLoad upd] pid } - Left wref -> updatePeer $ \p -> p { peerIdentityUpdate = wref : peerIdentityUpdate p } - _ -> return () + readTVarP (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> do + void $ newWaitingRef ref $ handleIdentityUpdate peer + _ -> return () TrChannelRequest reqref -> do - pst <- gets $ peerStorage . phPeer let process = do addHeader $ Acknowledged reqref - handleChannelRequest identity =<< newWaitingRef pst reqref + wref <- newWaitingRef reqref $ handleChannelRequest peer identity + writeTVarP (peerChannel peer) $ ChannelPeerRequest wref reject = addHeader $ Rejected reqref - gets (peerChannel . phPeer) >>= \case + readTVarP (peerChannel peer) >>= \case ChannelWait {} -> process ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process | otherwise -> reject @@ -467,9 +548,8 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h TrChannelAccept accref -> do let process = do - addHeader $ Acknowledged accref handleChannelAccept origHead identity accref - gets (peerChannel . phPeer) >>= \case + readTVarP (peerChannel peer) >>= \case ChannelWait {} -> process ChannelOurRequest {} -> process ChannelPeerRequest {} -> process @@ -482,207 +562,150 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h | not secure -> throwError $ "service packet without secure channel" | Just svc <- lookupServiceType headers -> if | svc `elem` svcs -> do - liftIO (ioLoadBytes pref) >>= \case - Right _ -> do + if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -} + then do addHeader $ Acknowledged pref - pst <- gets $ peerStorage . phPeer - wref <- newWaitingRef pst pref - updatePeer $ \p -> p { peerServiceInQueue = (svc, wref) : peerServiceInQueue p } - Left _ -> throwError $ "missing service object " ++ show pref + void $ newWaitingRef pref $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) + else throwError $ "missing service object " ++ show pref | otherwise -> addHeader $ Rejected pref | otherwise -> throwError $ "service ref without type" - - setupChannel identity - handleIdentityUpdate - handleServices chanSvc + let logd = writeTQueue (serverErrorLog server) case res of Left err -> do - logd $ "Error in handling packet from " ++ show (peerAddress opeer) ++ ": " ++ err + logd $ "Error in handling packet from " ++ show (peerAddress peer) ++ ": " ++ err return Nothing Right ph -> do when (not $ null $ phHead ph) $ do - let plain = BL.toStrict $ BL.concat - [ serializeObject $ transportToObject $ TransportHeader $ phHead ph - , BL.concat $ map lazyLoadBytes $ phBody ph - ] - case peerChannel $ phPeer ph of - ChannelEstablished ch -> do - x <- runExceptT (channelEncrypt ch plain) - case x of Right ctext -> void $ sendTo (phPeer ph) ctext - Left err -> logd $ "Failed to encrypt data: " ++ err - _ -> void $ sendTo (phPeer ph) plain + let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + writeTQueue (serverOutQueue server) (peer, secure, packet) return $ if phPeerChanged ph then Just $ phPeer ph else Nothing -getOrRequestIdentity :: PeerIdentity -> PacketHandler (Maybe UnifiedIdentity) -getOrRequestIdentity = \case - PeerIdentityUnknown -> return Nothing - PeerIdentityRef wref -> checkWaitingRef wref >>= \case - Just ref -> case validateIdentity (wrappedLoad ref) of - Nothing -> throwError $ "broken identity" - Just idt -> return $ Just idt - Nothing -> return Nothing - PeerIdentityFull idt -> return $ Just idt - - -setupChannel :: UnifiedIdentity -> PacketHandler () -setupChannel identity = gets phPeer >>= \case - peer@Peer { peerChannel = ChannelWait } -> do - getOrRequestIdentity (peerIdentity peer) >>= \case - Just pid | Just upid <- toUnifiedIdentity pid -> do - let ist = peerInStorage peer - req <- createChannelRequest (peerStorage peer) identity upid - updatePeer $ \p -> p { peerChannel = ChannelOurRequest req } - addHeader $ TrChannelRequest $ partialRef ist $ storedRef req - addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData identity - addBody $ storedRef req +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case + PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) + PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) + PeerIdentityFull idt -> writeTQueue (serverIOActions $ peerServer peer) (act idt) + + +setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback +setupChannel identity peer upid = do + req <- createChannelRequest (peerStorage peer) identity upid + let ist = peerInStorage peer + let hitems = + [ TrChannelRequest $ partialRef ist $ storedRef req + , AnnounceSelf $ partialRef ist $ storedRef $ idData identity + ] + liftIO $ atomically $ do + readTVar (peerChannel peer) >>= \case + ChannelWait -> do + sendToPeerPlain peer $ TransportPacket (TransportHeader hitems) [storedRef req] + writeTVar (peerChannel peer) $ ChannelOurRequest req _ -> return () - Peer { peerChannel = ChannelPeerRequest wref } -> do - handleChannelRequest identity wref - - _ -> return () - -handleChannelRequest :: UnifiedIdentity -> WaitingRef -> PacketHandler () -handleChannelRequest identity reqref = do - ist <- gets $ peerInStorage . phPeer - checkWaitingRef reqref >>= \case - Just req -> do - pid <- gets (peerIdentity . phPeer) >>= \case - PeerIdentityFull pid -> return pid - PeerIdentityRef wref -> do - Just idref <- checkWaitingRef wref - Just pid <- return $ validateIdentity $ wrappedLoad idref - return pid - PeerIdentityUnknown -> throwError $ "unknown peer identity" - - (acc, ch) <- case toUnifiedIdentity pid of - Just upid -> acceptChannelRequest identity upid (wrappedLoad req) - Nothing -> throwError $ "non-unified peer identity" - updatePeer $ \p -> p - { peerIdentity = PeerIdentityFull pid - , peerChannel = ChannelOurAccept acc ch - } - addHeader $ TrChannelAccept (partialRef ist $ storedRef acc) - mapM_ addBody $ concat - [ [ storedRef $ acc ] - , [ storedRef $ signedData $ fromStored acc ] - , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] - , map storedRef $ signedSignature $ fromStored acc - ] - Nothing -> do - updatePeer $ \p -> p { peerChannel = ChannelPeerRequest reqref } +handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback +handleChannelRequest peer identity req = do + withPeerIdentity peer $ \upid -> do + (acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req) + liftIO $ atomically $ do + readTVar (peerChannel peer) >>= \case + ChannelPeerRequest wr | wrDigest wr == refDigest req -> do + writeTVar (peerChannel peer) $ ChannelOurAccept acc ch + let header = TrChannelAccept (partialRef (peerInStorage peer) $ storedRef acc) + sendToPeerPlain peer $ TransportPacket (TransportHeader [header]) $ concat + [ [ storedRef $ acc ] + , [ storedRef $ signedData $ fromStored acc ] + , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] + , map storedRef $ signedSignature $ fromStored acc + ] + _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request" handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler () handleChannelAccept oh identity accref = do - pst <- gets $ peerStorage . phPeer - copyRef pst accref >>= \case - Right acc -> do - pid <- gets (peerIdentity . phPeer) >>= \case - PeerIdentityFull pid -> return pid - PeerIdentityRef wref -> do - Just idref <- checkWaitingRef wref - Just pid <- return $ validateIdentity $ wrappedLoad idref - return pid - PeerIdentityUnknown -> throwError $ "unknown peer identity" - - ch <- case toUnifiedIdentity pid of - Just upid -> acceptedChannel identity upid (wrappedLoad acc) - Nothing -> throwError $ "non-unified peer identity" - updatePeer $ \p -> p - { peerIdentity = PeerIdentityFull pid - , peerChannel = ChannelEstablished $ fromStored ch - } - finalizedChannel oh identity - Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) - - -finalizedChannel :: Head LocalState -> UnifiedIdentity -> PacketHandler () -finalizedChannel oh self = do - -- Identity update - ist <- gets $ peerInStorage . phPeer - addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData $ self - mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self + peer <- gets phPeer + liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do + withPeerIdentity peer $ \upid -> do + copyRef (peerStorage peer) accref >>= \case + Right acc -> do + ch <- acceptedChannel identity upid (wrappedLoad acc) + liftIO $ atomically $ do + sendToPeerS peer $ TransportPacket (TransportHeader [Acknowledged accref]) [] + writeTVar (peerChannel peer) $ ChannelEstablished $ fromStored ch + finalizedChannel peer oh identity - -- Shared state - gets phPeer >>= \case - peer | PeerIdentityFull pid <- peerIdentity peer - , finalOwner pid `sameIdentity` finalOwner self -> do - Just h <- liftIO $ reloadHead oh - let shared = lsShared $ headObject h - addHeader $ ServiceType $ serviceID @SyncService Proxy - mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared - mapM_ (addBody . storedRef) shared - | otherwise -> return () + Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) - -- Outstanding service packets - gets phPeer >>= \case - peer@Peer - { peerChannel = ChannelEstablished ch - , peerServiceOutQueue = oqueue - } -> do - ps <- liftIO $ modifyMVar oqueue $ return . ([],) - forM_ ps $ sendPacket peer ch - _ -> return () +finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM () +finalizedChannel peer oh self = do + -- Identity update + let ist = peerInStorage peer + sendToPeerS peer $ flip TransportPacket [] $ TransportHeader $ + ( AnnounceSelf $ partialRef ist $ storedRef $ idData $ self ) : + ( map (AnnounceUpdate . partialRef ist . storedRef) $ idUpdates $ self ) -handleIdentityUpdate :: PacketHandler () -handleIdentityUpdate = do - peer <- gets phPeer - case (peerIdentity peer, peerIdentityUpdate peer) of - (PeerIdentityRef wref, _) -> checkWaitingRef wref >>= \case - Just ref | Just pid <- validateIdentity $ wrappedLoad ref -> do - updatePeer $ \p -> p { peerIdentity = PeerIdentityFull pid } - handleIdentityUpdate - _ -> return () + -- Shared state + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do + writeTQueue (serverIOActions $ peerServer peer) $ do + Just h <- liftIO $ reloadHead oh + let shared = lsShared $ headObject h + let hitems = (ServiceType $ serviceID @SyncService Proxy) : + map (ServiceRef . partialRef ist . storedRef) shared + liftIO $ atomically $ sendToPeerS peer $ + TransportPacket (TransportHeader hitems) $ map storedRef shared + _ -> return () - (PeerIdentityFull pid, wrefs@(_:_)) -> do - (wrefs', upds) <- fmap partitionEithers $ forM wrefs $ \wref -> checkWaitingRef wref >>= \case - Just upd -> return $ Right $ wrappedLoad upd - Nothing -> return $ Left wref - updatePeer $ \p -> p - { peerIdentity = PeerIdentityFull $ updateOwners upds pid - , peerIdentityUpdate = wrefs' - } - - _ -> return () - - -handleServices :: Chan (Peer, ServiceID, Ref) -> PacketHandler () -handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case - [] -> return () - queue -> do - queue' <- flip filterM queue $ \case - (svc, wref) -> checkWaitingRef wref >>= \case - Just ref -> do - peer <- gets phPeer - liftIO $ writeChan chan (peer, svc, ref) - return False - Nothing -> return True - updatePeer $ \p -> p { peerServiceInQueue = queue' } - - -mkPeer :: Storage -> PeerAddress -> IO Peer -mkPeer st paddr = do - pst <- deriveEphemeralStorage st - ist <- derivePartialStorage pst - svcs <- newMVar M.empty - oqueue <- newMVar [] - return $ Peer - { peerAddress = paddr - , peerIdentity = PeerIdentityUnknown - , peerIdentityUpdate = [] - , peerChannel = ChannelWait - , peerStorage = pst - , peerInStorage = ist - , peerServiceState = svcs - , peerServiceInQueue = [] - , peerServiceOutQueue = oqueue - , peerWaitingRefs = [] - } + -- Outstanding service packets + mapM_ (sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] + + +handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback +handleIdentityAnnounce self peer ref = liftIO $ atomically $ do + pidentity <- readTVar (peerIdentityVar peer) + if | PeerIdentityRef wref wact <- pidentity + , wrDigest wref == refDigest ref + -> case validateIdentity $ wrappedLoad ref of + Just pid -> do + writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid + writeTChan (serverChanPeer $ peerServer peer) peer + mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) . + reverse =<< readTVar wact + writeTQueue (serverIOActions $ peerServer peer) $ do + setupChannel self peer pid + Nothing -> return () + + | otherwise -> return () + +handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback +handleIdentityUpdate peer ref = liftIO $ atomically $ do + pidentity <- readTVar (peerIdentityVar peer) + if | PeerIdentityFull pid <- pidentity + -> do + writeTVar (peerIdentityVar peer) $ PeerIdentityFull $ + updateOwners [wrappedLoad ref] pid + writeTChan (serverChanPeer $ peerServer peer) peer + + | otherwise -> return () + + +mkPeer :: Server -> PeerAddress -> IO Peer +mkPeer server paddr = do + pst <- deriveEphemeralStorage $ serverStorage server + Peer + <$> pure paddr + <*> pure server + <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO []) + <*> newTVarIO ChannelWait + <*> pure pst + <*> derivePartialStorage pst + <*> newTMVarIO M.empty + <*> newTVarIO [] + <*> newTMVarIO [] serverPeer :: Server -> SockAddr -> IO Peer serverPeer server paddr = do @@ -702,27 +725,24 @@ serverPeer' server paddr = do case M.lookup paddr pvalue of Just peer -> return (pvalue, (peer, False)) Nothing -> do - peer <- mkPeer (serverStorage server) paddr + peer <- mkPeer server paddr return (M.insert paddr peer pvalue, (peer, True)) when hello $ do identity <- readMVar (serverIdentity server) - void $ sendTo peer $ - BL.toStrict $ serializeObject $ transportToObject $ TransportHeader - [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ] + atomically $ writeTQueue (serverOutQueue server) $ (peer, False,) $ + TransportPacket + (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]) + [] return peer -sendTo :: Peer -> BC.ByteString -> IO () -sendTo Peer { peerAddress = DatagramAddress sock addr } msg = void $ S.sendTo sock msg addr -sendTo Peer { peerAddress = PeerIceSession ice } msg = iceSend ice msg - -sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m () +sendToPeer :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> s -> m () sendToPeer self peer packet = sendToPeerList self peer [ServiceReply (Left packet) True] -sendToPeerStored :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> Stored s -> m () +sendToPeerStored :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> Stored s -> m () sendToPeerStored self peer spacket = sendToPeerList self peer [ServiceReply (Right spacket) True] -sendToPeerList :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m () +sendToPeerList :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m () sendToPeerList _ peer parts = do let st = peerStorage peer pst = peerInStorage peer @@ -732,27 +752,26 @@ sendToPeerList _ peer parts = do let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs) header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs) packet = TransportPacket header content - case peerChannel peer of - ChannelEstablished ch -> do - sendPacket peer ch packet - _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:) + liftIO $ atomically $ sendToPeerS peer packet -sendPacket :: (MonadIO m, MonadError String m) => Peer -> Channel -> TransportPacket -> m () -sendPacket peer ch (TransportPacket header content) = do - let plain = BL.toStrict $ BL.concat $ - (serializeObject $ transportToObject header) - : map lazyLoadBytes content - ctext <- channelEncrypt ch plain - void $ liftIO $ sendTo peer ctext +sendToPeerS :: Peer -> TransportPacket -> STM () +sendToPeerS peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, True, packet) + +sendToPeerPlain :: Peer -> TransportPacket -> STM () +sendToPeerPlain peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, False, packet) sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () sendToPeerWith identity peer fobj = do let sproxy = Proxy @s sid = serviceID sproxy - res <- liftIO $ modifyMVar (peerServiceState peer) $ \svcs -> do - runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case + res <- liftIO $ do + svcs <- atomically $ takeTMVar (peerServiceState peer) + (svcs', res) <- runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case Right (obj, s') -> return $ (M.insert sid (SomeServiceState sproxy s') svcs, Right obj) Left err -> return $ (svcs, Left err) + atomically $ putTMVar (peerServiceState peer) svcs' + return res + case res of Right (Just obj) -> sendToPeer identity peer obj Right Nothing -> return () -- cgit v1.2.3