diff options
Diffstat (limited to 'src/Erebos/Network.hs')
-rw-r--r-- | src/Erebos/Network.hs | 815 |
1 files changed, 815 insertions, 0 deletions
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs new file mode 100644 index 0000000..b26ada5 --- /dev/null +++ b/src/Erebos/Network.hs @@ -0,0 +1,815 @@ +{-# LANGUAGE CPP #-} + +module Erebos.Network ( + Server, + startServer, + stopServer, + getNextPeerChange, + ServerOptions(..), serverIdentity, defaultServerOptions, + + Peer, peerServer, peerStorage, + PeerAddress(..), peerAddress, + PeerIdentity(..), peerIdentity, + WaitingRef, wrDigest, + Service(..), + serverPeer, +#ifdef ENABLE_ICE_SUPPORT + serverPeerIce, +#endif + sendToPeer, sendToPeerStored, sendToPeerWith, + runPeerService, + + discoveryPort, +) where + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Control.Monad.Except +import Control.Monad.Reader +import Control.Monad.State + +import qualified Data.ByteString.Char8 as BC +import Data.Function +import Data.IP qualified as IP +import Data.List +import Data.Map (Map) +import qualified Data.Map as M +import Data.Maybe +import Data.Typeable +import Data.Word + +import Foreign.Ptr +import Foreign.Storable + +import GHC.Conc.Sync (unsafeIOToSTM) + +import Network.Socket hiding (ControlMessage) +import qualified Network.Socket.ByteString as S + +import Erebos.Channel +#ifdef ENABLE_ICE_SUPPORT +import Erebos.ICE +#endif +import Erebos.Identity +import Erebos.Network.Protocol +import Erebos.PubKey +import Erebos.Service +import Erebos.State +import Erebos.Storage +import Erebos.Storage.Key +import Erebos.Storage.Merge + + +discoveryPort :: PortNumber +discoveryPort = 29665 + +announceIntervalSeconds :: Int +announceIntervalSeconds = 60 + + +data Server = Server + { serverStorage :: Storage + , serverOrigHead :: Head LocalState + , serverIdentity_ :: MVar UnifiedIdentity + , serverThreads :: MVar [ThreadId] + , serverSocket :: MVar Socket + , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) + , serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress) + , serverDataResponse :: TQueue (Peer, Maybe PartialRef) + , serverIOActions :: TQueue (ExceptT String IO ()) + , serverServices :: [SomeService] + , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState) + , serverPeers :: MVar (Map PeerAddress Peer) + , serverChanPeer :: TChan Peer + , serverErrorLog :: TQueue String + } + +serverIdentity :: Server -> IO UnifiedIdentity +serverIdentity = readMVar . serverIdentity_ + +getNextPeerChange :: Server -> IO Peer +getNextPeerChange = atomically . readTChan . serverChanPeer + +data ServerOptions = ServerOptions + { serverPort :: PortNumber + , serverLocalDiscovery :: Bool + } + +defaultServerOptions :: ServerOptions +defaultServerOptions = ServerOptions + { serverPort = discoveryPort + , serverLocalDiscovery = True + } + + +data Peer = Peer + { peerAddress :: PeerAddress + , peerServer_ :: Server + , peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress)) + , peerIdentityVar :: TVar PeerIdentity + , peerStorage_ :: Storage + , peerInStorage :: PartialStorage + , peerServiceState :: TMVar (M.Map ServiceID SomeServiceState) + , peerWaitingRefs :: TMVar [WaitingRef] + } + +peerServer :: Peer -> Server +peerServer = peerServer_ + +peerStorage :: Peer -> Storage +peerStorage = peerStorage_ + +getPeerChannel :: Peer -> STM ChannelState +getPeerChannel Peer {..} = either (const $ return ChannelNone) connGetChannel =<< readTVar peerConnection + +setPeerChannel :: Peer -> ChannelState -> STM () +setPeerChannel Peer {..} ch = do + readTVar peerConnection >>= \case + Left _ -> retry + Right conn -> connSetChannel conn ch + +instance Eq Peer where + (==) = (==) `on` peerIdentityVar + +data PeerAddress = DatagramAddress SockAddr +#ifdef ENABLE_ICE_SUPPORT + | PeerIceSession IceSession +#endif + +instance Show PeerAddress where + show (DatagramAddress saddr) = unwords $ case IP.fromSockAddr saddr of + Just (IP.IPv6 ipv6, port) + | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 + -> [show (IP.toIPv4w ipv4), show port] + Just (addr, port) + -> [show addr, show port] + _ -> [show saddr] +#ifdef ENABLE_ICE_SUPPORT + show (PeerIceSession ice) = show ice +#endif + +instance Eq PeerAddress where + DatagramAddress addr == DatagramAddress addr' = addr == addr' +#ifdef ENABLE_ICE_SUPPORT + PeerIceSession ice == PeerIceSession ice' = ice == ice' + _ == _ = False +#endif + +instance Ord PeerAddress where + compare (DatagramAddress addr) (DatagramAddress addr') = compare addr addr' +#ifdef ENABLE_ICE_SUPPORT + compare (DatagramAddress _ ) _ = LT + compare _ (DatagramAddress _ ) = GT + compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice' +#endif + + +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) + | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) + | PeerIdentityFull UnifiedIdentity + +peerIdentity :: MonadIO m => Peer -> m PeerIdentity +peerIdentity = liftIO . atomically . readTVar . peerIdentityVar + + +lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID +lookupServiceType (ServiceType stype : _) = Just stype +lookupServiceType (_ : hs) = lookupServiceType hs +lookupServiceType [] = Nothing + + +newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef +newWaitingRef dgst act = do + peer@Peer {..} <- gets phPeer + wref <- WaitingRef peerStorage_ (partialRefFromDigest peerInStorage dgst) act <$> liftSTM (newTVar (Left [])) + modifyTMVarP peerWaitingRefs (wref:) + liftSTM $ writeTQueue (serverDataResponse $ peerServer peer) (peer, Nothing) + return wref + + +forkServerThread :: Server -> IO () -> IO () +forkServerThread server act = modifyMVar_ (serverThreads server) $ \ts -> do + (:ts) <$> forkIO act + +startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server +startServer opt serverOrigHead logd' serverServices = do + let serverStorage = headStorage serverOrigHead + serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead + serverThreads <- newMVar [] + serverSocket <- newEmptyMVar + (serverRawPath, protocolRawPath) <- newFlowIO + (serverControlFlow, protocolControlFlow) <- newFlowIO + serverDataResponse <- newTQueueIO + serverIOActions <- newTQueueIO + serverServiceStates <- newTMVarIO M.empty + serverPeers <- newMVar M.empty + serverChanPeer <- newTChanIO + serverErrorLog <- newTQueueIO + let server = Server {..} + + chanSvc <- newTQueueIO + + let logd = writeTQueue serverErrorLog + forkServerThread server $ forever $ do + logd' =<< atomically (readTQueue serverErrorLog) + + forkServerThread server $ dataResponseWorker server + forkServerThread server $ forever $ do + either (atomically . logd) return =<< runExceptT =<< + atomically (readTQueue serverIOActions) + + broadcastAddreses <- getBroadcastAddresses discoveryPort + + let open addr = do + sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) + putMVar serverSocket sock + setSocketOption sock ReuseAddr 1 + setSocketOption sock Broadcast 1 + withFdSocket sock setCloseOnExecIfNeeded + bind sock (addrAddress addr) + return sock + + loop sock = do + when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do + atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses + threadDelay $ announceIntervalSeconds * 1000 * 1000 + + let announceUpdate identity = do + st <- derivePartialStorage serverStorage + let selfRef = partialRef st $ storedRef $ idExtData identity + updateRefs = map refDigest $ selfRef : map (partialRef st . storedRef) (idUpdates identity) + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] + hitems = map AnnounceUpdate updateRefs + packet = TransportPacket (TransportHeader $ hitems) [] + + ps <- readMVar serverPeers + forM_ ps $ \peer -> atomically $ do + ((,) <$> readTVar (peerIdentityVar peer) <*> getPeerChannel peer) >>= \case + (PeerIdentityFull _, ChannelEstablished _) -> + sendToPeerS peer ackedBy packet + _ -> return () + + void $ watchHead serverOrigHead $ \h -> do + let idt = headLocalIdentity h + changedId <- modifyMVar serverIdentity_ $ \cur -> + return (idt, cur /= idt) + when changedId $ do + writeFlowIO serverControlFlow $ UpdateSelfIdentity idt + announceUpdate idt + + forM_ serverServices $ \(SomeService service _) -> do + forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do + watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do + withMVar serverPeers $ mapM_ $ \peer -> atomically $ do + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> writeTQueue serverIOActions $ do + runPeerService peer $ act x + _ -> return () + + forkServerThread server $ forever $ do + (msg, saddr) <- S.recvFrom sock 4096 + writeFlowIO serverRawPath (DatagramAddress saddr, msg) + + forkServerThread server $ forever $ do + (paddr, msg) <- readFlowIO serverRawPath + case paddr of + DatagramAddress addr -> void $ S.sendTo sock msg addr +#ifdef ENABLE_ICE_SUPPORT + PeerIceSession ice -> iceSend ice msg +#endif + + forkServerThread server $ forever $ do + readFlowIO serverControlFlow >>= \case + NewConnection conn mbpid -> do + let paddr = connAddress conn + peer <- modifyMVar serverPeers $ \pvalue -> do + case M.lookup paddr pvalue of + Just peer -> return (pvalue, peer) + Nothing -> do + peer <- mkPeer server paddr + return (M.insert paddr peer pvalue, peer) + + forkServerThread server $ do + atomically $ do + readTVar (peerConnection peer) >>= \case + Left packets -> writeFlowBulk (connData conn) $ reverse packets + Right _ -> return () + writeTVar (peerConnection peer) (Right conn) + + case mbpid of + Just dgst -> do + identity <- readMVar serverIdentity_ + atomically $ runPacketHandler False peer $ do + wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer + readTVarP (peerIdentityVar peer) >>= \case + PeerIdentityUnknown idwait -> do + addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity + writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait + liftSTM $ writeTChan serverChanPeer peer + _ -> return () + Nothing -> return () + + forever $ do + (secure, TransportPacket header objs) <- readFlowIO $ connData conn + prefs <- forM objs $ storeObject $ peerInStorage peer + identity <- readMVar serverIdentity_ + let svcs = map someServiceID serverServices + handlePacket identity secure peer chanSvc svcs header prefs + + ReceivedAnnounce addr _ -> do + void $ serverPeer' server addr + + erebosNetworkProtocol (headLocalIdentity serverOrigHead) logd protocolRawPath protocolControlFlow + + forkServerThread server $ withSocketsDo $ do + let hints = defaultHints + { addrFlags = [AI_PASSIVE] + , addrFamily = AF_INET6 + , addrSocketType = Datagram + } + addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt) + bracket (open addr) close loop + + forkServerThread server $ forever $ do + (peer, svc, ref) <- atomically $ readTQueue chanSvc + case find ((svc ==) . someServiceID) serverServices of + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" + + return server + +stopServer :: Server -> IO () +stopServer Server {..} = do + mapM_ killThread =<< takeMVar serverThreads + +dataResponseWorker :: Server -> IO () +dataResponseWorker server = forever $ do + (peer, npref) <- atomically (readTQueue $ serverDataResponse server) + + wait <- atomically $ takeTMVar (peerWaitingRefs peer) + list <- forM wait $ \wr@WaitingRef { wrefStatus = tvar } -> + atomically (readTVar tvar) >>= \case + Left ds -> case maybe id (filter . (/=) . refDigest) npref $ ds of + [] -> copyRef (wrefStorage wr) (wrefPartial wr) >>= \case + Right ref -> do + atomically (writeTVar tvar $ Right ref) + forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case + Left err -> atomically $ writeTQueue (serverErrorLog server) err + Right () -> return () + + return (Nothing, []) + Left dgst -> do + atomically (writeTVar tvar $ Left [dgst]) + return (Just wr, [dgst]) + ds' -> do + atomically (writeTVar tvar $ Left ds') + return (Just wr, []) + Right _ -> return (Nothing, []) + atomically $ putTMVar (peerWaitingRefs peer) $ catMaybes $ map fst list + + let reqs = concat $ map snd list + when (not $ null reqs) $ do + let packet = TransportPacket (TransportHeader $ map DataRequest reqs) [] + ackedBy = concat [[ Rejected r, DataResponse r ] | r <- reqs ] + atomically $ sendToPeerPlain peer ackedBy packet + + +newtype PacketHandler a = PacketHandler { unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a } + deriving (Functor, Applicative, Monad, MonadState PacketHandlerState, MonadError String) + +instance MonadFail PacketHandler where + fail = throwError + +runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM () +runPacketHandler secure peer@Peer {..} act = do + let logd = writeTQueue $ serverErrorLog peerServer_ + runExceptT (flip execStateT (PacketHandlerState peer [] [] []) $ unPacketHandler act) >>= \case + Left err -> do + logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err + Right ph -> do + when (not $ null $ phHead ph) $ do + let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + sendToPeerS' secure peer (phAckedBy ph) packet + +liftSTM :: STM a -> PacketHandler a +liftSTM = PacketHandler . lift . lift + +readTVarP :: TVar a -> PacketHandler a +readTVarP = liftSTM . readTVar + +writeTVarP :: TVar a -> a -> PacketHandler () +writeTVarP v = liftSTM . writeTVar v + +modifyTMVarP :: TMVar a -> (a -> a) -> PacketHandler () +modifyTMVarP v f = liftSTM $ putTMVar v . f =<< takeTMVar v + +data PacketHandlerState = PacketHandlerState + { phPeer :: Peer + , phHead :: [TransportHeaderItem] + , phAckedBy :: [TransportHeaderItem] + , phBody :: [Ref] + } + +addHeader :: TransportHeaderItem -> PacketHandler () +addHeader h = modify $ \ph -> ph { phHead = h `appendDistinct` phHead ph } + +addAckedBy :: [TransportHeaderItem] -> PacketHandler () +addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy ph) hs } + +addBody :: Ref -> PacketHandler () +addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } + +appendDistinct :: Eq a => a -> [a] -> [a] +appendDistinct x (y:ys) | x == y = y : ys + | otherwise = y : appendDistinct x ys +appendDistinct x [] = [x] + +handlePacket :: UnifiedIdentity -> Bool + -> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID] + -> TransportHeader -> [PartialRef] -> IO () +handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = atomically $ do + let server = peerServer peer + ochannel <- getPeerChannel peer + let sidentity = idData identity + plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat + [ [ storedRef sidentity ] + , map storedRef $ idUpdates identity + , case ochannel of + ChannelOurRequest _ req -> [ storedRef req ] + ChannelOurAccept _ acc _ -> [ storedRef acc ] + _ -> [] + ] + + runPacketHandler secure peer $ do + let logd = liftSTM . writeTQueue (serverErrorLog server) + forM_ headers $ \case + Acknowledged dgst -> do + liftSTM (getPeerChannel peer) >>= \case + ChannelOurAccept _ acc ch | refDigest (storedRef acc) == dgst -> do + liftSTM $ finalizedChannel peer ch identity + _ -> return () + + Rejected dgst -> do + logd $ "rejected by peer: " ++ show dgst + + DataRequest dgst + | secure || dgst `elem` plaintextRefs -> do + Right mref <- liftSTM $ unsafeIOToSTM $ + copyRef (peerStorage peer) $ + partialRefFromDigest (peerInStorage peer) dgst + addHeader $ DataResponse dgst + addAckedBy [ Acknowledged dgst, Rejected dgst ] + addBody $ mref + | otherwise -> do + logd $ "unauthorized data request for " ++ show dgst + addHeader $ Rejected dgst + + DataResponse dgst -> if + | Just pref <- find ((==dgst) . refDigest) prefs -> do + when (not secure) $ do + addHeader $ Acknowledged dgst + liftSTM $ writeTQueue (serverDataResponse server) (peer, Just pref) + | otherwise -> throwError $ "mismatched data response " ++ show dgst + + AnnounceSelf dgst + | dgst == refDigest (storedRef sidentity) -> return () + | otherwise -> do + wref <- newWaitingRef dgst $ handleIdentityAnnounce identity peer + readTVarP (peerIdentityVar peer) >>= \case + PeerIdentityUnknown idwait -> do + addHeader $ AnnounceSelf $ refDigest $ storedRef $ idData identity + writeTVarP (peerIdentityVar peer) $ PeerIdentityRef wref idwait + liftSTM $ writeTChan (serverChanPeer $ peerServer peer) peer + _ -> return () + + AnnounceUpdate dgst -> do + readTVarP (peerIdentityVar peer) >>= \case + PeerIdentityFull _ -> do + void $ newWaitingRef dgst $ handleIdentityUpdate peer + _ -> return () + + TrChannelRequest dgst -> do + let process cookie = do + addHeader $ Acknowledged dgst + wref <- newWaitingRef dgst $ handleChannelRequest peer identity + liftSTM $ setPeerChannel peer $ ChannelPeerRequest cookie wref + reject = addHeader $ Rejected dgst + + liftSTM (getPeerChannel peer) >>= \case + ChannelNone {} -> return () + ChannelCookieWait {} -> return () + ChannelCookieReceived cookie -> process $ Just cookie + ChannelCookieConfirmed cookie -> process $ Just cookie + ChannelOurRequest mbcookie our | dgst < refDigest (storedRef our) -> process mbcookie + | otherwise -> reject + ChannelPeerRequest mbcookie _ -> process mbcookie + ChannelOurAccept {} -> reject + ChannelEstablished {} -> process Nothing + + TrChannelAccept dgst -> do + let process = do + handleChannelAccept identity $ partialRefFromDigest (peerInStorage peer) dgst + reject = addHeader $ Rejected dgst + liftSTM (getPeerChannel peer) >>= \case + ChannelNone {} -> reject + ChannelCookieWait {} -> reject + ChannelCookieReceived {} -> reject + ChannelCookieConfirmed {} -> reject + ChannelOurRequest {} -> process + ChannelPeerRequest {} -> process + ChannelOurAccept _ our _ | dgst < refDigest (storedRef our) -> process + | otherwise -> addHeader $ Rejected dgst + ChannelEstablished {} -> process + + ServiceType _ -> return () + ServiceRef dgst + | not secure -> throwError $ "service packet without secure channel" + | Just svc <- lookupServiceType headers -> if + | svc `elem` svcs -> do + if dgst `elem` map refDigest prefs || True {- TODO: used by Message service to confirm receive -} + then do + void $ newWaitingRef dgst $ \ref -> + liftIO $ atomically $ writeTQueue chanSvc (peer, svc, ref) + else throwError $ "missing service object " ++ show dgst + | otherwise -> addHeader $ Rejected dgst + | otherwise -> throwError $ "service ref without type" + + _ -> return () + + +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case + PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) + PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) + PeerIdentityFull idt -> writeTQueue (serverIOActions $ peerServer peer) (act idt) + + +setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback +setupChannel identity peer upid = do + req <- flip runReaderT (peerStorage peer) $ createChannelRequest identity upid + let reqref = refDigest $ storedRef req + let hitems = + [ TrChannelRequest reqref + , AnnounceSelf $ refDigest $ storedRef $ idData identity + ] + liftIO $ atomically $ do + getPeerChannel peer >>= \case + ChannelCookieConfirmed cookie -> do + sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ + TransportPacket (TransportHeader hitems) [storedRef req] + setPeerChannel peer $ ChannelOurRequest (Just cookie) req + _ -> return () + +handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback +handleChannelRequest peer identity req = do + withPeerIdentity peer $ \upid -> do + (acc, ch) <- flip runReaderT (peerStorage peer) $ acceptChannelRequest identity upid (wrappedLoad req) + liftIO $ atomically $ do + getPeerChannel peer >>= \case + ChannelPeerRequest mbcookie wr | wrDigest wr == refDigest req -> do + setPeerChannel peer $ ChannelOurAccept mbcookie acc ch + let accref = refDigest $ storedRef acc + header = TrChannelAccept accref + ackedBy = [ Acknowledged accref, Rejected accref ] + sendToPeerPlain peer ackedBy $ TransportPacket (TransportHeader [header]) $ concat + [ [ storedRef $ acc ] + , [ storedRef $ signedData $ fromStored acc ] + , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ] + , map storedRef $ signedSignature $ fromStored acc + ] + _ -> writeTQueue (serverErrorLog $ peerServer peer) $ "unexpected channel request" + +handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler () +handleChannelAccept identity accref = do + peer <- gets phPeer + liftSTM $ writeTQueue (serverIOActions $ peerServer peer) $ do + withPeerIdentity peer $ \upid -> do + copyRef (peerStorage peer) accref >>= \case + Right acc -> do + ch <- acceptedChannel identity upid (wrappedLoad acc) + liftIO $ atomically $ do + sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] + finalizedChannel peer ch identity + + Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) + + +finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () +finalizedChannel peer@Peer {..} ch self = do + setPeerChannel peer $ ChannelEstablished ch + + -- Identity update + writeTQueue (serverIOActions peerServer_) $ liftIO $ atomically $ do + let selfRef = refDigest $ storedRef $ idExtData $ self + updateRefs = selfRef : map (refDigest . storedRef) (idUpdates self) + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- updateRefs ] + sendToPeerS peer ackedBy $ flip TransportPacket [] $ TransportHeader $ map AnnounceUpdate updateRefs + + -- Notify services about new peer + readTVar peerIdentityVar >>= \case + PeerIdentityFull _ -> notifyServicesOfPeer peer + _ -> return () + + +handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback +handleIdentityAnnounce self peer ref = liftIO $ atomically $ do + let validateAndUpdate upds act = case validateIdentity $ wrappedLoad ref of + Just pid' -> do + let pid = fromMaybe pid' $ toUnifiedIdentity (updateIdentity upds pid') + writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid + writeTChan (serverChanPeer $ peerServer peer) peer + act pid + writeTQueue (serverIOActions $ peerServer peer) $ do + setupChannel self peer pid + Nothing -> return () + + readTVar (peerIdentityVar peer) >>= \case + PeerIdentityRef wref wact + | wrDigest wref == refDigest ref + -> validateAndUpdate [] $ \pid -> do + mapM_ (writeTQueue (serverIOActions $ peerServer peer) . ($ pid)) . + reverse =<< readTVar wact + + PeerIdentityFull pid + | idData pid `precedes` wrappedLoad ref + -> validateAndUpdate (idUpdates pid) $ \_ -> do + notifyServicesOfPeer peer + + _ -> return () + +handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback +handleIdentityUpdate peer ref = liftIO $ atomically $ do + pidentity <- readTVar (peerIdentityVar peer) + if | PeerIdentityFull pid <- pidentity + , Just pid' <- toUnifiedIdentity $ updateIdentity [wrappedLoad ref] pid + -> do + writeTVar (peerIdentityVar peer) $ PeerIdentityFull pid' + writeTChan (serverChanPeer $ peerServer peer) peer + when (idData pid /= idData pid') $ notifyServicesOfPeer peer + + | otherwise -> return () + +notifyServicesOfPeer :: Peer -> STM () +notifyServicesOfPeer peer@Peer { peerServer_ = Server {..} } = do + writeTQueue serverIOActions $ do + forM_ serverServices $ \service@(SomeService _ attrs) -> + runPeerServiceOn (Just (service, attrs)) peer serviceNewPeer + + +mkPeer :: Server -> PeerAddress -> IO Peer +mkPeer peerServer_ peerAddress = do + peerConnection <- newTVarIO (Left []) + peerIdentityVar <- newTVarIO . PeerIdentityUnknown =<< newTVarIO [] + peerStorage_ <- deriveEphemeralStorage $ serverStorage peerServer_ + peerInStorage <- derivePartialStorage peerStorage_ + peerServiceState <- newTMVarIO M.empty + peerWaitingRefs <- newTMVarIO [] + return Peer {..} + +serverPeer :: Server -> SockAddr -> IO Peer +serverPeer server paddr = do + serverPeer' server (DatagramAddress paddr) + +#ifdef ENABLE_ICE_SUPPORT +serverPeerIce :: Server -> IceSession -> IO Peer +serverPeerIce server@Server {..} ice = do + let paddr = PeerIceSession ice + peer <- serverPeer' server paddr + iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath + return peer +#endif + +serverPeer' :: Server -> PeerAddress -> IO Peer +serverPeer' server paddr = do + (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do + case M.lookup paddr pvalue of + Just peer -> return (pvalue, (peer, False)) + Nothing -> do + peer <- mkPeer server paddr + return (M.insert paddr peer pvalue, (peer, True)) + when hello $ atomically $ do + writeFlow (serverControlFlow server) (RequestConnection paddr) + return peer + + +sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () +sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] + +sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m () +sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True] + +sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () +sendToPeerList peer parts = do + let st = peerStorage peer + srefs <- liftIO $ fmap catMaybes $ forM parts $ \case + ServiceReply (Left x) use -> Just . (,use) <$> store st x + ServiceReply (Right sx) use -> return $ Just (storedRef sx, use) + ServiceFinally act -> act >> return Nothing + let dgsts = map (refDigest . fst) srefs + let content = map fst $ filter snd srefs + header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef dgsts) + packet = TransportPacket header content + ackedBy = concat [[ Acknowledged r, Rejected r, DataRequest r ] | r <- dgsts ] + liftIO $ atomically $ sendToPeerS peer ackedBy packet + +sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS' secure Peer {..} ackedBy packet = do + readTVar peerConnection >>= \case + Left xs -> writeTVar peerConnection $ Left $ (secure, packet, ackedBy) : xs + Right conn -> writeFlow (connData conn) (secure, packet, ackedBy) + +sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerS = sendToPeerS' True + +sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () +sendToPeerPlain = sendToPeerS' False + +sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () +sendToPeerWith peer fobj = do + let sproxy = Proxy @s + sid = serviceID sproxy + res <- liftIO $ do + svcs <- atomically $ takeTMVar (peerServiceState peer) + (svcs', res) <- runExceptT (fobj $ fromMaybe (emptyServiceState sproxy) $ fromServiceState sproxy =<< M.lookup sid svcs) >>= \case + Right (obj, s') -> return $ (M.insert sid (SomeServiceState sproxy s') svcs, Right obj) + Left err -> return $ (svcs, Left err) + atomically $ putTMVar (peerServiceState peer) svcs' + return res + + case res of + Right (Just obj) -> sendToPeer peer obj + Right Nothing -> return () + Left err -> throwError err + + +lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) + | Just (Refl :: s :~: t) <- eqT = Just (service, attr) + | otherwise = lookupService proxy rest +lookupService _ [] = Nothing + +runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () +runPeerService = runPeerServiceOn Nothing + +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice peer handler = liftIO $ do + let server = peerServer peer + proxy = Proxy @s + svc = serviceID proxy + logd = writeTQueue (serverErrorLog server) + case mbservice `mplus` lookupService proxy (serverServices server) of + Just (service, attr) -> + atomically (readTVar (peerIdentityVar peer)) >>= \case + PeerIdentityFull peerId -> do + (global, svcs) <- atomically $ (,) + <$> takeTMVar (serverServiceStates server) + <*> takeTMVar (peerServiceState peer) + case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs, + fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of + ((SomeServiceState (_ :: Proxy ps) ps), + (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do + Just (Refl :: s :~: ps) <- return $ eqT + Just (Refl :: s :~: gs) <- return $ eqT + + let inp = ServiceInput + { svcAttributes = attr + , svcPeer = peer + , svcPeerIdentity = peerId + , svcServer = server + , svcPrintOp = atomically . logd + } + reloadHead (serverOrigHead server) >>= \case + Nothing -> atomically $ do + logd $ "current head deleted" + putTMVar (peerServiceState peer) svcs + putTMVar (serverServiceStates server) global + Just h -> do + (rsp, (s', gs')) <- runServiceHandler h inp ps gs handler + moveKeys (peerStorage peer) (serverStorage server) + when (not (null rsp)) $ do + sendToPeerList peer rsp + atomically $ do + putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs + putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global + _ -> do + atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer) + + _ -> atomically $ do + logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" + + +foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) +foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () + +getBroadcastAddresses :: PortNumber -> IO [SockAddr] +getBroadcastAddresses port = do + ptr <- cBroadcastAddresses + let parse i = do + w <- peekElemOff ptr i + if w == 0 then return [] + else (SockAddrInet port w:) <$> parse (i + 1) + addrs <- parse 0 + cFree ptr + return addrs |