summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network.hs105
-rw-r--r--src/Service.hs10
2 files changed, 70 insertions, 45 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 6ace27b..e27c2a3 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -12,6 +12,7 @@ module Network (
Service(..),
serverPeer, serverPeerIce,
sendToPeer, sendToPeerStored, sendToPeerWith,
+ runPeerService,
discoveryPort,
) where
@@ -61,12 +62,15 @@ announceIntervalSeconds = 60
data Server = Server
{ serverStorage :: Storage
+ , serverOrigHead :: Head LocalState
, serverIdentity_ :: MVar UnifiedIdentity
, serverSocket :: MVar Socket
, serverChanPacket :: Chan (PeerAddress, BC.ByteString)
, serverOutQueue :: TQueue (Peer, Bool, TransportPacket)
, serverDataResponse :: TQueue (Peer, Maybe PartialRef)
, serverIOActions :: TQueue (ExceptT String IO ())
+ , serverServices :: [SomeService]
+ , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState)
, serverPeers :: MVar (Map PeerAddress Peer)
, serverChanPeer :: TChan Peer
, serverErrorLog :: TQueue String
@@ -236,12 +240,15 @@ startServer opt origHead logd' services = do
let server = Server
{ serverStorage = storage
+ , serverOrigHead = origHead
, serverIdentity_ = midentity
, serverSocket = ssocket
, serverChanPacket = chanPacket
, serverOutQueue = outQueue
, serverDataResponse = dataResponse
, serverIOActions = ioActions
+ , serverServices = services
+ , serverServiceStates = svcStates
, serverPeers = peers
, serverChanPeer = chanPeer
, serverErrorLog = errlog
@@ -360,46 +367,9 @@ startServer opt origHead logd' services = do
void $ forkIO $ forever $ do
(peer, svc, ref) <- atomically $ readTQueue chanSvc
- atomically (readTVar (peerIdentityVar peer)) >>= \case
- PeerIdentityFull peerId -> do
- (global, svcs) <- atomically $ (,)
- <$> takeTMVar svcStates
- <*> takeTMVar (peerServiceState peer)
- case find ((svc ==) . someServiceID) services of
- Just service@(SomeService (proxy :: Proxy s) attr) ->
- case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs,
- fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of
- ((SomeServiceState (_ :: Proxy ps) ps),
- (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do
- Just (Refl :: s :~: ps) <- return $ eqT
- Just (Refl :: s :~: gs) <- return $ eqT
-
- let inp = ServiceInput
- { svcAttributes = attr
- , svcPeer = peer
- , svcPeerIdentity = peerId
- , svcServer = server
- , svcPrintOp = atomically . logd
- }
- reloadHead origHead >>= \case
- Nothing -> atomically $ do
- logd $ "current head deleted"
- putTMVar (peerServiceState peer) svcs
- putTMVar svcStates global
- Just h -> do
- (rsp, (s', gs')) <- handleServicePacket h inp ps gs (wrappedLoad ref :: Stored s)
- when (not (null rsp)) $ do
- sendToPeerList peer rsp
- atomically $ do
- putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs
- putTMVar svcStates $ M.insert svc (SomeServiceGlobalState proxy gs') global
- _ -> atomically $ do
- logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
- putTMVar (peerServiceState peer) svcs
- putTMVar svcStates global
-
- _ -> do
- atomically $ logd $ "service packet from peer with incomplete identity " ++ show (peerAddress peer)
+ case find ((svc ==) . someServiceID) (serverServices server) of
+ Just service@(SomeService (_ :: Proxy s) attr) -> runPeerServiceOn (Just (service, attr)) peer (serviceHandler $ wrappedLoad @s ref)
+ _ -> atomically $ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
return server
@@ -812,6 +782,61 @@ sendToPeerWith peer fobj = do
Left err -> throwError err
+lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
+ | Just (Refl :: s :~: t) <- eqT = Just (service, attr)
+ | otherwise = lookupService proxy rest
+lookupService _ [] = Nothing
+
+runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
+runPeerService = runPeerServiceOn Nothing
+
+runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
+runPeerServiceOn mbservice peer handler = liftIO $ do
+ let server = peerServer peer
+ proxy = Proxy @s
+ svc = serviceID proxy
+ logd = writeTQueue (serverErrorLog server)
+ case mbservice `mplus` lookupService proxy (serverServices server) of
+ Just (service, attr) ->
+ atomically (readTVar (peerIdentityVar peer)) >>= \case
+ PeerIdentityFull peerId -> do
+ (global, svcs) <- atomically $ (,)
+ <$> takeTMVar (serverServiceStates server)
+ <*> takeTMVar (peerServiceState peer)
+ case (fromMaybe (someServiceEmptyState service) $ M.lookup svc svcs,
+ fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global) of
+ ((SomeServiceState (_ :: Proxy ps) ps),
+ (SomeServiceGlobalState (_ :: Proxy gs) gs)) -> do
+ Just (Refl :: s :~: ps) <- return $ eqT
+ Just (Refl :: s :~: gs) <- return $ eqT
+
+ let inp = ServiceInput
+ { svcAttributes = attr
+ , svcPeer = peer
+ , svcPeerIdentity = peerId
+ , svcServer = server
+ , svcPrintOp = atomically . logd
+ }
+ reloadHead (serverOrigHead server) >>= \case
+ Nothing -> atomically $ do
+ logd $ "current head deleted"
+ putTMVar (peerServiceState peer) svcs
+ putTMVar (serverServiceStates server) global
+ Just h -> do
+ (rsp, (s', gs')) <- runServiceHandler h inp ps gs handler
+ when (not (null rsp)) $ do
+ sendToPeerList peer rsp
+ atomically $ do
+ putTMVar (peerServiceState peer) $ M.insert svc (SomeServiceState proxy s') svcs
+ putTMVar (serverServiceStates server) $ M.insert svc (SomeServiceGlobalState proxy gs') global
+ _ -> do
+ atomically $ logd $ "can't run service handler on peer with incomplete identity " ++ show (peerAddress peer)
+
+ _ -> atomically $ do
+ logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
+
foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO ()
diff --git a/src/Service.hs b/src/Service.hs
index 90fd34a..0942159 100644
--- a/src/Service.hs
+++ b/src/Service.hs
@@ -8,7 +8,7 @@ module Service (
ServiceHandler,
ServiceInput(..),
ServiceReply(..),
- handleServicePacket,
+ runServiceHandler,
svcGet, svcSet, svcModify,
svcGetGlobal, svcSetGlobal, svcModifyGlobal,
@@ -110,10 +110,10 @@ data ServiceHandlerState s = ServiceHandlerState
newtype ServiceHandler s a = ServiceHandler (ReaderT (ServiceInput s) (WriterT [ServiceReply s] (StateT (ServiceHandlerState s) (ExceptT String IO))) a)
deriving (Functor, Applicative, Monad, MonadReader (ServiceInput s), MonadWriter [ServiceReply s], MonadState (ServiceHandlerState s), MonadError String, MonadIO)
-handleServicePacket :: Service s => Head LocalState -> ServiceInput s -> ServiceState s -> ServiceGlobalState s -> Stored s -> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s))
-handleServicePacket h input svc global packet = do
+runServiceHandler :: Service s => Head LocalState -> ServiceInput s -> ServiceState s -> ServiceGlobalState s -> ServiceHandler s () -> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s))
+runServiceHandler h input svc global shandler = do
let sstate = ServiceHandlerState { svcValue = svc, svcGlobal = global, svcLocal = headStoredObject h }
- ServiceHandler handler = serviceHandler packet
+ ServiceHandler handler = shandler
(runExceptT $ flip runStateT sstate $ execWriterT $ flip runReaderT input $ handler) >>= \case
Left err -> do
svcPrintOp input $ "service failed: " ++ err
@@ -121,7 +121,7 @@ handleServicePacket h input svc global packet = do
Right (rsp, sstate')
| svcLocal sstate' == svcLocal sstate -> return (rsp, (svcValue sstate', svcGlobal sstate'))
| otherwise -> replaceHead h (svcLocal sstate') >>= \case
- Left (Just h') -> handleServicePacket h' input svc global packet
+ Left (Just h') -> runServiceHandler h' input svc global shandler
_ -> return (rsp, (svcValue sstate', svcGlobal sstate'))
svcGet :: ServiceHandler s (ServiceState s)