diff options
| -rw-r--r-- | erebos.cabal | 1 | ||||
| -rw-r--r-- | src/Network.hs | 210 | 
2 files changed, 147 insertions, 64 deletions
| diff --git a/erebos.cabal b/erebos.cabal index bf091d1..d6fcdc5 100644 --- a/erebos.cabal +++ b/erebos.cabal @@ -63,6 +63,7 @@ executable erebos                         binary >=0.8 && <0.9,                         bytestring >=0.10 && <0.12,                         cereal >= 0.5 && <0.6, +                       clock >=0.8 && < 0.9,                         containers >= 0.6 && <0.7,                         cryptonite >=0.25 && <0.30,                         deepseq >= 1.4 && <1.5, diff --git a/src/Network.hs b/src/Network.hs index e78ae7c..01caef7 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -17,6 +17,7 @@ module Network (      discoveryPort,  ) where +import Control.Applicative  import Control.Concurrent  import Control.Concurrent.STM  import Control.Exception @@ -42,6 +43,8 @@ import GHC.Conc.Sync (unsafeIOToSTM)  import Network.Socket  import qualified Network.Socket.ByteString as S +import System.Clock +  import Channel  import ICE  import Identity @@ -66,7 +69,6 @@ data Server = Server      , serverIdentity_ :: MVar UnifiedIdentity      , serverSocket :: MVar Socket      , serverChanPacket :: Chan (PeerAddress, BC.ByteString) -    , serverOutQueue :: TQueue (Peer, Bool, TransportPacket)      , serverDataResponse :: TQueue (Peer, Maybe PartialRef)      , serverIOActions :: TQueue (ExceptT String IO ())      , serverServices :: [SomeService] @@ -101,11 +103,20 @@ data Peer = Peer      , peerChannel :: TVar PeerChannel      , peerStorage :: Storage      , peerInStorage :: PartialStorage +    , peerOutQueue :: TQueue (Bool, [TransportHeaderItem], TransportPacket) +    , peerSentPackets :: TVar [SentPacket]      , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) -    , peerServiceOutQueue :: TVar [TransportPacket] +    , peerServiceOutQueue :: TVar [([TransportHeaderItem], TransportPacket)]      , peerWaitingRefs :: TMVar [WaitingRef]      } +data SentPacket = SentPacket +    { spTime :: TimeSpec +    , spRetryCount :: Int +    , spAckedBy :: [TransportHeaderItem] +    , spData :: BC.ByteString +    } +  peerServer :: Peer -> Server  peerServer = peerServer_ @@ -226,7 +237,6 @@ startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeSer  startServer opt origHead logd' services = do      let storage = refStorage $ headRef origHead      chanPacket <- newChan -    outQueue <- newTQueueIO      dataResponse <- newTQueueIO      ioActions <- newTQueueIO      chanPeer <- newTChanIO @@ -244,7 +254,6 @@ startServer opt origHead logd' services = do              , serverIdentity_ = midentity              , serverSocket = ssocket              , serverChanPacket = chanPacket -            , serverOutQueue = outQueue              , serverDataResponse = dataResponse              , serverIOActions = ioActions              , serverServices = services @@ -258,7 +267,6 @@ startServer opt origHead logd' services = do      void $ forkIO $ forever $ do          logd' =<< atomically (readTQueue errlog) -    void $ forkIO $ sendWorker server      void $ forkIO $ dataResponseWorker server      void $ forkIO $ forever $ do          either (atomically . logd) return =<< runExceptT =<< @@ -285,25 +293,27 @@ startServer opt origHead logd' services = do              let announceUpdate identity = do                      st <- derivePartialStorage storage -                    let hitems = (AnnounceSelf $ partialRef st $ storedRef $ idData identity) : -                            map (AnnounceUpdate . partialRef st . storedRef) (idUpdates identity) -                    let packet = TransportPacket (TransportHeader hitems) [] +                    let selfRef = partialRef st $ storedRef $ idData identity +                        updateRefs = map (partialRef st . storedRef) $ idUpdates identity +                        hitems = AnnounceSelf selfRef : map AnnounceUpdate updateRefs +                        packet = TransportPacket (TransportHeader $  hitems) []                      ps <- readMVar peers                      forM_ ps $ \peer -> atomically $ do                          ((,) <$> readTVar (peerIdentityVar peer) <*> readTVar (peerChannel peer)) >>= \case                              (PeerIdentityFull _, ChannelEstablished _) -> -                                writeTQueue outQueue (peer, True, packet) +                                writeTQueue (peerOutQueue peer) (True, [], packet)                              _  -> return ()              let shareState self shared peer = do -                    let hitems = (ServiceType $ serviceID @SyncService Proxy) :  -                            map (ServiceRef . partialRef (peerInStorage peer) . storedRef) shared +                    let refs = map (partialRef (peerInStorage peer) . storedRef) shared +                        hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef refs +                        ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- refs ]                          packet = TransportPacket (TransportHeader hitems) $                              map storedRef shared                      atomically $ readTVar (peerIdentityVar peer) >>= \case                          PeerIdentityFull pid | finalOwner pid `sameIdentity` finalOwner self -> do -                            sendToPeerS peer packet +                            sendToPeerS peer ackedBy packet                          _  -> return ()              void $ watchHead origHead $ \h -> do @@ -373,36 +383,86 @@ startServer opt origHead logd' services = do      return server -sendWorker :: Server -> IO () -sendWorker server = forever $ do -    (peer, secure, packet@(TransportPacket header content)) <- -        atomically (readTQueue $ serverOutQueue server) - -    let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) -    let plain = BL.toStrict $ BL.concat $ -            (serializeObject $ transportToObject header) -            : map lazyLoadBytes content -    mbs <- do -        mbch <- atomically $ do -            readTVar (peerChannel peer) >>= \case +sendWorker :: Peer -> IO () +sendWorker peer = do +    startTime <- getTime MonotonicRaw +    nowVar <- newTVarIO startTime +    waitVar <- newTVarIO startTime + +    let waitTill time = void $ forkIO $ do +            now <- getTime MonotonicRaw +            when (time > now) $ +                threadDelay $ fromInteger (toNanoSecs (time - now)) `div` 1000 +            atomically . writeTVar nowVar =<< getTime MonotonicRaw + +    let sendBytes sp = do +            when (not $ null $ spAckedBy sp) $ do +                now <- getTime MonotonicRaw +                atomically $ modifyTVar' (peerSentPackets peer) $ (:) sp +                    { spTime = now +                    , spRetryCount = spRetryCount sp + 1 +                    } +            case peerAddress peer of +                DatagramAddress sock addr -> void $ S.sendTo sock (spData sp) addr +                PeerIceSession ice        -> iceSend ice (spData sp) + +    let sendNextPacket = do +            (secure, ackedBy, packet@(TransportPacket header content)) <- +                readTQueue (peerOutQueue peer) + +            let logd = atomically . writeTQueue (serverErrorLog $ peerServer peer) +            let plain = BL.toStrict $ BL.concat $ +                    (serializeObject $ transportToObject header) +                    : map lazyLoadBytes content + +            mbch <- readTVar (peerChannel peer) >>= \case                  ChannelEstablished ch -> return (Just ch) -                _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) (packet:) +                _ -> do when secure $ modifyTVar' (peerServiceOutQueue peer) ((ackedBy, packet):)                          return Nothing -        case mbch of -            Just ch -> do -                runExceptT (channelEncrypt ch plain) >>= \case -                    Right ctext -> return $ Just ctext -                    Left err -> do logd $ "Failed to encrypt data: " ++ err -                                   return Nothing -            Nothing | secure    -> return Nothing -                    | otherwise -> return $ Just plain - -    case mbs of -        Just bs -> case peerAddress peer of -            DatagramAddress sock addr -> void $ S.sendTo sock bs addr -            PeerIceSession ice        -> iceSend ice bs -        Nothing -> return () +            return $ do +                mbs <- case mbch of +                    Just ch -> do +                        runExceptT (channelEncrypt ch plain) >>= \case +                            Right ctext -> return $ Just ctext +                            Left err -> do logd $ "Failed to encrypt data: " ++ err +                                           return Nothing +                    Nothing | secure    -> return Nothing +                            | otherwise -> return $ Just plain + +                case mbs of +                    Just bs -> do +                        sendBytes $ SentPacket +                            { spTime = undefined +                            , spRetryCount = -1 +                            , spAckedBy = ackedBy +                            , spData = bs +                            } +                    Nothing -> return () + +    let retransmitPacket = do +            now <- readTVar nowVar +            (sp, rest) <- readTVar (peerSentPackets peer) >>= \case +                sps@(_:_) -> return (last sps, init sps) +                _         -> retry +            let nextTry = spTime sp + fromNanoSecs 1000000000 +            if now < nextTry +              then do +                wait <- readTVar waitVar +                if wait <= now || nextTry < wait +                   then do writeTVar waitVar nextTry +                           return $ waitTill nextTry +                   else retry +              else do +                writeTVar (peerSentPackets peer) rest +                return $ sendBytes sp + +    forever $ join $ atomically $ do +        retransmitPacket <|> sendNextPacket + +processAcknowledgements :: Peer -> [TransportHeaderItem] -> STM () +processAcknowledgements peer = mapM_ $ \hitem -> do +    modifyTVar' (peerSentPackets peer) $ filter $ (hitem `notElem`) . spAckedBy  dataResponseWorker :: Server -> IO ()  dataResponseWorker server = forever $ do @@ -432,7 +492,8 @@ dataResponseWorker server = forever $ do      let reqs = concat $ map snd list      when (not $ null reqs) $ do          let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] -        atomically $ sendToPeerPlain peer packet +            ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ] +        atomically $ sendToPeerPlain peer ackedBy packet  newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a } @@ -456,12 +517,16 @@ modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v  data PacketHandlerState = PacketHandlerState      { phPeer :: Peer      , phHead :: [TransportHeaderItem] +    , phAckedBy :: [TransportHeaderItem]      , phBody :: [Ref]      }  addHeader :: TransportHeaderItem -> PacketHandler ()  addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph } +addAckedBy :: [TransportHeaderItem] -> PacketHandler () +addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy ph) hs } +  addBody :: Ref -> PacketHandler ()  addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } @@ -475,6 +540,7 @@ handlePacket :: Head LocalState -> UnifiedIdentity -> Bool      -> TransportHeader -> [PartialRef] -> IO ()  handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do      let server = peerServer peer +    processAcknowledgements peer headers      ochannel <- readTVar $ peerChannel peer      let sidentity = idData identity          plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat @@ -486,7 +552,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers                     _                      -> []              ] -    res <- runExceptT $ flip execStateT (PacketHandlerState peer [] []) $ unPacketHandler $ do +    res <- runExceptT $ flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler $ do          let logd = liftSTM . writeTQueue (serverErrorLog server)          forM_ headers $ \case              Acknowledged ref -> do @@ -503,6 +569,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers                  | secure || refDigest ref `elem` plaintextRefs -> do                      Right mref <- liftSTM $ unsafeIOToSTM $ copyRef (storedStorage sidentity) ref                      addHeader $ DataResponse ref +                    addAckedBy [ Acknowledged ref, Rejected ref ]                      addBody $ mref                  | otherwise -> do                      logd $ "unauthorized data request for " ++ show ref @@ -520,15 +587,17 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers                      wref <- newWaitingRef pref $ handleIdentityAnnounce identity peer                      readTVarP (peerIdentityVar peer) >>= \case                          PeerIdentityUnknown idwait -> do -                            addHeader $ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity +                            let ref = partialRef (peerInStorage peer) $ storedRef $ idData identity +                            addHeader $ AnnounceSelf ref                              writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait                              liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer -                        _ -> return () +                        _ -> addHeader $ Acknowledged pref              AnnounceUpdate ref -> do                  readTVarP (peerIdentityVar peer) >>= \case                      PeerIdentityFull _ -> do                          void $ newWaitingRef ref $ handleIdentityUpdate peer +                        addHeader $ Acknowledged ref                      _ -> return ()              TrChannelRequest reqref -> do @@ -578,7 +647,7 @@ handlePacket origHead identity secure peer chanSvc svcs (TransportHeader headers          Right ph -> do              when (not $ null $ phHead ph) $ do                  let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) -                writeTQueue (serverOutQueue server) (peer, secure, packet) +                writeTQueue (peerOutQueue peer) (secure, phAckedBy ph, packet)  withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () @@ -592,14 +661,16 @@ setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback  setupChannel identity peer upid = do      req <- createChannelRequest (peerStorage peer) identity upid      let ist = peerInStorage peer +    let reqref = partialRef ist $ storedRef req      let hitems = -            [ TrChannelRequest $ partialRef ist $ storedRef req +            [ TrChannelRequest reqref              , AnnounceSelf $ partialRef ist $ storedRef $ idData identity              ]      liftIO $ atomically $ do          readTVar (peerChannel peer) >>= \case              ChannelWait -> do -                sendToPeerPlain peer $ TransportPacket (TransportHeader hitems) [storedRef req] +                sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ +                    TransportPacket (TransportHeader hitems) [storedRef req]                  writeTVar (peerChannel peer) $ ChannelOurRequest req              _ -> return () @@ -611,8 +682,10 @@ handleChannelRequest peer identity req = do              readTVar (peerChannel peer) >>= \case                  ChannelPeerRequest wr | wrDigest wr == refDigest req -> do                      writeTVar (peerChannel peer) $ ChannelOurAccept acc ch -                    let header = TrChannelAccept (partialRef (peerInStorage peer) $ storedRef acc) -                    sendToPeerPlain peer $ TransportPacket (TransportHeader [header]) $ concat +                    let accref = (partialRef (peerInStorage peer) $ storedRef acc) +                        header = TrChannelAccept accref +                        ackedBy = [ Acknowledged accref, Rejected accref ] +                    sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat                          [ [ storedRef $ acc ]                          , [ storedRef $ signedData $ fromStored acc ]                          , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] @@ -629,7 +702,7 @@ handleChannelAccept oh identity accref = do                  Right acc -> do                      ch <- acceptedChannel identity upid (wrappedLoad acc)                      liftIO $ atomically $ do -                        sendToPeerS peer $ TransportPacket (TransportHeader [Acknowledged accref]) [] +                        sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged accref]) []                          writeTVar (peerChannel peer) $ ChannelEstablished ch                          finalizedChannel peer oh identity @@ -638,11 +711,14 @@ handleChannelAccept oh identity accref = do  finalizedChannel :: Peer -> Head LocalState -> UnifiedIdentity -> STM ()  finalizedChannel peer oh self = do -    -- Identity update      let ist = peerInStorage peer -    sendToPeerS peer $ flip TransportPacket [] $ TransportHeader $ -        ( AnnounceSelf $ partialRef ist $ storedRef $ idData $ self ) : -        ( map (AnnounceUpdate . partialRef ist . storedRef) $ idUpdates $ self ) + +    -- Identity update +    do +        let selfRef = partialRef ist $ storedRef $ idData $ self +            updateRefs = map (partialRef ist . storedRef) $ idUpdates $ self +        sendToPeerS peer [] $ flip TransportPacket [] $ TransportHeader $ +            AnnounceSelf selfRef : map AnnounceUpdate updateRefs      -- Shared state      readTVar (peerIdentityVar peer) >>= \case @@ -650,14 +726,15 @@ finalizedChannel peer oh self = do              writeTQueue (serverIOActions $ peerServer peer) $ do                  Just h <- liftIO $ reloadHead oh                  let shared = lsShared $ headObject h -                let hitems = (ServiceType $ serviceID @SyncService Proxy) :  -                        map (ServiceRef . partialRef ist . storedRef) shared -                liftIO $ atomically $ sendToPeerS peer $ +                let hitems = (ServiceType $ serviceID @SyncService Proxy) : map ServiceRef srefs +                    srefs = map (partialRef ist . storedRef) shared +                    ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- srefs ] +                liftIO $ atomically $ sendToPeerS peer ackedBy $                      TransportPacket (TransportHeader hitems) $ map storedRef shared          _ -> return ()      -- Outstanding service packets -    mapM_ (sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) [] +    mapM_ (uncurry $ sendToPeerS peer) =<< swapTVar (peerServiceOutQueue peer) []  handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback @@ -700,16 +777,20 @@ handleIdentityUpdate peer ref = liftIO $ atomically $ do  mkPeer :: Server -> PeerAddress -> IO Peer  mkPeer server paddr = do      pst <- deriveEphemeralStorage $ serverStorage server -    Peer +    peer <- Peer          <$> pure paddr          <*> pure server          <*> (newTVarIO . PeerIdentityUnknown =<< newTVarIO [])          <*> newTVarIO ChannelWait          <*> pure pst          <*> derivePartialStorage pst +        <*> newTQueueIO +        <*> newTVarIO []          <*> newTMVarIO M.empty          <*> newTVarIO []          <*> newTMVarIO [] +    void $ forkIO $ sendWorker peer +    return peer  serverPeer :: Server -> SockAddr -> IO Peer  serverPeer server paddr = do @@ -733,7 +814,7 @@ serverPeer' server paddr = do                   return (M.insert paddr peer pvalue, (peer, True))      when hello $ do          identity <- serverIdentity server -        atomically $ writeTQueue (serverOutQueue server) $ (peer, False,) $ +        atomically $ writeTQueue (peerOutQueue peer) $ (False, [],) $              TransportPacket                  (TransportHeader [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ])                  [] @@ -756,13 +837,14 @@ sendToPeerList peer parts = do      let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs)          header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)          packet = TransportPacket header content -    liftIO $ atomically $ sendToPeerS peer packet +        ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- prefs ] +    liftIO $ atomically $ sendToPeerS peer ackedBy packet -sendToPeerS :: Peer -> TransportPacket -> STM () -sendToPeerS peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, True, packet) +sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () +sendToPeerS peer ackedBy packet = writeTQueue (peerOutQueue peer) (True, ackedBy, packet) -sendToPeerPlain :: Peer -> TransportPacket -> STM () -sendToPeerPlain peer packet = writeTQueue (serverOutQueue $ peerServer peer) (peer, False, packet) +sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket -> STM () +sendToPeerPlain peer ackedBy packet = writeTQueue (peerOutQueue peer) (False, ackedBy, packet)  sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()  sendToPeerWith peer fobj = do |