summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2023-08-13 16:28:33 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2023-08-27 12:01:16 +0200
commit251452dfb0c239ac1bc9f70c620a2cdef18ae739 (patch)
treec6b167fa397a901637aeb740ed01009f0bd3b87e
parent0ef84b829ef2b27ce73dc84ad549d6099b28c377 (diff)
Network: send announce using protocol control request
-rw-r--r--src/Network.hs23
-rw-r--r--src/Network/Protocol.hs43
2 files changed, 40 insertions, 26 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 28a8ce5..c5ba393 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -25,7 +25,6 @@ import Control.Monad.Except
import Control.Monad.State
import qualified Data.ByteString.Char8 as BC
-import qualified Data.ByteString.Lazy as BL
import Data.Function
import Data.IP qualified as IP
import Data.List
@@ -68,7 +67,7 @@ data Server = Server
, serverThreads :: MVar [ThreadId]
, serverSocket :: MVar Socket
, serverRawPath :: SymFlow (PeerAddress, BC.ByteString)
- , serverNewConnection :: Flow (Connection PeerAddress) PeerAddress
+ , serverControlFlow :: Flow (Connection PeerAddress) (ControlRequest PeerAddress)
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
, serverIOActions :: TQueue (ExceptT String IO ())
, serverServices :: [SomeService]
@@ -184,7 +183,7 @@ startServer opt serverOrigHead logd' serverServices = do
serverThreads <- newMVar []
serverSocket <- newEmptyMVar
(serverRawPath, protocolRawPath) <- newFlowIO
- (serverNewConnection, protocolNewConnection) <- newFlowIO
+ (serverControlFlow, protocolControlFlow) <- newFlowIO
serverDataResponse <- newTQueueIO
serverIOActions <- newTQueueIO
serverServiceStates <- newTMVarIO M.empty
@@ -217,10 +216,7 @@ startServer opt serverOrigHead logd' serverServices = do
loop sock = do
when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
- readMVar serverIdentity_ >>= \identity -> do
- st <- derivePartialStorage serverStorage
- let packet = BL.toStrict $ serializeObject $ transportToObject st $ TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ]
- mapM_ (void . S.sendTo sock packet) broadcastAddreses
+ atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses
threadDelay $ announceIntervalSeconds * 1000 * 1000
let announceUpdate identity = do
@@ -264,7 +260,7 @@ startServer opt serverOrigHead logd' serverServices = do
PeerIceSession ice -> iceSend ice msg
forkServerThread server $ forever $ do
- conn <- readFlowIO serverNewConnection
+ conn <- readFlowIO serverControlFlow
let paddr = connAddress conn
peer <- modifyMVar serverPeers $ \pvalue -> do
case M.lookup paddr pvalue of
@@ -286,7 +282,7 @@ startServer opt serverOrigHead logd' serverServices = do
let svcs = map someServiceID serverServices
handlePacket identity secure peer chanSvc svcs header prefs
- erebosNetworkProtocol logd protocolRawPath protocolNewConnection
+ erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow
forkServerThread server $ withSocketsDo $ do
let hints = defaultHints
@@ -642,13 +638,8 @@ serverPeer' server paddr = do
Nothing -> do
peer <- mkPeer server paddr
return (M.insert paddr peer pvalue, (peer, True))
- when hello $ do
- identity <- serverIdentity server
- atomically $ do
- writeFlow (serverNewConnection server) paddr
- sendToPeerPlain peer [] $ TransportPacket
- (TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ])
- []
+ when hello $ atomically $ do
+ writeFlow (serverControlFlow server) (RequestConnection paddr)
return peer
diff --git a/src/Network/Protocol.hs b/src/Network/Protocol.hs
index 054c0fb..488080e 100644
--- a/src/Network/Protocol.hs
+++ b/src/Network/Protocol.hs
@@ -10,6 +10,7 @@ module Network.Protocol (
ChannelState(..),
+ ControlRequest(..),
erebosNetworkProtocol,
Connection,
@@ -40,6 +41,7 @@ import System.Clock
import Channel
import Flow
+import Identity
import Service
import Storage
@@ -95,9 +97,10 @@ transportFromObject _ = Nothing
data GlobalState addr = (Eq addr, Show addr) => GlobalState
- { gConnections :: TVar [Connection addr]
+ { gIdentity :: TVar UnifiedIdentity
+ , gConnections :: TVar [Connection addr]
, gDataFlow :: SymFlow (addr, ByteString)
- , gConnectionFlow :: Flow addr (Connection addr)
+ , gControlFlow :: Flow (ControlRequest addr) (Connection addr)
, gLog :: String -> STM ()
, gStorage :: PartialStorage
, gNowVar :: TVar TimeSpec
@@ -155,12 +158,18 @@ data SentPacket = SentPacket
}
+data ControlRequest addr = RequestConnection addr
+ | SendAnnounce addr
+
+
erebosNetworkProtocol :: (Eq addr, Ord addr, Show addr)
- => (String -> STM ())
+ => UnifiedIdentity
+ -> (String -> STM ())
-> SymFlow (addr, ByteString)
- -> Flow addr (Connection addr)
+ -> Flow (ControlRequest addr) (Connection addr)
-> IO ()
-erebosNetworkProtocol gLog gDataFlow gConnectionFlow = do
+erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do
+ gIdentity <- newTVarIO initialIdentity
gConnections <- newTVarIO []
gStorage <- derivePartialStorage =<< memoryStorage
@@ -204,7 +213,7 @@ getConnection GlobalState {..} addr = do
let conn = Connection {..}
writeTVar gConnections (conn : conns)
- writeFlow gConnectionFlow conn
+ writeFlow gControlFlow conn
return conn
processIncomming :: GlobalState addr -> STM (IO ())
@@ -331,15 +340,29 @@ processOutgoing gs@GlobalState {..} = do
writeTVar cSentPackets rest
return $ sendBytes conn sp
- let establishNewConnection = do
- _ <- getConnection gs =<< readFlow gConnectionFlow
- return $ return ()
+ let handleControlRequests = readFlow gControlFlow >>= \case
+ RequestConnection addr -> do
+ _ <- getConnection gs addr
+ identity <- readTVar gIdentity
+ let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader
+ [ AnnounceSelf $ refDigest $ storedRef $ idData identity
+ ]
+ writeFlow gDataFlow (addr, packet)
+ return $ return ()
+
+ SendAnnounce addr -> do
+ identity <- readTVar gIdentity
+ let packet = BL.toStrict $ serializeObject $ transportToObject gStorage $ TransportHeader
+ [ AnnounceSelf $ refDigest $ storedRef $ idData identity
+ ]
+ writeFlow gDataFlow (addr, packet)
+ return $ return ()
conns <- readTVar gConnections
msum $ concat $
[ map retransmitPacket conns
, map sendNextPacket conns
- , [ establishNewConnection ]
+ , [ handleControlRequests ]
]
processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM ()