diff options
Diffstat (limited to 'src/Network.hs')
-rw-r--r-- | src/Network.hs | 102 |
1 files changed, 64 insertions, 38 deletions
diff --git a/src/Network.hs b/src/Network.hs index 0209853..b7d3c2f 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,7 +1,7 @@ module Network ( Peer(..), PeerAddress(..), - PeerIdentity(..), peerIdentityRef, + PeerIdentity(..), PeerChannel(..), WaitingRef, wrDigest, Service(..), @@ -17,6 +17,7 @@ import Control.Monad.State import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy as BL +import Data.Either import qualified Data.Map as M import Data.Maybe import qualified Data.Text as T @@ -42,7 +43,7 @@ announceIntervalSeconds = 60 data Peer = Peer { peerAddress :: PeerAddress , peerIdentity :: PeerIdentity - , peerOwner :: PeerIdentity + , peerIdentityUpdate :: [WaitingRef] , peerChannel :: PeerChannel , peerSocket :: Socket , peerStorage :: Storage @@ -59,12 +60,6 @@ data PeerIdentity = PeerIdentityUnknown | PeerIdentityRef WaitingRef | PeerIdentityFull UnifiedIdentity -peerIdentityRef :: Peer -> Maybe PartialRef -peerIdentityRef peer = case peerIdentity peer of - PeerIdentityUnknown -> Nothing - PeerIdentityRef (WaitingRef _ pref _) -> Just pref - PeerIdentityFull idt -> Just $ partialRef (peerInStorage peer) $ storedRef $ idData idt - data PeerChannel = ChannelWait | ChannelOurRequest (Stored ChannelRequest) | ChannelPeerRequest WaitingRef @@ -77,6 +72,7 @@ data TransportHeaderItem | DataRequest PartialRef | DataResponse PartialRef | AnnounceSelf PartialRef + | AnnounceUpdate PartialRef | TrChannelRequest PartialRef | TrChannelAccept PartialRef | ServiceType T.Text @@ -91,6 +87,7 @@ transportToObject (TransportHeader items) = Rec $ map single items DataRequest ref -> (BC.pack "REQ", RecRef ref) DataResponse ref -> (BC.pack "RSP", RecRef ref) AnnounceSelf ref -> (BC.pack "ANN", RecRef ref) + AnnounceUpdate ref -> (BC.pack "ANU", RecRef ref) TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref) TrChannelAccept ref -> (BC.pack "CAC", RecRef ref) ServiceType stype -> (BC.pack "STP", RecText stype) @@ -105,6 +102,7 @@ transportFromObject (Rec items) = case catMaybes $ map single items of | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref + | name == BC.pack "ANU", RecRef ref <- content -> Just $ AnnounceUpdate ref | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref | name == BC.pack "STP", RecText stype <- content -> Just $ ServiceType stype @@ -160,10 +158,7 @@ startServer origHead logd bhost services = do chanPeer <- newChan chanSvc <- newChan peers <- newMVar M.empty - - Just self <- return $ verifyIdentity $ lsIdentity $ - fromStored $ wrappedLoad $ headRef origHead - midentity <- newMVar $ self + midentity <- newMVar $ headLocalIdentity origHead let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) @@ -184,9 +179,8 @@ startServer origHead logd bhost services = do threadDelay $ announceIntervalSeconds * 1000 * 1000 watchHead origHead $ \h -> do - idt <- modifyMVar midentity $ \cur -> do - return $ (\x -> (x,x)) $ fromMaybe cur $ verifyIdentity $ lsIdentity $ - fromStored $ wrappedLoad $ headRef h + let idt = headLocalIdentity h + modifyMVar_ midentity $ \_ -> return idt announce idt forever $ do @@ -208,7 +202,7 @@ startServer origHead logd bhost services = do let peer = Peer { peerAddress = DatagramAddress paddr , peerIdentity = PeerIdentityUnknown - , peerOwner = PeerIdentityUnknown + , peerIdentityUpdate = [] , peerChannel = ChannelWait , peerSocket = sock , peerStorage = pst @@ -247,14 +241,13 @@ startServer origHead logd bhost services = do void $ forkIO $ forever $ readChan chanSvc >>= \case (peer, svc, ref) | PeerIdentityFull peerId <- peerIdentity peer - , PeerIdentityFull peerOwnerId <- peerOwner peer -> modifyMVar_ (peerServiceState peer) $ \svcs -> case maybe (lookup svc services) Just $ M.lookup svc svcs of Nothing -> do logd $ "unhandled service '" ++ T.unpack svc ++ "'" return svcs Just (SomeService s) -> do let inp = ServiceInput - { svcPeer = peerId, svcPeerOwner = peerOwnerId + { svcPeer = peerId , svcPrintOp = logd } (rsp, s') <- handleServicePacket storage inp s (wrappedLoad ref) @@ -295,6 +288,7 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do DatagramAddress paddr = peerAddress opeer plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat [ [ storedRef sidentity ] + , map storedRef $ idUpdates identity , case peerChannel opeer of ChannelOurRequest req -> [ storedRef req ] ChannelOurAccept acc _ -> [ storedRef acc ] @@ -305,8 +299,9 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do forM_ headers $ \case Acknowledged ref -> do gets (peerChannel . phPeer) >>= \case - ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> + ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } + sendIdentityUpdate identity _ -> return () DataRequest ref @@ -329,18 +324,28 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do AnnounceSelf ref -> do peer <- gets phPeer - if | Just ref' <- peerIdentityRef peer, refDigest ref' == refDigest ref -> return () + if | PeerIdentityRef wref <- peerIdentity peer, wrDigest wref == refDigest ref -> return () + | PeerIdentityFull pid <- peerIdentity peer, refDigest ref == (refDigest $ storedRef $ idData pid) -> return () | refDigest ref == refDigest (storedRef sidentity) -> return () | otherwise -> do copyOrRequestRef (peerStorage peer) ref >>= \case Right pref - | Just idt <- verifyIdentity (wrappedLoad pref) -> do - updatePeer $ \p -> p { peerIdentity = PeerIdentityFull idt - , peerOwner = PeerIdentityFull $ finalOwner idt - } + | Just idt <- validateIdentity $ wrappedLoad pref -> + case peerIdentity peer of + PeerIdentityFull prev | not (prev `sameIdentity` idt) -> + throwError $ "peer identity does not follow" + _ -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull idt } | otherwise -> throwError $ "broken identity " ++ show pref Left wref -> updatePeer $ \p -> p { peerIdentity = PeerIdentityRef wref } + AnnounceUpdate ref -> do + peer <- gets phPeer + case peerIdentity peer of + PeerIdentityFull pid -> copyOrRequestRef (peerStorage peer) ref >>= \case + Right upd -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull $ updateOwners [wrappedLoad upd] pid } + Left wref -> updatePeer $ \p -> p { peerIdentityUpdate = wref : peerIdentityUpdate p } + _ -> return () + TrChannelRequest reqref -> do addHeader $ Acknowledged reqref pst <- gets $ peerStorage . phPeer @@ -378,6 +383,7 @@ handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do | otherwise -> throwError $ "service ref without type" setupChannel identity + handleIdentityUpdate handleServices chanSvc case res of @@ -405,7 +411,7 @@ getOrRequestIdentity :: PeerIdentity -> PacketHandler (Maybe UnifiedIdentity) getOrRequestIdentity = \case PeerIdentityUnknown -> return Nothing PeerIdentityRef wref -> checkWaitingRef wref >>= \case - Just ref -> case verifyIdentity $ wrappedLoad ref of + Just ref -> case validateIdentity (wrappedLoad ref) of Nothing -> throwError $ "broken identity" Just idt -> return $ Just idt Nothing -> return Nothing @@ -416,14 +422,14 @@ setupChannel :: UnifiedIdentity -> PacketHandler () setupChannel identity = gets phPeer >>= \case peer@Peer { peerChannel = ChannelWait } -> do getOrRequestIdentity (peerIdentity peer) >>= \case - Just pid -> do + Just pid | Just upid <- toUnifiedIdentity pid -> do let ist = peerInStorage peer - req <- createChannelRequest (peerStorage peer) identity pid + req <- createChannelRequest (peerStorage peer) identity upid updatePeer $ \p -> p { peerChannel = ChannelOurRequest req } addHeader $ TrChannelRequest $ partialRef ist $ storedRef req addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData identity addBody $ storedRef req - Nothing -> return () + _ -> return () Peer { peerChannel = ChannelPeerRequest wref } -> do handleChannelRequest identity wref @@ -439,16 +445,15 @@ handleChannelRequest identity reqref = do PeerIdentityFull pid -> return pid PeerIdentityRef wref -> do Just idref <- checkWaitingRef wref - Just pid <- return $ verifyIdentity $ wrappedLoad idref + Just pid <- return $ validateIdentity $ wrappedLoad idref return pid PeerIdentityUnknown -> throwError $ "unknown peer identity" - (acc, ch) <- acceptChannelRequest identity pid (wrappedLoad req) + (acc, ch) <- case toUnifiedIdentity pid of + Just upid -> acceptChannelRequest identity upid (wrappedLoad req) + Nothing -> throwError $ "non-unified peer identity" updatePeer $ \p -> p { peerIdentity = PeerIdentityFull pid - , peerOwner = case peerOwner p of - PeerIdentityUnknown -> PeerIdentityFull $ finalOwner pid - owner -> owner , peerChannel = ChannelOurAccept acc ch } addHeader $ TrChannelAccept (partialRef ist $ storedRef acc) @@ -470,21 +475,42 @@ handleChannelAccept identity accref = do PeerIdentityFull pid -> return pid PeerIdentityRef wref -> do Just idref <- checkWaitingRef wref - Just pid <- return $ verifyIdentity $ wrappedLoad idref + Just pid <- return $ validateIdentity $ wrappedLoad idref return pid PeerIdentityUnknown -> throwError $ "unknown peer identity" - ch <- acceptedChannel identity pid (wrappedLoad acc) + ch <- case toUnifiedIdentity pid of + Just upid -> acceptedChannel identity upid (wrappedLoad acc) + Nothing -> throwError $ "non-unified peer identity" updatePeer $ \p -> p { peerIdentity = PeerIdentityFull pid - , peerOwner = case peerOwner p of - PeerIdentityUnknown -> PeerIdentityFull $ finalOwner pid - owner -> owner , peerChannel = ChannelEstablished $ fromStored ch } + sendIdentityUpdate identity Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) +sendIdentityUpdate :: UnifiedIdentity -> PacketHandler () +sendIdentityUpdate self = do + ist <- gets $ peerInStorage . phPeer + mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self + + +handleIdentityUpdate :: PacketHandler () +handleIdentityUpdate = do + peer <- gets phPeer + case (peerIdentity peer, peerIdentityUpdate peer) of + (PeerIdentityFull pid, wrefs@(_:_)) -> do + (wrefs', upds) <- fmap partitionEithers $ forM wrefs $ \wref -> checkWaitingRef wref >>= \case + Just upd -> return $ Right $ wrappedLoad upd + Nothing -> return $ Left wref + updatePeer $ \p -> p + { peerIdentity = PeerIdentityFull $ updateOwners upds pid + , peerIdentityUpdate = wrefs' + } + _ -> return () + + handleServices :: Chan (Peer, T.Text, Ref) -> PacketHandler () handleServices chan = gets (peerServiceQueue . phPeer) >>= \case [] -> return () |