summaryrefslogtreecommitdiff
path: root/src/Network.hs
diff options
context:
space:
mode:
authorRoman Smrž <roman.smrz@seznam.cz>2019-10-19 23:07:04 +0200
committerRoman Smrž <roman.smrz@seznam.cz>2019-10-19 23:07:04 +0200
commit0f8561a997952a76a92919e527b6bc05ade8ee65 (patch)
tree2922438457d847084f7f2bd76c2ee2cb9d0e10af /src/Network.hs
parent1aef7681082e411c135802881ebcd3ffd0168fcd (diff)
Network rewrite with data request and ack
Packet header is now composed of individual header items, which can be combined in various ways. Received data is properly acknowledged and missing objects can be requested using hashes.
Diffstat (limited to 'src/Network.hs')
-rw-r--r--src/Network.hs582
1 files changed, 375 insertions, 207 deletions
diff --git a/src/Network.hs b/src/Network.hs
index 053dbe5..5d86a24 100644
--- a/src/Network.hs
+++ b/src/Network.hs
@@ -1,6 +1,9 @@
module Network (
Peer(..),
PeerAddress(..),
+ PeerIdentity(..), peerIdentityRef,
+ PeerChannel(..),
+ WaitingRef, wrDigest,
startServer,
sendToPeer,
) where
@@ -9,10 +12,14 @@ import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.Except
+import Control.Monad.State
+
+import Crypto.Random
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.Map as M
+import Data.Maybe
import qualified Data.Text as T
import Network.Socket
@@ -30,87 +37,116 @@ discoveryPort = "29665"
data Peer = Peer
{ peerAddress :: PeerAddress
- , peerIdentity :: Maybe UnifiedIdentity
- , peerChannels :: [Channel]
+ , peerIdentity :: PeerIdentity
+ , peerOwner :: PeerIdentity
+ , peerChannel :: PeerChannel
, peerSocket :: Socket
, peerStorage :: Storage
, peerInStorage :: PartialStorage
+ , peerServiceQueue :: [(T.Text, WaitingRef)]
+ , peerWaitingRefs :: [WaitingRef]
}
- deriving (Show)
data PeerAddress = DatagramAddress SockAddr
deriving (Show)
+data PeerIdentity = PeerIdentityUnknown
+ | PeerIdentityRef WaitingRef
+ | PeerIdentityFull UnifiedIdentity
+
+peerIdentityRef :: Peer -> Maybe PartialRef
+peerIdentityRef peer = case peerIdentity peer of
+ PeerIdentityUnknown -> Nothing
+ PeerIdentityRef (WaitingRef _ pref _) -> Just pref
+ PeerIdentityFull idt -> Just $ partialRef (peerInStorage peer) $ storedRef $ idData idt
+
+data PeerChannel = ChannelWait
+ | ChannelOurRequest (Stored ChannelRequest)
+ | ChannelPeerRequest WaitingRef
+ | ChannelOurAccept (Stored ChannelAccept) (Stored Channel)
+ | ChannelEstablished Channel
+
-data TransportHeader = AnnouncePacket PartialRef
- | IdentityRequest PartialRef PartialRef
- | IdentityResponse PartialRef
- | TrChannelRequest PartialRef
- | TrChannelAccept PartialRef
+data TransportHeaderItem
+ = Acknowledged PartialRef
+ | DataRequest PartialRef
+ | DataResponse PartialRef
+ | AnnounceSelf PartialRef
+ | TrChannelRequest PartialRef
+ | TrChannelAccept PartialRef
+ | ServiceType T.Text
+ | ServiceRef PartialRef
-data ServiceHeader = ServiceHeader T.Text PartialRef
+data TransportHeader = TransportHeader [TransportHeaderItem]
transportToObject :: TransportHeader -> PartialObject
-transportToObject = \case
- AnnouncePacket ref -> Rec
- [ (BC.pack "TRANS", RecText $ T.pack "announce")
- , (BC.pack "identity", RecRef ref)
- ]
- IdentityRequest ref from -> Rec
- [ (BC.pack "TRANS", RecText $ T.pack "idreq")
- , (BC.pack "identity", RecRef ref)
- , (BC.pack "from", RecRef from)
- ]
- IdentityResponse ref -> Rec
- [ (BC.pack "TRANS", RecText $ T.pack "idresp")
- , (BC.pack "identity", RecRef ref)
- ]
- TrChannelRequest ref -> Rec
- [ (BC.pack "TRANS", RecText $ T.pack "chreq")
- , (BC.pack "req", RecRef ref)
- ]
- TrChannelAccept ref -> Rec
- [ (BC.pack "TRANS", RecText $ T.pack "chacc")
- , (BC.pack "acc", RecRef ref)
- ]
+transportToObject (TransportHeader items) = Rec $ map single items
+ where single = \case
+ Acknowledged ref -> (BC.pack "ACK", RecRef ref)
+ DataRequest ref -> (BC.pack "REQ", RecRef ref)
+ DataResponse ref -> (BC.pack "RSP", RecRef ref)
+ AnnounceSelf ref -> (BC.pack "ANN", RecRef ref)
+ TrChannelRequest ref -> (BC.pack "CRQ", RecRef ref)
+ TrChannelAccept ref -> (BC.pack "CAC", RecRef ref)
+ ServiceType stype -> (BC.pack "STP", RecText stype)
+ ServiceRef ref -> (BC.pack "SRF", RecRef ref)
transportFromObject :: PartialObject -> Maybe TransportHeader
-transportFromObject (Rec items)
- | Just (RecText trans) <- lookup (BC.pack "TRANS") items, trans == T.pack "announce"
- , Just (RecRef ref) <- lookup (BC.pack "identity") items
- = Just $ AnnouncePacket ref
+transportFromObject (Rec items) = case catMaybes $ map single items of
+ [] -> Nothing
+ titems -> Just $ TransportHeader titems
+ where single (name, content) = if
+ | name == BC.pack "ACK", RecRef ref <- content -> Just $ Acknowledged ref
+ | name == BC.pack "REQ", RecRef ref <- content -> Just $ DataRequest ref
+ | name == BC.pack "RSP", RecRef ref <- content -> Just $ DataResponse ref
+ | name == BC.pack "ANN", RecRef ref <- content -> Just $ AnnounceSelf ref
+ | name == BC.pack "CRQ", RecRef ref <- content -> Just $ TrChannelRequest ref
+ | name == BC.pack "CAC", RecRef ref <- content -> Just $ TrChannelAccept ref
+ | name == BC.pack "STP", RecText stype <- content -> Just $ ServiceType stype
+ | name == BC.pack "SRF", RecRef ref <- content -> Just $ ServiceRef ref
+ | otherwise -> Nothing
+transportFromObject _ = Nothing
- | Just (RecText trans) <- lookup (BC.pack "TRANS") items, trans == T.pack "idreq"
- , Just (RecRef ref) <- lookup (BC.pack "identity") items
- , Just (RecRef from) <- lookup (BC.pack "from") items
- = Just $ IdentityRequest ref from
+lookupServiceType :: [TransportHeaderItem] -> Maybe T.Text
+lookupServiceType (ServiceType stype : _) = Just stype
+lookupServiceType (_ : hs) = lookupServiceType hs
+lookupServiceType [] = Nothing
- | Just (RecText trans) <- lookup (BC.pack "TRANS") items, trans == T.pack "idresp"
- , Just (RecRef ref) <- lookup (BC.pack "identity") items
- = Just $ IdentityResponse ref
- | Just (RecText trans) <- lookup (BC.pack "TRANS") items, trans == T.pack "chreq"
- , Just (RecRef ref) <- lookup (BC.pack "req") items
- = Just $ TrChannelRequest ref
+data WaitingRef = WaitingRef Storage PartialRef (MVar [RefDigest])
- | Just (RecText trans) <- lookup (BC.pack "TRANS") items, trans == T.pack "chacc"
- , Just (RecRef ref) <- lookup (BC.pack "acc") items
- = Just $ TrChannelAccept ref
+wrDigest :: WaitingRef -> RefDigest
+wrDigest (WaitingRef _ pref _) = refDigest pref
-transportFromObject _ = Nothing
+newWaitingRef :: Storage -> PartialRef -> PacketHandler WaitingRef
+newWaitingRef st pref = do
+ wref <- WaitingRef st pref <$> liftIO (newMVar [])
+ updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p }
+ return wref
+
+copyOrRequestRef :: Storage -> PartialRef -> PacketHandler (Either WaitingRef Ref)
+copyOrRequestRef st pref = copyRef st pref >>= \case
+ Right ref -> return $ Right ref
+ Left dgst -> do
+ addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst
+ wref <- WaitingRef st pref <$> liftIO (newMVar [dgst])
+ updatePeer $ \p -> p { peerWaitingRefs = wref : peerWaitingRefs p }
+ return $ Left wref
-serviceToObject :: ServiceHeader -> PartialObject
-serviceToObject (ServiceHeader svc ref) = Rec
- [ (BC.pack "SVC", RecText svc)
- , (BC.pack "ref", RecRef ref)
- ]
+checkWaitingRef :: WaitingRef -> PacketHandler (Maybe Ref)
+checkWaitingRef (WaitingRef st pref mvar) = do
+ liftIO (readMVar mvar) >>= \case
+ [] -> copyRef st pref >>= \case
+ Right ref -> return $ Just ref
+ Left dgst -> do liftIO $ modifyMVar_ mvar $ return . (dgst:)
+ addHeader $ DataRequest $ partialRefFromDigest (refStorage pref) dgst
+ return Nothing
+ _ -> return Nothing
-serviceFromObject :: PartialObject -> Maybe ServiceHeader
-serviceFromObject (Rec items)
- | Just (RecText svc) <- lookup (BC.pack "SVC") items
- , Just (RecRef ref) <- lookup (BC.pack "ref") items
- = Just $ ServiceHeader svc ref
-serviceFromObject _ = Nothing
+receivedWaitingRef :: PartialRef -> WaitingRef -> PacketHandler (Maybe Ref)
+receivedWaitingRef nref wr@(WaitingRef _ _ mvar) = do
+ liftIO $ modifyMVar_ mvar $ return . filter (/= refDigest nref)
+ checkWaitingRef wr
startServer :: (String -> IO ()) -> String -> UnifiedIdentity -> IO (Chan Peer, Chan (Peer, T.Text, Ref))
@@ -131,149 +167,50 @@ startServer logd bhost identity = do
loop sock = do
st <- derivePartialStorage $ storedStorage sidentity
baddr:_ <- getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just bhost) (Just discoveryPort)
- void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ AnnouncePacket $ partialRef st $ storedRef sidentity) (addrAddress baddr)
+ void $ sendTo sock (BL.toStrict $ serializeObject $ transportToObject $ TransportHeader [ AnnounceSelf $ partialRef st $ storedRef sidentity ]) (addrAddress baddr)
forever $ do
(msg, paddr) <- recvFrom sock 4096
mbpeer <- M.lookup paddr <$> readMVar peers
- if | Just peer <- mbpeer
- , ch:_ <- peerChannels peer
- , Just plain <- channelDecrypt ch msg
- , Right (obj:objs) <- runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict plain
- , Just (ServiceHeader svc ref) <- serviceFromObject obj
- -> do forM_ objs $ storeObject $ peerInStorage peer
- copyRef (peerStorage peer) ref >>= \case
- Just pref -> writeChan chanSvc (peer, svc, pref)
- Nothing -> logd $ show paddr ++ ": incomplete service packet"
-
- | otherwise -> do
- (pst, ist) <- case mbpeer of
- Just peer -> return (peerStorage peer, peerInStorage peer)
- Nothing -> do pst <- deriveEphemeralStorage $ storedStorage sidentity
- ist <- derivePartialStorage pst
- return (pst, ist)
- if | Right (obj:objs) <- runExcept $ deserializeObjects ist $ BL.fromStrict msg
- , Just tpack <- transportFromObject obj
- -> packet sock paddr tpack objs pst ist
-
- | otherwise -> logd $ show paddr ++ ": invalid packet"
-
- packet sock paddr (AnnouncePacket ref) _ _ ist = do
- logd $ "Got announce: " ++ show ref ++ " from " ++ show paddr
- when (refDigest ref /= refDigest (storedRef sidentity)) $ void $ sendTo sock (BL.toStrict $ BL.concat
- [ serializeObject $ transportToObject $ IdentityRequest ref (partialRef ist $ storedRef sidentity)
- , BL.concat $ map (lazyLoadBytes . storedRef) $ collectStoredObjects $ wrappedLoad $ storedRef sidentity
- ]) paddr
-
- packet _ paddr (IdentityRequest ref from) [] _ _ = do
- logd $ "Got identity request: for " ++ show ref ++ " by " ++ show from ++ " from " ++ show paddr ++ " without content"
-
- packet sock paddr (IdentityRequest ref from) (obj:objs) pst ist = do
- logd $ "Got identity request: for " ++ show ref ++ " by " ++ show from ++ " from " ++ show paddr
- logd $ show (obj:objs)
- from' <- storeObject ist obj
- if from == from'
- then do forM_ objs $ storeObject ist
- copyRef pst from >>= \case
- Nothing -> logd $ "Incomplete peer identity"
- Just sfrom | Just pidentity <- verifyIdentity (wrappedLoad sfrom) -> do
- let peer = Peer (DatagramAddress paddr) (Just pidentity) [] sock pst ist
- modifyMVar_ peers $ return . M.insert paddr peer
- writeChan chanPeer peer
- void $ sendTo sock (BL.toStrict $ BL.concat
- [ serializeObject $ transportToObject $ IdentityResponse (partialRef ist $ storedRef sidentity)
- , BL.concat $ map (lazyLoadBytes . storedRef) $ collectStoredObjects $ wrappedLoad $ storedRef sidentity
- ]) paddr
- Just _ -> logd $ "Peer identity verification failed"
- else logd $ "Mismatched content"
-
- packet _ paddr (IdentityResponse ref) [] _ _ = do
- logd $ "Got identity response: by " ++ show ref ++ " from " ++ show paddr ++ " without content"
-
- packet sock paddr (IdentityResponse ref) (obj:objs) pst ist = do
- logd $ "Got identity response: by " ++ show ref ++ " from " ++ show paddr
- logd $ show (obj:objs)
- ref' <- storeObject ist obj
- if ref == ref'
- then do forM_ objs $ storeObject ist
- copyRef pst ref >>= \case
- Nothing -> logd $ "Incomplete peer identity"
- Just sref | Just pidentity <- verifyIdentity (wrappedLoad sref) -> do
- let peer = Peer (DatagramAddress paddr) (Just pidentity) [] sock pst ist
- modifyMVar_ peers $ return . M.insert paddr peer
- writeChan chanPeer peer
- req <- createChannelRequest pst identity pidentity
- void $ sendTo sock (BL.toStrict $ BL.concat
- [ serializeObject $ transportToObject $ TrChannelRequest (partialRef ist $ storedRef req)
- , lazyLoadBytes $ storedRef req
- , lazyLoadBytes $ storedRef $ signedData $ fromStored req
- , lazyLoadBytes $ storedRef $ crKey $ fromStored $ signedData $ fromStored req
- , BL.concat $ map (lazyLoadBytes . storedRef) $ signedSignature $ fromStored req
- ]) paddr
- Just _ -> logd $ "Peer identity verification failed"
- else logd $ "Mismatched content"
-
- packet _ paddr (TrChannelRequest _) [] _ _ = do
- logd $ "Got channel request: from " ++ show paddr ++ " without content"
-
- packet sock paddr (TrChannelRequest ref) (obj:objs) pst ist = do
- logd $ "Got channel request: from " ++ show paddr
- logd $ show (obj:objs)
- ref' <- storeObject ist obj
- if ref == ref'
- then do forM_ objs $ storeObject ist
- copyRef pst ref >>= \case
- Nothing -> logd $ "Incomplete channel request"
- Just sref -> do
- let request = wrappedLoad sref :: Stored ChannelRequest
- modifyMVar_ peers $ \pval -> case M.lookup paddr pval of
- Just peer | Just pid <- peerIdentity peer ->
- runExceptT (acceptChannelRequest identity pid request) >>= \case
- Left errs -> do mapM_ logd ("Invalid channel request" : errs)
- return pval
- Right (acc, channel) -> do
- logd $ "Got channel: " ++ show (storedRef channel)
- let peer' = peer { peerChannels = fromStored channel : peerChannels peer }
- writeChan chanPeer peer'
- void $ sendTo sock (BL.toStrict $ BL.concat
- [ serializeObject $ transportToObject $ TrChannelAccept (partialRef ist $ storedRef acc)
- , lazyLoadBytes $ storedRef acc
- , lazyLoadBytes $ storedRef $ signedData $ fromStored acc
- , lazyLoadBytes $ storedRef $ caKey $ fromStored $ signedData $ fromStored acc
- , BL.concat $ map (lazyLoadBytes . storedRef) $ signedSignature $ fromStored acc
- ]) paddr
- return $ M.insert paddr peer' pval
-
- _ -> do logd $ "Invalid channel request - no peer identity"
- return pval
- else logd $ "Mismatched content"
-
- packet _ paddr (TrChannelAccept _) [] _ _ = do
- logd $ "Got channel accept: from " ++ show paddr ++ " without content"
-
- packet _ paddr (TrChannelAccept ref) (obj:objs) pst ist = do
- logd $ "Got channel accept: from " ++ show paddr
- logd $ show (obj:objs)
- ref' <- storeObject ist obj
- if ref == ref'
- then do forM_ objs $ storeObject ist
- copyRef pst ref >>= \case
- Nothing -> logd $ "Incomplete channel accept"
- Just sref -> do
- let accepted = wrappedLoad sref :: Stored ChannelAccept
- modifyMVar_ peers $ \pval -> case M.lookup paddr pval of
- Just peer | Just pid <- peerIdentity peer ->
- runExceptT (acceptedChannel identity pid accepted) >>= \case
- Left errs -> do mapM_ logd ("Invalid channel accept" : errs)
- return pval
- Right channel -> do
- logd $ "Got channel: " ++ show (storedRef channel)
- let peer' = peer { peerChannels = fromStored channel : peerChannels peer }
- writeChan chanPeer peer'
- return $ M.insert paddr peer' pval
- _ -> do logd $ "Invalid channel accept - no peer identity"
- return pval
-
- else logd $ "Mismatched content"
+ (peer, content, secure) <- if
+ | Just peer <- mbpeer
+ , ChannelEstablished ch <- peerChannel peer
+ , Right plain <- runExcept $ channelDecrypt ch msg
+ -> return (peer, plain, True)
+
+ | Just peer <- mbpeer
+ -> return (peer, msg, False)
+
+ | otherwise -> do
+ pst <- deriveEphemeralStorage $ storedStorage sidentity
+ ist <- derivePartialStorage pst
+ let peer = Peer
+ { peerAddress = DatagramAddress paddr
+ , peerIdentity = PeerIdentityUnknown
+ , peerOwner = PeerIdentityUnknown
+ , peerChannel = ChannelWait
+ , peerSocket = sock
+ , peerStorage = pst
+ , peerInStorage = ist
+ , peerServiceQueue = []
+ , peerWaitingRefs = []
+ }
+ return (peer, msg, False)
+
+ case runExcept $ deserializeObjects (peerInStorage peer) $ BL.fromStrict content of
+ Right (obj:objs)
+ | Just header <- transportFromObject obj -> do
+ forM_ objs $ storeObject $ peerInStorage peer
+ handlePacket logd identity secure peer chanSvc header >>= \case
+ Just peer' -> do
+ modifyMVar_ peers $ return . M.insert paddr peer'
+ writeChan chanPeer peer'
+ Nothing -> return ()
+
+ | otherwise -> do
+ logd $ show paddr ++ ": invalid objects"
+ logd $ show objs
+
+ _ -> logd $ show paddr ++ ": invalid objects"
void $ forkIO $ withSocketsDo $ do
let hints = defaultHints
@@ -285,18 +222,249 @@ startServer logd bhost identity = do
return (chanPeer, chanSvc)
+type PacketHandler a = StateT PacketHandlerState (ExceptT String IO) a
-sendToPeer :: Storable a => UnifiedIdentity -> Peer -> T.Text -> a -> IO ()
-sendToPeer _ peer@Peer { peerChannels = ch:_ } svc obj = do
+data PacketHandlerState = PacketHandlerState
+ { phPeer :: Peer
+ , phPeerChanged :: Bool
+ , phHead :: [TransportHeaderItem]
+ , phBody :: [Ref]
+ }
+
+updatePeer :: (Peer -> Peer) -> PacketHandler ()
+updatePeer f = modify $ \ph -> ph { phPeer = f (phPeer ph), phPeerChanged = True }
+
+addHeader :: TransportHeaderItem -> PacketHandler ()
+addHeader h = modify $ \ph -> ph { phHead = h : phHead ph }
+
+addBody :: Ref -> PacketHandler ()
+addBody r = modify $ \ph -> ph { phBody = r : phBody ph }
+
+handlePacket :: (String -> IO ()) -> UnifiedIdentity -> Bool
+ -> Peer -> Chan (Peer, T.Text, Ref)
+ -> TransportHeader -> IO (Maybe Peer)
+handlePacket logd identity secure opeer chanSvc (TransportHeader headers) = do
+ let sidentity = idData identity
+ DatagramAddress paddr = peerAddress opeer
+ plaintextRefs = map (refDigest . storedRef) $ concatMap (collectStoredObjects . wrappedLoad) $ concat
+ [ [ storedRef sidentity ]
+ , case peerChannel opeer of
+ ChannelOurRequest req -> [ storedRef req ]
+ ChannelOurAccept acc _ -> [ storedRef acc ]
+ _ -> []
+ ]
+
+ res <- runExceptT $ flip execStateT (PacketHandlerState opeer False [] []) $ do
+ forM_ headers $ \case
+ Acknowledged ref -> do
+ gets (peerChannel . phPeer) >>= \case
+ ChannelOurAccept acc ch | refDigest (storedRef acc) == refDigest ref ->
+ updatePeer $ \p -> p { peerChannel = ChannelEstablished (fromStored ch) }
+ _ -> return ()
+
+ DataRequest ref
+ | secure || refDigest ref `elem` plaintextRefs -> do
+ Right mref <- copyRef (storedStorage sidentity) ref
+ addHeader $ DataResponse ref
+ addBody $ mref
+ | otherwise -> throwError $ "unauthorized data request for " ++ show ref
+
+ DataResponse ref -> do
+ liftIO (ioLoadBytes ref) >>= \case
+ Right _ -> do
+ addHeader $ Acknowledged ref
+ wait <- gets $ peerWaitingRefs . phPeer
+ wait' <- flip filterM wait $ receivedWaitingRef ref >=> \case
+ Just _ -> return False
+ Nothing -> return True
+ updatePeer $ \p -> p { peerWaitingRefs = wait' }
+ Left _ -> throwError $ "mismatched data response " ++ show ref
+
+ AnnounceSelf ref -> do
+ peer <- gets phPeer
+ if | Just ref' <- peerIdentityRef peer, refDigest ref' == refDigest ref -> return ()
+ | refDigest ref == refDigest (storedRef sidentity) -> return ()
+ | otherwise -> do
+ copyOrRequestRef (peerStorage peer) ref >>= \case
+ Right pref
+ | Just idt <- verifyIdentity (wrappedLoad pref) -> do
+ updatePeer $ \p -> p { peerIdentity = PeerIdentityFull idt
+ , peerOwner = PeerIdentityFull $ finalOwner idt
+ }
+ | otherwise -> throwError $ "broken identity " ++ show pref
+ Left wref -> updatePeer $ \p -> p { peerIdentity = PeerIdentityRef wref }
+
+ TrChannelRequest reqref -> do
+ addHeader $ Acknowledged reqref
+ pst <- gets $ peerStorage . phPeer
+ let process = handleChannelRequest identity =<< newWaitingRef pst reqref
+ gets (peerChannel . phPeer) >>= \case
+ ChannelWait {} -> process
+ ChannelOurRequest our | refDigest reqref < refDigest (storedRef our) -> process
+ | otherwise -> return ()
+ ChannelPeerRequest {} -> process
+ ChannelOurAccept {} -> return ()
+ ChannelEstablished {} -> process
+
+ TrChannelAccept accref -> do
+ addHeader $ Acknowledged accref
+ let process = handleChannelAccept identity accref
+ gets (peerChannel . phPeer) >>= \case
+ ChannelWait {} -> process
+ ChannelOurRequest {} -> process
+ ChannelPeerRequest {} -> process
+ ChannelOurAccept our _ | refDigest accref < refDigest (storedRef our) -> process
+ | otherwise -> return ()
+ ChannelEstablished {} -> process
+
+ ServiceType _ -> return ()
+ ServiceRef pref
+ | not secure -> throwError $ "service packet without secure channeel"
+ | Just svc <- lookupServiceType headers -> do
+ liftIO (ioLoadBytes pref) >>= \case
+ Right _ -> do
+ addHeader $ Acknowledged pref
+ pst <- gets $ peerStorage . phPeer
+ wref <- newWaitingRef pst pref
+ updatePeer $ \p -> p { peerServiceQueue = (svc, wref) : peerServiceQueue p }
+ Left _ -> throwError $ "missing service object " ++ show pref
+ | otherwise -> throwError $ "service ref without type"
+
+ setupChannel identity
+ handleServices chanSvc
+
+ case res of
+ Left err -> do
+ logd $ "Error in handling packet from " ++ show paddr ++ ": " ++ err
+ return Nothing
+ Right ph -> do
+ when (not $ null $ phHead ph) $ do
+ let plain = BL.toStrict $ BL.concat
+ [ serializeObject $ transportToObject $ TransportHeader $ reverse $ phHead ph
+ , BL.concat $ map lazyLoadBytes $ phBody ph
+ ]
+ case peerChannel opeer of
+ ChannelEstablished ch -> do
+ x <- runExceptT (channelEncrypt ch plain)
+ case x of Right ctext -> void $ sendTo (peerSocket $ phPeer ph) ctext paddr
+ Left err -> logd $ "Failed to encrypt data: " ++ err
+ _ -> void $ sendTo (peerSocket $ phPeer ph) plain paddr
+
+ return $ if phPeerChanged ph then Just $ phPeer ph
+ else Nothing
+
+
+getOrRequestIdentity :: PeerIdentity -> PacketHandler (Maybe UnifiedIdentity)
+getOrRequestIdentity = \case
+ PeerIdentityUnknown -> return Nothing
+ PeerIdentityRef wref -> checkWaitingRef wref >>= \case
+ Just ref -> case verifyIdentity $ wrappedLoad ref of
+ Nothing -> throwError $ "broken identity"
+ Just idt -> return $ Just idt
+ Nothing -> return Nothing
+ PeerIdentityFull idt -> return $ Just idt
+
+
+setupChannel :: UnifiedIdentity -> PacketHandler ()
+setupChannel identity = gets phPeer >>= \case
+ peer@Peer { peerChannel = ChannelWait } -> do
+ getOrRequestIdentity (peerIdentity peer) >>= \case
+ Just pid -> do
+ let ist = peerInStorage peer
+ req <- createChannelRequest (peerStorage peer) identity pid
+ updatePeer $ \p -> p { peerChannel = ChannelOurRequest req }
+ addHeader $ TrChannelRequest $ partialRef ist $ storedRef req
+ addHeader $ AnnounceSelf $ partialRef ist $ storedRef $ idData identity
+ addBody $ storedRef req
+ Nothing -> return ()
+
+ Peer { peerChannel = ChannelPeerRequest wref } -> do
+ handleChannelRequest identity wref
+
+ _ -> return ()
+
+handleChannelRequest :: UnifiedIdentity -> WaitingRef -> PacketHandler ()
+handleChannelRequest identity reqref = do
+ ist <- gets $ peerInStorage . phPeer
+ checkWaitingRef reqref >>= \case
+ Just req -> do
+ pid <- gets (peerIdentity . phPeer) >>= \case
+ PeerIdentityFull pid -> return pid
+ PeerIdentityRef wref -> do
+ Just idref <- checkWaitingRef wref
+ Just pid <- return $ verifyIdentity $ wrappedLoad idref
+ return pid
+ PeerIdentityUnknown -> throwError $ "unknown peer identity"
+
+ (acc, ch) <- acceptChannelRequest identity pid (wrappedLoad req)
+ updatePeer $ \p -> p
+ { peerIdentity = PeerIdentityFull pid
+ , peerOwner = case peerOwner p of
+ PeerIdentityUnknown -> PeerIdentityFull $ finalOwner pid
+ owner -> owner
+ , peerChannel = ChannelOurAccept acc ch
+ }
+ addHeader $ TrChannelAccept (partialRef ist $ storedRef acc)
+ mapM_ addBody $ concat
+ [ [ storedRef $ acc ]
+ , [ storedRef $ signedData $ fromStored acc ]
+ , [ storedRef $ caKey $ fromStored $ signedData $ fromStored acc ]
+ , map storedRef $ signedSignature $ fromStored acc
+ ]
+ Nothing -> do
+ updatePeer $ \p -> p { peerChannel = ChannelPeerRequest reqref }
+
+handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler ()
+handleChannelAccept identity accref = do
+ pst <- gets $ peerStorage . phPeer
+ copyRef pst accref >>= \case
+ Right acc -> do
+ pid <- gets (peerIdentity . phPeer) >>= \case
+ PeerIdentityFull pid -> return pid
+ PeerIdentityRef wref -> do
+ Just idref <- checkWaitingRef wref
+ Just pid <- return $ verifyIdentity $ wrappedLoad idref
+ return pid
+ PeerIdentityUnknown -> throwError $ "unknown peer identity"
+
+ ch <- acceptedChannel identity pid (wrappedLoad acc)
+ updatePeer $ \p -> p
+ { peerIdentity = PeerIdentityFull pid
+ , peerOwner = case peerOwner p of
+ PeerIdentityUnknown -> PeerIdentityFull $ finalOwner pid
+ owner -> owner
+ , peerChannel = ChannelEstablished $ fromStored ch
+ }
+ Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst)
+
+
+handleServices :: Chan (Peer, T.Text, Ref) -> PacketHandler ()
+handleServices chan = gets (peerServiceQueue . phPeer) >>= \case
+ [] -> return ()
+ queue -> do
+ queue' <- flip filterM queue $ \case
+ (svc, wref) -> checkWaitingRef wref >>= \case
+ Just ref -> do
+ peer <- gets phPeer
+ liftIO $ writeChan chan (peer, svc, ref)
+ return False
+ Nothing -> return True
+ updatePeer $ \p -> p { peerServiceQueue = queue' }
+
+
+sendToPeer :: (Storable a, MonadIO m, MonadError String m, MonadRandom m) => UnifiedIdentity -> Peer -> T.Text -> a -> m ()
+sendToPeer _ peer@Peer { peerChannel = ChannelEstablished ch } svc obj = do
let st = peerInStorage peer
- ref <- store st obj
- Just bytes <- return $ lazyLoadBytes ref
+ ref <- liftIO $ store st obj
+ bytes <- case lazyLoadBytes ref of
+ Right bytes -> return bytes
+ Left dgst -> throwError $ "incomplete ref " ++ show ref ++ ", missing " ++ BC.unpack (showRefDigest dgst)
let plain = BL.toStrict $ BL.concat
- [ serializeObject $ serviceToObject $ ServiceHeader svc ref
+ [ serializeObject $ transportToObject $ TransportHeader [ServiceType svc, ServiceRef ref]
, bytes
]
ctext <- channelEncrypt ch plain
let DatagramAddress paddr = peerAddress peer
- void $ sendTo (peerSocket peer) ctext paddr
+ void $ liftIO $ sendTo (peerSocket peer) ctext paddr
-sendToPeer _ _ _ _ = putStrLn $ "No channel to peer"
+sendToPeer _ _ _ _ = throwError $ "no channel to peer"