summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs120
1 files changed, 87 insertions, 33 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index 41b6279..2064d1c 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -19,7 +19,9 @@ module Erebos.Network (
#endif
dropPeer,
isPeerDropped,
- sendToPeer, sendToPeerStored, sendToPeerWith,
+ sendToPeer, sendManyToPeer,
+ sendToPeerStored, sendManyToPeerStored,
+ sendToPeerWith,
runPeerService,
discoveryPort,
@@ -52,6 +54,9 @@ import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
import qualified Network.Socket.ByteString as S
+import Foreign.C.Types
+import Foreign.Marshal.Alloc
+
import Erebos.Channel
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
@@ -69,6 +74,9 @@ import Erebos.Storage.Merge
discoveryPort :: PortNumber
discoveryPort = 29665
+discoveryMulticastGroup :: HostAddress6
+discoveryMulticastGroup = tupleToHostAddress6 (0xff12, 0xb6a4, 0x6b1f, 0x0969, 0xcaee, 0xacc2, 0x5c93, 0x73e1) -- ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1
+
announceIntervalSeconds :: Int
announceIntervalSeconds = 60
@@ -247,8 +255,6 @@ startServer opt serverOrigHead logd' serverServices = do
either (atomically . logd) return =<< runExceptT =<<
atomically (readTQueue serverIOActions)
- broadcastAddreses <- getBroadcastAddresses discoveryPort
-
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
putMVar serverSocket sock
@@ -259,9 +265,14 @@ startServer opt serverOrigHead logd' serverServices = do
return sock
loop sock = do
- when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
- atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses
- threadDelay $ announceIntervalSeconds * 1000 * 1000
+ when (serverLocalDiscovery opt) $ forkServerThread server $ do
+ announceAddreses <- fmap concat $ sequence $
+ [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock
+ , getBroadcastAddresses discoveryPort
+ ]
+ forever $ do
+ atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) announceAddreses
+ threadDelay $ announceIntervalSeconds * 1000 * 1000
let announceUpdate identity = do
st <- derivePartialStorage serverStorage
@@ -301,10 +312,11 @@ startServer opt serverOrigHead logd' serverServices = do
forkServerThread server $ forever $ do
(paddr, msg) <- readFlowIO serverRawPath
- case paddr of
- DatagramAddress addr -> void $ S.sendTo sock msg addr
+ handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do
+ case paddr of
+ DatagramAddress addr -> void $ S.sendTo sock msg addr
#ifdef ENABLE_ICE_SUPPORT
- PeerIceSession ice -> iceSend ice msg
+ PeerIceSession ice -> iceSend ice msg
#endif
forkServerThread server $ forever $ do
@@ -421,12 +433,18 @@ instance MonadFail PacketHandler where
runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler secure peer@Peer {..} act = do
let logd = writeTQueue $ serverErrorLog peerServer_
- runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case
+ runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case
Left err -> do
logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err
Right ph -> do
when (not $ null $ phHead ph) $ do
- let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
+ body <- case phBodyStream ph of
+ Nothing -> return $ phBody ph
+ Just stream -> do
+ writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do
+ writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph
+ return []
+ let packet = TransportPacket (TransportHeader $ phHead ph) body
secreq = case (secure, phPlaintextReply ph) of
(True, _) -> EncryptedOnly
(False, False) -> PlaintextAllowed
@@ -450,6 +468,7 @@ data PacketHandlerState = PacketHandlerState
, phHead :: [TransportHeaderItem]
, phAckedBy :: [TransportHeaderItem]
, phBody :: [Ref]
+ , phBodyStream :: Maybe RawStreamWriter
, phPlaintextReply :: Bool
}
@@ -462,6 +481,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy
addBody :: Ref -> PacketHandler ()
addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
+sendBodyAsStream :: PacketHandler ()
+sendBodyAsStream = do
+ gets phBodyStream >>= \case
+ Nothing -> do
+ stream <- openStream
+ modify $ \ph -> ph { phBodyStream = Just stream }
+ Just _ -> return ()
+
keepPlaintextReply :: PacketHandler ()
keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True }
@@ -517,8 +544,12 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
liftSTM $ finalizedChannel peer ch identity
_ -> return ()
- Rejected dgst -> do
- logd $ "rejected by peer: " ++ show dgst
+ Rejected dgst
+ | peerRequest : _ <- mapMaybe (\case TrChannelRequest d -> Just d; _ -> Nothing) headers
+ , peerRequest < dgst
+ -> return () -- Our request was rejected due to lower priority
+
+ | otherwise -> logd $ "rejected by peer: " ++ show dgst
DataRequest dgst
| secure || dgst `elem` plaintextRefs -> do
@@ -532,15 +563,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
-- otherwise lost the channel, so keep the reply plaintext as well.
when (not secure) keepPlaintextReply
- let bytes = lazyLoadBytes mref
-- TODO: MTU
- if (secure && BL.length bytes > 500)
- then do
- stream <- openStream
- liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
- writeByteStringToStream stream bytes
- else do
- addBody $ mref
+ when (secure && BL.length (lazyLoadBytes mref) > 500)
+ sendBodyAsStream
+
+ addBody $ mref
| otherwise -> do
logd $ "unauthorized data request for " ++ show dgst
addHeader $ Rejected dgst
@@ -593,9 +620,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs =
ChannelCookieWait {} -> return ()
ChannelCookieReceived {} -> process
ChannelCookieConfirmed {} -> process
- ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
- | otherwise -> reject
- ChannelPeerRequest {} -> process
+ ChannelOurRequest our
+ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> do
+ -- Reject peer channel request with lower priority
+ addHeader $ TrChannelRequest $ refDigest $ storedRef our
+ reject
+ ChannelPeerRequest prev
+ | dgst == wrDigest prev -> addHeader $ Acknowledged dgst
+ | otherwise -> process
ChannelOurAccept {} -> reject
ChannelEstablished {} -> process
ChannelClosed {} -> return ()
@@ -647,12 +680,14 @@ setupChannel identity peer upid = do
[ TrChannelRequest reqref
, AnnounceSelf $ refDigest $ storedRef $ idData identity
]
+ let sendChannelRequest = do
+ sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
+ TransportPacket (TransportHeader hitems) [storedRef req]
+ setPeerChannel peer $ ChannelOurRequest req
liftIO $ atomically $ do
getPeerChannel peer >>= \case
- ChannelCookieConfirmed -> do
- sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
- TransportPacket (TransportHeader hitems) [storedRef req]
- setPeerChannel peer $ ChannelOurRequest req
+ ChannelCookieReceived -> sendChannelRequest
+ ChannelCookieConfirmed -> sendChannelRequest
_ -> return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
@@ -806,10 +841,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case
_ -> return False
sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
-sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True]
+sendToPeer peer = sendManyToPeer peer . (: [])
+
+sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m ()
+sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True)
sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m ()
-sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True]
+sendToPeerStored peer = sendManyToPeerStored peer . (: [])
+
+sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m ()
+sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True)
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList peer parts = do
@@ -912,9 +953,19 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO ()
+joinMulticast :: Socket -> IO [ Word32 ]
+joinMulticast sock =
+ withFdSocket sock $ \fd ->
+ alloca $ \pcount -> do
+ ptr <- cJoinMulticast fd pcount
+ count <- fromIntegral <$> peek pcount
+ forM [ 0 .. count - 1 ] $ \i ->
+ peekElemOff ptr i
+
getBroadcastAddresses :: PortNumber -> IO [SockAddr]
getBroadcastAddresses port = do
ptr <- cBroadcastAddresses
@@ -922,6 +973,9 @@ getBroadcastAddresses port = do
w <- peekElemOff ptr i
if w == 0 then return []
else (SockAddrInet port w:) <$> parse (i + 1)
- addrs <- parse 0
- cFree ptr
- return addrs
+ if ptr == nullPtr
+ then return []
+ else do
+ addrs <- parse 0
+ cFree ptr
+ return addrs