diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Network.hs | 23 | ||||
| -rw-r--r-- | src/Network/Protocol.hs | 43 | 
2 files changed, 40 insertions, 26 deletions
| diff --git a/src/Network.hs b/src/Network.hs index 28a8ce5..c5ba393 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -25,7 +25,6 @@ import Control.Monad.Except  import Control.Monad.State  import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL  import Data.Function  import Data.IP qualified as IP  import Data.List @@ -68,7 +67,7 @@ data Server = Server      , serverThreads :: MVar [ThreadId]      , serverSocket :: MVar Socket      , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) -    , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress +    , serverControlFlow :: Flow (Connection PeerAddress) (ControlRequest PeerAddress)      , serverDataResponse :: TQueue (Peer, Maybe PartialRef)      , serverIOActions :: TQueue (ExceptT String IO ())      , serverServices :: [SomeService] @@ -184,7 +183,7 @@ startServer opt serverOrigHead logd' serverServices = do      serverThreads <- newMVar []      serverSocket <- newEmptyMVar      (serverRawPath, protocolRawPath) <- newFlowIO -    (serverNewConnection, protocolNewConnection) <- newFlowIO +    (serverControlFlow, protocolControlFlow) <- newFlowIO      serverDataResponse <- newTQueueIO      serverIOActions <- newTQueueIO      serverServiceStates <- newTMVarIO M.empty @@ -217,10 +216,7 @@ startServer opt serverOrigHead logd' serverServices = do          loop sock = do              when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do -                readMVar serverIdentity_ >>= \identity -> do -                    st <- derivePartialStorage serverStorage -                    let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] -                    mapM_ (void . S.sendTo sock packet) broadcastAddreses +                atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses                  threadDelay $ announceIntervalSeconds * 1000 * 1000              let announceUpdate identity = do @@ -264,7 +260,7 @@ startServer opt serverOrigHead logd' serverServices = do                      PeerIceSession ice   -> iceSend ice msg              forkServerThread server $ forever $ do -                conn <- readFlowIO serverNewConnection +                conn <- readFlowIO serverControlFlow                  let paddr = connAddress conn                  peer <- modifyMVar serverPeers $ \pvalue -> do                      case M.lookup paddr pvalue of @@ -286,7 +282,7 @@ startServer opt serverOrigHead logd' serverServices = do                      let svcs = map someServiceID serverServices                      handlePacket identity secure peer chanSvc svcs header prefs -            erebosNetworkProtocol logd protocolRawPath protocolNewConnection +            erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow      forkServerThread server $ withSocketsDo $ do          let hints = defaultHints @@ -642,13 +638,8 @@ serverPeer' server paddr = do               Nothing -> do                   peer <- mkPeer server paddr                   return (M.insert paddr peer pvalue, (peer, True)) -    when hello $ do -        identity <- serverIdentity server -        atomically $ do -            writeFlow (serverNewConnection server) paddr -            sendToPeerPlain peer [] $ TransportPacket -                (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]) -                [] +    when hello $ atomically $ do +        writeFlow (serverControlFlow server) (RequestConnection paddr)      return peer diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index 054c0fb..488080e 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -10,6 +10,7 @@ module Network.Protocol (      ChannelState(..), +    ControlRequest(..),      erebosNetworkProtocol,      Connection, @@ -40,6 +41,7 @@ import System.Clock  import Channel  import Flow +import Identity  import Service  import Storage @@ -95,9 +97,10 @@ transportFromObject _ = Nothing  data GlobalState addr = (Eq addr, Show addr) => GlobalState -    { gConnections :: TVar [Connection addr] +    { gIdentity :: TVar UnifiedIdentity +    , gConnections :: TVar [Connection addr]      , gDataFlow :: SymFlow (addr, ByteString) -    , gConnectionFlow :: Flow addr (Connection addr) +    , gControlFlow :: Flow (ControlRequest addr) (Connection addr)      , gLog :: String -> STM ()      , gStorage :: PartialStorage      , gNowVar :: TVar TimeSpec @@ -155,12 +158,18 @@ data SentPacket = SentPacket      } +data ControlRequest addr = RequestConnection addr +                         | SendAnnounce addr + +  erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) -                      => (String -> STM ()) +                      => UnifiedIdentity +                      -> (String -> STM ())                        -> SymFlow (addr, ByteString) -                      -> Flow addr (Connection addr) +                      -> Flow (ControlRequest addr) (Connection addr)                        -> IO () -erebosNetworkProtocol gLog gDataFlow gConnectionFlow = do +erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do +    gIdentity <- newTVarIO initialIdentity      gConnections <- newTVarIO []      gStorage <- derivePartialStorage =<< memoryStorage @@ -204,7 +213,7 @@ getConnection GlobalState {..} addr = do              let conn = Connection {..}              writeTVar gConnections (conn : conns) -            writeFlow gConnectionFlow conn +            writeFlow gControlFlow conn              return conn  processIncomming :: GlobalState addr -> STM (IO ()) @@ -331,15 +340,29 @@ processOutgoing gs@GlobalState {..} = do                  writeTVar cSentPackets rest                  return $ sendBytes conn sp -    let establishNewConnection = do -            _ <- getConnection gs =<< readFlow gConnectionFlow -            return $ return () +    let handleControlRequests = readFlow gControlFlow >>= \case +            RequestConnection addr -> do +                _ <- getConnection gs addr +                identity <- readTVar gIdentity +                let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader +                        [ AnnounceSelf $ refDigest $ storedRef $ idData identity +                        ] +                writeFlow gDataFlow (addr, packet) +                return $ return () + +            SendAnnounce addr -> do +                identity <- readTVar gIdentity +                let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader +                        [ AnnounceSelf $ refDigest $ storedRef $ idData identity +                        ] +                writeFlow gDataFlow (addr, packet) +                return $ return ()      conns <- readTVar gConnections      msum $ concat $          [ map retransmitPacket conns          , map sendNextPacket conns -        , [ establishNewConnection ] +        , [ handleControlRequests ]          ]  processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM () |