diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network.hs | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/src/Network.hs b/src/Network.hs index d1343f4..d64124e 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -44,6 +44,7 @@ import PubKey import Service import State import Storage +import Storage.Merge import Sync @@ -330,9 +331,7 @@ startServer opt origHead logd' services = do prefs <- forM objs $ storeObject $ peerInStorage peer identity <- readMVar midentity let svcs = map someServiceID services - handlePacket origHead identity secure peer chanSvc svcs header prefs >>= \case - Just peer' -> atomically $ writeTChan chanPeer peer' - Nothing -> return () + handlePacket origHead identity secure peer chanSvc svcs header prefs | otherwise -> atomically $ do logd $ show paddr ++ ": invalid objects" @@ -473,7 +472,6 @@ modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v data PacketHandlerState = PacketHandlerState { phPeer :: Peer - , phPeerChanged :: Bool , phHead :: [TransportHeaderItem] , phBody :: [Ref] } @@ -491,7 +489,7 @@ appendDistinct x [] = [x] handlePacket :: Head LocalState -> UnifiedIdentity -> Bool -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] - -> TransportHeader -> [PartialRef] -> IO (Maybe Peer) + -> TransportHeader -> [PartialRef] -> IO () handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do let server = peerServer peer ochannel <- readTVar $ peerChannel peer @@ -505,7 +503,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers _ -> [] ] - res <- runExceptT $ flip execStateT (PacketHandlerState peer False [] []) $ unPacketHandler $ do + res <- runExceptT $ flip execStateT (PacketHandlerState peer [] []) $ unPacketHandler $ do let logd = liftSTM . writeTQueue (serverErrorLog server) forM_ headers $ \case Acknowledged ref -> do @@ -535,13 +533,14 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers AnnounceSelf pref | refDigest pref == refDigest (storedRef sidentity) -> return () - | otherwise -> readTVarP (peerIdentityVar peer) >>= \case - PeerIdentityUnknown idwait -> do - wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer - addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity - writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait - modify $ \ph -> ph { phPeerChanged = True } - _ -> return () + | otherwise -> do + wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer + readTVarP (peerIdentityVar peer) >>= \case + PeerIdentityUnknown idwait -> do + addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity + writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait + liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer + _ -> return () AnnounceUpdate ref -> do readTVarP (peerIdentityVar peer) >>= \case @@ -593,15 +592,11 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers case res of Left err -> do logd $ "Error in handling packet from " ++ show (peerAddress peer) ++ ": " ++ err - return Nothing Right ph -> do when (not $ null $ phHead ph) $ do let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) writeTQueue (serverOutQueue server) (peer, secure, packet) - return $ if phPeerChanged ph then Just $ phPeer ph - else Nothing - withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case @@ -684,20 +679,28 @@ finalizedChannel peer oh self = do handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback handleIdentityAnnounce self peer ref = liftIO $ atomically $ do - pidentity <- readTVar (peerIdentityVar peer) - if | PeerIdentityRef wref wact <- pidentity - , wrDigest wref == refDigest ref - -> case validateIdentity $ wrappedLoad ref of - Just pid -> do + let validateAndUpdate upds act = case validateIdentity $ wrappedLoad ref of + Just pid' -> do + let pid = updateOwners upds pid' writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid writeTChan (serverChanPeer $ peerServer peer) peer - mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) . - reverse =<< readTVar wact + act pid writeTQueue (serverIOActions $ peerServer peer) $ do setupChannel self peer pid Nothing -> return () - | otherwise -> return () + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityRef wref wact + | wrDigest wref == refDigest ref + -> validateAndUpdate [] $ \pid -> do + mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) . + reverse =<< readTVar wact + + PeerIdentityFull pid + | idData pid `precedes` wrappedLoad ref + -> validateAndUpdate (idUpdates pid) $ \_ -> return () + + _ -> return () handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback handleIdentityUpdate peer ref = liftIO $ atomically $ do |