diff options
-rw-r--r-- | src/Network.hs | 23 | ||||
-rw-r--r-- | src/Network/Protocol.hs | 43 |
2 files changed, 40 insertions, 26 deletions
diff --git a/src/Network.hs b/src/Network.hs index 28a8ce5..c5ba393 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -25,7 +25,6 @@ import Control.Monad.Except import Control.Monad.State import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL import Data.Function import Data.IP qualified as IP import Data.List @@ -68,7 +67,7 @@ data Server = Server , serverThreads :: MVar [ThreadId] , serverSocket :: MVar Socket , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) - , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress + , serverControlFlow :: Flow (Connection PeerAddress) (ControlRequest PeerAddress) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) , serverIOActions :: TQueue (ExceptT String IO ()) , serverServices :: [SomeService] @@ -184,7 +183,7 @@ startServer opt serverOrigHead logd' serverServices = do serverThreads <- newMVar [] serverSocket <- newEmptyMVar (serverRawPath, protocolRawPath) <- newFlowIO - (serverNewConnection, protocolNewConnection) <- newFlowIO + (serverControlFlow, protocolControlFlow) <- newFlowIO serverDataResponse <- newTQueueIO serverIOActions <- newTQueueIO serverServiceStates <- newTMVarIO M.empty @@ -217,10 +216,7 @@ startServer opt serverOrigHead logd' serverServices = do loop sock = do when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do - readMVar serverIdentity_ >>= \identity -> do - st <- derivePartialStorage serverStorage - let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] - mapM_ (void . S.sendTo sock packet) broadcastAddreses + atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do @@ -264,7 +260,7 @@ startServer opt serverOrigHead logd' serverServices = do PeerIceSession ice -> iceSend ice msg forkServerThread server $ forever $ do - conn <- readFlowIO serverNewConnection + conn <- readFlowIO serverControlFlow let paddr = connAddress conn peer <- modifyMVar serverPeers $ \pvalue -> do case M.lookup paddr pvalue of @@ -286,7 +282,7 @@ startServer opt serverOrigHead logd' serverServices = do let svcs = map someServiceID serverServices handlePacket identity secure peer chanSvc svcs header prefs - erebosNetworkProtocol logd protocolRawPath protocolNewConnection + erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow forkServerThread server $ withSocketsDo $ do let hints = defaultHints @@ -642,13 +638,8 @@ serverPeer' server paddr = do Nothing -> do peer <- mkPeer server paddr return (M.insert paddr peer pvalue, (peer, True)) - when hello $ do - identity <- serverIdentity server - atomically $ do - writeFlow (serverNewConnection server) paddr - sendToPeerPlain peer [] $ TransportPacket - (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]) - [] + when hello $ atomically $ do + writeFlow (serverControlFlow server) (RequestConnection paddr) return peer diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs index 054c0fb..488080e 100644 --- a/src/Network/Protocol.hs +++ b/src/Network/Protocol.hs @@ -10,6 +10,7 @@ module Network.Protocol ( ChannelState(..), + ControlRequest(..), erebosNetworkProtocol, Connection, @@ -40,6 +41,7 @@ import System.Clock import Channel import Flow +import Identity import Service import Storage @@ -95,9 +97,10 @@ transportFromObject _ = Nothing data GlobalState addr = (Eq addr, Show addr) => GlobalState - { gConnections :: TVar [Connection addr] + { gIdentity :: TVar UnifiedIdentity + , gConnections :: TVar [Connection addr] , gDataFlow :: SymFlow (addr, ByteString) - , gConnectionFlow :: Flow addr (Connection addr) + , gControlFlow :: Flow (ControlRequest addr) (Connection addr) , gLog :: String -> STM () , gStorage :: PartialStorage , gNowVar :: TVar TimeSpec @@ -155,12 +158,18 @@ data SentPacket = SentPacket } +data ControlRequest addr = RequestConnection addr + | SendAnnounce addr + + erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr) - => (String -> STM ()) + => UnifiedIdentity + -> (String -> STM ()) -> SymFlow (addr, ByteString) - -> Flow addr (Connection addr) + -> Flow (ControlRequest addr) (Connection addr) -> IO () -erebosNetworkProtocol gLog gDataFlow gConnectionFlow = do +erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do + gIdentity <- newTVarIO initialIdentity gConnections <- newTVarIO [] gStorage <- derivePartialStorage =<< memoryStorage @@ -204,7 +213,7 @@ getConnection GlobalState {..} addr = do let conn = Connection {..} writeTVar gConnections (conn : conns) - writeFlow gConnectionFlow conn + writeFlow gControlFlow conn return conn processIncomming :: GlobalState addr -> STM (IO ()) @@ -331,15 +340,29 @@ processOutgoing gs@GlobalState {..} = do writeTVar cSentPackets rest return $ sendBytes conn sp - let establishNewConnection = do - _ <- getConnection gs =<< readFlow gConnectionFlow - return $ return () + let handleControlRequests = readFlow gControlFlow >>= \case + RequestConnection addr -> do + _ <- getConnection gs addr + identity <- readTVar gIdentity + let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader + [ AnnounceSelf $ refDigest $ storedRef $ idData identity + ] + writeFlow gDataFlow (addr, packet) + return $ return () + + SendAnnounce addr -> do + identity <- readTVar gIdentity + let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader + [ AnnounceSelf $ refDigest $ storedRef $ idData identity + ] + writeFlow gDataFlow (addr, packet) + return $ return () conns <- readTVar gConnections msum $ concat $ [ map retransmitPacket conns , map sendNextPacket conns - , [ establishNewConnection ] + , [ handleControlRequests ] ] processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM () |