summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2020-05-10 23:30:59 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2020-05-11 23:13:40 +0200
commitb08e5a3e6d82ca5e5a2e29e791a2e61bf08964a4 (patch)
tree59b122e5f04de61d9069f72309f6ba7d524cff97
parentfd633795d755049c528d6594e6645fd15a1c57e6 (diff)
Network: support adding custom peers
-rw-r--r--src/Main.hs6
-rw-r--r--src/Network.hs232
2 files changed, 154 insertions, 84 deletions
diff --git a/src/Main.hs b/src/Main.hs
index 0398233..34c2b3b 100644
--- a/src/Main.hs
+++ b/src/Main.hs
@@ -96,7 +96,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do
False -> error "Requires terminal"
extPrint <- getExternalPrint
let extPrintLn str = extPrint $ str ++ "\n";
- chanPeer <- liftIO $ do
+ server <- liftIO $ do
erebosHead <- loadLocalStateHead st
startServer erebosHead extPrintLn bhost
[ SomeService @AttachService Proxy
@@ -107,7 +107,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do
peers <- liftIO $ newMVar []
void $ liftIO $ forkIO $ void $ forever $ do
- peer <- readChan chanPeer
+ peer <- readChan $ serverChanPeer server
if | PeerIdentityFull pid <- peerIdentity peer -> do
let update [] = ([peer], Nothing)
update (p:ps) | PeerIdentityFull pid' <- peerIdentity p
@@ -142,6 +142,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do
curIdentity <- liftIO $ loadLocalIdentity st
res <- liftIO $ runExceptT $ flip execStateT cstate $ runReaderT cmd CommandInput
{ ciSelf = curIdentity
+ , ciServer = server
, ciLine = line
, ciPrint = extPrintLn
, ciPeers = liftIO $ readMVar peers
@@ -158,6 +159,7 @@ interactiveLoop st bhost = runInputT defaultSettings $ do
data CommandInput = CommandInput
{ ciSelf :: UnifiedIdentity
+ , ciServer :: Server
, ciLine :: String
, ciPrint :: String -> IO ()
, ciPeers :: CommandM [Peer]
diff --git a/src/Network.hs b/src/Network.hs
index 429dee1..f07e7ce 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -1,12 +1,18 @@
module Network (
+ Server,
+ startServer,
+ serverChanPeer,
+
Peer(..),
PeerAddress(..),
PeerIdentity(..),
PeerChannel(..),
WaitingRef, wrDigest,
Service(..),
- startServer,
+ serverPeer,
sendToPeer, sendToPeerStored, sendToPeerWith,
+
+ discoveryPort,
) where
import Control.Concurrent
@@ -19,6 +25,7 @@ import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import Data.Either
import Data.List
+import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe
import Data.Typeable
@@ -42,6 +49,18 @@ announceIntervalSeconds :: Int
announceIntervalSeconds = 60
+data Server = Server
+ { serverStorage :: Storage
+ , serverIdentity :: MVar UnifiedIdentity
+ , serverSocket :: MVar Socket
+ , serverPeers :: MVar (Map SockAddr Peer)
+ , serverChanPeer' :: Chan Peer
+ }
+
+serverChanPeer :: Server -> Chan Peer
+serverChanPeer = serverChanPeer'
+
+
data Peer = Peer
{ peerAddress :: PeerAddress
, peerIdentity :: PeerIdentity
@@ -51,7 +70,8 @@ data Peer = Peer
, peerStorage :: Storage
, peerInStorage :: PartialStorage
, peerServiceState :: MVar (M.Map ServiceID SomeServiceState)
- , peerServiceQueue :: [(ServiceID, WaitingRef)]
+ , peerServiceInQueue :: [(ServiceID, WaitingRef)]
+ , peerServiceOutQueue :: MVar [TransportPacket]
, peerWaitingRefs :: [WaitingRef]
}
@@ -69,6 +89,9 @@ data PeerChannel = ChannelWait
| ChannelEstablished Channel
+data TransportPacket = TransportPacket TransportHeader [Ref]
+
+
data TransportHeaderItem
= Acknowledged PartialRef
| Rejected PartialRef
@@ -158,7 +181,7 @@ receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do
checkWaitingRef wr
-startServer :: Head -> (String -> IO ()) -> String -> [SomeService] -> IO (Chan Peer)
+startServer :: Head -> (String -> IO ()) -> String -> [SomeService] -> IO Server
startServer origHead logd bhost services = do
let storage = refStorage $ headRef origHead
chanPeer <- newChan
@@ -167,9 +190,11 @@ startServer origHead logd bhost services = do
peers <- newMVar M.empty
midentity <- newMVar $ headLocalIdentity origHead
mshared <- newMVar $ lsShared $ load $ headRef origHead
+ ssocket <- newEmptyMVar
let open addr = do
sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
+ putMVar ssocket sock
setSocketOption sock ReuseAddr 1
setSocketOption sock Broadcast 1
setCloseOnExecIfNeeded =<< fdSocket sock
@@ -223,54 +248,42 @@ startServer origHead logd bhost services = do
forever $ do
(msg, paddr) <- recvFrom sock 4096
- mbpeer <- M.lookup paddr <$> readMVar peers
- (peer, content, secure) <- if
- | Just peer <- mbpeer
- , Just ch <- case peerChannel peer of
- ChannelEstablished ch -> Just ch
- ChannelOurAccept _ sch -> Just $ fromStored sch
- _ -> Nothing
- , Right plain <- runExcept $ channelDecrypt ch msg
- -> return (peer, plain, True)
-
- | Just peer <- mbpeer
- -> return (peer, msg, False)
-
- | otherwise -> do
- pst <- deriveEphemeralStorage storage
- ist <- derivePartialStorage pst
- svcs <- newMVar M.empty
- let peer = Peer
- { peerAddress = DatagramAddress paddr
- , peerIdentity = PeerIdentityUnknown
- , peerIdentityUpdate = []
- , peerChannel = ChannelWait
- , peerSocket = sock
- , peerStorage = pst
- , peerInStorage = ist
- , peerServiceState = svcs
- , peerServiceQueue = []
- , peerWaitingRefs = []
- }
- return (peer, msg, False)
-
- case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
- Right (obj:objs)
- | Just header <- transportFromObject obj -> do
- forM_ objs $ storeObject $ peerInStorage peer
- identity <- readMVar midentity
- let svcs = map someServiceID services
- handlePacket logd identity secure peer chanSvc svcs header >>= \case
- Just peer' -> do
- modifyMVar_ peers $ return . M.insert paddr peer'
- writeChan chanPeer peer'
- Nothing -> return ()
-
- | otherwise -> do
- logd $ show paddr ++ ": invalid objects"
- logd $ show objs
-
- _ -> logd $ show paddr ++ ": invalid objects"
+ modifyMVar_ peers $ \pvalue -> do
+ let mbpeer = M.lookup paddr pvalue
+ (peer, content, secure) <- if
+ | Just peer <- mbpeer
+ , Just ch <- case peerChannel peer of
+ ChannelEstablished ch -> Just ch
+ ChannelOurAccept _ sch -> Just $ fromStored sch
+ _ -> Nothing
+ , Right plain <- runExcept $ channelDecrypt ch msg
+ -> return (peer, plain, True)
+
+ | Just peer <- mbpeer
+ -> return (peer, msg, False)
+
+ | otherwise
+ -> (, msg, False) <$> mkPeer storage sock paddr
+
+ case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
+ Right (obj:objs)
+ | Just header <- transportFromObject obj -> do
+ forM_ objs $ storeObject $ peerInStorage peer
+ identity <- readMVar midentity
+ let svcs = map someServiceID services
+ handlePacket logd identity secure peer chanSvc svcs header >>= \case
+ Just peer' -> do
+ writeChan chanPeer peer'
+ return $ M.insert paddr peer' pvalue
+ Nothing -> return pvalue
+
+ | otherwise -> do
+ logd $ show paddr ++ ": invalid objects"
+ logd $ show objs
+ return pvalue
+
+ _ -> do logd $ show paddr ++ ": invalid objects"
+ return pvalue
void $ forkIO $ withSocketsDo $ do
let hints = defaultHints
@@ -308,7 +321,13 @@ startServer origHead logd bhost services = do
| DatagramAddress paddr <- peerAddress peer -> do
logd $ "service packet from peer with incomplete identity " ++ show paddr
- return chanPeer
+ return Server
+ { serverStorage = storage
+ , serverIdentity = midentity
+ , serverSocket = ssocket
+ , serverPeers = peers
+ , serverChanPeer' = chanPeer
+ }
type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a
@@ -354,9 +373,7 @@ handlePacket logd identity secure opeer chanSvc svcs (TransportHeader headers) =
gets (peerChannel . phPeer) >>= \case
ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref -> do
updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) }
- sendIdentityUpdate identity
- sendSharedState identity . lsShared . fromStored =<<
- liftIO (loadLocalState $ storedStorage $ idData identity)
+ finalizedChannel identity
_ -> return ()
Rejected _ -> return ()
@@ -444,7 +461,7 @@ handlePacket logd identity secure opeer chanSvc svcs (TransportHeader headers) =
addHeader $ Acknowledged pref
pst <- gets $ peerStorage . phPeer
wref <- newWaitingRef pst pref
- updatePeer $ \p -> p { peerServiceQueue = (svc, wref) : peerServiceQueue p }
+ updatePeer $ \p -> p { peerServiceInQueue = (svc, wref) : peerServiceInQueue p }
Left _ -> throwError $ "missing service object " ++ show pref
| otherwise -> addHeader $ Rejected pref
| otherwise -> throwError $ "service ref without type"
@@ -553,30 +570,39 @@ handleChannelAccept identity accref = do
{ peerIdentity = PeerIdentityFull pid
, peerChannel = ChannelEstablished $ fromStored ch
}
- sendIdentityUpdate identity
- sendSharedState identity . lsShared . fromStored =<<
- liftIO (loadLocalState $ storedStorage $ idData identity)
+ finalizedChannel identity
Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
-sendIdentityUpdate :: UnifiedIdentity -> PacketHandler ()
-sendIdentityUpdate self = do
+finalizedChannel :: UnifiedIdentity -> PacketHandler ()
+finalizedChannel self = do
+ -- Identity update
ist <- gets $ peerInStorage . phPeer
addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData $ self
mapM_ addHeader . map (AnnounceUpdate . partialRef ist . storedRef) . idUpdates $ self
-
-sendSharedState :: UnifiedIdentity -> [Stored a] -> PacketHandler ()
-sendSharedState self shared = do
+ -- Shared state
gets phPeer >>= \case
peer | PeerIdentityFull pid <- peerIdentity peer
, finalOwner pid `sameIdentity` finalOwner self -> do
- ist <- gets $ peerInStorage . phPeer
+ shared <- lsShared . fromStored <$>
+ liftIO (loadLocalState $ storedStorage $ idData self)
addHeader $ ServiceType $ serviceID @SyncService Proxy
mapM_ (addHeader . ServiceRef . partialRef ist . storedRef) shared
mapM_ (addBody . storedRef) shared
| otherwise -> return ()
+ -- Outstanding service packets
+ gets phPeer >>= \case
+ Peer { peerChannel = ChannelEstablished ch
+ , peerAddress = DatagramAddress paddr
+ , peerServiceOutQueue = oqueue
+ , peerSocket = sock
+ } -> do
+ ps <- liftIO $ modifyMVar oqueue $ return . ([],)
+ forM_ ps $ sendPacket sock paddr ch
+ _ -> return ()
+
handleIdentityUpdate :: PacketHandler ()
handleIdentityUpdate = do
@@ -601,7 +627,7 @@ handleIdentityUpdate = do
handleServices :: Chan (Peer, ServiceID, Ref) -> PacketHandler ()
-handleServices chan = gets (peerServiceQueue . phPeer) >>= \case
+handleServices chan = gets (peerServiceInQueue . phPeer) >>= \case
[] -> return ()
queue -> do
queue' <- flip filterM queue $ \case
@@ -611,7 +637,45 @@ handleServices chan = gets (peerServiceQueue . phPeer) >>= \case
liftIO $ writeChan chan (peer, svc, ref)
return False
Nothing -> return True
- updatePeer $ \p -> p { peerServiceQueue = queue' }
+ updatePeer $ \p -> p { peerServiceInQueue = queue' }
+
+
+mkPeer :: Storage -> Socket -> SockAddr -> IO Peer
+mkPeer st sock paddr = do
+ pst <- deriveEphemeralStorage st
+ ist <- derivePartialStorage pst
+ svcs <- newMVar M.empty
+ oqueue <- newMVar []
+ return $ Peer
+ { peerAddress = DatagramAddress paddr
+ , peerIdentity = PeerIdentityUnknown
+ , peerIdentityUpdate = []
+ , peerChannel = ChannelWait
+ , peerSocket = sock
+ , peerStorage = pst
+ , peerInStorage = ist
+ , peerServiceState = svcs
+ , peerServiceInQueue = []
+ , peerServiceOutQueue = oqueue
+ , peerWaitingRefs = []
+ }
+
+serverPeer :: Server -> SockAddr -> IO Peer
+serverPeer server paddr = do
+ sock <- readMVar $ serverSocket server
+ (peer, hello) <- modifyMVar (serverPeers server) $ \pvalue -> do
+ case M.lookup paddr pvalue of
+ Just peer -> return (pvalue, (peer, False))
+ Nothing -> do
+ peer <- mkPeer (serverStorage server) sock paddr
+ return (M.insert paddr peer pvalue, (peer, True))
+ when hello $ do
+ identity <- readMVar (serverIdentity server)
+ void $ sendTo sock
+ (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader
+ [ AnnounceSelf $ partialRef (peerInStorage peer) $ storedRef $ idData identity ]
+ ) paddr
+ return peer
sendToPeer :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> s -> m ()
@@ -621,24 +685,28 @@ sendToPeerStored :: (Service s, MonadIO m, MonadError String m) => UnifiedIdenti
sendToPeerStored self peer spacket = sendToPeerList self peer [ServiceReply (Right spacket) True]
sendToPeerList :: (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> [ServiceReply s] -> m ()
-sendToPeerList _ peer@Peer { peerChannel = ChannelEstablished ch } parts = do
- let st = peerInStorage peer
+sendToPeerList _ peer parts = do
+ let st = peerStorage peer
+ pst = peerInStorage peer
srefs <- liftIO $ forM parts $ \case ServiceReply (Left x) _ -> store st x
- ServiceReply (Right sx) _ -> copyRef st (storedRef sx)
-
- bytes <- forM (zip parts srefs) $
- \case (ServiceReply _ False, _) -> return BL.empty
- (ServiceReply _ True, ref) -> case lazyLoadBytes ref of
- Right bytes -> return bytes
- Left dgst -> throwError $ "incomplete ref " ++ show ref ++ ", missing " ++ BC.unpack (showRefDigest dgst)
+ ServiceReply (Right sx) _ -> return $ storedRef sx
+ prefs <- mapM (copyRef pst) srefs
+ let content = map snd $ filter (\(ServiceReply _ use, _) -> use) (zip parts srefs)
+ header = TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef prefs)
+ packet = TransportPacket header content
+ case peerChannel peer of
+ ChannelEstablished ch -> do
+ let DatagramAddress paddr = peerAddress peer
+ sendPacket (peerSocket peer) paddr ch packet
+ _ -> liftIO $ modifyMVar_ (peerServiceOutQueue peer) $ return . (packet:)
+
+sendPacket :: (MonadIO m, MonadError String m) => Socket -> SockAddr -> Channel -> TransportPacket -> m ()
+sendPacket sock addr ch (TransportPacket header content) = do
let plain = BL.toStrict $ BL.concat $
- (serializeObject $ transportToObject $ TransportHeader (ServiceType (serviceID $ head parts) : map ServiceRef srefs))
- : bytes
+ (serializeObject $ transportToObject header)
+ : map lazyLoadBytes content
ctext <- channelEncrypt ch plain
- let DatagramAddress paddr = peerAddress peer
- void $ liftIO $ sendTo (peerSocket peer) ctext paddr
-
-sendToPeerList _ _ _ = throwError $ "no channel to peer"
+ void $ liftIO $ sendTo sock ctext addr
sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => UnifiedIdentity -> Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith identity peer fobj = do