diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2020-12-20 21:47:22 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2020-12-23 22:32:09 +0100 | 
| commit | 36b9a1ddbddf1477c61809d340cd0b86360a7a83 (patch) | |
| tree | 7b327df1b1635270e98391ec1cf63478b8730793 | |
| parent | 0c4c6618d43a8b7179f11b8edb1f289169b5f2bc (diff) | |
Network: STM-based synchronization rewrite
| -rw-r--r-- | erebos.cabal | 1 | ||||
| -rw-r--r-- | src/Attach.hs | 2 | ||||
| -rw-r--r-- | src/Contact.hs | 4 | ||||
| -rw-r--r-- | src/Main.hs | 56 | ||||
| -rw-r--r-- | src/Message.hs | 4 | ||||
| -rw-r--r-- | src/Network.hs | 825 | ||||
| -rw-r--r-- | src/Pairing.hs | 6 | ||||
| -rw-r--r-- | src/Storage/Internal.hs | 3 | 
8 files changed, 462 insertions, 439 deletions
| diff --git a/erebos.cabal b/erebos.cabal index 7012f71..9c3f086 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -71,6 +71,7 @@ executable erebos                         mime >= 0.4 && < 0.5,                         mtl >=2.2 && <2.3,                         network >= 3.0 && <3.1, +                       stm >=2.5 && <2.6,                         tagged >= 0.8 && <0.9,                         text >= 1.2 && <1.3,                         time >= 1.8 && <1.10, diff --git a/src/Attach.hs b/src/Attach.hs index 5acc608..4df7e5f 100644 --- a/src/Attach.hs +++ b/src/Attach.hs @@ -97,7 +97,7 @@ attachAccept printMsg h peer = do          PeerRequestConfirm -> do              liftIO $ printMsg $ "Accepted new attached device, seding updated identity"              owner <- liftIO $ mergeSharedIdentity h -            PeerIdentityFull pid <- return $ peerIdentity peer +            PeerIdentityFull pid <- peerIdentity peer              Just secret <- liftIO $ loadKey $ idKeyIdentity owner              liftIO $ do                  identity <- wrappedStore st =<< sign secret =<< wrappedStore st (emptyIdentityData $ idKeyIdentity pid) diff --git a/src/Contact.hs b/src/Contact.hs index 814e324..fefcd1f 100644 --- a/src/Contact.hs +++ b/src/Contact.hs @@ -121,7 +121,7 @@ contactAccept printMsg h peer = do              liftIO $ printMsg $ "Contact accepted, waiting for peer confirmation"              return (Nothing, OurRequestReady)          OurRequestConfirm (Just ContactAccepted) -> do -            PeerIdentityFull pid <- return $ peerIdentity peer +            PeerIdentityFull pid <- peerIdentity peer              liftIO $ do                  printMsg $ "Contact accepted"                  updateLocalState_ h $ finalizeContact pid @@ -129,7 +129,7 @@ contactAccept printMsg h peer = do          OurRequestReady -> throwError $ "alredy accepted, waiting for peer"          PeerRequest {} -> throwError $ "waiting for peer"          PeerRequestConfirm -> do -            PeerIdentityFull pid <- return $ peerIdentity peer +            PeerIdentityFull pid <- peerIdentity peer              liftIO $ do                  printMsg $ "Contact accepted"                  updateLocalState_ h $ finalizeContact pid diff --git a/src/Main.hs b/src/Main.hs index 0e8970f..a847bd1 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -109,16 +109,16 @@ interactiveLoop st bhost = runInputT defaultSettings $ do      peers <- liftIO $ newMVar []      void $ liftIO $ forkIO $ void $ forever $ do -        peer <- readChan $ serverChanPeer server -        if | PeerIdentityFull pid <- peerIdentity peer -> do -                 let update [] = ([peer], Nothing) -                     update (p:ps) | PeerIdentityFull pid' <- peerIdentity p -                                   , pid' `sameIdentity` pid = (peer : ps, Just p) -                                   | otherwise               = first (p:) $ update ps -                 op <- modifyMVar peers (return . update) -                 let shown = showPeer peer -                 when (Just shown /= (showPeer <$> op)) $ extPrint shown -           | otherwise -> return () +        peer <- getNextPeerChange server +        peerIdentity peer >>= \case +            pid@(PeerIdentityFull _) -> do +                let shown = showPeer pid $ peerAddress peer +                let update [] = ([(peer, shown)], Nothing) +                    update ((p,s):ps) | p == peer = ((peer, shown) : ps, Just s) +                                      | otherwise = first ((p,s):) $ update ps +                op <- modifyMVar peers (return . update) +                when (Just shown /= op) $ extPrint shown +            _ -> return ()      let getInputLines prompt = do              Just input <- lift $ getInputLine prompt @@ -129,12 +129,12 @@ interactiveLoop st bhost = runInputT defaultSettings $ do      let process :: CommandState -> MaybeT (InputT IO) CommandState          process cstate = do -            let pname = case csPeer cstate of -                             Nothing -> "" -                             Just peer -> case peerIdentity peer of -                                 PeerIdentityFull pid -> maybe "<unnamed>" T.unpack $ idName $ finalOwner pid -                                 PeerIdentityRef wref -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">" -                                 PeerIdentityUnknown  -> "<unknown>" +            pname <- case csPeer cstate of +                        Nothing -> return "" +                        Just peer -> peerIdentity peer >>= return . \case +                            PeerIdentityFull pid -> maybe "<unnamed>" T.unpack $ idName $ finalOwner pid +                            PeerIdentityRef wref _ -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">" +                            PeerIdentityUnknown _  -> "<unknown>"              input <- getInputLines $ pname ++ "> "              let (CommandM cmd, line) = case input of                      '/':rest -> let (scmd, args) = dropWhile isSpace <$> span (\c -> isAlphaNum c || c == '-') rest @@ -171,7 +171,7 @@ data CommandInput = CommandInput      , ciServer :: Server      , ciLine :: String      , ciPrint :: String -> IO () -    , ciPeers :: CommandM [Peer] +    , ciPeers :: CommandM [(Peer, String)]      }  data CommandState = CommandState @@ -214,21 +214,21 @@ cmdUnknown cmd = liftIO $ putStrLn $ "Unknown command: " ++ cmd  cmdPeers :: Command  cmdPeers = do      peers <- join $ asks ciPeers -    forM_ (zip [1..] peers) $ \(i :: Int, p) -> do -        liftIO $ putStrLn $ show i ++ ": " ++ showPeer p +    forM_ (zip [1..] peers) $ \(i :: Int, (_, name)) -> do +        liftIO $ putStrLn $ show i ++ ": " ++ name -showPeer :: Peer -> String -showPeer peer = -    let name = case peerIdentity peer of -                    PeerIdentityUnknown  -> "<noid>" -                    PeerIdentityRef wref -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">" -                    PeerIdentityFull pid -> T.unpack $ displayIdentity pid -     in name ++ " [" ++ show (peerAddress peer) ++ "]" +showPeer :: PeerIdentity -> PeerAddress -> String +showPeer pidentity paddr = +    let name = case pidentity of +                    PeerIdentityUnknown _  -> "<noid>" +                    PeerIdentityRef wref _ -> "<" ++ BC.unpack (showRefDigest $ wrDigest wref) ++ ">" +                    PeerIdentityFull pid   -> T.unpack $ displayIdentity pid +     in name ++ " [" ++ show paddr ++ "]"  cmdSetPeer :: Int -> Command  cmdSetPeer n | n < 1     = liftIO $ putStrLn "Invalid peer index"               | otherwise = do peers <- join $ asks ciPeers -                              modify $ \s -> s { csPeer = listToMaybe $ drop (n - 1) peers } +                              modify $ \s -> s { csPeer = fmap fst $ listToMaybe $ drop (n - 1) peers }  cmdSend :: Command  cmdSend = void $ do @@ -243,7 +243,7 @@ cmdHistory :: Command  cmdHistory = void $ do      ehead <- asks ciHead      Just peer <- gets csPeer -    PeerIdentityFull pid <- return $ peerIdentity peer +    PeerIdentityFull pid <- peerIdentity peer      let powner = finalOwner pid      Just thread <- return $ find (sameIdentity powner . msgPeer) $ diff --git a/src/Message.hs b/src/Message.hs index 874e375..2d00de2 100644 --- a/src/Message.hs +++ b/src/Message.hs @@ -115,8 +115,8 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do  sendDirectMessage :: (MonadIO m, MonadError String m) => Head LocalState -> Peer -> Text -> m (Stored DirectMessage)  sendDirectMessage h peer text = do -    pid <- case peerIdentity peer of PeerIdentityFull pid -> return pid -                                     _ -> throwError "incomplete peer identity" +    pid <- peerIdentity peer >>= \case PeerIdentityFull pid -> return pid +                                       _ -> throwError "incomplete peer identity"      let st = refStorage $ headRef h          self = headLocalIdentity h          powner = finalOwner pid diff --git a/src/Network.hs b/src/Network.hs index cbc68b6..52d83bb 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -1,11 +1,11 @@  module Network (      Server,      startServer, -    serverChanPeer, +    getNextPeerChange, -    Peer(..), -    PeerAddress(..), -    PeerIdentity(..), +    Peer, +    PeerAddress(..), peerAddress, +    PeerIdentity(..), peerIdentity,      PeerChannel(..),      WaitingRef, wrDigest,      Service(..), @@ -16,6 +16,7 @@ module Network (  ) where  import Control.Concurrent +import Control.Concurrent.STM  import Control.Exception  import Control.Monad  import Control.Monad.Except @@ -23,13 +24,15 @@ import Control.Monad.State  import qualified Data.ByteString.Char8 as BC  import qualified Data.ByteString.Lazy as BL -import Data.Either +import Data.Function  import Data.List  import Data.Map (Map)  import qualified Data.Map as M  import Data.Maybe  import Data.Typeable +import GHC.Conc.Sync (unsafeIOToSTM) +  import Network.Socket  import qualified Network.Socket.ByteString as S @@ -55,27 +58,33 @@ data Server = Server      , serverIdentity :: MVar UnifiedIdentity      , serverSocket :: MVar Socket      , serverChanPacket :: Chan (PeerAddress, BC.ByteString) +    , serverOutQueue :: TQueue (Peer, Bool, TransportPacket) +    , serverDataResponse :: TQueue (Peer, Maybe PartialRef) +    , serverIOActions :: TQueue (ExceptT String IO ())      , serverPeers :: MVar (Map PeerAddress Peer) -    , serverChanPeer' :: Chan Peer +    , serverChanPeer :: TChan Peer +    , serverErrorLog :: TQueue String      } -serverChanPeer :: Server -> Chan Peer -serverChanPeer = serverChanPeer' +getNextPeerChange :: Server -> IO Peer +getNextPeerChange = atomically . readTChan . serverChanPeer  data Peer = Peer      { peerAddress :: PeerAddress -    , peerIdentity :: PeerIdentity -    , peerIdentityUpdate :: [WaitingRef] -    , peerChannel :: PeerChannel +    , peerServer :: Server +    , peerIdentityVar :: TVar PeerIdentity +    , peerChannel :: TVar PeerChannel      , peerStorage :: Storage      , peerInStorage :: PartialStorage -    , peerServiceState :: MVar (M.Map ServiceID SomeServiceState) -    , peerServiceInQueue :: [(ServiceID, WaitingRef)] -    , peerServiceOutQueue :: MVar [TransportPacket] -    , peerWaitingRefs :: [WaitingRef] +    , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) +    , peerServiceOutQueue :: TVar [TransportPacket] +    , peerWaitingRefs :: TMVar [WaitingRef]      } +instance Eq Peer where +    (==) = (==) `on` peerIdentityVar +  data PeerAddress = DatagramAddress Socket SockAddr                   | PeerIceSession IceSession @@ -95,8 +104,8 @@ instance Ord PeerAddress where      compare (PeerIceSession ice    ) (PeerIceSession ice')     = compare ice ice' -data PeerIdentity = PeerIdentityUnknown -                  | PeerIdentityRef WaitingRef +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) +                  | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])                    | PeerIdentityFull UnifiedIdentity  data PeerChannel = ChannelWait @@ -105,6 +114,9 @@ data PeerChannel = ChannelWait                   | ChannelOurAccept (Stored ChannelAccept) (Stored Channel)                   | ChannelEstablished Channel +peerIdentity :: MonadIO m => Peer -> m PeerIdentity +peerIdentity = liftIO . atomically . readTVar . peerIdentityVar +  data TransportPacket = TransportPacket TransportHeader [Ref] @@ -162,53 +174,65 @@ lookupServiceType (_ : hs) = lookupServiceType hs  lookupServiceType [] = Nothing -data WaitingRef = WaitingRef Storage PartialRef (MVar [RefDigest]) +data WaitingRef = WaitingRef +    { wrefStorage :: Storage +    , wrefPartial :: PartialRef +    , wrefAction :: Ref -> WaitingRefCallback +    , wrefStatus :: TVar (Either [RefDigest] Ref) +    } + +type WaitingRefCallback = ExceptT String IO ()  wrDigest :: WaitingRef -> RefDigest -wrDigest (WaitingRef _ pref _) = refDigest pref +wrDigest = refDigest . wrefPartial -newWaitingRef :: Storage -> PartialRef -> PacketHandler WaitingRef -newWaitingRef st pref = do -    wref <- WaitingRef st pref <$> liftIO (newMVar []) -    updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p } +newWaitingRef :: PartialRef -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef +newWaitingRef pref act = do +    peer <- gets phPeer +    wref <- WaitingRef (peerStorage peer) pref act <$> liftSTM (newTVar (Left [])) +    modifyTMVarP (peerWaitingRefs peer) (wref:) +    liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)      return wref -copyOrRequestRef :: Storage -> PartialRef -> PacketHandler (Either WaitingRef Ref) -copyOrRequestRef st pref = copyRef st pref >>= \case -    Right ref -> return $ Right ref -    Left dgst -> do -        addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst -        wref <- WaitingRef st pref <$> liftIO (newMVar [dgst]) -        updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p } -        return $ Left wref - -checkWaitingRef :: WaitingRef -> PacketHandler (Maybe Ref) -checkWaitingRef (WaitingRef st pref mvar) = do -    liftIO (readMVar mvar) >>= \case -        [] -> copyRef st pref >>= \case -                  Right ref -> return $ Just ref -                  Left dgst -> do liftIO $ modifyMVar_ mvar $ return . (dgst:) -                                  addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst -                                  return Nothing -        _  -> return Nothing - -receivedWaitingRef :: PartialRef -> WaitingRef -> PacketHandler (Maybe Ref) -receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do -    liftIO $ modifyMVar_ mvar $ return . filter (/= refDigest nref) -    checkWaitingRef wr -  startServer :: Head LocalState -> (String -> IO ()) -> String -> [SomeService] -> IO Server -startServer origHead logd bhost services = do +startServer origHead logd' bhost services = do      let storage = refStorage $ headRef origHead      chanPacket <- newChan -    chanPeer <- newChan -    chanSvc <- newChan -    svcStates <- newMVar M.empty +    outQueue <- newTQueueIO +    dataResponse <- newTQueueIO +    ioActions <- newTQueueIO +    chanPeer <- newTChanIO +    chanSvc <- newTQueueIO +    svcStates <- newTMVarIO M.empty      peers <- newMVar M.empty      midentity <- newMVar $ headLocalIdentity origHead      mshared <- newMVar $ lsShared $ load $ headRef origHead      ssocket <- newEmptyMVar +    errlog <- newTQueueIO + +    let server = Server +            { serverStorage = storage +            , serverIdentity = midentity +            , serverSocket = ssocket +            , serverChanPacket = chanPacket +            , serverOutQueue = outQueue +            , serverDataResponse = dataResponse +            , serverIOActions = ioActions +            , serverPeers = peers +            , serverChanPeer = chanPeer +            , serverErrorLog = errlog +            } + +    let logd = writeTQueue errlog +    void $ forkIO $ forever $ do +        logd' =<< atomically (readTQueue errlog) + +    void $ forkIO $ sendWorker server +    void $ forkIO $ dataResponseWorker server +    void $ forkIO $ forever $ do +        either (atomically . logd) return =<< runExceptT =<< +            atomically (readTQueue $ serverIOActions server)      let open addr = do              sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) @@ -229,27 +253,26 @@ startServer origHead logd bhost services = do              let announceUpdate identity = do                      st <- derivePartialStorage storage -                    let plaintext = BL.toStrict $ serializeObject $ transportToObject $ TransportHeader $ -                            (AnnounceSelf $ partialRef st $ storedRef $ idData identity) : +                    let hitems = (AnnounceSelf $ partialRef st $ storedRef $ idData identity) :                              map (AnnounceUpdate . partialRef st . storedRef) (idUpdates identity) +                    let packet = TransportPacket (TransportHeader hitems) []                      ps <- readMVar peers -                    forM_ ps $ \case -                      peer -                        | PeerIdentityFull _ <- peerIdentity peer -                        , ChannelEstablished ch <- peerChannel peer -                        -> runExceptT (channelEncrypt ch plaintext) >>= \case -                               Right ctext -> void $ sendTo peer ctext -                               Left err -> logd $ "Failed to encrypt data: " ++ err -                        | otherwise -> return () - -            let shareState self shared peer -                    | PeerIdentityFull pid <- peerIdentity peer -                    , finalOwner pid `sameIdentity` finalOwner self = do -                        forM_ shared $ \s -> runExceptT (sendToPeer self peer $ SyncPacket s) >>= \case -                            Left err -> logd $ "failed to sync state with peer: " ++ show err -                            Right () -> return () -                    | otherwise = return () +                    forM_ ps $ \peer -> atomically $ do +                        ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case +                            (PeerIdentityFull _, ChannelEstablished _) -> +                                writeTQueue outQueue (peer, True, packet) +                            _  -> return () + +            let shareState self shared peer = do +                    let hitems = (ServiceType $ serviceID @SyncService Proxy) :  +                            map (ServiceRef . partialRef (peerInStorage peer) . storedRef) shared +                        packet = TransportPacket (TransportHeader hitems) $ +                            map storedRef shared +                    atomically $ readTVar (peerIdentityVar peer) >>= \case +                        PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do +                            sendToPeerS peer packet +                        _  -> return ()              watchHead origHead $ \h -> do                  let idt = headLocalIdentity h @@ -269,42 +292,40 @@ startServer origHead logd bhost services = do              forever $ do                  (paddr, msg) <- readChan chanPacket -                modifyMVar_ peers $ \pvalue -> do -                    let mbpeer = M.lookup paddr pvalue -                    (peer, content, secure) <- if -                        | Just peer <- mbpeer -                        , Just ch <- case peerChannel peer of -                                          ChannelEstablished ch  -> Just ch -                                          ChannelOurAccept _ sch -> Just $ fromStored sch -                                          _                      -> Nothing -                        , Right plain <- runExcept $ channelDecrypt ch msg -                        -> return (peer, plain, True) - -                        | Just peer <- mbpeer -                        -> return (peer, msg, False) - -                        | otherwise -                        -> (, msg, False) <$> mkPeer storage paddr - -                    case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of -                         Right (obj:objs) -                             | Just header <- transportFromObject obj -> do -                                   forM_ objs $ storeObject $ peerInStorage peer -                                   identity <- readMVar midentity -                                   let svcs = map someServiceID services -                                   handlePacket logd origHead identity secure peer chanSvc svcs header >>= \case -                                       Just peer' -> do -                                           writeChan chanPeer peer' -                                           return $ M.insert paddr peer' pvalue -                                       Nothing -> return pvalue - -                             | otherwise -> do -                                   logd $ show paddr ++ ": invalid objects" -                                   logd $ show objs -                                   return pvalue - -                         _ -> do logd $ show paddr ++ ": invalid objects" -                                 return pvalue +                (peer, content, secure) <- modifyMVar peers $ \pvalue -> do +                    case M.lookup paddr pvalue of +                        Just peer -> do +                            mbch <- atomically (readTVar (peerChannel peer)) >>= return . \case +                                ChannelEstablished ch  -> Just ch +                                ChannelOurAccept _ sch -> Just $ fromStored sch +                                _                      -> Nothing + +                            if  | Just ch <- mbch +                                , Right plain <- runExcept $ channelDecrypt ch msg +                                -> return (pvalue, (peer, plain, True)) + +                                | otherwise +                                -> return (pvalue, (peer, msg, False)) + +                        Nothing -> do +                            peer <- mkPeer server paddr +                            return (M.insert paddr peer pvalue, (peer, msg, False)) + +                case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of +                     Right (obj:objs) +                         | Just header <- transportFromObject obj -> do +                               prefs <- forM objs $ storeObject $ peerInStorage peer +                               identity <- readMVar midentity +                               let svcs = map someServiceID services +                               handlePacket origHead identity secure peer chanSvc svcs header prefs >>= \case +                                   Just peer' -> atomically $ writeTChan chanPeer peer' +                                   Nothing -> return () + +                         | otherwise -> atomically $ do +                               logd $ show paddr ++ ": invalid objects" +                               logd $ show objs + +                     _ -> do atomically $ logd $ show paddr ++ ": invalid objects"      void $ forkIO $ withSocketsDo $ do          let hints = defaultHints @@ -314,49 +335,123 @@ startServer origHead logd bhost services = do          addr:_ <- getAddrInfo (Just hints) Nothing (Just discoveryPort)          bracket (open addr) close loop -    void $ forkIO $ forever $ readChan chanSvc >>= \case -        (peer, svc, ref) -            | PeerIdentityFull peerId <- peerIdentity peer -            -> modifyMVar_ svcStates $ \global -> -               modifyMVar (peerServiceState peer) $ \svcs -> -                   case (maybe (someServiceEmptyState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc svcs, -                            maybe (someServiceEmptyGlobalState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc global) of -                        (Just (SomeServiceState (proxy :: Proxy s) s), -                            Just (SomeServiceGlobalState (_ :: Proxy gs) gs)) -                            | Just (Refl :: s :~: gs) <- eqT -> do -                            let inp = ServiceInput -                                    { svcPeer = peerId -                                    , svcPrintOp = logd -                                    } -                            reloadHead origHead >>= \case -                                Nothing -> do -                                    logd $ "current head deleted" -                                    return (svcs, global) -                                Just h -> do -                                    (rsp, (s', gs')) <- handleServicePacket h inp s gs (wrappedLoad ref :: Stored s) -                                    identity <- readMVar midentity -                                    runExceptT (sendToPeerList identity peer rsp) >>= \case -                                        Left err -> logd $ "failed to send response to peer: " ++ show err -                                        Right () -> return () -                                    return (M.insert svc (SomeServiceState proxy s') svcs, -                                        M.insert svc (SomeServiceGlobalState proxy gs') global) -                        _ -> do -                            logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" -                            return (svcs, global) - -            | otherwise -> do -                logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) - -    return Server -        { serverStorage = storage -        , serverIdentity = midentity -        , serverSocket = ssocket -        , serverChanPacket = chanPacket -        , serverPeers = peers -        , serverChanPeer' = chanPeer -        } - -type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a +    void $ forkIO $ forever $ do +        (peer, svc, ref) <- atomically $ readTQueue chanSvc +        atomically (readTVar (peerIdentityVar peer)) >>= \case +            PeerIdentityFull peerId -> do +                (global, svcs) <- atomically $ (,) +                    <$> takeTMVar svcStates +                    <*> takeTMVar (peerServiceState peer) +                case (maybe (someServiceEmptyState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc svcs, +                         maybe (someServiceEmptyGlobalState <$> find ((svc ==) . someServiceID) services) Just $ M.lookup svc global) of +                     (Just (SomeServiceState (proxy :: Proxy s) s), +                         Just (SomeServiceGlobalState (_ :: Proxy gs) gs)) +                         | Just (Refl :: s :~: gs) <- eqT -> do +                         let inp = ServiceInput +                                 { svcPeer = peerId +                                 , svcPrintOp = atomically . logd +                                 } +                         reloadHead origHead >>= \case +                             Nothing -> atomically $ do +                                 logd $ "current head deleted" +                                 putTMVar (peerServiceState peer) svcs +                                 putTMVar svcStates global +                             Just h -> do +                                 (rsp, (s', gs')) <- handleServicePacket h inp s gs (wrappedLoad ref :: Stored s) +                                 identity <- readMVar midentity +                                 sendToPeerList identity peer rsp +                                 atomically $ do +                                     putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs +                                     putTMVar svcStates $ M.insert svc (SomeServiceGlobalState proxy gs') global +                     _ -> atomically $ do +                         logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +                         putTMVar (peerServiceState peer) svcs +                         putTMVar svcStates global + +            _ -> do +                atomically $ logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) + +    return server + +sendWorker :: Server -> IO () +sendWorker server = forever $ do +    (peer, secure, packet@(TransportPacket header content)) <- +        atomically (readTQueue $ serverOutQueue server) + +    let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) +    let plain = BL.toStrict $ BL.concat $ +            (serializeObject $ transportToObject header) +            : map lazyLoadBytes content +    mbs <- do +        mbch <- atomically $ do +            readTVar (peerChannel peer) >>= \case +                ChannelEstablished ch -> return (Just ch) +                _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) (packet:) +                        return Nothing + +        case mbch of +            Just ch -> do +                runExceptT (channelEncrypt ch plain) >>= \case +                    Right ctext -> return $ Just ctext +                    Left err -> do logd $ "Failed to encrypt data: " ++ err +                                   return Nothing +            Nothing | secure    -> return Nothing +                    | otherwise -> return $ Just plain + +    case mbs of +        Just bs -> case peerAddress peer of +            DatagramAddress sock addr -> void $ S.sendTo sock bs addr +            PeerIceSession ice        -> iceSend ice bs +        Nothing -> return () + +dataResponseWorker :: Server -> IO () +dataResponseWorker server = forever $ do +    (peer, npref) <- atomically (readTQueue $ serverDataResponse server) + +    wait <- atomically $ takeTMVar (peerWaitingRefs peer) +    list <- forM wait $ \wr@WaitingRef { wrefStatus = tvar } -> +        atomically (readTVar tvar) >>= \case +            Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of +                [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case +                          Right ref -> do +                              atomically (writeTVar tvar $ Right ref) +                              void $ forkIO $ runExceptT (wrefAction wr ref) >>= \case +                                  Left err -> atomically $ writeTQueue (serverErrorLog server) err +                                  Right () -> return () + +                              return (Nothing, []) +                          Left dgst -> do +                              atomically (writeTVar tvar $ Left [dgst]) +                              return (Just wr, [partialRefFromDigest (refStorage $ wrefPartial wr) dgst]) +                ds' -> do +                    atomically (writeTVar tvar $ Left ds') +                    return (Just wr, []) +            Right _ -> return (Nothing, []) +    atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list + +    let reqs = concat $ map snd list +    when (not $ null reqs) $ do +        let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] +        atomically $ sendToPeerPlain peer packet + + +newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a } +    deriving (Functor, Applicative, Monad, MonadState PacketHandlerState, MonadError String) + +instance MonadFail PacketHandler where +    fail = throwError + +liftSTM :: STM a -> PacketHandler a +liftSTM = PacketHandler . lift . lift + +readTVarP :: TVar a -> PacketHandler a +readTVarP = liftSTM . readTVar + +writeTVarP :: TVar a -> a -> PacketHandler () +writeTVarP v = liftSTM . writeTVar v + +modifyTMVarP :: TMVar a -> (a -> a) -> PacketHandler () +modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v  data PacketHandlerState = PacketHandlerState      { phPeer :: Peer @@ -365,9 +460,6 @@ data PacketHandlerState = PacketHandlerState      , phBody :: [Ref]      } -updatePeer :: (Peer -> Peer) -> PacketHandler () -updatePeer f = modify $ \ph -> ph { phPeer = f (phPeer ph), phPeerChanged = True } -  addHeader :: TransportHeaderItem -> PacketHandler ()  addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph } @@ -379,85 +471,74 @@ appendDistinct x (y:ys) | x == y    = y : ys                          | otherwise = y : appendDistinct x ys  appendDistinct x [] = [x] -handlePacket :: (String -> IO ()) -> Head LocalState -> UnifiedIdentity -> Bool -    -> Peer -> Chan (Peer, ServiceID, Ref) -> [ServiceID] -    -> TransportHeader -> IO (Maybe Peer) -handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader headers) = do +handlePacket :: Head LocalState -> UnifiedIdentity -> Bool +    -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] +    -> TransportHeader -> [PartialRef] -> IO (Maybe Peer) +handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do +    let server = peerServer peer +    ochannel <- readTVar $ peerChannel peer      let sidentity = idData identity          plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat              [ [ storedRef sidentity ]              , map storedRef $ idUpdates identity -            , case peerChannel opeer of +            , case ochannel of                     ChannelOurRequest req  -> [ storedRef req ]                     ChannelOurAccept acc _ -> [ storedRef acc ]                     _                      -> []              ] -    res <- runExceptT $ flip execStateT (PacketHandlerState opeer False [] []) $ do +    res <- runExceptT $ flip execStateT (PacketHandlerState peer False [] []) $ unPacketHandler $ do +        let logd = liftSTM . writeTQueue (serverErrorLog server)          forM_ headers $ \case              Acknowledged ref -> do -                gets (peerChannel . phPeer) >>= \case +                readTVarP (peerChannel peer) >>= \case                      ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do -                        updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) } -                        finalizedChannel origHead identity +                        writeTVarP (peerChannel peer) $ ChannelEstablished (fromStored ch) +                        liftSTM $ finalizedChannel peer origHead identity                      _ -> return () -            Rejected _ -> return () +            Rejected ref -> do +                logd $ "rejected by peer: " ++ show (refDigest ref)              DataRequest ref                  | secure || refDigest ref `elem` plaintextRefs -> do -                    Right mref <- copyRef (storedStorage sidentity) ref +                    Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref                      addHeader $ DataResponse ref                      addBody $ mref                  | otherwise -> do -                    liftIO $ logd $ "unauthorized data request for " ++ show ref +                    logd $ "unauthorized data request for " ++ show ref                      addHeader $ Rejected ref -            DataResponse ref -> do -                liftIO (ioLoadBytes ref) >>= \case -                    Right _  -> do -                        addHeader $ Acknowledged ref -                        wait <- gets $ peerWaitingRefs . phPeer -                        wait' <- flip filterM wait $ receivedWaitingRef ref >=> \case -                            Just _  -> return False -                            Nothing -> return True -                        updatePeer $ \p -> p { peerWaitingRefs = wait' } -                    Left _ -> throwError $ "mismatched data response " ++ show ref - -            AnnounceSelf ref -> do -                peer <- gets phPeer -                if | PeerIdentityRef wref <- peerIdentity peer, wrDigest wref == refDigest ref -> return () -                   | PeerIdentityFull pid <- peerIdentity peer, refDigest ref == (refDigest $ storedRef $ idData pid) -> return () -                   | refDigest ref == refDigest (storedRef sidentity) -> return () -                   | otherwise -> do -                        copyOrRequestRef (peerStorage peer) ref >>= \case -                            Right pref -                                | Just idt <- validateIdentity $ wrappedLoad pref -> -                                    case peerIdentity peer of -                                         PeerIdentityFull prev | not (prev `sameIdentity` idt) -> -                                             throwError $ "peer identity does not follow" -                                         _ -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull idt } -                                | otherwise -> throwError $ "broken identity " ++ show pref -                            Left wref -> do -                                addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity -                                updatePeer $ \p -> p { peerIdentity = PeerIdentityRef wref } +            DataResponse ref -> if +                | ref `elem` prefs -> do +                    addHeader $ Acknowledged ref +                    liftSTM $ writeTQueue (serverDataResponse server) (peer, Just ref) +                | otherwise -> throwError $ "mismatched data response " ++ show ref + +            AnnounceSelf pref +                | refDigest pref == refDigest (storedRef sidentity) -> return () +                | otherwise -> readTVarP (peerIdentityVar peer) >>= \case +                    PeerIdentityUnknown idwait -> do +                        wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer +                        addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity +                        writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait +                        modify $ \ph -> ph { phPeerChanged = True } +                    _ -> return ()              AnnounceUpdate ref -> do -                peer <- gets phPeer -                case peerIdentity peer of -                     PeerIdentityFull pid -> copyOrRequestRef (peerStorage peer) ref >>= \case -                         Right upd -> updatePeer $ \p -> p { peerIdentity = PeerIdentityFull $ updateOwners [wrappedLoad upd] pid } -                         Left wref -> updatePeer $ \p -> p { peerIdentityUpdate = wref : peerIdentityUpdate p } -                     _ -> return () +                readTVarP (peerIdentityVar peer) >>= \case +                    PeerIdentityFull _ -> do +                        void $ newWaitingRef ref $ handleIdentityUpdate peer +                    _ -> return ()              TrChannelRequest reqref -> do -                pst <- gets $ peerStorage . phPeer                  let process = do                          addHeader $ Acknowledged reqref -                        handleChannelRequest identity =<< newWaitingRef pst reqref +                        wref <- newWaitingRef reqref $ handleChannelRequest peer identity +                        writeTVarP (peerChannel peer) $ ChannelPeerRequest wref                      reject = addHeader $ Rejected reqref -                gets (peerChannel . phPeer) >>= \case +                readTVarP (peerChannel peer) >>= \case                      ChannelWait {} -> process                      ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process                                            | otherwise -> reject @@ -467,9 +548,8 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h              TrChannelAccept accref -> do                  let process = do -                        addHeader $ Acknowledged accref                          handleChannelAccept origHead identity accref -                gets (peerChannel . phPeer) >>= \case +                readTVarP (peerChannel peer) >>= \case                      ChannelWait {} -> process                      ChannelOurRequest {} -> process                      ChannelPeerRequest {} -> process @@ -482,207 +562,150 @@ handlePacket logd origHead identity secure opeer chanSvc svcs (TransportHeader h                  | not secure -> throwError $ "service packet without secure channel"                  | Just svc <- lookupServiceType headers -> if                      | svc `elem` svcs -> do -                        liftIO (ioLoadBytes pref) >>= \case -                            Right _ -> do +                        if pref `elem` prefs || True {- TODO: used by Message service to confirm receive -} +                           then do                                  addHeader $ Acknowledged pref -                                pst <- gets $ peerStorage . phPeer -                                wref <- newWaitingRef pst pref -                                updatePeer $ \p -> p { peerServiceInQueue = (svc, wref) : peerServiceInQueue p } -                            Left _ -> throwError $ "missing service object " ++ show pref +                                void $ newWaitingRef pref $ \ref -> +                                    liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) +                           else throwError $ "missing service object " ++ show pref                      | otherwise -> addHeader $ Rejected pref                  | otherwise -> throwError $ "service ref without type" -                 -        setupChannel identity -        handleIdentityUpdate -        handleServices chanSvc +    let logd = writeTQueue (serverErrorLog server)      case res of          Left err -> do -            logd $ "Error in handling packet from " ++ show (peerAddress opeer) ++ ": " ++ err +            logd $ "Error in handling packet from " ++ show (peerAddress peer) ++ ": " ++ err              return Nothing          Right ph -> do              when (not $ null $ phHead ph) $ do -                let plain = BL.toStrict $ BL.concat -                        [ serializeObject $ transportToObject $ TransportHeader $ phHead ph -                        , BL.concat $ map lazyLoadBytes $ phBody ph -                        ] -                case peerChannel $ phPeer ph of -                     ChannelEstablished ch -> do -                        x <- runExceptT (channelEncrypt ch plain) -                        case x of Right ctext -> void $ sendTo (phPeer ph) ctext -                                  Left err -> logd $ "Failed to encrypt data: " ++ err -                     _ -> void $ sendTo (phPeer ph) plain +                let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) +                writeTQueue (serverOutQueue server) (peer, secure, packet)              return $ if phPeerChanged ph then Just $ phPeer ph                                           else Nothing -getOrRequestIdentity :: PeerIdentity -> PacketHandler (Maybe UnifiedIdentity) -getOrRequestIdentity = \case -    PeerIdentityUnknown -> return Nothing -    PeerIdentityRef wref -> checkWaitingRef wref >>= \case -        Just ref -> case validateIdentity (wrappedLoad ref) of -                         Nothing  -> throwError $ "broken identity" -                         Just idt -> return $ Just idt -        Nothing -> return Nothing -    PeerIdentityFull idt -> return $ Just idt - - -setupChannel :: UnifiedIdentity -> PacketHandler () -setupChannel identity = gets phPeer >>= \case -    peer@Peer { peerChannel = ChannelWait } -> do -        getOrRequestIdentity (peerIdentity peer) >>= \case -            Just pid | Just upid <- toUnifiedIdentity pid -> do -                let ist = peerInStorage peer -                req <- createChannelRequest (peerStorage peer) identity upid -                updatePeer $ \p -> p { peerChannel = ChannelOurRequest req } -                addHeader $ TrChannelRequest $ partialRef ist $ storedRef req -                addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData identity -                addBody $ storedRef req +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case +    PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) +    PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) +    PeerIdentityFull idt -> writeTQueue (serverIOActions $ peerServer peer) (act idt) + + +setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback +setupChannel identity peer upid = do +    req <- createChannelRequest (peerStorage peer) identity upid +    let ist = peerInStorage peer +    let hitems = +            [ TrChannelRequest $ partialRef ist $ storedRef req +            , AnnounceSelf $ partialRef ist $ storedRef $ idData identity +            ] +    liftIO $ atomically $ do +        readTVar (peerChannel peer) >>= \case +            ChannelWait -> do +                sendToPeerPlain peer $ TransportPacket (TransportHeader hitems) [storedRef req] +                writeTVar (peerChannel peer) $ ChannelOurRequest req              _ -> return () -    Peer { peerChannel = ChannelPeerRequest wref } -> do -        handleChannelRequest identity wref - -    _ -> return () - -handleChannelRequest :: UnifiedIdentity -> WaitingRef -> PacketHandler () -handleChannelRequest identity reqref = do -    ist <- gets $ peerInStorage . phPeer -    checkWaitingRef reqref >>= \case -        Just req -> do -            pid <- gets (peerIdentity . phPeer) >>= \case -                PeerIdentityFull pid -> return pid -                PeerIdentityRef wref -> do -                    Just idref <- checkWaitingRef wref -                    Just pid <- return $ validateIdentity $ wrappedLoad idref -                    return pid -                PeerIdentityUnknown -> throwError $ "unknown peer identity" - -            (acc, ch) <- case toUnifiedIdentity pid of -                Just upid -> acceptChannelRequest identity upid (wrappedLoad req) -                Nothing   -> throwError $ "non-unified peer identity" -            updatePeer $ \p -> p -                { peerIdentity = PeerIdentityFull pid -                , peerChannel = ChannelOurAccept acc ch -                } -            addHeader $ TrChannelAccept (partialRef ist $ storedRef acc) -            mapM_ addBody $ concat -                [ [ storedRef $ acc ] -                , [ storedRef $ signedData $ fromStored acc ] -                , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] -                , map storedRef $ signedSignature $ fromStored acc -                ] -        Nothing -> do -            updatePeer $ \p -> p { peerChannel = ChannelPeerRequest reqref } +handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback +handleChannelRequest peer identity req = do +    withPeerIdentity peer $ \upid -> do +        (acc, ch) <- acceptChannelRequest identity upid (wrappedLoad req) +        liftIO $ atomically $ do +            readTVar (peerChannel peer) >>= \case +                ChannelPeerRequest wr | wrDigest wr == refDigest req -> do +                    writeTVar (peerChannel peer) $ ChannelOurAccept acc ch +                    let header = TrChannelAccept (partialRef (peerInStorage peer) $ storedRef acc) +                    sendToPeerPlain peer $ TransportPacket (TransportHeader [header]) $ concat +                        [ [ storedRef $ acc ] +                        , [ storedRef $ signedData $ fromStored acc ] +                        , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] +                        , map storedRef $ signedSignature $ fromStored acc +                        ] +                _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request"  handleChannelAccept :: Head LocalState -> UnifiedIdentity -> PartialRef -> PacketHandler ()  handleChannelAccept oh identity accref = do -    pst <- gets $ peerStorage . phPeer -    copyRef pst accref >>= \case -        Right acc -> do -            pid <- gets (peerIdentity . phPeer) >>= \case -                PeerIdentityFull pid -> return pid -                PeerIdentityRef wref -> do -                    Just idref <- checkWaitingRef wref -                    Just pid <- return $ validateIdentity $ wrappedLoad idref -                    return pid -                PeerIdentityUnknown -> throwError $ "unknown peer identity" - -            ch <- case toUnifiedIdentity pid of -                Just upid -> acceptedChannel identity upid (wrappedLoad acc) -                Nothing   -> throwError $ "non-unified peer identity" -            updatePeer $ \p -> p -                { peerIdentity = PeerIdentityFull pid -                , peerChannel = ChannelEstablished $ fromStored ch -                } -            finalizedChannel oh identity -        Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) - - -finalizedChannel :: Head LocalState -> UnifiedIdentity -> PacketHandler () -finalizedChannel oh self = do -    -- Identity update -    ist <- gets $ peerInStorage . phPeer -    addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData $ self -    mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self +    peer <- gets phPeer +    liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do +        withPeerIdentity peer $ \upid -> do +            copyRef (peerStorage peer) accref >>= \case +                Right acc -> do +                    ch <- acceptedChannel identity upid (wrappedLoad acc) +                    liftIO $ atomically $ do +                        sendToPeerS peer $ TransportPacket (TransportHeader [Acknowledged accref]) [] +                        writeTVar (peerChannel peer) $ ChannelEstablished $ fromStored ch +                        finalizedChannel peer oh identity -    -- Shared state -    gets phPeer >>= \case -        peer | PeerIdentityFull pid <- peerIdentity peer -             , finalOwner pid `sameIdentity` finalOwner self -> do -                 Just h <- liftIO $ reloadHead oh -                 let shared = lsShared $ headObject h -                 addHeader $ ServiceType $ serviceID @SyncService Proxy -                 mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared -                 mapM_ (addBody . storedRef) shared -             | otherwise -> return () +                Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) -    -- Outstanding service packets -    gets phPeer >>= \case -        peer@Peer -            { peerChannel = ChannelEstablished ch -            , peerServiceOutQueue = oqueue -            } -> do -                ps <- liftIO $ modifyMVar oqueue $ return . ([],) -                forM_ ps $ sendPacket peer ch -        _ -> return () +finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM () +finalizedChannel peer oh self = do +    -- Identity update +    let ist = peerInStorage peer +    sendToPeerS peer $ flip TransportPacket [] $ TransportHeader $ +        ( AnnounceSelf $ partialRef ist $ storedRef $ idData $ self ) : +        ( map (AnnounceUpdate . partialRef ist . storedRef) $ idUpdates $ self ) -handleIdentityUpdate :: PacketHandler () -handleIdentityUpdate = do -    peer <- gets phPeer -    case (peerIdentity peer, peerIdentityUpdate peer) of -         (PeerIdentityRef wref, _) -> checkWaitingRef wref >>= \case -            Just ref | Just pid <- validateIdentity $ wrappedLoad ref -> do -                updatePeer $ \p -> p { peerIdentity = PeerIdentityFull pid } -                handleIdentityUpdate -            _ -> return () +    -- Shared state +    readTVar (peerIdentityVar peer) >>= \case +        PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do +            writeTQueue (serverIOActions $ peerServer peer) $ do +                Just h <- liftIO $ reloadHead oh +                let shared = lsShared $ headObject h +                let hitems = (ServiceType $ serviceID @SyncService Proxy) :  +                        map (ServiceRef . partialRef ist . storedRef) shared +                liftIO $ atomically $ sendToPeerS peer $ +                    TransportPacket (TransportHeader hitems) $ map storedRef shared +        _ -> return () -         (PeerIdentityFull pid, wrefs@(_:_)) -> do -             (wrefs', upds) <- fmap partitionEithers $ forM wrefs $ \wref -> checkWaitingRef wref >>= \case -                 Just upd -> return $ Right $ wrappedLoad upd -                 Nothing -> return $ Left wref -             updatePeer $ \p -> p -                 { peerIdentity = PeerIdentityFull $ updateOwners upds pid -                 , peerIdentityUpdate = wrefs' -                 } - -         _ -> return () - - -handleServices :: Chan (Peer, ServiceID, Ref) -> PacketHandler () -handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case -    [] -> return () -    queue -> do -        queue' <- flip filterM queue $ \case -            (svc, wref) -> checkWaitingRef wref >>= \case -                Just ref -> do -                    peer <- gets phPeer -                    liftIO $ writeChan chan (peer, svc, ref) -                    return False -                Nothing -> return True -        updatePeer $ \p -> p { peerServiceInQueue = queue' } - - -mkPeer :: Storage -> PeerAddress -> IO Peer -mkPeer st paddr = do -    pst <- deriveEphemeralStorage st -    ist <- derivePartialStorage pst -    svcs <- newMVar M.empty -    oqueue <- newMVar [] -    return $ Peer -        { peerAddress = paddr -        , peerIdentity = PeerIdentityUnknown -        , peerIdentityUpdate = [] -        , peerChannel = ChannelWait -        , peerStorage = pst -        , peerInStorage = ist -        , peerServiceState = svcs -        , peerServiceInQueue = [] -        , peerServiceOutQueue = oqueue -        , peerWaitingRefs = [] -        } +    -- Outstanding service packets +    mapM_ (sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] + + +handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback +handleIdentityAnnounce self peer ref = liftIO $ atomically $ do +    pidentity <- readTVar (peerIdentityVar peer) +    if  | PeerIdentityRef wref wact <- pidentity +        , wrDigest wref == refDigest ref +        -> case validateIdentity $ wrappedLoad ref of +            Just pid -> do +                writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid +                writeTChan (serverChanPeer $ peerServer peer) peer +                mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($pid)) . +                    reverse =<< readTVar wact +                writeTQueue (serverIOActions $ peerServer peer) $ do +                    setupChannel self peer pid +            Nothing -> return () + +        | otherwise -> return () + +handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback +handleIdentityUpdate peer ref = liftIO $ atomically $ do +    pidentity <- readTVar (peerIdentityVar peer) +    if  | PeerIdentityFull pid <- pidentity +        -> do +            writeTVar (peerIdentityVar peer) $ PeerIdentityFull $ +                updateOwners [wrappedLoad ref] pid +            writeTChan (serverChanPeer $ peerServer peer) peer + +        | otherwise -> return () + + +mkPeer :: Server -> PeerAddress -> IO Peer +mkPeer server paddr = do +    pst <- deriveEphemeralStorage $ serverStorage server +    Peer +        <$> pure paddr +        <*> pure server +        <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO []) +        <*> newTVarIO ChannelWait +        <*> pure pst +        <*> derivePartialStorage pst +        <*> newTMVarIO M.empty +        <*> newTVarIO [] +        <*> newTMVarIO []  serverPeer :: Server -> SockAddr -> IO Peer  serverPeer server paddr = do @@ -702,27 +725,24 @@ serverPeer' server paddr = do          case M.lookup paddr pvalue of               Just peer -> return (pvalue, (peer, False))               Nothing -> do -                 peer <- mkPeer (serverStorage server) paddr +                 peer <- mkPeer server paddr                   return (M.insert paddr peer pvalue, (peer, True))      when hello $ do          identity <- readMVar (serverIdentity server) -        void $ sendTo peer $ -            BL.toStrict $ serializeObject $ transportToObject $ TransportHeader -                [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ] +        atomically $ writeTQueue (serverOutQueue server) $ (peer, False,) $ +            TransportPacket +                (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]) +                []      return peer -sendTo :: Peer -> BC.ByteString -> IO () -sendTo Peer { peerAddress = DatagramAddress sock addr } msg = void $ S.sendTo sock msg addr -sendTo Peer { peerAddress = PeerIceSession ice } msg = iceSend ice msg - -sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m () +sendToPeer :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> s -> m ()  sendToPeer self peer packet = sendToPeerList self peer [ServiceReply (Left packet) True] -sendToPeerStored :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> Stored s -> m () +sendToPeerStored :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> Stored s -> m ()  sendToPeerStored self peer spacket = sendToPeerList self peer [ServiceReply (Right spacket) True] -sendToPeerList :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m () +sendToPeerList :: (Service s, MonadIO m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m ()  sendToPeerList _ peer parts = do      let st = peerStorage peer          pst = peerInStorage peer @@ -732,27 +752,26 @@ sendToPeerList _ peer parts = do      let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs)          header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)          packet = TransportPacket header content -    case peerChannel peer of -         ChannelEstablished ch -> do -             sendPacket peer ch packet -         _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:) +    liftIO $ atomically $ sendToPeerS peer packet -sendPacket :: (MonadIO m, MonadError String m) => Peer -> Channel -> TransportPacket -> m () -sendPacket peer ch (TransportPacket header content) = do -    let plain = BL.toStrict $ BL.concat $ -            (serializeObject $ transportToObject header) -            : map lazyLoadBytes content -    ctext <- channelEncrypt ch plain -    void $ liftIO $ sendTo peer ctext +sendToPeerS :: Peer -> TransportPacket -> STM () +sendToPeerS peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, True, packet) + +sendToPeerPlain :: Peer -> TransportPacket -> STM () +sendToPeerPlain peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, False, packet)  sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()  sendToPeerWith identity peer fobj = do      let sproxy = Proxy @s          sid = serviceID sproxy -    res <- liftIO $ modifyMVar (peerServiceState peer) $ \svcs -> do -        runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case +    res <- liftIO $ do +        svcs <- atomically $ takeTMVar (peerServiceState peer) +        (svcs', res) <- runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case              Right (obj, s') -> return $ (M.insert sid (SomeServiceState sproxy s') svcs, Right obj)              Left err -> return $ (svcs, Left err) +        atomically $ putTMVar (peerServiceState peer) svcs' +        return res +      case res of           Right (Just obj) -> sendToPeer identity peer obj           Right Nothing -> return () diff --git a/src/Pairing.hs b/src/Pairing.hs index a0a19b3..9af33c7 100644 --- a/src/Pairing.hs +++ b/src/Pairing.hs @@ -155,9 +155,9 @@ confirmationNumber dgst = let (a:b:c:d:_) = map fromIntegral $ BA.unpack dgst ::  pairingRequest :: forall a m proxy. (PairingResult a, MonadIO m, MonadError String m) => proxy a -> UnifiedIdentity -> Peer -> m ()  pairingRequest _ self peer = do      nonce <- liftIO $ getRandomBytes 32 -    pid <- case peerIdentity peer of -                PeerIdentityFull pid -> return pid -                _ -> throwError "incomplete peer identity" +    pid <- peerIdentity peer >>= \case +        PeerIdentityFull pid -> return pid +        _ -> throwError "incomplete peer identity"      sendToPeerWith @(PairingService a) self peer $ \case          NoPairing -> return (Just $ PairingRequest (nonceDigest self pid nonce BA.empty), OurRequest nonce)          _ -> throwError "alredy in progress" diff --git a/src/Storage/Internal.hs b/src/Storage/Internal.hs index e4e4f00..a625efb 100644 --- a/src/Storage/Internal.hs +++ b/src/Storage/Internal.hs @@ -71,6 +71,9 @@ data StorageBacking c  newtype RefDigest = RefDigest (Digest Blake2b_256)      deriving (Eq, Ord, NFData, ByteArrayAccess) +instance Show RefDigest where +    show = BC.unpack . showRefDigest +  data Ref' c = Ref (Storage' c) RefDigest      deriving (Eq) |