summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network.hs53
1 files changed, 28 insertions, 25 deletions
diff --git a/src/Network.hs b/src/Network.hs
index d1343f4..d64124e 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -44,6 +44,7 @@ import PubKey
import Service
import State
import Storage
+import Storage.Merge
import Sync
@@ -330,9 +331,7 @@ startServer opt origHead logd' services = do
prefs <- forM objs $ storeObject $ peerInStorage peer
identity <- readMVar midentity
let svcs = map someServiceID services
- handlePacket origHead identity secure peer chanSvc svcs header prefs >>= \case
- Just peer' -> atomically $ writeTChan chanPeer peer'
- Nothing -> return ()
+ handlePacket origHead identity secure peer chanSvc svcs header prefs
| otherwise -> atomically $ do
logd $ show paddr ++ ": invalid objects"
@@ -473,7 +472,6 @@ modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v
data PacketHandlerState = PacketHandlerState
{ phPeer :: Peer
- , phPeerChanged :: Bool
, phHead :: [TransportHeaderItem]
, phBody :: [Ref]
}
@@ -491,7 +489,7 @@ appendDistinct x [] = [x]
handlePacket :: Head LocalState -> UnifiedIdentity -> Bool
-> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
- -> TransportHeader -> [PartialRef] -> IO (Maybe Peer)
+ -> TransportHeader -> [PartialRef] -> IO ()
handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
let server = peerServer peer
ochannel <- readTVar $ peerChannel peer
@@ -505,7 +503,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
_ -> []
]
- res <- runExceptT $ flip execStateT (PacketHandlerState peer False [] []) $ unPacketHandler $ do
+ res <- runExceptT $ flip execStateT (PacketHandlerState peer [] []) $ unPacketHandler $ do
let logd = liftSTM . writeTQueue (serverErrorLog server)
forM_ headers $ \case
Acknowledged ref -> do
@@ -535,13 +533,14 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
AnnounceSelf pref
| refDigest pref == refDigest (storedRef sidentity) -> return ()
- | otherwise -> readTVarP (peerIdentityVar peer) >>= \case
- PeerIdentityUnknown idwait -> do
- wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer
- addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity
- writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
- modify $ \ph -> ph { phPeerChanged = True }
- _ -> return ()
+ | otherwise -> do
+ wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer
+ readTVarP (peerIdentityVar peer) >>= \case
+ PeerIdentityUnknown idwait -> do
+ addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity
+ writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
+ liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer
+ _ -> return ()
AnnounceUpdate ref -> do
readTVarP (peerIdentityVar peer) >>= \case
@@ -593,15 +592,11 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers
case res of
Left err -> do
logd $ "Error in handling packet from " ++ show (peerAddress peer) ++ ": " ++ err
- return Nothing
Right ph -> do
when (not $ null $ phHead ph) $ do
let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
writeTQueue (serverOutQueue server) (peer, secure, packet)
- return $ if phPeerChanged ph then Just $ phPeer ph
- else Nothing
-
withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case
@@ -684,20 +679,28 @@ finalizedChannel peer oh self = do
handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
- pidentity <- readTVar (peerIdentityVar peer)
- if | PeerIdentityRef wref wact <- pidentity
- , wrDigest wref == refDigest ref
- -> case validateIdentity $ wrappedLoad ref of
- Just pid -> do
+ let validateAndUpdate upds act = case validateIdentity $ wrappedLoad ref of
+ Just pid' -> do
+ let pid = updateOwners upds pid'
writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid
writeTChan (serverChanPeer $ peerServer peer) peer
- mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) .
- reverse =<< readTVar wact
+ act pid
writeTQueue (serverIOActions $ peerServer peer) $ do
setupChannel self peer pid
Nothing -> return ()
- | otherwise -> return ()
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityRef wref wact
+ | wrDigest wref == refDigest ref
+ -> validateAndUpdate [] $ \pid -> do
+ mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) .
+ reverse =<< readTVar wact
+
+ PeerIdentityFull pid
+ | idData pid `precedes` wrappedLoad ref
+ -> validateAndUpdate (idUpdates pid) $ \_ -> return ()
+
+ _ -> return ()
handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback
handleIdentityUpdate peer ref = liftIO $ atomically $ do