summaryrefslogtreecommitdiff
path: root/src/Erebos
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos')
-rw-r--r--src/Erebos/Conversation.hs2
-rw-r--r--src/Erebos/Discovery.hs278
-rw-r--r--src/Erebos/Flow.hs37
-rw-r--r--src/Erebos/ICE.chs7
-rw-r--r--src/Erebos/ICE/pjproject.c97
-rw-r--r--src/Erebos/ICE/pjproject.h7
-rw-r--r--src/Erebos/Message.hs21
-rw-r--r--src/Erebos/Network.hs60
-rw-r--r--src/Erebos/Service.hs7
-rw-r--r--src/Erebos/Storage.hs2
10 files changed, 338 insertions, 180 deletions
diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs
index f0ffa70..c165343 100644
--- a/src/Erebos/Conversation.hs
+++ b/src/Erebos/Conversation.hs
@@ -71,7 +71,7 @@ directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Con
directMessageConversation peer = do
(find (sameIdentity peer . msgPeer) . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead) >>= \case
Just thread -> return $ DirectMessageConversation thread
- Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] []
+ Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] []
chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation)
chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate)
diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs
index f156c85..d098f98 100644
--- a/src/Erebos/Discovery.hs
+++ b/src/Erebos/Discovery.hs
@@ -3,7 +3,9 @@
module Erebos.Discovery (
DiscoveryService(..),
DiscoveryAttributes(..),
- DiscoveryConnection(..)
+ DiscoveryConnection(..),
+
+ discoverySearch,
) where
import Control.Concurrent
@@ -12,9 +14,13 @@ import Control.Monad.Except
import Control.Monad.Reader
import Data.IP qualified as IP
+import Data.List
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as M
import Data.Maybe
+import Data.Proxy
+import Data.Set (Set)
+import Data.Set qualified as S
import Data.Text (Text)
import Data.Text qualified as T
import Data.Word
@@ -30,6 +36,13 @@ import Erebos.Service
import Erebos.Storage
+#ifndef ENABLE_ICE_SUPPORT
+type IceConfig = ()
+type IceSession = ()
+type IceRemoteInfo = Stored Object
+#endif
+
+
data DiscoveryService
= DiscoverySelf [ Text ] (Maybe Int)
| DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16)
@@ -57,11 +70,7 @@ data DiscoveryConnection = DiscoveryConnection
{ dconnSource :: Ref
, dconnTarget :: Ref
, dconnAddress :: Maybe Text
-#ifdef ENABLE_ICE_SUPPORT
, dconnIceInfo :: Maybe IceRemoteInfo
-#else
- , dconnIceInfo :: Maybe (Stored Object)
-#endif
}
emptyConnection :: Ref -> Ref -> DiscoveryConnection
@@ -90,12 +99,13 @@ instance Storable DiscoveryService where
DiscoveryConnectionRequest conn -> storeConnection "request" conn
DiscoveryConnectionResponse conn -> storeConnection "response" conn
- where storeConnection ctype conn = do
- storeText "connection" $ ctype
- storeRawRef "source" $ dconnSource conn
- storeRawRef "target" $ dconnTarget conn
- storeMbText "address" $ dconnAddress conn
- storeMbRef "ice-info" $ dconnIceInfo conn
+ where
+ storeConnection ctype DiscoveryConnection {..} = do
+ storeText "connection" $ ctype
+ storeRawRef "source" dconnSource
+ storeRawRef "target" dconnTarget
+ storeMbText "address" dconnAddress
+ storeMbRef "ice-info" dconnIceInfo
load' = loadRec $ msum
[ do
@@ -120,22 +130,40 @@ instance Storable DiscoveryService where
, loadConnection "request" DiscoveryConnectionRequest
, loadConnection "response" DiscoveryConnectionResponse
]
- where loadConnection ctype ctor = do
- ctype' <- loadText "connection"
- guard $ ctype == ctype'
- return . ctor =<< DiscoveryConnection
- <$> loadRawRef "source"
- <*> loadRawRef "target"
- <*> loadMbText "address"
- <*> loadMbRef "ice-info"
+ where
+ loadConnection ctype ctor = do
+ ctype' <- loadText "connection"
+ guard $ ctype == ctype'
+ dconnSource <- loadRawRef "source"
+ dconnTarget <- loadRawRef "target"
+ dconnAddress <- loadMbText "address"
+ dconnIceInfo <- loadMbRef "ice-info"
+ return $ ctor DiscoveryConnection {..}
data DiscoveryPeer = DiscoveryPeer
{ dpPriority :: Int
, dpPeer :: Maybe Peer
, dpAddress :: [ Text ]
-#ifdef ENABLE_ICE_SUPPORT
, dpIceSession :: Maybe IceSession
-#endif
+ }
+
+emptyPeer :: DiscoveryPeer
+emptyPeer = DiscoveryPeer
+ { dpPriority = 0
+ , dpPeer = Nothing
+ , dpAddress = []
+ , dpIceSession = Nothing
+ }
+
+data DiscoveryPeerState = DiscoveryPeerState
+ { dpsStunServer :: Maybe ( Text, Word16 )
+ , dpsTurnServer :: Maybe ( Text, Word16 )
+ , dpsIceConfig :: Maybe IceConfig
+ }
+
+data DiscoveryGlobalState = DiscoveryGlobalState
+ { dgsPeers :: Map RefDigest DiscoveryPeer
+ , dgsSearchingFor :: Set RefDigest
}
instance Service DiscoveryService where
@@ -144,13 +172,18 @@ instance Service DiscoveryService where
type ServiceAttributes DiscoveryService = DiscoveryAttributes
defaultServiceAttributes _ = defaultDiscoveryAttributes
-#ifdef ENABLE_ICE_SUPPORT
- type ServiceState DiscoveryService = Maybe IceConfig
- emptyServiceState _ = Nothing
-#endif
+ type ServiceState DiscoveryService = DiscoveryPeerState
+ emptyServiceState _ = DiscoveryPeerState
+ { dpsStunServer = Nothing
+ , dpsTurnServer = Nothing
+ , dpsIceConfig = Nothing
+ }
- type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer
- emptyServiceGlobalState _ = M.empty
+ type ServiceGlobalState DiscoveryService = DiscoveryGlobalState
+ emptyServiceGlobalState _ = DiscoveryGlobalState
+ { dgsPeers = M.empty
+ , dgsSearchingFor = S.empty
+ }
serviceHandler msg = case fromStored msg of
DiscoverySelf addrs priority -> do
@@ -171,15 +204,14 @@ instance Service DiscoveryService where
| otherwise -> return Nothing
- forM_ (idDataF =<< unfoldOwners pid) $ \s ->
- svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer
- { dpPriority = fromMaybe 0 priority
- , dpPeer = Just peer
- , dpAddress = addrs
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+ forM_ (idDataF =<< unfoldOwners pid) $ \sdata -> do
+ let dp = DiscoveryPeer
+ { dpPriority = fromMaybe 0 priority
+ , dpPeer = Just peer
+ , dpAddress = addrs
+ , dpIceSession = Nothing
+ }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.insertWith insertHelper (refDigest $ storedRef sdata) dp $ dgsPeers s }
attrs <- asks svcAttributes
replyPacket $ DiscoveryAcknowledged matchedAddrs
(discoveryStunServer attrs)
@@ -188,7 +220,6 @@ instance Service DiscoveryService where
(discoveryTurnPort attrs)
DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do
-#ifdef ENABLE_ICE_SUPPORT
paddr <- asks (peerAddress . svcPeer) >>= return . \case
(DatagramAddress saddr) -> case IP.fromSockAddr saddr of
Just (IP.IPv6 ipv6, _)
@@ -204,100 +235,90 @@ instance Service DiscoveryService where
toIceServer (Just server) Nothing = Just ( server, 0 )
toIceServer (Just server) (Just port) = Just ( server, port )
- cfg <- liftIO $ iceCreateConfig
- (toIceServer stunServer stunPort)
- (toIceServer turnServer turnPort)
- svcSet cfg
-#endif
- return ()
+ svcModify $ \s -> s
+ { dpsStunServer = toIceServer stunServer stunPort
+ , dpsTurnServer = toIceServer turnServer turnPort
+ }
DiscoverySearch ref -> do
- dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal
+ dpeer <- M.lookup (refDigest ref) . dgsPeers <$> svcGetGlobal
replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer
DiscoveryResult ref [] -> do
svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found"
DiscoveryResult ref addrs -> do
+ let dgst = refDigest ref
-- TODO: check if we really requested that
server <- asks svcServer
self <- svcSelf
- mbIceConfig <- svcGet
discoveryPeer <- asks svcPeer
let runAsService = runPeerService @DiscoveryService discoveryPeer
- liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if
+ forM_ addrs $ \addr -> if
| addr == T.pack "ICE"
-#ifdef ENABLE_ICE_SUPPORT
- , Just config <- mbIceConfig
-> do
- ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
- rinfo <- iceRemoteInfo ice
- res <- runExceptT $ sendToPeer discoveryPeer $
- DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
- case res of
- Right _ -> return ()
- Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
-
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Nothing
- , dpAddress = []
- , dpIceSession = Just ice
- }
-#else
- -> do
- return ()
+#ifdef ENABLE_ICE_SUPPORT
+ getIceConfig >>= \case
+ Just config -> void $ liftIO $ forkIO $ do
+ ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do
+ rinfo <- iceRemoteInfo ice
+
+ res <- runExceptT $ sendToPeer discoveryPeer $
+ DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo }
+ case res of
+ Right _ -> return ()
+ Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err
+
+ runAsService $ do
+ let upd dp = dp { dpIceSession = Just ice }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
+
+ Nothing -> do
+ return ()
#endif
+ return ()
| [ ipaddr, port ] <- words (T.unpack addr) -> do
- saddr <- head <$>
- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
- peer <- serverPeer server (addrAddress saddr)
- runAsService $ do
- svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer
- { dpPriority = 0
- , dpPeer = Just peer
- , dpAddress = []
-#ifdef ENABLE_ICE_SUPPORT
- , dpIceSession = Nothing
-#endif
- }
+ void $ liftIO $ forkIO $ do
+ saddr <- head <$>
+ getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
+ peer <- serverPeer server (addrAddress saddr)
+ runAsService $ do
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) dgst $ dgsPeers s }
| otherwise -> do
- runAsService $ do
- svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
+ svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr
DiscoveryConnectionRequest conn -> do
self <- svcSelf
let rconn = emptyConnection (dconnSource conn) (dconnTarget conn)
- if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
- then do
+ if refDigest (dconnTarget conn) `elem` identityDigests self
+ then if
#ifdef ENABLE_ICE_SUPPORT
- -- request for us, create ICE sesssion
+ -- request for us, create ICE sesssion
+ | Just prinfo <- dconnIceInfo conn -> do
server <- asks svcServer
peer <- asks svcPeer
- svcGet >>= \case
+ getIceConfig >>= \case
Just config -> do
liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do
rinfo <- iceRemoteInfo ice
res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo }
case res of
- Right _ -> do
- case dconnIceInfo conn of
- Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice
- Nothing -> putStrLn $ "Discovery: connection request without ICE remote info"
+ Right _ -> iceConnect ice prinfo $ void $ serverPeerIce server ice
Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err
Nothing -> do
- svcPrint $ "Discovery: ICE request from peer without ICE configuration"
-#else
- return ()
+ return ()
#endif
- else do
+ | otherwise -> do
+ svcPrint $ "Discovery: unsupported connection request"
+
+ else do
-- request to some of our peers, relay
- mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal
+ mbdp <- M.lookup (refDigest $ dconnTarget conn) . dgsPeers <$> svcGetGlobal
case mbdp of
Nothing -> replyPacket $ DiscoveryConnectionResponse rconn
Just dp
@@ -307,29 +328,28 @@ instance Service DiscoveryService where
DiscoveryConnectionResponse conn -> do
self <- svcSelf
- dpeers <- svcGetGlobal
- if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self)
+ dpeers <- dgsPeers <$> svcGetGlobal
+ if refDigest (dconnSource conn) `elem` identityDigests self
then do
-- response to our request, try to connect to the peer
-#ifdef ENABLE_ICE_SUPPORT
server <- asks svcServer
if | Just addr <- dconnAddress conn
, [ipaddr, port] <- words (T.unpack addr) -> do
saddr <- liftIO $ head <$>
getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port)
peer <- liftIO $ serverPeer server (addrAddress saddr)
- svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $
- DiscoveryPeer 0 (Just peer) [] Nothing
+ let upd dp = dp { dpPeer = Just peer }
+ svcModifyGlobal $ \s -> s
+ { dgsPeers = M.alter (Just . upd . fromMaybe emptyPeer) (refDigest $ dconnTarget conn) $ dgsPeers s }
+#ifdef ENABLE_ICE_SUPPORT
| Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers
, Just ice <- dpIceSession dp
, Just rinfo <- dconnIceInfo conn -> do
liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice
+#endif
| otherwise -> svcPrint $ "Discovery: connection request failed"
-#else
- return ()
-#endif
else do
-- response to relayed request
case M.lookup (refDigest $ dconnSource conn) dpeers of
@@ -340,6 +360,7 @@ instance Service DiscoveryService where
serviceNewPeer = do
server <- asks svcServer
peer <- asks svcPeer
+ st <- getStorage
let addrToText saddr = do
( addr, port ) <- IP.fromSockAddr saddr
@@ -351,5 +372,60 @@ instance Service DiscoveryService where
#endif
]
+ pid <- asks svcPeerIdentity
+ gs <- svcGetGlobal
+ let searchingFor = foldl' (flip S.delete) (dgsSearchingFor gs) (identityDigests pid)
+ svcModifyGlobal $ \s -> s { dgsSearchingFor = searchingFor }
+
when (not $ null addrs) $ do
sendToPeer peer $ DiscoverySelf addrs Nothing
+ forM_ searchingFor $ \dgst -> do
+ liftIO (refFromDigest st dgst) >>= \case
+ Just ref -> sendToPeer peer $ DiscoverySearch ref
+ Nothing -> return ()
+
+#ifdef ENABLE_ICE_SUPPORT
+ serviceStopServer _ _ _ pstates = do
+ forM_ pstates $ \( _, DiscoveryPeerState {..} ) -> do
+ mapM_ iceStopThread dpsIceConfig
+#endif
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
+
+
+getIceConfig :: ServiceHandler DiscoveryService (Maybe IceConfig)
+getIceConfig = do
+ dpsIceConfig <$> svcGet >>= \case
+ Just cfg -> return $ Just cfg
+ Nothing -> do
+#ifdef ENABLE_ICE_SUPPORT
+ stun <- dpsStunServer <$> svcGet
+ turn <- dpsTurnServer <$> svcGet
+ liftIO (iceCreateConfig stun turn) >>= \case
+ Just cfg -> do
+ svcModify $ \s -> s { dpsIceConfig = Just cfg }
+ return $ Just cfg
+ Nothing -> do
+ svcPrint $ "Discovery: failed to create ICE config"
+ return Nothing
+#else
+ return Nothing
+#endif
+
+
+discoverySearch :: (MonadIO m, MonadError String m) => Server -> Ref -> m ()
+discoverySearch server ref = do
+ peers <- liftIO $ getCurrentPeerList server
+ match <- forM peers $ \peer -> do
+ peerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ refDigest ref `elem` identityDigests pid
+ _ -> return False
+ when (not $ or match) $ do
+ modifyServiceGlobalState server (Proxy @DiscoveryService) $ \s -> (, ()) s
+ { dgsSearchingFor = S.insert (refDigest ref) $ dgsSearchingFor s
+ }
+ forM_ peers $ \peer -> do
+ sendToPeer peer $ DiscoverySearch ref
diff --git a/src/Erebos/Flow.hs b/src/Erebos/Flow.hs
index ba2607a..1e1a521 100644
--- a/src/Erebos/Flow.hs
+++ b/src/Erebos/Flow.hs
@@ -11,54 +11,53 @@ module Erebos.Flow (
import Control.Concurrent.STM
-data Flow r w = Flow (TMVar [r]) (TMVar [w])
- | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
+data Flow r w
+ = Flow (TBQueue r) (TBQueue w)
+ | forall r' w'. MappedFlow (r' -> r) (w -> w') (Flow r' w')
type SymFlow a = Flow a a
newFlow :: STM (Flow a b, Flow b a)
newFlow = do
- x <- newEmptyTMVar
- y <- newEmptyTMVar
+ x <- newTBQueue 16
+ y <- newTBQueue 16
return (Flow x y, Flow y x)
newFlowIO :: IO (Flow a b, Flow b a)
newFlowIO = atomically newFlow
readFlow :: Flow r w -> STM r
-readFlow (Flow rvar _) = takeTMVar rvar >>= \case
- (x:[]) -> return x
- (x:xs) -> putTMVar rvar xs >> return x
- [] -> error "Flow: empty list"
+readFlow (Flow rvar _) = readTBQueue rvar
readFlow (MappedFlow f _ up) = f <$> readFlow up
tryReadFlow :: Flow r w -> STM (Maybe r)
-tryReadFlow (Flow rvar _) = tryTakeTMVar rvar >>= \case
- Just (x:[]) -> return (Just x)
- Just (x:xs) -> putTMVar rvar xs >> return (Just x)
- Just [] -> error "Flow: empty list"
- Nothing -> return Nothing
+tryReadFlow (Flow rvar _) = tryReadTBQueue rvar
tryReadFlow (MappedFlow f _ up) = fmap f <$> tryReadFlow up
canReadFlow :: Flow r w -> STM Bool
-canReadFlow (Flow rvar _) = not <$> isEmptyTMVar rvar
+canReadFlow (Flow rvar _) = not <$> isEmptyTBQueue rvar
canReadFlow (MappedFlow _ _ up) = canReadFlow up
writeFlow :: Flow r w -> w -> STM ()
-writeFlow (Flow _ wvar) = putTMVar wvar . (:[])
+writeFlow (Flow _ wvar) = writeTBQueue wvar
writeFlow (MappedFlow _ f up) = writeFlow up . f
writeFlowBulk :: Flow r w -> [w] -> STM ()
writeFlowBulk _ [] = return ()
-writeFlowBulk (Flow _ wvar) xs = putTMVar wvar xs
+writeFlowBulk (Flow _ wvar) xs = mapM_ (writeTBQueue wvar) xs
writeFlowBulk (MappedFlow _ f up) xs = writeFlowBulk up $ map f xs
tryWriteFlow :: Flow r w -> w -> STM Bool
-tryWriteFlow (Flow _ wvar) = tryPutTMVar wvar . (:[])
-tryWriteFlow (MappedFlow _ f up) = tryWriteFlow up . f
+tryWriteFlow (Flow _ wvar) x = do
+ isFullTBQueue wvar >>= \case
+ True -> return False
+ False -> do
+ writeTBQueue wvar x
+ return True
+tryWriteFlow (MappedFlow _ f up) x = tryWriteFlow up $ f x
canWriteFlow :: Flow r w -> STM Bool
-canWriteFlow (Flow _ wvar) = isEmptyTMVar wvar
+canWriteFlow (Flow _ wvar) = not <$> isFullTBQueue wvar
canWriteFlow (MappedFlow _ _ up) = canWriteFlow up
readFlowIO :: Flow r w -> IO r
diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs
index 6f61451..cc2fdcc 100644
--- a/src/Erebos/ICE.chs
+++ b/src/Erebos/ICE.chs
@@ -8,6 +8,7 @@ module Erebos.ICE (
IceRemoteInfo,
iceCreateConfig,
+ iceStopThread,
iceCreateSession,
iceDestroy,
iceRemoteInfo,
@@ -138,6 +139,12 @@ iceCreateConfig stun turn =
then return Nothing
else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg
+foreign import ccall unsafe "pjproject.h ice_cfg_stop_thread"
+ ice_cfg_stop_thread :: Ptr PjIceStransCfg -> IO ()
+
+iceStopThread :: IceConfig -> IO ()
+iceStopThread (IceConfig fcfg) = withForeignPtr fcfg ice_cfg_stop_thread
+
{#pointer *pj_ice_strans as ^ #}
iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession
diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c
index e79fb9d..e9446fe 100644
--- a/src/Erebos/ICE/pjproject.c
+++ b/src/Erebos/ICE/pjproject.c
@@ -1,6 +1,7 @@
#include "pjproject.h"
#include "Erebos/ICE_stub.h"
+#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
@@ -15,6 +16,13 @@ static struct
pj_sockaddr def_addr;
} ice;
+struct erebos_ice_cfg
+{
+ pj_ice_strans_cfg cfg;
+ pj_thread_t * thread;
+ atomic_bool exit;
+};
+
struct user_data
{
pj_ice_sess_role role;
@@ -30,17 +38,17 @@ static void ice_perror(const char * msg, pj_status_t status)
fprintf(stderr, "ICE: %s: %s\n", msg, err);
}
-static int ice_worker_thread(void * vcfg)
+static int ice_worker_thread( void * vcfg )
{
- pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg;
+ struct erebos_ice_cfg * ecfg = (struct erebos_ice_cfg *)( vcfg );
- while (true) {
+ while( ! ecfg->exit ){
pj_time_val max_timeout = { 0, 0 };
pj_time_val timeout = { 0, 0 };
max_timeout.msec = 500;
- pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout);
+ pj_timer_heap_poll( ecfg->cfg.stun_cfg.timer_heap, &timeout );
pj_assert(timeout.sec >= 0 && timeout.msec >= 0);
if (timeout.msec >= 1000)
@@ -49,7 +57,7 @@ static int ice_worker_thread(void * vcfg)
if (PJ_TIME_VAL_GT(timeout, max_timeout))
timeout = max_timeout;
- int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout);
+ int c = pj_ioqueue_poll( ecfg->cfg.stun_cfg.ioqueue, &timeout );
if (c < 0)
pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout));
}
@@ -131,80 +139,91 @@ exit:
pthread_mutex_unlock(&mutex);
}
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port )
{
ice_init();
- pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) );
- pj_ice_strans_cfg_default( cfg );
+ struct erebos_ice_cfg * ecfg = malloc( sizeof(struct erebos_ice_cfg) );
+ pj_ice_strans_cfg_default( &ecfg->cfg );
+ ecfg->exit = false;
+ ecfg->thread = NULL;
- cfg->stun_cfg.pf = &ice.cp.factory;
+ ecfg->cfg.stun_cfg.pf = &ice.cp.factory;
if( pj_timer_heap_create( ice.pool, 100,
- &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){
+ &ecfg->cfg.stun_cfg.timer_heap ) != PJ_SUCCESS ){
fprintf( stderr, "pj_timer_heap_create failed\n" );
goto fail;
}
- if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){
+ if( pj_ioqueue_create( ice.pool, 16, &ecfg->cfg.stun_cfg.ioqueue ) != PJ_SUCCESS ){
fprintf( stderr, "pj_ioqueue_create failed\n" );
goto fail;
}
- pj_thread_t * thread;
if( pj_thread_create( ice.pool, NULL, &ice_worker_thread,
- cfg, 0, 0, &thread ) != PJ_SUCCESS ){
+ ecfg, 0, 0, &ecfg->thread ) != PJ_SUCCESS ){
fprintf( stderr, "pj_thread_create failed\n" );
goto fail;
}
- cfg->af = pj_AF_INET();
- cfg->opt.aggressive = PJ_TRUE;
+ ecfg->cfg.af = pj_AF_INET();
+ ecfg->cfg.opt.aggressive = PJ_TRUE;
if( stun_server ){
- cfg->stun.server.ptr = malloc( strlen( stun_server ));
- pj_strcpy2( &cfg->stun.server, stun_server );
+ ecfg->cfg.stun.server.ptr = malloc( strlen( stun_server ));
+ pj_strcpy2( &ecfg->cfg.stun.server, stun_server );
if( stun_port )
- cfg->stun.port = stun_port;
+ ecfg->cfg.stun.port = stun_port;
}
if( turn_server ){
- cfg->turn.server.ptr = malloc( strlen( turn_server ));
- pj_strcpy2( &cfg->turn.server, turn_server );
+ ecfg->cfg.turn.server.ptr = malloc( strlen( turn_server ));
+ pj_strcpy2( &ecfg->cfg.turn.server, turn_server );
if( turn_port )
- cfg->turn.port = turn_port;
- cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
- cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
- cfg->turn.conn_type = PJ_TURN_TP_UDP;
+ ecfg->cfg.turn.port = turn_port;
+ ecfg->cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC;
+ ecfg->cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+ ecfg->cfg.turn.conn_type = PJ_TURN_TP_UDP;
}
- return cfg;
+ return ecfg;
fail:
- ice_cfg_free( cfg );
+ ice_cfg_free( ecfg );
return NULL;
}
-void ice_cfg_free( pj_ice_strans_cfg * cfg )
+void ice_cfg_free( struct erebos_ice_cfg * ecfg )
{
- if( ! cfg )
+ if( ! ecfg )
return;
- if( cfg->turn.server.ptr )
- free( cfg->turn.server.ptr );
+ ecfg->exit = true;
+ pj_thread_join( ecfg->thread );
- if( cfg->stun.server.ptr )
- free( cfg->stun.server.ptr );
+ if( ecfg->cfg.turn.server.ptr )
+ free( ecfg->cfg.turn.server.ptr );
- if( cfg->stun_cfg.ioqueue )
- pj_ioqueue_destroy( cfg->stun_cfg.ioqueue );
+ if( ecfg->cfg.stun.server.ptr )
+ free( ecfg->cfg.stun.server.ptr );
- if( cfg->stun_cfg.timer_heap )
- pj_timer_heap_destroy( cfg->stun_cfg.timer_heap );
+ if( ecfg->cfg.stun_cfg.ioqueue )
+ pj_ioqueue_destroy( ecfg->cfg.stun_cfg.ioqueue );
- free( cfg );
+ if( ecfg->cfg.stun_cfg.timer_heap )
+ pj_timer_heap_destroy( ecfg->cfg.stun_cfg.timer_heap );
+
+ free( ecfg );
+}
+
+void ice_cfg_stop_thread( struct erebos_ice_cfg * ecfg )
+{
+ if( ! ecfg )
+ return;
+ ecfg->exit = true;
}
-pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role,
+pj_ice_strans * ice_create( const struct erebos_ice_cfg * ecfg, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb )
{
ice_init();
@@ -221,7 +240,7 @@ pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role
.on_ice_complete = cb_on_ice_complete,
};
- pj_status_t status = pj_ice_strans_create( NULL, cfg, 1,
+ pj_status_t status = pj_ice_strans_create( NULL, &ecfg->cfg, 1,
udata, &icecb, &res );
if (status != PJ_SUCCESS)
diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h
index e4fcbdb..c31e227 100644
--- a/src/Erebos/ICE/pjproject.h
+++ b/src/Erebos/ICE/pjproject.h
@@ -3,11 +3,12 @@
#include <pjnath.h>
#include <HsFFI.h>
-pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
+struct erebos_ice_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port,
const char * turn_server, uint16_t turn_port );
-void ice_cfg_free( pj_ice_strans_cfg * cfg );
+void ice_cfg_free( struct erebos_ice_cfg * cfg );
+void ice_cfg_stop_thread( struct erebos_ice_cfg * cfg );
-pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role,
+pj_ice_strans * ice_create( const struct erebos_ice_cfg *, pj_ice_sess_role role,
HsStablePtr sptr, HsStablePtr cb );
void ice_destroy(pj_ice_strans * strans);
diff --git a/src/Erebos/Message.hs b/src/Erebos/Message.hs
index 5ef27f3..78fb5e7 100644
--- a/src/Erebos/Message.hs
+++ b/src/Erebos/Message.hs
@@ -29,6 +29,7 @@ import qualified Data.Text as T
import Data.Time.Format
import Data.Time.LocalTime
+import Erebos.Discovery
import Erebos.Identity
import Erebos.Network
import Erebos.Service
@@ -103,8 +104,10 @@ instance Service DirectMessage where
serviceNewPeer = syncDirectMessageToPeer . lookupSharedValue . lsShared . fromStored =<< svcGetLocal
- serviceStorageWatchers _ = (:[]) $
- SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ serviceStorageWatchers _ =
+ [ SomeStorageWatcher (lookupSharedValue . lsShared . fromStored) syncDirectMessageToPeer
+ , GlobalStorageWatcher (lookupSharedValue . lsShared . fromStored) findMissingPeers
+ ]
data MessageState = MessageState
@@ -210,12 +213,19 @@ syncDirectMessageToPeer (DirectMessageThreads mss _) = do
else do
return unchanged
+findMissingPeers :: Server -> DirectMessageThreads -> ExceptT String IO ()
+findMissingPeers server threads = do
+ forM_ (toThreadList threads) $ \thread -> do
+ when (msgHead thread /= msgReceived thread) $ do
+ mapM_ (discoverySearch server) $ map storedRef $ idDataF $ msgPeer thread
+
data DirectMessageThread = DirectMessageThread
{ msgPeer :: ComposedIdentity
- , msgHead :: [Stored DirectMessage]
- , msgSent :: [Stored DirectMessage]
- , msgSeen :: [Stored DirectMessage]
+ , msgHead :: [ Stored DirectMessage ]
+ , msgSent :: [ Stored DirectMessage ]
+ , msgSeen :: [ Stored DirectMessage ]
+ , msgReceived :: [ Stored DirectMessage ]
}
threadToList :: DirectMessageThread -> [DirectMessage]
@@ -249,6 +259,7 @@ messageThreadFor peer mss =
, msgHead = filterAncestors $ ready ++ received
, msgSent = filterAncestors $ sent ++ received
, msgSeen = filterAncestors $ ready ++ seen
+ , msgReceived = filterAncestors $ received
}
diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs
index d8e868a..fb2b5e9 100644
--- a/src/Erebos/Network.hs
+++ b/src/Erebos/Network.hs
@@ -24,6 +24,7 @@ module Erebos.Network (
sendToPeerStored, sendManyToPeerStored,
sendToPeerWith,
runPeerService,
+ modifyServiceGlobalState,
discoveryPort,
) where
@@ -300,13 +301,18 @@ startServer serverOptions serverOrigHead logd' serverServices = do
announceUpdate idt
forM_ serverServices $ \(SomeService service _) -> do
- forM_ (serviceStorageWatchers service) $ \(SomeStorageWatcher sel act) -> do
- watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
- withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
- readTVar (peerIdentityVar peer) >>= \case
- PeerIdentityFull _ -> writeTQueue serverIOActions $ do
- runPeerService peer $ act x
- _ -> return ()
+ forM_ (serviceStorageWatchers service) $ \case
+ SomeStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ withMVar serverPeers $ mapM_ $ \peer -> atomically $ do
+ readTVar (peerIdentityVar peer) >>= \case
+ PeerIdentityFull _ -> writeTQueue serverIOActions $ do
+ runPeerService peer $ act x
+ _ -> return ()
+ GlobalStorageWatcher sel act -> do
+ watchHeadWith serverOrigHead (sel . headStoredObject) $ \x -> do
+ atomically $ writeTQueue serverIOActions $ do
+ act server x
forkServerThread server $ forever $ do
(msg, saddr) <- S.recvFrom sock 4096
@@ -391,8 +397,21 @@ startServer serverOptions serverOrigHead logd' serverServices = do
return server
stopServer :: Server -> IO ()
-stopServer Server {..} = do
- mapM_ killThread =<< takeMVar serverThreads
+stopServer server@Server {..} = do
+ withMVar serverPeers $ \peers -> do
+ ( global, peerStates ) <- atomically $ (,)
+ <$> takeTMVar serverServiceStates
+ <*> (forM (M.elems peers) $ \p@Peer {..} -> ( p, ) <$> takeTMVar peerServiceState)
+
+ forM_ global $ \(SomeServiceGlobalState (proxy :: Proxy s) gs) -> do
+ ps <- forM peerStates $ \( peer, states ) ->
+ return $ ( peer, ) $ case M.lookup (serviceID proxy) states of
+ Just (SomeServiceState (_ :: Proxy ps) pstate)
+ | Just (Refl :: s :~: ps) <- eqT
+ -> pstate
+ _ -> emptyServiceState proxy
+ serviceStopServer proxy server gs ps
+ mapM_ killThread =<< takeMVar serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker server = forever $ do
@@ -899,7 +918,7 @@ sendToPeerWith peer fobj = do
Left err -> throwError err
-lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
+lookupService :: forall s proxy. Service s => proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService proxy (service@(SomeService (_ :: Proxy t) attr) : rest)
| Just (Refl :: s :~: t) <- eqT = Just (service, attr)
| otherwise = lookupService proxy rest
@@ -954,6 +973,27 @@ runPeerServiceOn mbservice peer handler = liftIO $ do
_ -> atomically $ do
logd $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+modifyServiceGlobalState
+ :: forall s a m proxy. (Service s, MonadIO m, MonadError String m)
+ => Server -> proxy s
+ -> (ServiceGlobalState s -> ( ServiceGlobalState s, a ))
+ -> m a
+modifyServiceGlobalState server proxy f = do
+ let svc = serviceID proxy
+ case lookupService proxy (serverServices server) of
+ Just ( service, _ ) -> do
+ liftIO $ atomically $ do
+ global <- takeTMVar (serverServiceStates server)
+ ( global', res ) <- case fromMaybe (someServiceEmptyGlobalState service) $ M.lookup svc global of
+ SomeServiceGlobalState (_ :: Proxy gs) gs -> do
+ (Refl :: s :~: gs) <- return $ fromMaybe (error "service ID mismatch in global map") eqT
+ let ( gs', res ) = f gs
+ return ( M.insert svc (SomeServiceGlobalState (Proxy @s) gs') global, res )
+ putTMVar (serverServiceStates server) global'
+ return res
+ Nothing -> do
+ throwError $ "unhandled service '" ++ show (toUUID svc) ++ "'"
+
foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32)
foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress)
diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs
index d1943e1..b5e52dd 100644
--- a/src/Erebos/Service.hs
+++ b/src/Erebos/Service.hs
@@ -71,6 +71,9 @@ class (
serviceStorageWatchers :: proxy s -> [SomeStorageWatcher s]
serviceStorageWatchers _ = []
+ serviceStopServer :: proxy s -> Server -> ServiceGlobalState s -> [ ( Peer, ServiceState s ) ] -> IO ()
+ serviceStopServer _ _ _ _ = return ()
+
data SomeService = forall s. Service s => SomeService (Proxy s) (ServiceAttributes s)
@@ -100,7 +103,9 @@ someServiceEmptyGlobalState :: SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState (SomeService p _) = SomeServiceGlobalState p (emptyServiceGlobalState p)
-data SomeStorageWatcher s = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+data SomeStorageWatcher s
+ = forall a. Eq a => SomeStorageWatcher (Stored LocalState -> a) (a -> ServiceHandler s ())
+ | forall a. Eq a => GlobalStorageWatcher (Stored LocalState -> a) (Server -> a -> ExceptT String IO ())
newtype ServiceID = ServiceID UUID
diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs
index c1e9664..9ccfdde 100644
--- a/src/Erebos/Storage.hs
+++ b/src/Erebos/Storage.hs
@@ -4,7 +4,7 @@ module Erebos.Storage (
deriveEphemeralStorage, derivePartialStorage,
Ref, PartialRef, RefDigest,
- refDigest,
+ refDigest, refFromDigest,
readRef, showRef, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,