diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Erebos/Chatroom.hs | 133 | ||||
-rw-r--r-- | src/Erebos/Network.hs | 16 | ||||
-rw-r--r-- | src/Erebos/Network/Protocol.hs | 6 | ||||
-rw-r--r-- | src/Erebos/Network/ifaddrs.c | 50 | ||||
-rw-r--r-- | src/windows/Erebos/Storage/Platform.hs | 13 |
5 files changed, 197 insertions, 21 deletions
diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs index 673c59f..dcd7b42 100644 --- a/src/Erebos/Chatroom.hs +++ b/src/Erebos/Chatroom.hs @@ -16,7 +16,9 @@ module Erebos.Chatroom ( ChatroomSetChange(..), watchChatrooms, - ChatMessage, cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave, + ChatMessage, + cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave, + cmsgRoom, cmsgRoomData, ChatMessageData(..), chatroomMessageByStateData, @@ -29,6 +31,7 @@ import Control.Monad.Except import Control.Monad.IO.Class import Data.Bool +import Data.Either import Data.IORef import Data.List import Data.Maybe @@ -111,6 +114,11 @@ data ChatMessage = ChatMessage { cmsgData :: Stored (Signed ChatMessageData) } +validateSingleMessage :: Stored (Signed ChatMessageData) -> Maybe ChatMessage +validateSingleMessage sdata = do + guard $ fromStored sdata `isSignedBy` idKeyMessage (mdFrom (fromSigned sdata)) + return $ ChatMessage sdata + cmsgFrom :: ChatMessage -> ComposedIdentity cmsgFrom = mdFrom . fromSigned . cmsgData @@ -126,6 +134,12 @@ cmsgText = mdText . fromSigned . cmsgData cmsgLeave :: ChatMessage -> Bool cmsgLeave = mdLeave . fromSigned . cmsgData +cmsgRoom :: ChatMessage -> Maybe Chatroom +cmsgRoom = either (const Nothing) Just . runExcept . validateChatroom . cmsgRoomData + +cmsgRoomData :: ChatMessage -> [ Stored (Signed ChatroomData) ] +cmsgRoomData = concat . findProperty ((\case [] -> Nothing; xs -> Just xs) . mdRoom . fromStored . signedData) . (: []) . cmsgData + instance Storable ChatMessageData where store' ChatMessageData {..} = storeRec $ do mapM_ (storeRef "SPREV") mdPrev @@ -152,13 +166,11 @@ threadToList thread = helper S.empty $ thread helper :: S.Set (Stored (Signed ChatMessageData)) -> [Stored (Signed ChatMessageData)] -> [ChatMessage] helper seen msgs | msg : msgs' <- filter (`S.notMember` seen) $ reverse $ sortBy (comparing cmpView) msgs = - messageFromData msg : helper (S.insert msg seen) (msgs' ++ mdPrev (fromSigned msg)) + maybe id (:) (validateSingleMessage msg) $ + helper (S.insert msg seen) (msgs' ++ mdPrev (fromSigned msg)) | otherwise = [] cmpView msg = (zonedTimeToUTC $ mdTime $ fromSigned msg, msg) - messageFromData :: Stored (Signed ChatMessageData) -> ChatMessage - messageFromData sdata = ChatMessage { cmsgData = sdata } - chatroomMessageByStateData :: (MonadStorage m, MonadHead LocalState m, MonadError String m) => Stored ChatroomStateData -> Text -> m () @@ -170,7 +182,9 @@ chatroomMessageByStateData lookupData msg = void $ findAndUpdateChatroomState $ time <- liftIO getZonedTime mdata <- mstore =<< sign secret =<< mstore ChatMessageData { mdPrev = roomStateMessageData cstate - , mdRoom = [] + , mdRoom = if null (roomStateMessageData cstate) + then maybe [] roomData (roomStateRoom cstate) + else [] , mdFrom = self , mdReplyTo = Nothing , mdTime = time @@ -365,13 +379,18 @@ makeChatroomDiff [] ys = map (AddedChatroom . snd) ys data ChatroomService = ChatroomService { chatRoomQuery :: Bool , chatRoomInfo :: [Stored (Signed ChatroomData)] + , chatRoomSubscribe :: [Stored (Signed ChatroomData)] + , chatRoomUnsubscribe :: [Stored (Signed ChatroomData)] , chatRoomMessage :: [Stored (Signed ChatMessageData)] } + deriving (Eq) emptyPacket :: ChatroomService emptyPacket = ChatroomService { chatRoomQuery = False , chatRoomInfo = [] + , chatRoomSubscribe = [] + , chatRoomUnsubscribe = [] , chatRoomMessage = [] } @@ -379,17 +398,22 @@ instance Storable ChatroomService where store' ChatroomService {..} = storeRec $ do when chatRoomQuery $ storeEmpty "room-query" forM_ chatRoomInfo $ storeRef "room-info" + forM_ chatRoomSubscribe $ storeRef "room-subscribe" + forM_ chatRoomUnsubscribe $ storeRef "room-unsubscribe" forM_ chatRoomMessage $ storeRef "room-message" load' = loadRec $ do chatRoomQuery <- isJust <$> loadMbEmpty "room-query" chatRoomInfo <- loadRefs "room-info" + chatRoomSubscribe <- loadRefs "room-subscribe" + chatRoomUnsubscribe <- loadRefs "room-unsubscribe" chatRoomMessage <- loadRefs "room-message" return ChatroomService {..} data PeerState = PeerState { psSendRoomUpdates :: Bool , psLastList :: [(Stored ChatroomStateData, ChatroomState)] + , psSubscribedTo :: [ Stored (Signed ChatroomData) ] -- least root for each room } instance Service ChatroomService where @@ -399,6 +423,7 @@ instance Service ChatroomService where emptyServiceState _ = PeerState { psSendRoomUpdates = False , psLastList = [] + , psSubscribedTo = [] } serviceHandler spacket = do @@ -420,7 +445,7 @@ instance Service ChatroomService where maybe [] roomData . roomStateRoom let prev = concatMap roomStateData $ filter isCurrentRoom rooms - prevRoom = concatMap (rsdRoom . fromStored) prev + prevRoom = filterAncestors $ concat $ findProperty ((\case [] -> Nothing; xs -> Just xs) . rsdRoom) prev room = filterAncestors $ (roomInfo : ) prevRoom -- update local state only if we got roomInfo not present there @@ -436,6 +461,51 @@ instance Service ChatroomService where else return set foldM upd roomSet chatRoomInfo + forM_ chatRoomSubscribe $ \subscribeData -> do + mbRoomState <- findChatroomByRoomData subscribeData + forM_ mbRoomState $ \roomState -> + forM (roomStateRoom roomState) $ \room -> do + let leastRoot = head . filterAncestors . concatMap storedRoots . roomData $ room + svcModify $ \ps -> ps { psSubscribedTo = leastRoot : psSubscribedTo ps } + replyPacket emptyPacket + { chatRoomMessage = roomStateMessageData roomState + } + + forM_ chatRoomUnsubscribe $ \unsubscribeData -> do + mbRoomState <- findChatroomByRoomData unsubscribeData + forM_ (mbRoomState >>= roomStateRoom) $ \room -> do + let leastRoot = head . filterAncestors . concatMap storedRoots . roomData $ room + svcModify $ \ps -> ps { psSubscribedTo = filter (/= leastRoot) (psSubscribedTo ps) } + + when (not (null chatRoomMessage)) $ do + updateLocalHead_ $ updateSharedState_ $ \roomSet -> do + let rooms = fromSetBy (comparing $ roomName <=< roomStateRoom) roomSet + upd set (msgData :: Stored (Signed ChatMessageData)) + | Just msg <- validateSingleMessage msgData = do + let roomInfo = cmsgRoomData msg + currentRoots = filterAncestors $ concatMap storedRoots roomInfo + isCurrentRoom = any ((`intersectsSorted` currentRoots) . storedRoots) . + maybe [] roomData . roomStateRoom + + let prevData = concatMap roomStateData $ filter isCurrentRoom rooms + prev = mergeSorted prevData + prevMessages = roomStateMessageData prev + messages = filterAncestors $ msgData : prevMessages + + -- update local state only if subscribed and we got some new messages + if roomStateSubscribe prev && messages /= prevMessages + then do + sdata <- mstore ChatroomStateData + { rsdPrev = prevData + , rsdRoom = [] + , rsdSubscribe = Nothing + , rsdMessages = messages + } + storeSetAddComponent sdata set + else return set + | otherwise = return set + foldM upd roomSet chatRoomMessage + serviceNewPeer = do replyPacket emptyPacket { chatRoomQuery = True } @@ -447,11 +517,50 @@ syncChatroomsToPeer set = do ps@PeerState {..} <- svcGet when psSendRoomUpdates $ do let curList = chatroomSetToList set - updates <- fmap (concat . catMaybes) $ - forM (makeChatroomDiff psLastList curList) $ return . \case + diff = makeChatroomDiff psLastList curList + + roomUpdates <- fmap (concat . catMaybes) $ + forM diff $ return . \case AddedChatroom room -> roomData <$> roomStateRoom room RemovedChatroom {} -> Nothing - UpdatedChatroom _ room -> roomData <$> roomStateRoom room - when (not $ null updates) $ do - replyPacket $ emptyPacket { chatRoomInfo = updates } + UpdatedChatroom oldroom room + | roomStateData oldroom /= roomStateData room -> roomData <$> roomStateRoom room + | otherwise -> Nothing + + (subscribe, unsubscribe) <- fmap (partitionEithers . concat . catMaybes) $ + forM diff $ return . \case + AddedChatroom room + | roomStateSubscribe room + -> map Left . roomData <$> roomStateRoom room + RemovedChatroom oldroom + | roomStateSubscribe oldroom + -> map Right . roomData <$> roomStateRoom oldroom + UpdatedChatroom oldroom room + | roomStateSubscribe oldroom /= roomStateSubscribe room + -> map (if roomStateSubscribe room then Left else Right) . roomData <$> roomStateRoom room + _ -> Nothing + + messages <- fmap concat $ do + let leastRootFor = head . filterAncestors . concatMap storedRoots . roomData + forM diff $ return . \case + AddedChatroom rstate + | Just room <- roomStateRoom rstate + , leastRootFor room `elem` psSubscribedTo + -> roomStateMessageData rstate + UpdatedChatroom oldstate rstate + | Just room <- roomStateRoom rstate + , leastRootFor room `elem` psSubscribedTo + , roomStateMessageData oldstate /= roomStateMessageData rstate + -> roomStateMessageData rstate + _ -> [] + + let packet = emptyPacket + { chatRoomInfo = roomUpdates + , chatRoomSubscribe = subscribe + , chatRoomUnsubscribe = unsubscribe + , chatRoomMessage = messages + } + + when (packet /= emptyPacket) $ do + replyPacket packet svcSet $ ps { psLastList = curList } diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 41b6279..3896829 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -301,10 +301,11 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - case paddr of - DatagramAddress addr -> void $ S.sendTo sock msg addr + handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + case paddr of + DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg + PeerIceSession ice -> iceSend ice msg #endif forkServerThread server $ forever $ do @@ -922,6 +923,9 @@ getBroadcastAddresses port = do w <- peekElemOff ptr i if w == 0 then return [] else (SockAddrInet port w:) <$> parse (i + 1) - addrs <- parse 0 - cFree ptr - return addrs + if ptr == nullPtr + then return [] + else do + addrs <- parse 0 + cFree ptr + return addrs diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index a009ad1..b0047fc 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -440,7 +440,7 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do mStorage <- memoryStorage gStorage <- derivePartialStorage mStorage - startTime <- getTime MonotonicRaw + startTime <- getTime Monotonic gNowVar <- newTVarIO startTime gNextTimeout <- newTVarIO startTime gInitConfig <- store mStorage $ (Rec [] :: Object) @@ -448,7 +448,7 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do let gs = GlobalState {..} let signalTimeouts = forever $ do - now <- getTime MonotonicRaw + now <- getTime Monotonic next <- atomically $ do writeTVar gNowVar now readTVar gNextTimeout @@ -715,7 +715,7 @@ reservePacket conn@Connection {..} = do resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () resendBytes Connection {..} reserved sp = do let GlobalState {..} = cGlobalState - now <- getTime MonotonicRaw + now <- getTime Monotonic atomically $ do when (isJust reserved) $ do modifyTVar' cReservedPackets (subtract 1) diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c index 37c3e00..efeca18 100644 --- a/src/Erebos/Network/ifaddrs.c +++ b/src/Erebos/Network/ifaddrs.c @@ -1,5 +1,7 @@ #include "ifaddrs.h" +#ifndef _WIN32 + #include <arpa/inet.h> #include <ifaddrs.h> #include <net/if.h> @@ -39,3 +41,51 @@ uint32_t * broadcast_addresses(void) ret[count] = 0; return ret; } + +#else // _WIN32 + +#include <winsock2.h> +#include <ws2tcpip.h> + +#pragma comment(lib, "ws2_32.lib") + +uint32_t * broadcast_addresses(void) +{ + uint32_t * ret = NULL; + SOCKET wsock = INVALID_SOCKET; + + struct WSAData wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) + return NULL; + + wsock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, 0); + if (wsock == INVALID_SOCKET) + goto cleanup; + + INTERFACE_INFO InterfaceList[32]; + unsigned long nBytesReturned; + + if (WSAIoctl(wsock, SIO_GET_INTERFACE_LIST, 0, 0, + InterfaceList, sizeof(InterfaceList), + &nBytesReturned, 0, 0) == SOCKET_ERROR) + goto cleanup; + + int numInterfaces = nBytesReturned / sizeof(INTERFACE_INFO); + + size_t capacity = 16, count = 0; + ret = malloc(sizeof(uint32_t) * capacity); + + for (int i = 0; i < numInterfaces && count < capacity - 1; i++) + if (InterfaceList[i].iiFlags & IFF_BROADCAST) + ret[count++] = InterfaceList[i].iiBroadcastAddress.AddressIn.sin_addr.s_addr; + + ret[count] = 0; +cleanup: + if (wsock != INVALID_SOCKET) + closesocket(wsock); + WSACleanup(); + + return ret; +} + +#endif diff --git a/src/windows/Erebos/Storage/Platform.hs b/src/windows/Erebos/Storage/Platform.hs new file mode 100644 index 0000000..76c940b --- /dev/null +++ b/src/windows/Erebos/Storage/Platform.hs @@ -0,0 +1,13 @@ +module Erebos.Storage.Platform ( + createFileExclusive, +) where + +import Data.Bits + +import System.IO +import System.Win32.File +import System.Win32.Types + +createFileExclusive :: FilePath -> IO Handle +createFileExclusive path = do + hANDLEToHandle =<< createFile path gENERIC_WRITE (fILE_SHARE_READ .|. fILE_SHARE_DELETE) Nothing cREATE_NEW fILE_ATTRIBUTE_NORMAL Nothing |