diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network.hs | 105 | ||||
-rw-r--r-- | src/Service.hs | 10 |
2 files changed, 70 insertions, 45 deletions
diff --git a/src/Network.hs b/src/Network.hs index 6ace27b..e27c2a3 100644 --- a/src/Network.hs +++ b/src/Network.hs @@ -12,6 +12,7 @@ module Network ( Service(..), serverPeer, serverPeerIce, sendToPeer, sendToPeerStored, sendToPeerWith, + runPeerService, discoveryPort, ) where @@ -61,12 +62,15 @@ announceIntervalSeconds = 60 data Server = Server { serverStorage :: Storage + , serverOrigHead :: Head LocalState , serverIdentity_ :: MVar UnifiedIdentity , serverSocket :: MVar Socket , serverChanPacket :: Chan (PeerAddress, BC.ByteString) , serverOutQueue :: TQueue (Peer, Bool, TransportPacket) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) , serverIOActions :: TQueue (ExceptT String IO ()) + , serverServices :: [SomeService] + , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState) , serverPeers :: MVar (Map PeerAddress Peer) , serverChanPeer :: TChan Peer , serverErrorLog :: TQueue String @@ -236,12 +240,15 @@ startServer opt origHead logd' services = do let server = Server { serverStorage = storage + , serverOrigHead = origHead , serverIdentity_ = midentity , serverSocket = ssocket , serverChanPacket = chanPacket , serverOutQueue = outQueue , serverDataResponse = dataResponse , serverIOActions = ioActions + , serverServices = services + , serverServiceStates = svcStates , serverPeers = peers , serverChanPeer = chanPeer , serverErrorLog = errlog @@ -360,46 +367,9 @@ startServer opt origHead logd' services = do void $ forkIO $ forever $ do (peer, svc, ref) <- atomically $ readTQueue chanSvc - atomically (readTVar (peerIdentityVar peer)) >>= \case - PeerIdentityFull peerId -> do - (global, svcs) <- atomically $ (,) - <$> takeTMVar svcStates - <*> takeTMVar (peerServiceState peer) - case find ((svc ==) . someServiceID) services of - Just service@(SomeService (proxy :: Proxy s) attr) -> - case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs, - fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of - ((SomeServiceState (_ :: Proxy ps) ps), - (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do - Just (Refl :: s :~: ps) <- return $ eqT - Just (Refl :: s :~: gs) <- return $ eqT - - let inp = ServiceInput - { svcAttributes = attr - , svcPeer = peer - , svcPeerIdentity = peerId - , svcServer = server - , svcPrintOp = atomically . logd - } - reloadHead origHead >>= \case - Nothing -> atomically $ do - logd $ "current head deleted" - putTMVar (peerServiceState peer) svcs - putTMVar svcStates global - Just h -> do - (rsp, (s', gs')) <- handleServicePacket h inp ps gs (wrappedLoad ref :: Stored s) - when (not (null rsp)) $ do - sendToPeerList peer rsp - atomically $ do - putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs - putTMVar svcStates $ M.insert svc (SomeServiceGlobalState proxy gs') global - _ -> atomically $ do - logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" - putTMVar (peerServiceState peer) svcs - putTMVar svcStates global - - _ -> do - atomically $ logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer) + case find ((svc ==) . someServiceID) (serverServices server) of + Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref) + _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" return server @@ -812,6 +782,61 @@ sendToPeerWith peer fobj = do Left err -> throwError err +lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) +lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest) + | Just (Refl :: s :~: t) <- eqT = Just (service, attr) + | otherwise = lookupService proxy rest +lookupService _ [] = Nothing + +runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m () +runPeerService = runPeerServiceOn Nothing + +runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m () +runPeerServiceOn mbservice peer handler = liftIO $ do + let server = peerServer peer + proxy = Proxy @s + svc = serviceID proxy + logd = writeTQueue (serverErrorLog server) + case mbservice `mplus` lookupService proxy (serverServices server) of + Just (service, attr) -> + atomically (readTVar (peerIdentityVar peer)) >>= \case + PeerIdentityFull peerId -> do + (global, svcs) <- atomically $ (,) + <$> takeTMVar (serverServiceStates server) + <*> takeTMVar (peerServiceState peer) + case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs, + fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of + ((SomeServiceState (_ :: Proxy ps) ps), + (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do + Just (Refl :: s :~: ps) <- return $ eqT + Just (Refl :: s :~: gs) <- return $ eqT + + let inp = ServiceInput + { svcAttributes = attr + , svcPeer = peer + , svcPeerIdentity = peerId + , svcServer = server + , svcPrintOp = atomically . logd + } + reloadHead (serverOrigHead server) >>= \case + Nothing -> atomically $ do + logd $ "current head deleted" + putTMVar (peerServiceState peer) svcs + putTMVar (serverServiceStates server) global + Just h -> do + (rsp, (s', gs')) <- runServiceHandler h inp ps gs handler + when (not (null rsp)) $ do + sendToPeerList peer rsp + atomically $ do + putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs + putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global + _ -> do + atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer) + + _ -> atomically $ do + logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" + + foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () diff --git a/src/Service.hs b/src/Service.hs index 90fd34a..0942159 100644 --- a/src/Service.hs +++ b/src/Service.hs @@ -8,7 +8,7 @@ module Service ( ServiceHandler, ServiceInput(..), ServiceReply(..), - handleServicePacket, + runServiceHandler, svcGet, svcSet, svcModify, svcGetGlobal, svcSetGlobal, svcModifyGlobal, @@ -110,10 +110,10 @@ data ServiceHandlerState s = ServiceHandlerState newtype ServiceHandler s a = ServiceHandler (ReaderT (ServiceInput s) (WriterT [ServiceReply s] (StateT (ServiceHandlerState s) (ExceptT String IO))) a) deriving (Functor, Applicative, Monad, MonadReader (ServiceInput s), MonadWriter [ServiceReply s], MonadState (ServiceHandlerState s), MonadError String, MonadIO) -handleServicePacket :: Service s => Head LocalState -> ServiceInput s -> ServiceState s -> ServiceGlobalState s -> Stored s -> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s)) -handleServicePacket h input svc global packet = do +runServiceHandler :: Service s => Head LocalState -> ServiceInput s -> ServiceState s -> ServiceGlobalState s -> ServiceHandler s () -> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s)) +runServiceHandler h input svc global shandler = do let sstate = ServiceHandlerState { svcValue = svc, svcGlobal = global, svcLocal = headStoredObject h } - ServiceHandler handler = serviceHandler packet + ServiceHandler handler = shandler (runExceptT $ flip runStateT sstate $ execWriterT $ flip runReaderT input $ handler) >>= \case Left err -> do svcPrintOp input $ "service failed: " ++ err @@ -121,7 +121,7 @@ handleServicePacket h input svc global packet = do Right (rsp, sstate') | svcLocal sstate' == svcLocal sstate -> return (rsp, (svcValue sstate', svcGlobal sstate')) | otherwise -> replaceHead h (svcLocal sstate') >>= \case - Left (Just h') -> handleServicePacket h' input svc global packet + Left (Just h') -> runServiceHandler h' input svc global shandler _ -> return (rsp, (svcValue sstate', svcGlobal sstate')) svcGet :: ServiceHandler s (ServiceState s) |