diff options
| author | Roman Smrž <roman.smrz@seznam.cz> | 2021-12-21 22:36:58 +0100 | 
|---|---|---|
| committer | Roman Smrž <roman.smrz@seznam.cz> | 2021-12-21 22:47:45 +0100 | 
| commit | ed2fd1bf1f2e24565530bcfc9853cacbfa1c2a2a (patch) | |
| tree | 56b1c4cf414fcc9427ead5f030c8b7081574141e /src | |
| parent | 8416b3e959fd0f6ade7c2b61a6caea681ee03e15 (diff) | |
Network: external interface to run service handlers
Diffstat (limited to 'src')
| -rw-r--r-- | src/Network.hs | 105 | ||||
| -rw-r--r-- | src/Service.hs | 10 | 
2 files changed, 70 insertions, 45 deletions
| diff --git a/src/Network.hs b/src/Network.hs index 6ace27b..e27c2a3 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -12,6 +12,7 @@ module Network (      Service(..),      serverPeer, serverPeerIce,      sendToPeer, sendToPeerStored, sendToPeerWith, +    runPeerService,      discoveryPort,  ) where @@ -61,12 +62,15 @@ announceIntervalSeconds = 60  data Server = Server      { serverStorage :: Storage +    , serverOrigHead :: Head LocalState      , serverIdentity_ :: MVar UnifiedIdentity      , serverSocket :: MVar Socket      , serverChanPacket :: Chan (PeerAddress, BC.ByteString)      , serverOutQueue :: TQueue (Peer, Bool, TransportPacket)      , serverDataResponse :: TQueue (Peer, Maybe PartialRef)      , serverIOActions :: TQueue (ExceptT String IO ()) +    , serverServices :: [SomeService] +    , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState)      , serverPeers :: MVar (Map PeerAddress Peer)      , serverChanPeer :: TChan Peer      , serverErrorLog :: TQueue String @@ -236,12 +240,15 @@ startServer opt origHead logd' services = do      let server = Server              { serverStorage = storage +            , serverOrigHead = origHead              , serverIdentity_ = midentity              , serverSocket = ssocket              , serverChanPacket = chanPacket              , serverOutQueue = outQueue              , serverDataResponse = dataResponse              , serverIOActions = ioActions +            , serverServices = services +            , serverServiceStates = svcStates              , serverPeers = peers              , serverChanPeer = chanPeer              , serverErrorLog = errlog @@ -360,46 +367,9 @@ startServer opt origHead logd' services = do      void $ forkIO $ forever $ do          (peer, svc, ref) <- atomically $ readTQueue chanSvc -        atomically (readTVar (peerIdentityVar peer)) >>= \case -            PeerIdentityFull peerId -> do -                (global, svcs) <- atomically $ (,) -                    <$> takeTMVar svcStates -                    <*> takeTMVar (peerServiceState peer) -                case find ((svc ==) . someServiceID) services of -                    Just service@(SomeService (proxy :: Proxy s) attr) -> -                        case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs, -                                fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of -                            ((SomeServiceState (_ :: Proxy ps) ps), -                                    (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do -                                Just (Refl :: s :~: ps) <- return $ eqT -                                Just (Refl :: s :~: gs) <- return $ eqT - -                                let inp = ServiceInput -                                        { svcAttributes = attr -                                        , svcPeer = peer -                                        , svcPeerIdentity = peerId -                                        , svcServer = server -                                        , svcPrintOp = atomically . logd -                                        } -                                reloadHead origHead >>= \case -                                    Nothing -> atomically $ do -                                        logd $ "current head deleted" -                                        putTMVar (peerServiceState peer) svcs -                                        putTMVar svcStates global -                                    Just h -> do -                                        (rsp, (s', gs')) <- handleServicePacket h inp ps gs (wrappedLoad ref :: Stored s) -                                        when (not (null rsp)) $ do -                                            sendToPeerList peer rsp -                                        atomically $ do -                                            putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs -                                            putTMVar svcStates $ M.insert svc (SomeServiceGlobalState proxy gs') global -                    _ -> atomically $ do -                        logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" -                        putTMVar (peerServiceState peer) svcs -                        putTMVar svcStates global - -            _ -> do -                atomically $ logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) +        case find ((svc ==) . someServiceID) (serverServices server) of +            Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) +            _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"      return server @@ -812,6 +782,61 @@ sendToPeerWith peer fobj = do           Left err -> throwError err +lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) +    | Just (Refl :: s :~: t) <- eqT = Just (service, attr) +    | otherwise = lookupService proxy rest +lookupService _ [] = Nothing + +runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () +runPeerService = runPeerServiceOn Nothing + +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice peer handler = liftIO $ do +    let server = peerServer peer +        proxy = Proxy @s +        svc = serviceID proxy +        logd = writeTQueue (serverErrorLog server) +    case mbservice `mplus` lookupService proxy (serverServices server) of +        Just (service, attr) -> +            atomically (readTVar (peerIdentityVar peer)) >>= \case +                PeerIdentityFull peerId -> do +                    (global, svcs) <- atomically $ (,) +                        <$> takeTMVar (serverServiceStates server) +                        <*> takeTMVar (peerServiceState peer) +                    case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs, +                            fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of +                        ((SomeServiceState (_ :: Proxy ps) ps), +                                (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do +                            Just (Refl :: s :~: ps) <- return $ eqT +                            Just (Refl :: s :~: gs) <- return $ eqT + +                            let inp = ServiceInput +                                    { svcAttributes = attr +                                    , svcPeer = peer +                                    , svcPeerIdentity = peerId +                                    , svcServer = server +                                    , svcPrintOp = atomically . logd +                                    } +                            reloadHead (serverOrigHead server) >>= \case +                                Nothing -> atomically $ do +                                    logd $ "current head deleted" +                                    putTMVar (peerServiceState peer) svcs +                                    putTMVar (serverServiceStates server) global +                                Just h -> do +                                    (rsp, (s', gs')) <- runServiceHandler h inp ps gs handler +                                    when (not (null rsp)) $ do +                                        sendToPeerList peer rsp +                                    atomically $ do +                                        putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs +                                        putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global +                _ -> do +                    atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer) + +        _ -> atomically $ do +            logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" + +  foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)  foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () diff --git a/src/Service.hs b/src/Service.hs index 90fd34a..0942159 100644 --- a/src/Service.hs +++ b/src/Service.hs @@ -8,7 +8,7 @@ module Service (      ServiceHandler,      ServiceInput(..),      ServiceReply(..), -    handleServicePacket, +    runServiceHandler,      svcGet, svcSet, svcModify,      svcGetGlobal, svcSetGlobal, svcModifyGlobal, @@ -110,10 +110,10 @@ data ServiceHandlerState s = ServiceHandlerState  newtype ServiceHandler s a = ServiceHandler (ReaderT (ServiceInput s) (WriterT [ServiceReply s] (StateT (ServiceHandlerState s) (ExceptT String IO))) a)      deriving (Functor, Applicative, Monad, MonadReader (ServiceInput s), MonadWriter [ServiceReply s], MonadState (ServiceHandlerState s), MonadError String, MonadIO) -handleServicePacket :: Service s => Head LocalState -> ServiceInput s -> ServiceState s -> ServiceGlobalState s -> Stored s -> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s)) -handleServicePacket h input svc global packet = do +runServiceHandler :: Service s => Head LocalState -> ServiceInput s -> ServiceState s -> ServiceGlobalState s -> ServiceHandler s () -> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s)) +runServiceHandler h input svc global shandler = do      let sstate = ServiceHandlerState { svcValue = svc, svcGlobal = global, svcLocal = headStoredObject h } -        ServiceHandler handler = serviceHandler packet +        ServiceHandler handler = shandler      (runExceptT $ flip runStateT sstate $ execWriterT $ flip runReaderT input $ handler) >>= \case          Left err -> do              svcPrintOp input $ "service failed: " ++ err @@ -121,7 +121,7 @@ handleServicePacket h input svc global packet = do          Right (rsp, sstate')              | svcLocal sstate' == svcLocal sstate -> return (rsp, (svcValue sstate', svcGlobal sstate'))              | otherwise -> replaceHead h (svcLocal sstate') >>= \case -                Left (Just h') -> handleServicePacket h' input svc global packet +                Left (Just h') -> runServiceHandler h' input svc global shandler                  _              -> return (rsp, (svcValue sstate', svcGlobal sstate'))  svcGet :: ServiceHandler s (ServiceState s) |