summaryrefslogtreecommitdiff
path: root/src/Erebos/Network.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r--src/Erebos/Network.hs927
1 files changed, 927 insertions, 0 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
new file mode 100644
index 0000000..41b6279
--- /dev/null
+++ b/src/Erebos/Network.hs
@@ -0,0 +1,927 @@
+{-# LANGUAGE CPP #-}
+
+module Erebos.Network (
+ Server,
+ startServer,
+ stopServer,
+ getCurrentPeerList,
+ getNextPeerChange,
+ ServerOptions(..), serverIdentity, defaultServerOptions,
+
+ Peer, peerServer, peerStorage,
+ PeerAddress(..), peerAddress,
+ PeerIdentity(..), peerIdentity,
+ WaitingRef, wrDigest,
+ Service(..),
+ serverPeer,
+#ifdef ENABLE_ICE_SUPPORT
+ serverPeerIce,
+#endif
+ dropPeer,
+ isPeerDropped,
+ sendToPeer, sendToPeerStored, sendToPeerWith,
+ runPeerService,
+
+ discoveryPort,
+) where
+
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Exception
+import Control.Monad
+import Control.Monad.Except
+import Control.Monad.Reader
+import Control.Monad.State
+
+import Data.ByteString.Char8 qualified as BC
+import Data.ByteString.Lazy qualified as BL
+import Data.Function
+import Data.IP qualified as IP
+import Data.List
+import Data.Map (Map)
+import qualified Data.Map as M
+import Data.Maybe
+import Data.Typeable
+import Data.Word
+
+import Foreign.Ptr
+import Foreign.Storable
+
+import GHC.Conc.Sync (unsafeIOToSTM)
+
+import Network.Socket hiding (ControlMessage)
+import qualified Network.Socket.ByteString as S
+
+import Erebos.Channel
+#ifdef ENABLE_ICE_SUPPORT
+import Erebos.ICE
+#endif
+import Erebos.Identity
+import Erebos.Network.Protocol
+import Erebos.PubKey
+import Erebos.Service
+import Erebos.State
+import Erebos.Storage
+import Erebos.Storage.Key
+import Erebos.Storage.Merge
+
+
+discoveryPort :: PortNumber
+discoveryPort = 29665
+
+announceIntervalSeconds :: Int
+announceIntervalSeconds = 60
+
+
+data Server = Server
+ { serverStorage :: Storage
+ , serverOrigHead :: Head LocalState
+ , serverIdentity_ :: MVar UnifiedIdentity
+ , serverThreads :: MVar [ThreadId]
+ , serverSocket :: MVar Socket
+ , serverRawPath :: SymFlow (PeerAddress, BC.ByteString)
+ , serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
+ , serverDataResponse :: TQueue (Peer, Maybe PartialRef)
+ , serverIOActions :: TQueue (ExceptT String IO ())
+ , serverServices :: [SomeService]
+ , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState)
+ , serverPeers :: MVar (Map PeerAddress Peer)
+ , serverChanPeer :: TChan Peer
+ , serverErrorLog :: TQueue String
+ }
+
+serverIdentity :: Server -> IO UnifiedIdentity
+serverIdentity = readMVar . serverIdentity_
+
+getCurrentPeerList :: Server -> IO [Peer]
+getCurrentPeerList = fmap M.elems . readMVar . serverPeers
+
+getNextPeerChange :: Server -> IO Peer
+getNextPeerChange = atomically . readTChan . serverChanPeer
+
+data ServerOptions = ServerOptions
+ { serverPort :: PortNumber
+ , serverLocalDiscovery :: Bool
+ }
+
+defaultServerOptions :: ServerOptions
+defaultServerOptions = ServerOptions
+ { serverPort = discoveryPort
+ , serverLocalDiscovery = True
+ }
+
+
+data Peer = Peer
+ { peerAddress :: PeerAddress
+ , peerServer_ :: Server
+ , peerState :: TVar PeerState
+ , peerIdentityVar :: TVar PeerIdentity
+ , peerStorage_ :: Storage
+ , peerInStorage :: PartialStorage
+ , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState)
+ , peerWaitingRefs :: TMVar [WaitingRef]
+ }
+
+peerServer :: Peer -> Server
+peerServer = peerServer_
+
+peerStorage :: Peer -> Storage
+peerStorage = peerStorage_
+
+getPeerChannel :: Peer -> STM ChannelState
+getPeerChannel Peer {..} =
+ readTVar peerState >>= \case
+ PeerInit _ -> return ChannelNone
+ PeerConnected conn -> connGetChannel conn
+ PeerDropped -> return ChannelClosed
+
+setPeerChannel :: Peer -> ChannelState -> STM ()
+setPeerChannel Peer {..} ch = do
+ readTVar peerState >>= \case
+ PeerInit _ -> retry
+ PeerConnected conn -> connSetChannel conn ch
+ PeerDropped -> return ()
+
+instance Eq Peer where
+ (==) = (==) `on` peerIdentityVar
+
+data PeerAddress = DatagramAddress SockAddr
+#ifdef ENABLE_ICE_SUPPORT
+ | PeerIceSession IceSession
+#endif
+
+instance Show PeerAddress where
+ show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of
+ Just (IP.IPv6 ipv6, port)
+ | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6
+ -> [show (IP.toIPv4w ipv4), show port]
+ Just (addr, port)
+ -> [show addr, show port]
+ _ -> [show saddr]
+#ifdef ENABLE_ICE_SUPPORT
+ show (PeerIceSession ice) = show ice
+#endif
+
+instance Eq PeerAddress where
+ DatagramAddress addr == DatagramAddress addr' = addr == addr'
+#ifdef ENABLE_ICE_SUPPORT
+ PeerIceSession ice == PeerIceSession ice' = ice == ice'
+ _ == _ = False
+#endif
+
+instance Ord PeerAddress where
+ compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr'
+#ifdef ENABLE_ICE_SUPPORT
+ compare (DatagramAddress _ ) _ = LT
+ compare _ (DatagramAddress _ ) = GT
+ compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
+#endif
+
+
+data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()])
+ | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])
+ | PeerIdentityFull UnifiedIdentity
+
+peerIdentity :: MonadIO m => Peer -> m PeerIdentity
+peerIdentity = liftIO . atomically . readTVar . peerIdentityVar
+
+
+data PeerState = PeerInit [(SecurityRequirement, TransportPacket Ref, [TransportHeaderItem])]
+ | PeerConnected (Connection PeerAddress)
+ | PeerDropped
+
+
+lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
+lookupServiceType (ServiceType stype : _) = Just stype
+lookupServiceType (_ : hs) = lookupServiceType hs
+lookupServiceType [] = Nothing
+
+lookupNewStreams :: [TransportHeaderItem] -> [Word8]
+lookupNewStreams (StreamOpen num : rest) = num : lookupNewStreams rest
+lookupNewStreams (_ : rest) = lookupNewStreams rest
+lookupNewStreams [] = []
+
+
+newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
+newWaitingRef dgst act = do
+ peer@Peer {..} <- gets phPeer
+ wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left []))
+ modifyTMVarP peerWaitingRefs (wref:)
+ liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing)
+ return wref
+
+
+forkServerThread :: Server -> IO () -> IO ()
+forkServerThread server act = do
+ modifyMVar_ (serverThreads server) $ \ts -> do
+ t <- forkIO $ do
+ t <- myThreadId
+ act
+ modifyMVar_ (serverThreads server) $ return . filter (/=t)
+ return (t:ts)
+
+startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
+startServer opt serverOrigHead logd' serverServices = do
+ let serverStorage = headStorage serverOrigHead
+ serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead
+ serverThreads <- newMVar []
+ serverSocket <- newEmptyMVar
+ (serverRawPath, protocolRawPath) <- newFlowIO
+ (serverControlFlow, protocolControlFlow) <- newFlowIO
+ serverDataResponse <- newTQueueIO
+ serverIOActions <- newTQueueIO
+ serverServiceStates <- newTMVarIO M.empty
+ serverPeers <- newMVar M.empty
+ serverChanPeer <- newTChanIO
+ serverErrorLog <- newTQueueIO
+ let server = Server {..}
+
+ chanSvc <- newTQueueIO
+
+ let logd = writeTQueue serverErrorLog
+ forkServerThread server $ forever $ do
+ logd' =<< atomically (readTQueue serverErrorLog)
+
+ forkServerThread server $ dataResponseWorker server
+ forkServerThread server $ forever $ do
+ either (atomically . logd) return =<< runExceptT =<<
+ atomically (readTQueue serverIOActions)
+
+ broadcastAddreses <- getBroadcastAddresses discoveryPort
+
+ let open addr = do
+ sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
+ putMVar serverSocket sock
+ setSocketOption sock ReuseAddr 1
+ setSocketOption sock Broadcast 1
+ withFdSocket sock setCloseOnExecIfNeeded
+ bind sock (addrAddress addr)
+ return sock
+
+ loop sock = do
+ when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do
+ atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses
+ threadDelay $ announceIntervalSeconds * 1000 * 1000
+
+ let announceUpdate identity = do
+ st <- derivePartialStorage serverStorage
+ let selfRef = partialRef st $ storedRef $ idExtData identity
+ updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity)
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
+ hitems = map AnnounceUpdate updateRefs
+ packet = TransportPacket (TransportHeader $ hitems) []
+
+ ps <- readMVar serverPeers
+ forM_ ps $ \peer -> atomically $ do
+ ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case
+ (PeerIdentityFull _, ChannelEstablished _) ->
+ sendToPeerS peer ackedBy packet
+ _ -> return ()
+
+ void $ watchHead serverOrigHead $ \h -> do
+ let idt = headLocalIdentity h
+ changedId <- modifyMVar serverIdentity_ $ \cur ->
+ return (idt, cur /= idt)
+ when changedId $ do
+ writeFlowIO serverControlFlow $ UpdateSelfIdentity idt
+ announceUpdate idt
+
+ forM_ serverServices $ \(SomeService service _) -> do
+ forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+
+ forkServerThread server $ forever $ do
+ (msg, saddr) <- S.recvFrom sock 4096
+ writeFlowIO serverRawPath (DatagramAddress saddr, msg)
+
+ forkServerThread server $ forever $ do
+ (paddr, msg) <- readFlowIO serverRawPath
+ case paddr of
+ DatagramAddress addr -> void $ S.sendTo sock msg addr
+#ifdef ENABLE_ICE_SUPPORT
+ PeerIceSession ice -> iceSend ice msg
+#endif
+
+ forkServerThread server $ forever $ do
+ readFlowIO serverControlFlow >>= \case
+ NewConnection conn mbpid -> do
+ let paddr = connAddress conn
+ peer <- modifyMVar serverPeers $ \pvalue -> do
+ case M.lookup paddr pvalue of
+ Just peer -> return (pvalue, peer)
+ Nothing -> do
+ peer <- mkPeer server paddr
+ return (M.insert paddr peer pvalue, peer)
+
+ forkServerThread server $ do
+ atomically $ do
+ readTVar (peerState peer) >>= \case
+ PeerInit packets -> do
+ writeFlowBulk (connData conn) $ reverse packets
+ writeTVar (peerState peer) (PeerConnected conn)
+ PeerConnected _ -> do
+ writeTVar (peerState peer) (PeerConnected conn)
+ PeerDropped -> do
+ connClose conn
+
+ case mbpid of
+ Just dgst -> do
+ identity <- readMVar serverIdentity_
+ atomically $ runPacketHandler False peer $ do
+ wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer
+ readTVarP (peerIdentityVar peer) >>= \case
+ PeerIdentityUnknown idwait -> do
+ addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity
+ writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
+ liftSTM $ writeTChan serverChanPeer peer
+ _ -> return ()
+ Nothing -> return ()
+
+ let peerLoop = readFlowIO (connData conn) >>= \case
+ Just (secure, TransportPacket header objs) -> do
+ prefs <- forM objs $ storeObject $ peerInStorage peer
+ identity <- readMVar serverIdentity_
+ let svcs = map someServiceID serverServices
+ handlePacket identity secure peer chanSvc svcs header prefs
+ peerLoop
+ Nothing -> do
+ dropPeer peer
+ atomically $ writeTChan serverChanPeer peer
+ peerLoop
+
+ ReceivedAnnounce addr _ -> do
+ void $ serverPeer' server addr
+
+ erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow
+
+ forkServerThread server $ withSocketsDo $ do
+ let hints = defaultHints
+ { addrFlags = [AI_PASSIVE]
+ , addrFamily = AF_INET6
+ , addrSocketType = Datagram
+ }
+ addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt)
+ bracket (open addr) close loop
+
+ forkServerThread server $ forever $ do
+ (peer, svc, ref) <- atomically $ readTQueue chanSvc
+ case find ((svc ==) . someServiceID) serverServices of
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
+ return server
+
+stopServer :: Server -> IO ()
+stopServer Server {..} = do
+ mapM_ killThread =<< takeMVar serverThreads
+
+dataResponseWorker :: Server -> IO ()
+dataResponseWorker server = forever $ do
+ (peer, npref) <- atomically (readTQueue $ serverDataResponse server)
+
+ wait <- atomically $ takeTMVar (peerWaitingRefs peer)
+ list <- forM wait $ \wr@WaitingRef { wrefStatus = tvar } ->
+ atomically (readTVar tvar) >>= \case
+ Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of
+ [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case
+ Right ref -> do
+ atomically (writeTVar tvar $ Right ref)
+ forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case
+ Left err -> atomically $ writeTQueue (serverErrorLog server) err
+ Right () -> return ()
+
+ return (Nothing, [])
+ Left dgst -> do
+ atomically (writeTVar tvar $ Left [dgst])
+ return (Just wr, [dgst])
+ ds' -> do
+ atomically (writeTVar tvar $ Left ds')
+ return (Just wr, [])
+ Right _ -> return (Nothing, [])
+ atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list
+
+ let reqs = concat $ map snd list
+ when (not $ null reqs) $ do
+ let packet = TransportPacket (TransportHeader $ map DataRequest reqs) []
+ ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ]
+ atomically $ sendToPeerPlain peer ackedBy packet
+
+
+newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a }
+ deriving (Functor, Applicative, Monad, MonadState PacketHandlerState, MonadError String)
+
+instance MonadFail PacketHandler where
+ fail = throwError
+
+runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
+runPacketHandler secure peer@Peer {..} act = do
+ let logd = writeTQueue $ serverErrorLog peerServer_
+ runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case
+ Left err -> do
+ logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err
+ Right ph -> do
+ when (not $ null $ phHead ph) $ do
+ let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph)
+ secreq = case (secure, phPlaintextReply ph) of
+ (True, _) -> EncryptedOnly
+ (False, False) -> PlaintextAllowed
+ (False, True) -> PlaintextOnly
+ sendToPeerS' secreq peer (phAckedBy ph) packet
+
+liftSTM :: STM a -> PacketHandler a
+liftSTM = PacketHandler . lift . lift
+
+readTVarP :: TVar a -> PacketHandler a
+readTVarP = liftSTM . readTVar
+
+writeTVarP :: TVar a -> a -> PacketHandler ()
+writeTVarP v = liftSTM . writeTVar v
+
+modifyTMVarP :: TMVar a -> (a -> a) -> PacketHandler ()
+modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v
+
+data PacketHandlerState = PacketHandlerState
+ { phPeer :: Peer
+ , phHead :: [TransportHeaderItem]
+ , phAckedBy :: [TransportHeaderItem]
+ , phBody :: [Ref]
+ , phPlaintextReply :: Bool
+ }
+
+addHeader :: TransportHeaderItem -> PacketHandler ()
+addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph }
+
+addAckedBy :: [TransportHeaderItem] -> PacketHandler ()
+addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy ph) hs }
+
+addBody :: Ref -> PacketHandler ()
+addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph }
+
+keepPlaintextReply :: PacketHandler ()
+keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True }
+
+openStream :: PacketHandler RawStreamWriter
+openStream = do
+ Peer {..} <- gets phPeer
+ conn <- readTVarP peerState >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't open stream without established connection"
+ (hdr, writer, handler) <- liftSTM (connAddWriteStream conn) >>= \case
+ Right res -> return res
+ Left err -> throwError err
+
+ liftSTM $ writeTQueue (serverIOActions peerServer_) (liftIO $ forkServerThread peerServer_ handler)
+ addHeader hdr
+ return writer
+
+acceptStream :: Word8 -> PacketHandler RawStreamReader
+acceptStream streamNumber = do
+ Peer {..} <- gets phPeer
+ conn <- readTVarP peerState >>= \case
+ PeerConnected conn -> return conn
+ _ -> throwError "can't accept stream without established connection"
+ liftSTM $ connAddReadStream conn streamNumber
+
+appendDistinct :: Eq a => a -> [a] -> [a]
+appendDistinct x (y:ys) | x == y = y : ys
+ | otherwise = y : appendDistinct x ys
+appendDistinct x [] = [x]
+
+handlePacket :: UnifiedIdentity -> Bool
+ -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
+ -> TransportHeader -> [PartialRef] -> IO ()
+handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do
+ let server = peerServer peer
+ ochannel <- getPeerChannel peer
+ let sidentity = idData identity
+ plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat
+ [ [ storedRef sidentity ]
+ , map storedRef $ idUpdates identity
+ , case ochannel of
+ ChannelOurRequest req -> [ storedRef req ]
+ ChannelOurAccept acc _ -> [ storedRef acc ]
+ _ -> []
+ ]
+
+ runPacketHandler secure peer $ do
+ let logd = liftSTM . writeTQueue (serverErrorLog server)
+ forM_ headers $ \case
+ Acknowledged dgst -> do
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelOurAccept acc ch | refDigest (storedRef acc) == dgst -> do
+ liftSTM $ finalizedChannel peer ch identity
+ _ -> return ()
+
+ Rejected dgst -> do
+ logd $ "rejected by peer: " ++ show dgst
+
+ DataRequest dgst
+ | secure || dgst `elem` plaintextRefs -> do
+ Right mref <- liftSTM $ unsafeIOToSTM $
+ copyRef (peerStorage peer) $
+ partialRefFromDigest (peerInStorage peer) dgst
+ addHeader $ DataResponse dgst
+ addAckedBy [ Acknowledged dgst, Rejected dgst ]
+
+ -- Plaintext request may indicate the peer has restarted/changed or
+ -- otherwise lost the channel, so keep the reply plaintext as well.
+ when (not secure) keepPlaintextReply
+
+ let bytes = lazyLoadBytes mref
+ -- TODO: MTU
+ if (secure && BL.length bytes > 500)
+ then do
+ stream <- openStream
+ liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
+ writeByteStringToStream stream bytes
+ else do
+ addBody $ mref
+ | otherwise -> do
+ logd $ "unauthorized data request for " ++ show dgst
+ addHeader $ Rejected dgst
+
+ DataResponse dgst -> if
+ | Just pref <- find ((==dgst) . refDigest) prefs -> do
+ when (not secure) $ do
+ addHeader $ Acknowledged dgst
+ liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref)
+
+ | streamNumber : _ <- lookupNewStreams headers -> do
+ streamReader <- acceptStream streamNumber
+ liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do
+ (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case
+ Left err -> atomically $ writeTQueue (serverErrorLog server) $
+ "failed to receive object from stream: " <> err
+ Right objs -> do
+ forM_ objs $ \obj -> do
+ pref <- storeObject (peerInStorage peer) obj
+ atomically $ writeTQueue (serverDataResponse server) (peer, Just pref)
+
+ | otherwise -> throwError $ "mismatched data response " ++ show dgst
+
+ AnnounceSelf dgst
+ | dgst == refDigest (storedRef sidentity) -> return ()
+ | otherwise -> do
+ wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer
+ readTVarP (peerIdentityVar peer) >>= \case
+ PeerIdentityUnknown idwait -> do
+ addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity
+ writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait
+ liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer
+ _ -> return ()
+
+ AnnounceUpdate dgst -> do
+ readTVarP (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> do
+ void $ newWaitingRef dgst $ handleIdentityUpdate peer
+ _ -> return ()
+
+ TrChannelRequest dgst -> do
+ let process = do
+ addHeader $ Acknowledged dgst
+ wref <- newWaitingRef dgst $ handleChannelRequest peer identity
+ liftSTM $ setPeerChannel peer $ ChannelPeerRequest wref
+ reject = addHeader $ Rejected dgst
+
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelNone {} -> return ()
+ ChannelCookieWait {} -> return ()
+ ChannelCookieReceived {} -> process
+ ChannelCookieConfirmed {} -> process
+ ChannelOurRequest our | dgst < refDigest (storedRef our) -> process
+ | otherwise -> reject
+ ChannelPeerRequest {} -> process
+ ChannelOurAccept {} -> reject
+ ChannelEstablished {} -> process
+ ChannelClosed {} -> return ()
+
+ TrChannelAccept dgst -> do
+ let process = do
+ handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst
+ reject = addHeader $ Rejected dgst
+ liftSTM (getPeerChannel peer) >>= \case
+ ChannelNone {} -> reject
+ ChannelCookieWait {} -> reject
+ ChannelCookieReceived {} -> reject
+ ChannelCookieConfirmed {} -> reject
+ ChannelOurRequest {} -> process
+ ChannelPeerRequest {} -> process
+ ChannelOurAccept our _ | dgst < refDigest (storedRef our) -> process
+ | otherwise -> addHeader $ Rejected dgst
+ ChannelEstablished {} -> process
+ ChannelClosed {} -> return ()
+
+ ServiceType _ -> return ()
+ ServiceRef dgst
+ | not secure -> throwError $ "service packet without secure channel"
+ | Just svc <- lookupServiceType headers -> if
+ | svc `elem` svcs -> do
+ if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -}
+ then do
+ void $ newWaitingRef dgst $ \ref ->
+ liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref)
+ else throwError $ "missing service object " ++ show dgst
+ | otherwise -> addHeader $ Rejected dgst
+ | otherwise -> throwError $ "service ref without type"
+
+ _ -> return ()
+
+
+withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
+withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityUnknown tvar -> modifyTVar' tvar (act:)
+ PeerIdentityRef _ tvar -> modifyTVar' tvar (act:)
+ PeerIdentityFull idt -> writeTQueue (serverIOActions $ peerServer peer) (act idt)
+
+
+setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback
+setupChannel identity peer upid = do
+ req <- flip runReaderT (peerStorage peer) $ createChannelRequest identity upid
+ let reqref = refDigest $ storedRef req
+ let hitems =
+ [ TrChannelRequest reqref
+ , AnnounceSelf $ refDigest $ storedRef $ idData identity
+ ]
+ liftIO $ atomically $ do
+ getPeerChannel peer >>= \case
+ ChannelCookieConfirmed -> do
+ sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $
+ TransportPacket (TransportHeader hitems) [storedRef req]
+ setPeerChannel peer $ ChannelOurRequest req
+ _ -> return ()
+
+handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
+handleChannelRequest peer identity req = do
+ withPeerIdentity peer $ \upid -> do
+ (acc, ch) <- flip runReaderT (peerStorage peer) $ acceptChannelRequest identity upid (wrappedLoad req)
+ liftIO $ atomically $ do
+ getPeerChannel peer >>= \case
+ ChannelPeerRequest wr | wrDigest wr == refDigest req -> do
+ setPeerChannel peer $ ChannelOurAccept acc ch
+ let accref = refDigest $ storedRef acc
+ header = TrChannelAccept accref
+ ackedBy = [ Acknowledged accref, Rejected accref ]
+ sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat
+ [ [ storedRef $ acc ]
+ , [ storedRef $ signedData $ fromStored acc ]
+ , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ]
+ , map storedRef $ signedSignature $ fromStored acc
+ ]
+ _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request"
+
+handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler ()
+handleChannelAccept identity accref = do
+ peer <- gets phPeer
+ liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do
+ withPeerIdentity peer $ \upid -> do
+ copyRef (peerStorage peer) accref >>= \case
+ Right acc -> do
+ ch <- acceptedChannel identity upid (wrappedLoad acc)
+ liftIO $ atomically $ do
+ sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) []
+ finalizedChannel peer ch identity
+
+ Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
+
+
+finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM ()
+finalizedChannel peer@Peer {..} ch self = do
+ setPeerChannel peer $ ChannelEstablished ch
+
+ -- Identity update
+ writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do
+ let selfRef = refDigest $ storedRef $ idExtData $ self
+ updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self)
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ]
+ sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs
+
+ -- Notify services about new peer
+ readTVar peerIdentityVar >>= \case
+ PeerIdentityFull _ -> notifyServicesOfPeer peer
+ _ -> return ()
+
+
+handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
+handleIdentityAnnounce self peer ref = liftIO $ atomically $ do
+ let validateAndUpdate upds act = case validateIdentity $ wrappedLoad ref of
+ Just pid' -> do
+ let pid = fromMaybe pid' $ toUnifiedIdentity (updateIdentity upds pid')
+ writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid
+ writeTChan (serverChanPeer $ peerServer peer) peer
+ act pid
+ writeTQueue (serverIOActions $ peerServer peer) $ do
+ setupChannel self peer pid
+ Nothing -> return ()
+
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityRef wref wact
+ | wrDigest wref == refDigest ref
+ -> validateAndUpdate [] $ \pid -> do
+ mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($ pid)) .
+ reverse =<< readTVar wact
+
+ PeerIdentityFull pid
+ | idData pid `precedes` wrappedLoad ref
+ -> validateAndUpdate (idUpdates pid) $ \_ -> do
+ notifyServicesOfPeer peer
+
+ _ -> return ()
+
+handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback
+handleIdentityUpdate peer ref = liftIO $ atomically $ do
+ pidentity <- readTVar (peerIdentityVar peer)
+ if | PeerIdentityFull pid <- pidentity
+ , Just pid' <- toUnifiedIdentity $ updateIdentity [wrappedLoad ref] pid
+ -> do
+ writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid'
+ writeTChan (serverChanPeer $ peerServer peer) peer
+ when (idData pid /= idData pid') $ notifyServicesOfPeer peer
+
+ | otherwise -> return ()
+
+notifyServicesOfPeer :: Peer -> STM ()
+notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do
+ writeTQueue serverIOActions $ do
+ forM_ serverServices $ \service@(SomeService _ attrs) ->
+ runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer
+
+
+mkPeer :: Server -> PeerAddress -> IO Peer
+mkPeer peerServer_ peerAddress = do
+ peerState <- newTVarIO (PeerInit [])
+ peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO []
+ peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_
+ peerInStorage <- derivePartialStorage peerStorage_
+ peerServiceState <- newTMVarIO M.empty
+ peerWaitingRefs <- newTMVarIO []
+ return Peer {..}
+
+serverPeer :: Server -> SockAddr -> IO Peer
+serverPeer server paddr = do
+ let paddr' = case IP.fromSockAddr paddr of
+ Just (IP.IPv4 ipv4, port)
+ -> IP.toSockAddr (IP.IPv6 $ IP.toIPv6w (0, 0, 0xffff, IP.fromIPv4w ipv4), port)
+ _ -> paddr
+ serverPeer' server (DatagramAddress paddr')
+
+#ifdef ENABLE_ICE_SUPPORT
+serverPeerIce :: Server -> IceSession -> IO Peer
+serverPeerIce server@Server {..} ice = do
+ let paddr = PeerIceSession ice
+ peer <- serverPeer' server paddr
+ iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath
+ return peer
+#endif
+
+serverPeer' :: Server -> PeerAddress -> IO Peer
+serverPeer' server paddr = do
+ (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do
+ case M.lookup paddr pvalue of
+ Just peer -> return (pvalue, (peer, False))
+ Nothing -> do
+ peer <- mkPeer server paddr
+ return (M.insert paddr peer pvalue, (peer, True))
+ when hello $ atomically $ do
+ writeFlow (serverControlFlow server) (RequestConnection paddr)
+ return peer
+
+dropPeer :: MonadIO m => Peer -> m ()
+dropPeer peer = liftIO $ do
+ modifyMVar_ (serverPeers $ peerServer peer) $ \pvalue -> do
+ atomically $ do
+ readTVar (peerState peer) >>= \case
+ PeerConnected conn -> connClose conn
+ _ -> return()
+ writeTVar (peerState peer) PeerDropped
+ return $ M.delete (peerAddress peer) pvalue
+
+isPeerDropped :: MonadIO m => Peer -> m Bool
+isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case
+ PeerDropped -> return True
+ _ -> return False
+
+sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
+sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True]
+
+sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m ()
+sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True]
+
+sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
+sendToPeerList peer parts = do
+ let st = peerStorage peer
+ srefs <- liftIO $ fmap catMaybes $ forM parts $ \case
+ ServiceReply (Left x) use -> Just . (,use) <$> store st x
+ ServiceReply (Right sx) use -> return $ Just (storedRef sx, use)
+ ServiceFinally act -> act >> return Nothing
+ let dgsts = map (refDigest . fst) srefs
+ let content = map fst $ filter (\(ref, use) -> use && BL.length (lazyLoadBytes ref) < 500) srefs -- TODO: MTU
+ header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts)
+ packet = TransportPacket header content
+ ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ]
+ liftIO $ atomically $ sendToPeerS peer ackedBy packet
+
+sendToPeerS' :: SecurityRequirement -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerS' secure Peer {..} ackedBy packet = do
+ readTVar peerState >>= \case
+ PeerInit xs -> writeTVar peerState $ PeerInit $ (secure, packet, ackedBy) : xs
+ PeerConnected conn -> writeFlow (connData conn) (secure, packet, ackedBy)
+ PeerDropped -> return ()
+
+sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerS = sendToPeerS' EncryptedOnly
+
+sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
+sendToPeerPlain = sendToPeerS' PlaintextAllowed
+
+sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
+sendToPeerWith peer fobj = do
+ let sproxy = Proxy @s
+ sid = serviceID sproxy
+ res <- liftIO $ do
+ svcs <- atomically $ takeTMVar (peerServiceState peer)
+ (svcs', res) <- runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case
+ Right (obj, s') -> return $ (M.insert sid (SomeServiceState sproxy s') svcs, Right obj)
+ Left err -> return $ (svcs, Left err)
+ atomically $ putTMVar (peerServiceState peer) svcs'
+ return res
+
+ case res of
+ Right (Just obj) -> sendToPeer peer obj
+ Right Nothing -> return ()
+ Left err -> throwError err
+
+
+lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
+ | Just (Refl :: s :~: t) <- eqT = Just (service, attr)
+ | otherwise = lookupService proxy rest
+lookupService _ [] = Nothing
+
+runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
+runPeerService = runPeerServiceOn Nothing
+
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice peer handler = liftIO $ do
+ let server = peerServer peer
+ proxy = Proxy @s
+ svc = serviceID proxy
+ logd = writeTQueue (serverErrorLog server)
+ case mbservice `mplus` lookupService proxy (serverServices server) of
+ Just (service, attr) ->
+ atomically (readTVar (peerIdentityVar peer)) >>= \case
+ PeerIdentityFull peerId -> do
+ (global, svcs) <- atomically $ (,)
+ <$> takeTMVar (serverServiceStates server)
+ <*> takeTMVar (peerServiceState peer)
+ case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs,
+ fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of
+ ((SomeServiceState (_ :: Proxy ps) ps),
+ (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do
+ Just (Refl :: s :~: ps) <- return $ eqT
+ Just (Refl :: s :~: gs) <- return $ eqT
+
+ let inp = ServiceInput
+ { svcAttributes = attr
+ , svcPeer = peer
+ , svcPeerIdentity = peerId
+ , svcServer = server
+ , svcPrintOp = atomically . logd
+ }
+ reloadHead (serverOrigHead server) >>= \case
+ Nothing -> atomically $ do
+ logd $ "current head deleted"
+ putTMVar (peerServiceState peer) svcs
+ putTMVar (serverServiceStates server) global
+ Just h -> do
+ (rsp, (s', gs')) <- runServiceHandler h inp ps gs handler
+ moveKeys (peerStorage peer) (serverStorage server)
+ when (not (null rsp)) $ do
+ sendToPeerList peer rsp
+ atomically $ do
+ putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs
+ putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global
+ _ -> do
+ atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer)
+
+ _ -> atomically $ do
+ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
+
+foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
+foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO ()
+
+getBroadcastAddresses :: PortNumber -> IO [SockAddr]
+getBroadcastAddresses port = do
+ ptr <- cBroadcastAddresses
+ let parse i = do
+ w <- peekElemOff ptr i
+ if w == 0 then return []
+ else (SockAddrInet port w:) <$> parse (i + 1)
+ addrs <- parse 0
+ cFree ptr
+ return addrs