diff options
Diffstat (limited to 'src')
34 files changed, 2798 insertions, 1657 deletions
diff --git a/src/Erebos/Attach.hs b/src/Erebos/Attach.hs index bd2f521..df61406 100644 --- a/src/Erebos/Attach.hs +++ b/src/Erebos/Attach.hs @@ -20,7 +20,7 @@ import Erebos.Pairing import Erebos.PubKey import Erebos.Service import Erebos.State -import Erebos.Storage +import Erebos.Storable import Erebos.Storage.Key type AttachService = PairingService AttachIdentity @@ -113,11 +113,11 @@ instance PairingResult AttachIdentity where svcPrint $ "Attachement failed" } -attachToOwner :: (MonadIO m, MonadError String m) => Peer -> m () +attachToOwner :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () attachToOwner = pairingRequest @AttachIdentity Proxy -attachAccept :: (MonadIO m, MonadError String m) => Peer -> m () +attachAccept :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () attachAccept = pairingAccept @AttachIdentity Proxy -attachReject :: (MonadIO m, MonadError String m) => Peer -> m () +attachReject :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () attachReject = pairingReject @AttachIdentity Proxy diff --git a/src/Erebos/Chatroom.hs b/src/Erebos/Chatroom.hs index dcd7b42..2d4f272 100644 --- a/src/Erebos/Chatroom.hs +++ b/src/Erebos/Chatroom.hs @@ -6,11 +6,16 @@ module Erebos.Chatroom ( ChatroomState(..), ChatroomStateData(..), createChatroom, + deleteChatroomByStateData, updateChatroomByStateData, listChatrooms, findChatroomByRoomData, findChatroomByStateData, chatroomSetSubscribe, + chatroomMembers, + joinChatroom, joinChatroomByStateData, + joinChatroomAs, joinChatroomAsByStateData, + leaveChatroom, leaveChatroomByStateData, getMessagesSinceState, ChatroomSetChange(..), @@ -20,7 +25,8 @@ module Erebos.Chatroom ( cmsgFrom, cmsgReplyTo, cmsgTime, cmsgText, cmsgLeave, cmsgRoom, cmsgRoomData, ChatMessageData(..), - chatroomMessageByStateData, + sendChatroomMessage, + sendChatroomMessageByStateData, ChatroomService(..), ) where @@ -32,6 +38,8 @@ import Control.Monad.IO.Class import Data.Bool import Data.Either +import Data.Foldable +import Data.Function import Data.IORef import Data.List import Data.Maybe @@ -46,7 +54,8 @@ import Erebos.PubKey import Erebos.Service import Erebos.Set import Erebos.State -import Erebos.Storage +import Erebos.Storable +import Erebos.Storage.Head import Erebos.Storage.Merge import Erebos.Util @@ -160,8 +169,8 @@ instance Storable ChatMessageData where mdLeave <- isJust <$> loadMbEmpty "leave" return ChatMessageData {..} -threadToList :: [Stored (Signed ChatMessageData)] -> [ChatMessage] -threadToList thread = helper S.empty $ thread +threadToListSince :: [ Stored (Signed ChatMessageData) ] -> [ Stored (Signed ChatMessageData) ] -> [ ChatMessage ] +threadToListSince since thread = helper (S.fromList since) thread where helper :: S.Set (Stored (Signed ChatMessageData)) -> [Stored (Signed ChatMessageData)] -> [ChatMessage] helper seen msgs @@ -171,30 +180,38 @@ threadToList thread = helper S.empty $ thread | otherwise = [] cmpView msg = (zonedTimeToUTC $ mdTime $ fromSigned msg, msg) -chatroomMessageByStateData - :: (MonadStorage m, MonadHead LocalState m, MonadError String m) +sendChatroomMessage + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => ChatroomState -> Text -> m () +sendChatroomMessage rstate msg = sendChatroomMessageByStateData (head $ roomStateData rstate) msg + +sendChatroomMessageByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) => Stored ChatroomStateData -> Text -> m () -chatroomMessageByStateData lookupData msg = void $ findAndUpdateChatroomState $ \cstate -> do +sendChatroomMessageByStateData lookupData msg = sendRawChatroomMessageByStateData lookupData Nothing Nothing (Just msg) False + +sendRawChatroomMessageByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => Stored ChatroomStateData -> Maybe UnifiedIdentity -> Maybe (Stored (Signed ChatMessageData)) -> Maybe Text -> Bool -> m () +sendRawChatroomMessageByStateData lookupData mbIdentity mdReplyTo mdText mdLeave = void $ findAndUpdateChatroomState $ \cstate -> do guard $ any (lookupData `precedesOrEquals`) $ roomStateData cstate Just $ do - self <- finalOwner . localIdentity . fromStored <$> getLocalHead - secret <- loadKey $ idKeyMessage self - time <- liftIO getZonedTime - mdata <- mstore =<< sign secret =<< mstore ChatMessageData - { mdPrev = roomStateMessageData cstate - , mdRoom = if null (roomStateMessageData cstate) - then maybe [] roomData (roomStateRoom cstate) - else [] - , mdFrom = self - , mdReplyTo = Nothing - , mdTime = time - , mdText = Just msg - , mdLeave = False - } - mergeSorted . (:[]) <$> mstore ChatroomStateData + mdFrom <- finalOwner <$> if + | Just identity <- mbIdentity -> return identity + | Just identity <- roomStateIdentity cstate -> return identity + | otherwise -> localIdentity . fromStored <$> getLocalHead + secret <- loadKey $ idKeyMessage mdFrom + mdTime <- liftIO getZonedTime + let mdPrev = roomStateMessageData cstate + mdRoom = if null (roomStateMessageData cstate) + then maybe [] roomData (roomStateRoom cstate) + else [] + + mdata <- mstore =<< sign secret =<< mstore ChatMessageData {..} + mergeSorted . (:[]) <$> mstore emptyChatroomStateData { rsdPrev = roomStateData cstate - , rsdRoom = [] - , rsdSubscribe = Just True + , rsdSubscribe = Just (not mdLeave) + , rsdIdentity = mbIdentity , rsdMessages = [ mdata ] } @@ -202,15 +219,29 @@ chatroomMessageByStateData lookupData msg = void $ findAndUpdateChatroomState $ data ChatroomStateData = ChatroomStateData { rsdPrev :: [Stored ChatroomStateData] , rsdRoom :: [Stored (Signed ChatroomData)] + , rsdDelete :: Bool , rsdSubscribe :: Maybe Bool + , rsdIdentity :: Maybe UnifiedIdentity , rsdMessages :: [Stored (Signed ChatMessageData)] } +emptyChatroomStateData :: ChatroomStateData +emptyChatroomStateData = ChatroomStateData + { rsdPrev = [] + , rsdRoom = [] + , rsdDelete = False + , rsdSubscribe = Nothing + , rsdIdentity = Nothing + , rsdMessages = [] + } + data ChatroomState = ChatroomState { roomStateData :: [Stored ChatroomStateData] , roomStateRoom :: Maybe Chatroom , roomStateMessageData :: [Stored (Signed ChatMessageData)] + , roomStateDeleted :: Bool , roomStateSubscribe :: Bool + , roomStateIdentity :: Maybe UnifiedIdentity , roomStateMessages :: [ChatMessage] } @@ -218,13 +249,17 @@ instance Storable ChatroomStateData where store' ChatroomStateData {..} = storeRec $ do forM_ rsdPrev $ storeRef "PREV" forM_ rsdRoom $ storeRef "room" + when rsdDelete $ storeEmpty "delete" forM_ rsdSubscribe $ storeInt "subscribe" . bool @Int 0 1 + forM_ rsdIdentity $ storeRef "id" . idExtData forM_ rsdMessages $ storeRef "msg" load' = loadRec $ do rsdPrev <- loadRefs "PREV" rsdRoom <- loadRefs "room" + rsdDelete <- isJust <$> loadMbEmpty "delete" rsdSubscribe <- fmap ((/=) @Int 0) <$> loadMbInt "subscribe" + rsdIdentity <- loadMbUnifiedIdentity "id" rsdMessages <- loadRefs "msg" return ChatroomStateData {..} @@ -237,8 +272,10 @@ instance Mergeable ChatroomState where roomStateMessageData = filterAncestors $ concat $ flip findProperty roomStateData $ \case ChatroomStateData {..} | null rsdMessages -> Nothing | otherwise -> Just rsdMessages - roomStateSubscribe = fromMaybe False $ findPropertyFirst rsdSubscribe roomStateData - roomStateMessages = threadToList $ concatMap (rsdMessages . fromStored) roomStateData + roomStateDeleted = any (rsdDelete . fromStored) roomStateData + roomStateSubscribe = not roomStateDeleted && (fromMaybe False $ findPropertyFirst rsdSubscribe roomStateData) + roomStateIdentity = findPropertyFirst rsdIdentity roomStateData + roomStateMessages = threadToListSince [] $ concatMap (rsdMessages . fromStored) roomStateData in ChatroomState {..} toComponents = roomStateData @@ -246,16 +283,14 @@ instance Mergeable ChatroomState where instance SharedType (Set ChatroomState) where sharedTypeID _ = mkSharedTypeID "7bc71cbf-bc43-42b1-b413-d3a2c9a2aae0" -createChatroom :: (MonadStorage m, MonadHead LocalState m, MonadIO m, MonadError String m) => Maybe Text -> Maybe Text -> m ChatroomState +createChatroom :: (MonadStorage m, MonadHead LocalState m, MonadIO m, MonadError e m, FromErebosError e) => Maybe Text -> Maybe Text -> m ChatroomState createChatroom rdName rdDescription = do (secret, rdKey) <- liftIO . generateKeys =<< getStorage let rdPrev = [] rdata <- mstore =<< sign secret =<< mstore ChatroomData {..} - cstate <- mergeSorted . (:[]) <$> mstore ChatroomStateData - { rsdPrev = [] - , rsdRoom = [ rdata ] + cstate <- mergeSorted . (:[]) <$> mstore emptyChatroomStateData + { rsdRoom = [ rdata ] , rsdSubscribe = Just True - , rsdMessages = [] } updateLocalHead $ updateSharedState $ \rooms -> do @@ -281,8 +316,19 @@ findAndUpdateChatroomState f = do return (roomSet, Just upd) [] -> return (roomSet, Nothing) +deleteChatroomByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => Stored ChatroomStateData -> m () +deleteChatroomByStateData lookupData = void $ findAndUpdateChatroomState $ \cstate -> do + guard $ any (lookupData `precedesOrEquals`) $ roomStateData cstate + Just $ do + mergeSorted . (:[]) <$> mstore emptyChatroomStateData + { rsdPrev = roomStateData cstate + , rsdDelete = True + } + updateChatroomByStateData - :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) => Stored ChatroomStateData -> Maybe Text -> Maybe Text @@ -298,16 +344,16 @@ updateChatroomByStateData lookupData newName newDesc = findAndUpdateChatroomStat , rdDescription = newDesc , rdKey = roomKey room } - mergeSorted . (:[]) <$> mstore ChatroomStateData + mergeSorted . (:[]) <$> mstore emptyChatroomStateData { rsdPrev = roomStateData cstate , rsdRoom = [ rdata ] , rsdSubscribe = Just True - , rsdMessages = [] } listChatrooms :: MonadHead LocalState m => m [ChatroomState] -listChatrooms = fromSetBy (comparing $ roomName <=< roomStateRoom) . +listChatrooms = filter (not . roomStateDeleted) . + fromSetBy (comparing $ roomName <=< roomStateRoom) . lookupSharedValue . lsShared . fromStored <$> getLocalHead findChatroom :: MonadHead LocalState m => (ChatroomState -> Bool) -> m (Maybe ChatroomState) @@ -323,23 +369,58 @@ findChatroomByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> findChatroomByStateData cdata = findChatroom $ any (cdata `precedesOrEquals`) . roomStateData chatroomSetSubscribe - :: (MonadStorage m, MonadHead LocalState m, MonadError String m) + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) => Stored ChatroomStateData -> Bool -> m () chatroomSetSubscribe lookupData subscribe = void $ findAndUpdateChatroomState $ \cstate -> do guard $ any (lookupData `precedesOrEquals`) $ roomStateData cstate Just $ do - mergeSorted . (:[]) <$> mstore ChatroomStateData + mergeSorted . (:[]) <$> mstore emptyChatroomStateData { rsdPrev = roomStateData cstate - , rsdRoom = [] , rsdSubscribe = Just subscribe - , rsdMessages = [] } +chatroomMembers :: ChatroomState -> [ ComposedIdentity ] +chatroomMembers ChatroomState {..} = + map (mdFrom . fromSigned . head) $ + filter (any $ not . mdLeave . fromSigned) $ -- keep only users that hasn't left + map (filterAncestors . map snd) $ -- gather message data per each identity and filter ancestors + groupBy ((==) `on` fst) $ -- group on identity root + sortBy (comparing fst) $ -- sort by first root of identity data + map (\x -> ( head . filterAncestors . concatMap storedRoots . idDataF . mdFrom . fromSigned $ x, x )) $ + toList $ ancestors $ roomStateMessageData + +joinChatroom + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => ChatroomState -> m () +joinChatroom rstate = joinChatroomByStateData (head $ roomStateData rstate) + +joinChatroomByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => Stored ChatroomStateData -> m () +joinChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupData Nothing Nothing Nothing False + +joinChatroomAs + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => UnifiedIdentity -> ChatroomState -> m () +joinChatroomAs identity rstate = joinChatroomAsByStateData identity (head $ roomStateData rstate) + +joinChatroomAsByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => UnifiedIdentity -> Stored ChatroomStateData -> m () +joinChatroomAsByStateData identity lookupData = sendRawChatroomMessageByStateData lookupData (Just identity) Nothing Nothing False + +leaveChatroom + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => ChatroomState -> m () +leaveChatroom rstate = leaveChatroomByStateData (head $ roomStateData rstate) + +leaveChatroomByStateData + :: (MonadStorage m, MonadHead LocalState m, MonadError e m, FromErebosError e) + => Stored ChatroomStateData -> m () +leaveChatroomByStateData lookupData = sendRawChatroomMessageByStateData lookupData Nothing Nothing Nothing True + getMessagesSinceState :: ChatroomState -> ChatroomState -> [ChatMessage] -getMessagesSinceState cur old = takeWhile notOld (roomStateMessages cur) - where - notOld msg = cmsgData msg `notElem` roomStateMessageData old - -- TODO: parallel message threads +getMessagesSinceState cur old = threadToListSince (roomStateMessageData old) (roomStateMessageData cur) data ChatroomSetChange = AddedChatroom ChatroomState @@ -358,7 +439,7 @@ watchChatrooms h f = liftIO $ do return $ makeChatroomDiff lastList curList chatroomSetToList :: Set ChatroomState -> [(Stored ChatroomStateData, ChatroomState)] -chatroomSetToList = map (cmp &&& id) . fromSetBy (comparing cmp) +chatroomSetToList = map (cmp &&& id) . filter (not . roomStateDeleted) . fromSetBy (comparing cmp) where cmp :: ChatroomState -> Stored ChatroomStateData cmp = head . filterAncestors . concatMap storedRoots . toComponents @@ -428,8 +509,13 @@ instance Service ChatroomService where serviceHandler spacket = do let ChatroomService {..} = fromStored spacket + + previouslyUpdated <- psSendRoomUpdates <$> svcGet svcModify $ \s -> s { psSendRoomUpdates = True } + when (not previouslyUpdated) $ do + syncChatroomsToPeer . lookupSharedValue . lsShared . fromStored =<< getLocalHead + when chatRoomQuery $ do rooms <- listChatrooms replyPacket emptyPacket @@ -451,11 +537,9 @@ instance Service ChatroomService where -- update local state only if we got roomInfo not present there if roomInfo `notElem` prevRoom && roomInfo `elem` room then do - sdata <- mstore ChatroomStateData + sdata <- mstore emptyChatroomStateData { rsdPrev = prev , rsdRoom = room - , rsdSubscribe = Nothing - , rsdMessages = [] } storeSetAddComponent sdata set else return set @@ -495,10 +579,8 @@ instance Service ChatroomService where -- update local state only if subscribed and we got some new messages if roomStateSubscribe prev && messages /= prevMessages then do - sdata <- mstore ChatroomStateData + sdata <- mstore emptyChatroomStateData { rsdPrev = prevData - , rsdRoom = [] - , rsdSubscribe = Nothing , rsdMessages = messages } storeSetAddComponent sdata set diff --git a/src/Erebos/Contact.hs b/src/Erebos/Contact.hs index d90aa50..25239b9 100644 --- a/src/Erebos/Contact.hs +++ b/src/Erebos/Contact.hs @@ -28,7 +28,7 @@ import Erebos.PubKey import Erebos.Service import Erebos.Set import Erebos.State -import Erebos.Storage +import Erebos.Storable import Erebos.Storage.Merge data Contact = Contact @@ -155,13 +155,13 @@ instance PairingResult ContactAccepted where svcPrint $ "Contact failed" } -contactRequest :: (MonadIO m, MonadError String m) => Peer -> m () +contactRequest :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () contactRequest = pairingRequest @ContactAccepted Proxy -contactAccept :: (MonadIO m, MonadError String m) => Peer -> m () +contactAccept :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () contactAccept = pairingAccept @ContactAccepted Proxy -contactReject :: (MonadIO m, MonadError String m) => Peer -> m () +contactReject :: (MonadIO m, MonadError e m, FromErebosError e) => Peer -> m () contactReject = pairingReject @ContactAccepted Proxy finalizeContact :: MonadHead LocalState m => UnifiedIdentity -> m () diff --git a/src/Erebos/Conversation.hs b/src/Erebos/Conversation.hs index 94d2399..dee6faa 100644 --- a/src/Erebos/Conversation.hs +++ b/src/Erebos/Conversation.hs @@ -1,12 +1,15 @@ module Erebos.Conversation ( Message, messageFrom, + messageTime, messageText, messageUnread, formatMessage, Conversation, directMessageConversation, + chatroomConversation, + chatroomConversationByStateData, reloadConversation, lookupConversations, @@ -15,6 +18,7 @@ module Erebos.Conversation ( conversationHistory, sendMessage, + deleteConversation, ) where import Control.Monad.Except @@ -23,30 +27,45 @@ import Data.List import Data.Maybe import Data.Text (Text) import Data.Text qualified as T +import Data.Time.Format import Data.Time.LocalTime +import Erebos.Chatroom +import Erebos.DirectMessage import Erebos.Identity -import Erebos.Message hiding (formatMessage) import Erebos.State -import Erebos.Storage +import Erebos.Storable data Message = DirectMessageMessage DirectMessage Bool + | ChatroomMessage ChatMessage Bool messageFrom :: Message -> ComposedIdentity messageFrom (DirectMessageMessage msg _) = msgFrom msg +messageFrom (ChatroomMessage msg _) = cmsgFrom msg + +messageTime :: Message -> ZonedTime +messageTime (DirectMessageMessage msg _) = msgTime msg +messageTime (ChatroomMessage msg _) = cmsgTime msg messageText :: Message -> Maybe Text messageText (DirectMessageMessage msg _) = Just $ msgText msg +messageText (ChatroomMessage msg _) = cmsgText msg messageUnread :: Message -> Bool messageUnread (DirectMessageMessage _ unread) = unread +messageUnread (ChatroomMessage _ unread) = unread formatMessage :: TimeZone -> Message -> String -formatMessage tzone (DirectMessageMessage msg _) = formatDirectMessage tzone msg +formatMessage tzone msg = concat + [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ messageTime msg + , maybe "<unnamed>" T.unpack $ idName $ messageFrom msg + , maybe "" ((": "<>) . T.unpack) $ messageText msg + ] data Conversation = DirectMessageConversation DirectMessageThread + | ChatroomConversation ChatroomState directMessageConversation :: MonadHead LocalState m => ComposedIdentity -> m Conversation directMessageConversation peer = do @@ -54,8 +73,16 @@ directMessageConversation peer = do Just thread -> return $ DirectMessageConversation thread Nothing -> return $ DirectMessageConversation $ DirectMessageThread peer [] [] [] +chatroomConversation :: MonadHead LocalState m => ChatroomState -> m (Maybe Conversation) +chatroomConversation rstate = chatroomConversationByStateData (head $ roomStateData rstate) + +chatroomConversationByStateData :: MonadHead LocalState m => Stored ChatroomStateData -> m (Maybe Conversation) +chatroomConversationByStateData sdata = fmap ChatroomConversation <$> findChatroomByStateData sdata + reloadConversation :: MonadHead LocalState m => Conversation -> m Conversation reloadConversation (DirectMessageConversation thread) = directMessageConversation (msgPeer thread) +reloadConversation cur@(ChatroomConversation rstate) = + fromMaybe cur <$> chatroomConversation rstate lookupConversations :: MonadHead LocalState m => m [Conversation] lookupConversations = map DirectMessageConversation . toThreadList . lookupSharedValue . lsShared . fromStored <$> getLocalHead @@ -63,13 +90,21 @@ lookupConversations = map DirectMessageConversation . toThreadList . lookupShare conversationName :: Conversation -> Text conversationName (DirectMessageConversation thread) = fromMaybe (T.pack "<unnamed>") $ idName $ msgPeer thread +conversationName (ChatroomConversation rstate) = fromMaybe (T.pack "<unnamed>") $ roomName =<< roomStateRoom rstate conversationPeer :: Conversation -> Maybe ComposedIdentity conversationPeer (DirectMessageConversation thread) = Just $ msgPeer thread +conversationPeer (ChatroomConversation _) = Nothing conversationHistory :: Conversation -> [Message] conversationHistory (DirectMessageConversation thread) = map (\msg -> DirectMessageMessage msg False) $ threadToList thread +conversationHistory (ChatroomConversation rstate) = map (\msg -> ChatroomMessage msg False) $ roomStateMessages rstate + +sendMessage :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> Text -> m (Maybe Message) +sendMessage (DirectMessageConversation thread) text = fmap Just $ DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False +sendMessage (ChatroomConversation rstate) text = sendChatroomMessage rstate text >> return Nothing -sendMessage :: (MonadHead LocalState m, MonadError String m) => Conversation -> Text -> m Message -sendMessage (DirectMessageConversation thread) text = DirectMessageMessage <$> (fromStored <$> sendDirectMessage (msgPeer thread) text) <*> pure False +deleteConversation :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => Conversation -> m () +deleteConversation (DirectMessageConversation _) = throwOtherError "deleting direct message conversation is not supported" +deleteConversation (ChatroomConversation rstate) = deleteChatroomByStateData (head $ roomStateData rstate) diff --git a/src/Erebos/Message.hs b/src/Erebos/DirectMessage.hs index 5ef27f3..28d8085 100644 --- a/src/Erebos/Message.hs +++ b/src/Erebos/DirectMessage.hs @@ -1,4 +1,4 @@ -module Erebos.Message ( +module Erebos.DirectMessage ( DirectMessage(..), sendDirectMessage, @@ -13,12 +13,10 @@ module Erebos.Message ( messageThreadView, watchReceivedMessages, - formatMessage, formatDirectMessage, ) where import Control.Monad -import Control.Monad.Except import Control.Monad.Reader import Data.List @@ -33,7 +31,8 @@ import Erebos.Identity import Erebos.Network import Erebos.Service import Erebos.State -import Erebos.Storage +import Erebos.Storable +import Erebos.Storage.Head import Erebos.Storage.Merge data DirectMessage = DirectMessage @@ -157,7 +156,7 @@ findMsgProperty pid sel mss = concat $ flip findProperty mss $ \x -> do return $ sel x -sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m, MonadError String m) +sendDirectMessage :: (Foldable f, Applicative f, MonadHead LocalState m) => Identity f -> Text -> m (Stored DirectMessage) sendDirectMessage pid text = updateLocalHead $ \ls -> do let self = localIdentity $ fromStored ls @@ -259,10 +258,6 @@ watchReceivedMessages h f = do forM_ (map fromStored sms) $ \ms -> do mapM_ f $ filter (not . sameIdentity self . msgFrom . fromStored) $ msReceived ms -{-# DEPRECATED formatMessage "use formatDirectMessage instead" #-} -formatMessage :: TimeZone -> DirectMessage -> String -formatMessage = formatDirectMessage - formatDirectMessage :: TimeZone -> DirectMessage -> String formatDirectMessage tzone msg = concat [ formatTime defaultTimeLocale "[%H:%M] " $ utcToLocalTime tzone $ zonedTimeToUTC $ msgTime msg diff --git a/src/Erebos/Discovery.hs b/src/Erebos/Discovery.hs index 48df9c3..d900363 100644 --- a/src/Erebos/Discovery.hs +++ b/src/Erebos/Discovery.hs @@ -1,5 +1,8 @@ +{-# LANGUAGE CPP #-} + module Erebos.Discovery ( DiscoveryService(..), + DiscoveryAttributes(..), DiscoveryConnection(..) ) where @@ -8,54 +11,83 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader +import Data.IP qualified as IP import Data.Map.Strict (Map) -import qualified Data.Map.Strict as M +import Data.Map.Strict qualified as M import Data.Maybe import Data.Text (Text) -import qualified Data.Text as T +import Data.Text qualified as T +import Data.Word import Network.Socket +#ifdef ENABLE_ICE_SUPPORT import Erebos.ICE +#endif import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.Service -import Erebos.Storage +import Erebos.Storable -keepaliveSeconds :: Int -keepaliveSeconds = 20 +data DiscoveryService + = DiscoverySelf [ Text ] (Maybe Int) + | DiscoveryAcknowledged [ Text ] (Maybe Text) (Maybe Word16) (Maybe Text) (Maybe Word16) + | DiscoverySearch Ref + | DiscoveryResult Ref [ Text ] + | DiscoveryConnectionRequest DiscoveryConnection + | DiscoveryConnectionResponse DiscoveryConnection +data DiscoveryAttributes = DiscoveryAttributes + { discoveryStunPort :: Maybe Word16 + , discoveryStunServer :: Maybe Text + , discoveryTurnPort :: Maybe Word16 + , discoveryTurnServer :: Maybe Text + } -data DiscoveryService = DiscoverySelf Text Int - | DiscoveryAcknowledged Text - | DiscoverySearch Ref - | DiscoveryResult Ref (Maybe Text) - | DiscoveryConnectionRequest DiscoveryConnection - | DiscoveryConnectionResponse DiscoveryConnection +defaultDiscoveryAttributes :: DiscoveryAttributes +defaultDiscoveryAttributes = DiscoveryAttributes + { discoveryStunPort = Nothing + , discoveryStunServer = Nothing + , discoveryTurnPort = Nothing + , discoveryTurnServer = Nothing + } data DiscoveryConnection = DiscoveryConnection { dconnSource :: Ref , dconnTarget :: Ref , dconnAddress :: Maybe Text - , dconnIceSession :: Maybe IceRemoteInfo +#ifdef ENABLE_ICE_SUPPORT + , dconnIceInfo :: Maybe IceRemoteInfo +#else + , dconnIceInfo :: Maybe (Stored Object) +#endif } emptyConnection :: Ref -> Ref -> DiscoveryConnection -emptyConnection source target = DiscoveryConnection source target Nothing Nothing +emptyConnection dconnSource dconnTarget = DiscoveryConnection {..} + where + dconnAddress = Nothing + dconnIceInfo = Nothing instance Storable DiscoveryService where store' x = storeRec $ do case x of - DiscoverySelf addr priority -> do - storeText "self" addr - storeInt "priority" priority - DiscoveryAcknowledged addr -> do - storeText "ack" addr + DiscoverySelf addrs priority -> do + mapM_ (storeText "self") addrs + mapM_ (storeInt "priority") priority + DiscoveryAcknowledged addrs stunServer stunPort turnServer turnPort -> do + if null addrs then storeEmpty "ack" + else mapM_ (storeText "ack") addrs + storeMbText "stun-server" stunServer + storeMbInt "stun-port" stunPort + storeMbText "turn-server" turnServer + storeMbInt "turn-port" turnPort DiscoverySearch ref -> storeRawRef "search" ref DiscoveryResult ref addr -> do storeRawRef "result" ref - storeMbText "address" addr + mapM_ (storeText "address") addr DiscoveryConnectionRequest conn -> storeConnection "request" conn DiscoveryConnectionResponse conn -> storeConnection "response" conn @@ -64,18 +96,28 @@ instance Storable DiscoveryService where storeRawRef "source" $ dconnSource conn storeRawRef "target" $ dconnTarget conn storeMbText "address" $ dconnAddress conn - storeMbRef "ice-session" $ dconnIceSession conn + storeMbRef "ice-info" $ dconnIceInfo conn load' = loadRec $ msum - [ DiscoverySelf - <$> loadText "self" - <*> loadInt "priority" - , DiscoveryAcknowledged - <$> loadText "ack" + [ do + addrs <- loadTexts "self" + guard (not $ null addrs) + DiscoverySelf addrs + <$> loadMbInt "priority" + , do + addrs <- loadTexts "ack" + mbEmpty <- loadMbEmpty "ack" + guard (not (null addrs) || isJust mbEmpty) + DiscoveryAcknowledged + <$> pure addrs + <*> loadMbText "stun-server" + <*> loadMbInt "stun-port" + <*> loadMbText "turn-server" + <*> loadMbInt "turn-port" , DiscoverySearch <$> loadRawRef "search" , DiscoveryResult <$> loadRawRef "result" - <*> loadMbText "address" + <*> loadTexts "address" , loadConnection "request" DiscoveryConnectionRequest , loadConnection "response" DiscoveryConnectionResponse ] @@ -86,109 +128,180 @@ instance Storable DiscoveryService where <$> loadRawRef "source" <*> loadRawRef "target" <*> loadMbText "address" - <*> loadMbRef "ice-session" + <*> loadMbRef "ice-info" data DiscoveryPeer = DiscoveryPeer { dpPriority :: Int , dpPeer :: Maybe Peer - , dpAddress :: Maybe Text + , dpAddress :: [ Text ] +#ifdef ENABLE_ICE_SUPPORT , dpIceSession :: Maybe IceSession +#endif } instance Service DiscoveryService where - serviceID _ = mkServiceID "dd59c89c-69cc-4703-b75b-4ddcd4b3c23b" + serviceID _ = mkServiceID "dd59c89c-69cc-4703-b75b-4ddcd4b3c23c" + + type ServiceAttributes DiscoveryService = DiscoveryAttributes + defaultServiceAttributes _ = defaultDiscoveryAttributes + +#ifdef ENABLE_ICE_SUPPORT + type ServiceState DiscoveryService = Maybe IceConfig + emptyServiceState _ = Nothing +#endif type ServiceGlobalState DiscoveryService = Map RefDigest DiscoveryPeer emptyServiceGlobalState _ = M.empty serviceHandler msg = case fromStored msg of - DiscoverySelf addr priority -> do + DiscoverySelf addrs priority -> do pid <- asks svcPeerIdentity peer <- asks svcPeer let insertHelper new old | dpPriority new > dpPriority old = new | otherwise = old - mbaddr <- case words (T.unpack addr) of - [ipaddr, port] | DatagramAddress paddr <- peerAddress peer -> do + matchedAddrs <- fmap catMaybes $ forM addrs $ \addr -> if + | addr == T.pack "ICE" -> do + return $ Just addr + + | [ ipaddr, port ] <- words (T.unpack addr) + , DatagramAddress paddr <- peerAddress peer -> do saddr <- liftIO $ head <$> getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) return $ if paddr == addrAddress saddr then Just addr else Nothing - _ -> return Nothing + + | otherwise -> return Nothing + forM_ (idDataF =<< unfoldOwners pid) $ \s -> - svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) $ - DiscoveryPeer priority (Just peer) mbaddr Nothing - replyPacket $ DiscoveryAcknowledged $ fromMaybe (T.pack "ICE") mbaddr - - DiscoveryAcknowledged addr -> do - when (addr == T.pack "ICE") $ do - -- keep-alive packet from behind NAT - peer <- asks svcPeer - liftIO $ void $ forkIO $ do - threadDelay (keepaliveSeconds * 1000 * 1000) - res <- runExceptT $ sendToPeer peer $ DiscoverySelf addr 0 - case res of - Right _ -> return () - Left err -> putStrLn $ "Discovery: failed to send keep-alive: " ++ err + svcModifyGlobal $ M.insertWith insertHelper (refDigest $ storedRef s) DiscoveryPeer + { dpPriority = fromMaybe 0 priority + , dpPeer = Just peer + , dpAddress = addrs +#ifdef ENABLE_ICE_SUPPORT + , dpIceSession = Nothing +#endif + } + attrs <- asks svcAttributes + replyPacket $ DiscoveryAcknowledged matchedAddrs + (discoveryStunServer attrs) + (discoveryStunPort attrs) + (discoveryTurnServer attrs) + (discoveryTurnPort attrs) + + DiscoveryAcknowledged _ stunServer stunPort turnServer turnPort -> do +#ifdef ENABLE_ICE_SUPPORT + paddr <- asks (peerAddress . svcPeer) >>= return . \case + (DatagramAddress saddr) -> case IP.fromSockAddr saddr of + Just (IP.IPv6 ipv6, _) + | (0, 0, 0xffff, ipv4) <- IP.fromIPv6w ipv6 + -> Just $ T.pack $ show (IP.toIPv4w ipv4) + Just (addr, _) + -> Just $ T.pack $ show addr + _ -> Nothing + _ -> Nothing + + let toIceServer Nothing Nothing = Nothing + toIceServer Nothing (Just port) = ( , port) <$> paddr + toIceServer (Just server) Nothing = Just ( server, 0 ) + toIceServer (Just server) (Just port) = Just ( server, port ) + + cfg <- liftIO $ iceCreateConfig + (toIceServer stunServer stunPort) + (toIceServer turnServer turnPort) + svcSet cfg +#endif + return () DiscoverySearch ref -> do - addr <- M.lookup (refDigest ref) <$> svcGetGlobal - replyPacket $ DiscoveryResult ref $ fromMaybe (T.pack "ICE") . dpAddress <$> addr + dpeer <- M.lookup (refDigest ref) <$> svcGetGlobal + replyPacket $ DiscoveryResult ref $ maybe [] dpAddress dpeer - DiscoveryResult ref Nothing -> do + DiscoveryResult ref [] -> do svcPrint $ "Discovery: " ++ show (refDigest ref) ++ " not found" - DiscoveryResult ref (Just addr) -> do + DiscoveryResult ref addrs -> do -- TODO: check if we really requested that server <- asks svcServer - if addr == T.pack "ICE" - then do - self <- svcSelf - peer <- asks svcPeer - ice <- liftIO $ iceCreate PjIceSessRoleControlling $ \ice -> do + self <- svcSelf + mbIceConfig <- svcGet + discoveryPeer <- asks svcPeer + let runAsService = runPeerService @DiscoveryService discoveryPeer + + liftIO $ void $ forkIO $ forM_ addrs $ \addr -> if + | addr == T.pack "ICE" +#ifdef ENABLE_ICE_SUPPORT + , Just config <- mbIceConfig + -> do + ice <- iceCreateSession config PjIceSessRoleControlling $ \ice -> do rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer peer $ - DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceSession = Just rinfo } + res <- runExceptT $ sendToPeer discoveryPeer $ + DiscoveryConnectionRequest (emptyConnection (storedRef $ idData self) ref) { dconnIceInfo = Just rinfo } case res of Right _ -> return () Left err -> putStrLn $ "Discovery: failed to send connection request: " ++ err - svcModifyGlobal $ M.insert (refDigest ref) $ - DiscoveryPeer 0 Nothing Nothing (Just ice) - else do - case words (T.unpack addr) of - [ipaddr, port] -> do - saddr <- liftIO $ head <$> - getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) - peer <- liftIO $ serverPeer server (addrAddress saddr) - svcModifyGlobal $ M.insert (refDigest ref) $ - DiscoveryPeer 0 (Just peer) Nothing Nothing + runAsService $ do + svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer + { dpPriority = 0 + , dpPeer = Nothing + , dpAddress = [] + , dpIceSession = Just ice + } +#else + -> do + return () +#endif + + | [ ipaddr, port ] <- words (T.unpack addr) -> do + saddr <- head <$> + getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) + peer <- serverPeer server (addrAddress saddr) + runAsService $ do + svcModifyGlobal $ M.insert (refDigest ref) DiscoveryPeer + { dpPriority = 0 + , dpPeer = Just peer + , dpAddress = [] +#ifdef ENABLE_ICE_SUPPORT + , dpIceSession = Nothing +#endif + } - _ -> svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr + | otherwise -> do + runAsService $ do + svcPrint $ "Discovery: invalid address in result: " ++ T.unpack addr DiscoveryConnectionRequest conn -> do self <- svcSelf let rconn = emptyConnection (dconnSource conn) (dconnTarget conn) if refDigest (dconnTarget conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) then do +#ifdef ENABLE_ICE_SUPPORT -- request for us, create ICE sesssion server <- asks svcServer peer <- asks svcPeer - liftIO $ void $ iceCreate PjIceSessRoleControlled $ \ice -> do - rinfo <- iceRemoteInfo ice - res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceSession = Just rinfo } - case res of - Right _ -> do - case dconnIceSession conn of - Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice - Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" - Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err + svcGet >>= \case + Just config -> do + liftIO $ void $ iceCreateSession config PjIceSessRoleControlled $ \ice -> do + rinfo <- iceRemoteInfo ice + res <- runExceptT $ sendToPeer peer $ DiscoveryConnectionResponse rconn { dconnIceInfo = Just rinfo } + case res of + Right _ -> do + case dconnIceInfo conn of + Just prinfo -> iceConnect ice prinfo $ void $ serverPeerIce server ice + Nothing -> putStrLn $ "Discovery: connection request without ICE remote info" + Left err -> putStrLn $ "Discovery: failed to send connection response: " ++ err + Nothing -> do + svcPrint $ "Discovery: ICE request from peer without ICE configuration" +#else + return () +#endif else do -- request to some of our peers, relay mbdp <- M.lookup (refDigest $ dconnTarget conn) <$> svcGetGlobal case mbdp of Nothing -> replyPacket $ DiscoveryConnectionResponse rconn - Just dp | Just addr <- dpAddress dp -> do + Just dp | addr : _ <- dpAddress dp -> do replyPacket $ DiscoveryConnectionResponse rconn { dconnAddress = Just addr } | Just dpeer <- dpPeer dp -> do sendToPeer dpeer $ DiscoveryConnectionRequest conn @@ -200,6 +313,7 @@ instance Service DiscoveryService where if refDigest (dconnSource conn) `elem` (map (refDigest . storedRef) $ idDataF =<< unfoldOwners self) then do -- response to our request, try to connect to the peer +#ifdef ENABLE_ICE_SUPPORT server <- asks svcServer if | Just addr <- dconnAddress conn , [ipaddr, port] <- words (T.unpack addr) -> do @@ -207,17 +321,37 @@ instance Service DiscoveryService where getAddrInfo (Just $ defaultHints { addrSocketType = Datagram }) (Just ipaddr) (Just port) peer <- liftIO $ serverPeer server (addrAddress saddr) svcModifyGlobal $ M.insert (refDigest $ dconnTarget conn) $ - DiscoveryPeer 0 (Just peer) Nothing Nothing + DiscoveryPeer 0 (Just peer) [] Nothing | Just dp <- M.lookup (refDigest $ dconnTarget conn) dpeers , Just ice <- dpIceSession dp - , Just rinfo <- dconnIceSession conn -> do + , Just rinfo <- dconnIceInfo conn -> do liftIO $ iceConnect ice rinfo $ void $ serverPeerIce server ice | otherwise -> svcPrint $ "Discovery: connection request failed" +#else + return () +#endif else do -- response to relayed request case M.lookup (refDigest $ dconnSource conn) dpeers of Just dp | Just dpeer <- dpPeer dp -> do sendToPeer dpeer $ DiscoveryConnectionResponse conn _ -> svcPrint $ "Discovery: failed to relay connection response" + + serviceNewPeer = do + server <- asks svcServer + peer <- asks svcPeer + + let addrToText saddr = do + ( addr, port ) <- IP.fromSockAddr saddr + Just $ T.pack $ show addr <> " " <> show port + addrs <- concat <$> sequence + [ catMaybes . map addrToText <$> liftIO (getServerAddresses server) +#ifdef ENABLE_ICE_SUPPORT + , return [ T.pack "ICE" ] +#endif + ] + + when (not $ null addrs) $ do + sendToPeer peer $ DiscoverySelf addrs Nothing diff --git a/src/Erebos/Error.hs b/src/Erebos/Error.hs new file mode 100644 index 0000000..3bb8736 --- /dev/null +++ b/src/Erebos/Error.hs @@ -0,0 +1,39 @@ +module Erebos.Error ( + ErebosError(..), + showErebosError, + + FromErebosError(..), + throwOtherError, +) where + +import Control.Monad.Except + + +data ErebosError + = ManyErrors [ ErebosError ] + | OtherError String + +showErebosError :: ErebosError -> String +showErebosError (ManyErrors errs) = unlines $ map showErebosError errs +showErebosError (OtherError str) = str + +instance Semigroup ErebosError where + ManyErrors [] <> b = b + a <> ManyErrors [] = a + ManyErrors a <> ManyErrors b = ManyErrors (a ++ b) + ManyErrors a <> b = ManyErrors (a ++ [ b ]) + a <> ManyErrors b = ManyErrors (a : b) + a@OtherError {} <> b@OtherError {} = ManyErrors [ a, b ] + +instance Monoid ErebosError where + mempty = ManyErrors [] + + +class FromErebosError e where + fromErebosError :: ErebosError -> e + +instance FromErebosError ErebosError where + fromErebosError = id + +throwOtherError :: (MonadError e m, FromErebosError e) => String -> m a +throwOtherError = throwError . fromErebosError . OtherError diff --git a/src/Erebos/ICE.chs b/src/Erebos/ICE.chs index 096ee0d..2c6f500 100644 --- a/src/Erebos/ICE.chs +++ b/src/Erebos/ICE.chs @@ -4,9 +4,11 @@ module Erebos.ICE ( IceSession, IceSessionRole(..), + IceConfig, IceRemoteInfo, - iceCreate, + iceCreateConfig, + iceCreateSession, iceDestroy, iceRemoteInfo, iceShow, @@ -17,35 +19,39 @@ module Erebos.ICE ( ) where import Control.Arrow -import Control.Concurrent.MVar +import Control.Concurrent import Control.Monad -import Control.Monad.Except import Control.Monad.Identity import Data.ByteString (ByteString, packCStringLen, useAsCString) -import qualified Data.ByteString.Lazy.Char8 as BLC +import Data.ByteString.Lazy.Char8 qualified as BLC import Data.ByteString.Unsafe import Data.Function import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.Text.Encoding as T -import qualified Data.Text.Read as T +import Data.Text qualified as T +import Data.Text.Encoding qualified as T +import Data.Text.Read qualified as T import Data.Void +import Data.Word import Foreign.C.String import Foreign.C.Types +import Foreign.ForeignPtr import Foreign.Marshal.Alloc import Foreign.Marshal.Array import Foreign.Ptr import Foreign.StablePtr import Erebos.Flow +import Erebos.Object +import Erebos.Storable import Erebos.Storage #include "pjproject.h" data IceSession = IceSession { isStrans :: PjIceStrans + , _isConfig :: IceConfig , isChan :: MVar (Either [ByteString] (Flow Void ByteString)) } @@ -111,19 +117,43 @@ instance StorableText IceCandidate where , icandPort = port , icandType = ctype } - _ -> throwError "failed to parse candidate" + _ -> throwOtherError "failed to parse candidate" {#enum pj_ice_sess_role as IceSessionRole {underscoreToCase} deriving (Show, Eq) #} +data PjIceStransCfg +newtype IceConfig = IceConfig (ForeignPtr PjIceStransCfg) + +foreign import ccall unsafe "pjproject.h &ice_cfg_free" + ice_cfg_free :: FunPtr (Ptr PjIceStransCfg -> IO ()) +foreign import ccall unsafe "pjproject.h ice_cfg_create" + ice_cfg_create :: CString -> Word16 -> CString -> Word16 -> IO (Ptr PjIceStransCfg) + +iceCreateConfig :: Maybe ( Text, Word16 ) -> Maybe ( Text, Word16 ) -> IO (Maybe IceConfig) +iceCreateConfig stun turn = + maybe ($ nullPtr) (withText . fst) stun $ \cstun -> + maybe ($ nullPtr) (withText . fst) turn $ \cturn -> do + cfg <- ice_cfg_create cstun (maybe 0 snd stun) cturn (maybe 0 snd turn) + if cfg == nullPtr + then return Nothing + else Just . IceConfig <$> newForeignPtr ice_cfg_free cfg + {#pointer *pj_ice_strans as ^ #} -iceCreate :: IceSessionRole -> (IceSession -> IO ()) -> IO IceSession -iceCreate role cb = do +iceCreateSession :: IceConfig -> IceSessionRole -> (IceSession -> IO ()) -> IO IceSession +iceCreateSession icfg@(IceConfig fcfg) role cb = do rec sptr <- newStablePtr sess - cbptr <- newStablePtr $ cb sess + cbptr <- newStablePtr $ do + -- The callback may be called directly from pj_ice_strans_create or later + -- from a different thread; make sure we use a different thread here + -- to avoid deadlock on accessing 'sess'. + forkIO $ cb sess sess <- IceSession - <$> {#call ice_create #} (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr) + <$> (withForeignPtr fcfg $ \cfg -> + {#call ice_create #} (castPtr cfg) (fromIntegral $ fromEnum role) (castStablePtrToPtr sptr) (castStablePtrToPtr cbptr) + ) + <*> pure icfg <*> (newMVar $ Left []) return $ sess diff --git a/src/Erebos/ICE/pjproject.c b/src/Erebos/ICE/pjproject.c index bb06b1f..e79fb9d 100644 --- a/src/Erebos/ICE/pjproject.c +++ b/src/Erebos/ICE/pjproject.c @@ -12,7 +12,6 @@ static struct { pj_caching_pool cp; pj_pool_t * pool; - pj_ice_strans_cfg cfg; pj_sockaddr def_addr; } ice; @@ -31,9 +30,9 @@ static void ice_perror(const char * msg, pj_status_t status) fprintf(stderr, "ICE: %s: %s\n", msg, err); } -static int ice_worker_thread(void * unused) +static int ice_worker_thread(void * vcfg) { - PJ_UNUSED_ARG(unused); + pj_ice_strans_cfg * cfg = (pj_ice_strans_cfg *) vcfg; while (true) { pj_time_val max_timeout = { 0, 0 }; @@ -41,7 +40,7 @@ static int ice_worker_thread(void * unused) max_timeout.msec = 500; - pj_timer_heap_poll(ice.cfg.stun_cfg.timer_heap, &timeout); + pj_timer_heap_poll(cfg->stun_cfg.timer_heap, &timeout); pj_assert(timeout.sec >= 0 && timeout.msec >= 0); if (timeout.msec >= 1000) @@ -50,7 +49,7 @@ static int ice_worker_thread(void * unused) if (PJ_TIME_VAL_GT(timeout, max_timeout)) timeout = max_timeout; - int c = pj_ioqueue_poll(ice.cfg.stun_cfg.ioqueue, &timeout); + int c = pj_ioqueue_poll(cfg->stun_cfg.ioqueue, &timeout); if (c < 0) pj_thread_sleep(PJ_TIME_VAL_MSEC(timeout)); } @@ -105,7 +104,7 @@ static void ice_init(void) if (done) { pthread_mutex_unlock(&mutex); - goto exit; + return; } pj_log_set_level(1); @@ -125,54 +124,94 @@ static void ice_init(void) pj_caching_pool_init(&ice.cp, NULL, 0); - pj_ice_strans_cfg_default(&ice.cfg); - ice.cfg.stun_cfg.pf = &ice.cp.factory; - ice.pool = pj_pool_create(&ice.cp.factory, "ice", 512, 512, NULL); - if (pj_timer_heap_create(ice.pool, 100, - &ice.cfg.stun_cfg.timer_heap) != PJ_SUCCESS) { - fprintf(stderr, "pj_timer_heap_create failed\n"); - goto exit; +exit: + done = true; + pthread_mutex_unlock(&mutex); +} + +pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, + const char * turn_server, uint16_t turn_port ) +{ + ice_init(); + + pj_ice_strans_cfg * cfg = malloc( sizeof(pj_ice_strans_cfg) ); + pj_ice_strans_cfg_default( cfg ); + + cfg->stun_cfg.pf = &ice.cp.factory; + if( pj_timer_heap_create( ice.pool, 100, + &cfg->stun_cfg.timer_heap ) != PJ_SUCCESS ){ + fprintf( stderr, "pj_timer_heap_create failed\n" ); + goto fail; } - if (pj_ioqueue_create(ice.pool, 16, &ice.cfg.stun_cfg.ioqueue) != PJ_SUCCESS) { - fprintf(stderr, "pj_ioqueue_create failed\n"); - goto exit; + if( pj_ioqueue_create( ice.pool, 16, &cfg->stun_cfg.ioqueue ) != PJ_SUCCESS ){ + fprintf( stderr, "pj_ioqueue_create failed\n" ); + goto fail; } pj_thread_t * thread; - if (pj_thread_create(ice.pool, "ice", &ice_worker_thread, - NULL, 0, 0, &thread) != PJ_SUCCESS) { - fprintf(stderr, "pj_thread_create failed\n"); - goto exit; + if( pj_thread_create( ice.pool, NULL, &ice_worker_thread, + cfg, 0, 0, &thread ) != PJ_SUCCESS ){ + fprintf( stderr, "pj_thread_create failed\n" ); + goto fail; } - ice.cfg.af = pj_AF_INET(); - ice.cfg.opt.aggressive = PJ_TRUE; + cfg->af = pj_AF_INET(); + cfg->opt.aggressive = PJ_TRUE; - ice.cfg.stun.server.ptr = "discovery1.erebosprotocol.net"; - ice.cfg.stun.server.slen = strlen(ice.cfg.stun.server.ptr); - ice.cfg.stun.port = 29670; + if( stun_server ){ + cfg->stun.server.ptr = malloc( strlen( stun_server )); + pj_strcpy2( &cfg->stun.server, stun_server ); + if( stun_port ) + cfg->stun.port = stun_port; + } - ice.cfg.turn.server = ice.cfg.stun.server; - ice.cfg.turn.port = ice.cfg.stun.port; - ice.cfg.turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; - ice.cfg.turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - ice.cfg.turn.conn_type = PJ_TURN_TP_UDP; + if( turn_server ){ + cfg->turn.server.ptr = malloc( strlen( turn_server )); + pj_strcpy2( &cfg->turn.server, turn_server ); + if( turn_port ) + cfg->turn.port = turn_port; + cfg->turn.auth_cred.type = PJ_STUN_AUTH_CRED_STATIC; + cfg->turn.auth_cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + cfg->turn.conn_type = PJ_TURN_TP_UDP; + } -exit: - done = true; - pthread_mutex_unlock(&mutex); + return cfg; +fail: + ice_cfg_free( cfg ); + return NULL; } -pj_ice_strans * ice_create(pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb) +void ice_cfg_free( pj_ice_strans_cfg * cfg ) +{ + if( ! cfg ) + return; + + if( cfg->turn.server.ptr ) + free( cfg->turn.server.ptr ); + + if( cfg->stun.server.ptr ) + free( cfg->stun.server.ptr ); + + if( cfg->stun_cfg.ioqueue ) + pj_ioqueue_destroy( cfg->stun_cfg.ioqueue ); + + if( cfg->stun_cfg.timer_heap ) + pj_timer_heap_destroy( cfg->stun_cfg.timer_heap ); + + free( cfg ); +} + +pj_ice_strans * ice_create( const pj_ice_strans_cfg * cfg, pj_ice_sess_role role, + HsStablePtr sptr, HsStablePtr cb ) { ice_init(); pj_ice_strans * res; - struct user_data * udata = malloc(sizeof(struct user_data)); + struct user_data * udata = calloc( 1, sizeof( struct user_data )); udata->role = role; udata->sptr = sptr; udata->cb_init = cb; @@ -182,8 +221,8 @@ pj_ice_strans * ice_create(pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr .on_ice_complete = cb_on_ice_complete, }; - pj_status_t status = pj_ice_strans_create(NULL, &ice.cfg, 1, - udata, &icecb, &res); + pj_status_t status = pj_ice_strans_create( NULL, cfg, 1, + udata, &icecb, &res ); if (status != PJ_SUCCESS) ice_perror("error creating ice", status); @@ -213,7 +252,9 @@ ssize_t ice_encode_session(pj_ice_strans * strans, char * ufrag, char * pass, pj_str_t local_ufrag, local_pwd; pj_status_t status; - pj_ice_strans_get_ufrag_pwd(strans, &local_ufrag, &local_pwd, NULL, NULL); + status = pj_ice_strans_get_ufrag_pwd( strans, &local_ufrag, &local_pwd, NULL, NULL ); + if( status != PJ_SUCCESS ) + return -status; n = snprintf(ufrag, maxlen, "%.*s", (int) local_ufrag.slen, local_ufrag.ptr); if (n < 0 || n == maxlen) @@ -356,7 +397,7 @@ void ice_send(pj_ice_strans * strans, const char * data, size_t len) return; } - pj_status_t status = pj_ice_strans_sendto(strans, 1, data, len, + pj_status_t status = pj_ice_strans_sendto2(strans, 1, data, len, &ice.def_addr, pj_sockaddr_get_len(&ice.def_addr)); if (status != PJ_SUCCESS && status != PJ_EPENDING) ice_perror("error sending data", status); diff --git a/src/Erebos/ICE/pjproject.h b/src/Erebos/ICE/pjproject.h index e230e75..e4fcbdb 100644 --- a/src/Erebos/ICE/pjproject.h +++ b/src/Erebos/ICE/pjproject.h @@ -3,7 +3,12 @@ #include <pjnath.h> #include <HsFFI.h> -pj_ice_strans * ice_create(pj_ice_sess_role role, HsStablePtr sptr, HsStablePtr cb); +pj_ice_strans_cfg * ice_cfg_create( const char * stun_server, uint16_t stun_port, + const char * turn_server, uint16_t turn_port ); +void ice_cfg_free( pj_ice_strans_cfg * cfg ); + +pj_ice_strans * ice_create( const pj_ice_strans_cfg *, pj_ice_sess_role role, + HsStablePtr sptr, HsStablePtr cb ); void ice_destroy(pj_ice_strans * strans); ssize_t ice_encode_session(pj_ice_strans *, char * ufrag, char * pass, diff --git a/src/Erebos/Identity.hs b/src/Erebos/Identity.hs index 8761fde..a3f17b5 100644 --- a/src/Erebos/Identity.hs +++ b/src/Erebos/Identity.hs @@ -13,7 +13,7 @@ module Erebos.Identity ( createIdentity, validateIdentity, validateIdentityF, validateIdentityFE, validateExtendedIdentity, validateExtendedIdentityF, validateExtendedIdentityFE, - loadIdentity, loadUnifiedIdentity, + loadIdentity, loadMbIdentity, loadUnifiedIdentity, loadMbUnifiedIdentity, mergeIdentity, toUnifiedIdentity, toComposedIdentity, updateIdentity, updateOwners, @@ -35,14 +35,13 @@ import Data.Foldable import Data.Function import Data.List import Data.Maybe -import Data.Ord import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) import qualified Data.Text as T import Erebos.PubKey -import Erebos.Storage +import Erebos.Storable import Erebos.Storage.Merge import Erebos.Util @@ -281,10 +280,16 @@ validateExtendedIdentityFE mdata = do Just mk -> return mk loadIdentity :: String -> LoadRec ComposedIdentity -loadIdentity name = maybe (throwError "identity validation failed") return . validateExtendedIdentityF =<< loadRefs name +loadIdentity name = maybe (throwOtherError "identity validation failed") return . validateExtendedIdentityF =<< loadRefs name + +loadMbIdentity :: String -> LoadRec (Maybe ComposedIdentity) +loadMbIdentity name = return . validateExtendedIdentityF =<< loadRefs name loadUnifiedIdentity :: String -> LoadRec UnifiedIdentity -loadUnifiedIdentity name = maybe (throwError "identity validation failed") return . validateExtendedIdentity =<< loadRef name +loadUnifiedIdentity name = maybe (throwOtherError "identity validation failed") return . validateExtendedIdentity =<< loadRef name + +loadMbUnifiedIdentity :: String -> LoadRec (Maybe UnifiedIdentity) +loadMbUnifiedIdentity name = return . (validateExtendedIdentity =<<) =<< loadMbRef name gatherPrevious :: Set (Stored (Signed ExtendedIdentityData)) -> [Stored (Signed ExtendedIdentityData)] -> Set (Stored (Signed ExtendedIdentityData)) @@ -304,27 +309,20 @@ verifySignatures sidd = do throwError "signature verification failed" lookupProperty :: forall a m. Foldable m => (ExtendedIdentityData -> Maybe a) -> m (Stored (Signed ExtendedIdentityData)) -> Maybe a -lookupProperty sel topHeads = findResult filteredLayers - where findPropHeads :: Stored (Signed ExtendedIdentityData) -> [(Stored (Signed ExtendedIdentityData), a)] - findPropHeads sobj | Just x <- sel $ fromSigned sobj = [(sobj, x)] - | otherwise = findPropHeads =<< (eiddPrev $ fromSigned sobj) - - propHeads :: [(Stored (Signed ExtendedIdentityData), a)] - propHeads = findPropHeads =<< toList topHeads - - historyLayers :: [Set (Stored (Signed ExtendedIdentityData))] - historyLayers = generations $ map fst propHeads +lookupProperty sel topHeads = findResult propHeads + where + findPropHeads :: Stored (Signed ExtendedIdentityData) -> [ Stored (Signed ExtendedIdentityData) ] + findPropHeads sobj | Just _ <- sel $ fromSigned sobj = [ sobj ] + | otherwise = findPropHeads =<< (eiddPrev $ fromSigned sobj) - filteredLayers :: [[(Stored (Signed ExtendedIdentityData), a)]] - filteredLayers = scanl (\cur obsolete -> filter ((`S.notMember` obsolete) . fst) cur) propHeads historyLayers + propHeads :: [ Stored (Signed ExtendedIdentityData) ] + propHeads = filterAncestors $ findPropHeads =<< toList topHeads - findResult ([(_, x)] : _) = Just x - findResult ([] : _) = Nothing - findResult [] = Nothing - findResult [xs] = Just $ snd $ minimumBy (comparing fst) xs - findResult (_:rest) = findResult rest + findResult :: [ Stored (Signed ExtendedIdentityData) ] -> Maybe a + findResult [] = Nothing + findResult xs = sel $ fromSigned $ minimum xs -mergeIdentity :: (MonadStorage m, MonadError String m, MonadIO m) => Identity f -> m UnifiedIdentity +mergeIdentity :: (MonadStorage m, MonadError e m, FromErebosError e, MonadIO m) => Identity f -> m UnifiedIdentity mergeIdentity idt | Just idt' <- toUnifiedIdentity idt = return idt' mergeIdentity idt@Identity {..} = do (owner, ownerData) <- case idOwner_ of @@ -385,8 +383,9 @@ updateOwners updates orig@Identity { idOwner_ = Just owner, idUpdates_ = cupdate updateOwners _ orig@Identity { idOwner_ = Nothing } = orig sameIdentity :: (Foldable m, Foldable m') => Identity m -> Identity m' -> Bool -sameIdentity x y = not $ S.null $ S.intersection (refset x) (refset y) - where refset idt = foldr S.insert (ancestors $ toList $ idDataF idt) (idDataF idt) +sameIdentity x y = intersectsSorted (roots x) (roots y) + where + roots idt = uniq $ sort $ concatMap storedRoots $ toList $ idDataF idt unfoldOwners :: (Foldable m) => Identity m -> [ComposedIdentity] diff --git a/src/Erebos/Network.hs b/src/Erebos/Network.hs index 41b6279..54658de 100644 --- a/src/Erebos/Network.hs +++ b/src/Erebos/Network.hs @@ -6,6 +6,7 @@ module Erebos.Network ( stopServer, getCurrentPeerList, getNextPeerChange, + getServerAddresses, ServerOptions(..), serverIdentity, defaultServerOptions, Peer, peerServer, peerStorage, @@ -19,7 +20,9 @@ module Erebos.Network ( #endif dropPeer, isPeerDropped, - sendToPeer, sendToPeerStored, sendToPeerWith, + sendToPeer, sendManyToPeer, + sendToPeerStored, sendManyToPeerStored, + sendToPeerWith, runPeerService, discoveryPort, @@ -44,20 +47,25 @@ import Data.Maybe import Data.Typeable import Data.Word +import Foreign.C.Types +import Foreign.Marshal.Alloc +import Foreign.Marshal.Array import Foreign.Ptr -import Foreign.Storable +import Foreign.Storable as F import GHC.Conc.Sync (unsafeIOToSTM) import Network.Socket hiding (ControlMessage) import qualified Network.Socket.ByteString as S -import Erebos.Channel +import Erebos.Error #ifdef ENABLE_ICE_SUPPORT import Erebos.ICE #endif import Erebos.Identity +import Erebos.Network.Channel import Erebos.Network.Protocol +import Erebos.Object.Internal import Erebos.PubKey import Erebos.Service import Erebos.State @@ -69,12 +77,16 @@ import Erebos.Storage.Merge discoveryPort :: PortNumber discoveryPort = 29665 +discoveryMulticastGroup :: HostAddress6 +discoveryMulticastGroup = tupleToHostAddress6 (0xff12, 0xb6a4, 0x6b1f, 0x0969, 0xcaee, 0xacc2, 0x5c93, 0x73e1) -- ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1 + announceIntervalSeconds :: Int announceIntervalSeconds = 60 data Server = Server { serverStorage :: Storage + , serverOptions :: ServerOptions , serverOrigHead :: Head LocalState , serverIdentity_ :: MVar UnifiedIdentity , serverThreads :: MVar [ThreadId] @@ -82,7 +94,7 @@ data Server = Server , serverRawPath :: SymFlow (PeerAddress, BC.ByteString) , serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress) , serverDataResponse :: TQueue (Peer, Maybe PartialRef) - , serverIOActions :: TQueue (ExceptT String IO ()) + , serverIOActions :: TQueue (ExceptT ErebosError IO ()) , serverServices :: [SomeService] , serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState) , serverPeers :: MVar (Map PeerAddress Peer) @@ -178,8 +190,8 @@ instance Ord PeerAddress where #endif -data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]) - | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()]) +data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) + | PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT ErebosError IO ()]) | PeerIdentityFull UnifiedIdentity peerIdentity :: MonadIO m => Peer -> m PeerIdentity @@ -221,7 +233,7 @@ forkServerThread server act = do return (t:ts) startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server -startServer opt serverOrigHead logd' serverServices = do +startServer serverOptions serverOrigHead logd' serverServices = do let serverStorage = headStorage serverOrigHead serverIdentity_ <- newMVar $ headLocalIdentity serverOrigHead serverThreads <- newMVar [] @@ -244,11 +256,9 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ dataResponseWorker server forkServerThread server $ forever $ do - either (atomically . logd) return =<< runExceptT =<< + either (atomically . logd . showErebosError) return =<< runExceptT =<< atomically (readTQueue serverIOActions) - broadcastAddreses <- getBroadcastAddresses discoveryPort - let open addr = do sock <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr) putMVar serverSocket sock @@ -259,9 +269,14 @@ startServer opt serverOrigHead logd' serverServices = do return sock loop sock = do - when (serverLocalDiscovery opt) $ forkServerThread server $ forever $ do - atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) broadcastAddreses - threadDelay $ announceIntervalSeconds * 1000 * 1000 + when (serverLocalDiscovery serverOptions) $ forkServerThread server $ do + announceAddreses <- fmap concat $ sequence $ + [ map (SockAddrInet6 discoveryPort 0 discoveryMulticastGroup) <$> joinMulticast sock + , getBroadcastAddresses discoveryPort + ] + forever $ do + atomically $ writeFlowBulk serverControlFlow $ map (SendAnnounce . DatagramAddress) announceAddreses + threadDelay $ announceIntervalSeconds * 1000 * 1000 let announceUpdate identity = do st <- derivePartialStorage serverStorage @@ -301,10 +316,11 @@ startServer opt serverOrigHead logd' serverServices = do forkServerThread server $ forever $ do (paddr, msg) <- readFlowIO serverRawPath - case paddr of - DatagramAddress addr -> void $ S.sendTo sock msg addr + handle (\(e :: IOException) -> atomically . logd $ "failed to send packet to " ++ show paddr ++ ": " ++ show e) $ do + case paddr of + DatagramAddress addr -> void $ S.sendTo sock msg addr #ifdef ENABLE_ICE_SUPPORT - PeerIceSession ice -> iceSend ice msg + PeerIceSession ice -> iceSend ice msg #endif forkServerThread server $ forever $ do @@ -365,7 +381,7 @@ startServer opt serverOrigHead logd' serverServices = do , addrFamily = AF_INET6 , addrSocketType = Datagram } - addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort opt) + addr:_ <- getAddrInfo (Just hints) Nothing (Just $ show $ serverPort serverOptions) bracket (open addr) close loop forkServerThread server $ forever $ do @@ -392,7 +408,7 @@ dataResponseWorker server = forever $ do Right ref -> do atomically (writeTVar tvar $ Right ref) forkServerThread server $ runExceptT (wrefAction wr ref) >>= \case - Left err -> atomically $ writeTQueue (serverErrorLog server) err + Left err -> atomically $ writeTQueue (serverErrorLog server) (showErebosError err) Right () -> return () return (Nothing, []) @@ -421,12 +437,18 @@ instance MonadFail PacketHandler where runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM () runPacketHandler secure peer@Peer {..} act = do let logd = writeTQueue $ serverErrorLog peerServer_ - runExceptT (flip execStateT (PacketHandlerState peer [] [] [] False) $ unPacketHandler act) >>= \case + runExceptT (flip execStateT (PacketHandlerState peer [] [] [] Nothing False) $ unPacketHandler act) >>= \case Left err -> do logd $ "Error in handling packet from " ++ show peerAddress ++ ": " ++ err Right ph -> do when (not $ null $ phHead ph) $ do - let packet = TransportPacket (TransportHeader $ phHead ph) (phBody ph) + body <- case phBodyStream ph of + Nothing -> return $ phBody ph + Just stream -> do + writeTQueue (serverIOActions peerServer_) $ void $ liftIO $ forkIO $ do + writeByteStringToStream stream $ BL.concat $ map lazyLoadBytes $ phBody ph + return [] + let packet = TransportPacket (TransportHeader $ phHead ph) body secreq = case (secure, phPlaintextReply ph) of (True, _) -> EncryptedOnly (False, False) -> PlaintextAllowed @@ -450,6 +472,7 @@ data PacketHandlerState = PacketHandlerState , phHead :: [TransportHeaderItem] , phAckedBy :: [TransportHeaderItem] , phBody :: [Ref] + , phBodyStream :: Maybe RawStreamWriter , phPlaintextReply :: Bool } @@ -462,6 +485,14 @@ addAckedBy hs = modify $ \ph -> ph { phAckedBy = foldr appendDistinct (phAckedBy addBody :: Ref -> PacketHandler () addBody r = modify $ \ph -> ph { phBody = r `appendDistinct` phBody ph } +sendBodyAsStream :: PacketHandler () +sendBodyAsStream = do + gets phBodyStream >>= \case + Nothing -> do + stream <- openStream + modify $ \ph -> ph { phBodyStream = Just stream } + Just _ -> return () + keepPlaintextReply :: PacketHandler () keepPlaintextReply = modify $ \ph -> ph { phPlaintextReply = True } @@ -517,8 +548,12 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ finalizedChannel peer ch identity _ -> return () - Rejected dgst -> do - logd $ "rejected by peer: " ++ show dgst + Rejected dgst + | peerRequest : _ <- mapMaybe (\case TrChannelRequest d -> Just d; _ -> Nothing) headers + , peerRequest < dgst + -> return () -- Our request was rejected due to lower priority + + | otherwise -> logd $ "rejected by peer: " ++ show dgst DataRequest dgst | secure || dgst `elem` plaintextRefs -> do @@ -532,15 +567,11 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = -- otherwise lost the channel, so keep the reply plaintext as well. when (not secure) keepPlaintextReply - let bytes = lazyLoadBytes mref -- TODO: MTU - if (secure && BL.length bytes > 500) - then do - stream <- openStream - liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do - writeByteStringToStream stream bytes - else do - addBody $ mref + when (secure && BL.length (lazyLoadBytes mref) > 500) + sendBodyAsStream + + addBody $ mref | otherwise -> do logd $ "unauthorized data request for " ++ show dgst addHeader $ Rejected dgst @@ -556,7 +587,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = liftSTM $ writeTQueue (serverIOActions server) $ void $ liftIO $ forkIO $ do (runExcept <$> readObjectsFromStream (peerInStorage peer) streamReader) >>= \case Left err -> atomically $ writeTQueue (serverErrorLog server) $ - "failed to receive object from stream: " <> err + "failed to receive object from stream: " <> showErebosError err Right objs -> do forM_ objs $ \obj -> do pref <- storeObject (peerInStorage peer) obj @@ -593,9 +624,15 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = ChannelCookieWait {} -> return () ChannelCookieReceived {} -> process ChannelCookieConfirmed {} -> process - ChannelOurRequest our | dgst < refDigest (storedRef our) -> process - | otherwise -> reject - ChannelPeerRequest {} -> process + ChannelOurRequest our + | dgst < refDigest (storedRef our) -> process + | otherwise -> do + -- Reject peer channel request with lower priority + addHeader $ TrChannelRequest $ refDigest $ storedRef our + reject + ChannelPeerRequest prev + | dgst == wrDigest prev -> addHeader $ Acknowledged dgst + | otherwise -> process ChannelOurAccept {} -> reject ChannelEstablished {} -> process ChannelClosed {} -> return () @@ -632,7 +669,7 @@ handlePacket identity secure peer chanSvc svcs (TransportHeader headers) prefs = _ -> return () -withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m () +withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT ErebosError IO ()) -> m () withPeerIdentity peer act = liftIO $ atomically $ readTVar (peerIdentityVar peer) >>= \case PeerIdentityUnknown tvar -> modifyTVar' tvar (act:) PeerIdentityRef _ tvar -> modifyTVar' tvar (act:) @@ -647,12 +684,14 @@ setupChannel identity peer upid = do [ TrChannelRequest reqref , AnnounceSelf $ refDigest $ storedRef $ idData identity ] + let sendChannelRequest = do + sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ + TransportPacket (TransportHeader hitems) [storedRef req] + setPeerChannel peer $ ChannelOurRequest req liftIO $ atomically $ do getPeerChannel peer >>= \case - ChannelCookieConfirmed -> do - sendToPeerPlain peer [ Acknowledged reqref, Rejected reqref ] $ - TransportPacket (TransportHeader hitems) [storedRef req] - setPeerChannel peer $ ChannelOurRequest req + ChannelCookieReceived -> sendChannelRequest + ChannelCookieConfirmed -> sendChannelRequest _ -> return () handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback @@ -686,7 +725,7 @@ handleChannelAccept identity accref = do sendToPeerS peer [] $ TransportPacket (TransportHeader [Acknowledged $ refDigest accref]) [] finalizedChannel peer ch identity - Left dgst -> throwError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) + Left dgst -> throwOtherError $ "missing accept data " ++ BC.unpack (showRefDigest dgst) finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM () @@ -806,10 +845,16 @@ isPeerDropped peer = liftIO $ atomically $ readTVar (peerState peer) >>= \case _ -> return False sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m () -sendToPeer peer packet = sendToPeerList peer [ServiceReply (Left packet) True] +sendToPeer peer = sendManyToPeer peer . (: []) + +sendManyToPeer :: (Service s, MonadIO m) => Peer -> [ s ] -> m () +sendManyToPeer peer = sendToPeerList peer . map (\part -> ServiceReply (Left part) True) sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m () -sendToPeerStored peer spacket = sendToPeerList peer [ServiceReply (Right spacket) True] +sendToPeerStored peer = sendManyToPeerStored peer . (: []) + +sendManyToPeerStored :: (Service s, MonadIO m) => Peer -> [ Stored s ] -> m () +sendManyToPeerStored peer = sendToPeerList peer . map (\part -> ServiceReply (Right part) True) sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m () sendToPeerList peer parts = do @@ -838,7 +883,7 @@ sendToPeerS = sendToPeerS' EncryptedOnly sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM () sendToPeerPlain = sendToPeerS' PlaintextAllowed -sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m () +sendToPeerWith :: forall s m e. (Service s, MonadIO m, MonadError e m, FromErebosError e) => Peer -> (ServiceState s -> ExceptT ErebosError IO (Maybe s, ServiceState s)) -> m () sendToPeerWith peer fobj = do let sproxy = Proxy @s sid = serviceID sproxy @@ -853,7 +898,7 @@ sendToPeerWith peer fobj = do case res of Right (Just obj) -> sendToPeer peer obj Right Nothing -> return () - Left err -> throwError err + Left err -> throwError $ fromErebosError err lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s) @@ -912,8 +957,57 @@ runPeerServiceOn mbservice peer handler = liftIO $ do logd $ "unhandled service '" ++ show (toUUID svc) ++ "'" +foreign import ccall unsafe "Network/ifaddrs.h join_multicast" cJoinMulticast :: CInt -> Ptr CSize -> IO (Ptr Word32) +foreign import ccall unsafe "Network/ifaddrs.h local_addresses" cLocalAddresses :: Ptr CSize -> IO (Ptr InetAddress) foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32) -foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO () +foreign import ccall unsafe "stdlib.h free" cFree :: Ptr a -> IO () + +data InetAddress = InetAddress { fromInetAddress :: IP.IP } + +instance F.Storable InetAddress where + sizeOf _ = sizeOf (undefined :: CInt) + 16 + alignment _ = 8 + + peek ptr = (unpackFamily <$> peekByteOff ptr 0) >>= \case + AF_INET -> InetAddress . IP.IPv4 . IP.fromHostAddress <$> peekByteOff ptr (sizeOf (undefined :: CInt)) + AF_INET6 -> InetAddress . IP.IPv6 . IP.toIPv6b . map fromIntegral <$> peekArray 16 (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) + _ -> fail "InetAddress: unknown family" + + poke ptr (InetAddress addr) = case addr of + IP.IPv4 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET) + pokeByteOff ptr (sizeOf (undefined :: CInt)) (IP.toHostAddress ip) + IP.IPv6 ip -> do + pokeByteOff ptr 0 (packFamily AF_INET6) + pokeArray (ptr `plusPtr` sizeOf (undefined :: CInt) :: Ptr Word8) (map fromIntegral $ IP.fromIPv6b ip) + +joinMulticast :: Socket -> IO [ Word32 ] +joinMulticast sock = + withFdSocket sock $ \fd -> + alloca $ \pcount -> do + ptr <- cJoinMulticast fd pcount + if ptr == nullPtr + then do + return [] + else do + count <- fromIntegral <$> peek pcount + res <- forM [ 0 .. count - 1 ] $ \i -> + peekElemOff ptr i + cFree ptr + return res + +getServerAddresses :: Server -> IO [ SockAddr ] +getServerAddresses Server {..} = do + alloca $ \pcount -> do + ptr <- cLocalAddresses pcount + if ptr == nullPtr + then do + return [] + else do + count <- fromIntegral <$> peek pcount + res <- peekArray count ptr + cFree ptr + return $ map (IP.toSockAddr . (, serverPort serverOptions ) . fromInetAddress) res getBroadcastAddresses :: PortNumber -> IO [SockAddr] getBroadcastAddresses port = do @@ -922,6 +1016,9 @@ getBroadcastAddresses port = do w <- peekElemOff ptr i if w == 0 then return [] else (SockAddrInet port w:) <$> parse (i + 1) - addrs <- parse 0 - cFree ptr - return addrs + if ptr == nullPtr + then return [] + else do + addrs <- parse 0 + cFree ptr + return addrs diff --git a/src/Erebos/Network.hs-boot b/src/Erebos/Network.hs-boot index 849bfc1..af77581 100644 --- a/src/Erebos/Network.hs-boot +++ b/src/Erebos/Network.hs-boot @@ -1,6 +1,6 @@ module Erebos.Network where -import Erebos.Storage +import Erebos.Object.Internal data Server data Peer diff --git a/src/Erebos/Channel.hs b/src/Erebos/Network/Channel.hs index 5f66637..d9679bd 100644 --- a/src/Erebos/Channel.hs +++ b/src/Erebos/Network/Channel.hs @@ -1,4 +1,4 @@ -module Erebos.Channel ( +module Erebos.Network.Channel ( Channel, ChannelRequest, ChannelRequestData(..), ChannelAccept, ChannelAcceptData(..), @@ -27,7 +27,7 @@ import Data.List import Erebos.Identity import Erebos.PubKey -import Erebos.Storage +import Erebos.Storable data Channel = Channel { chPeers :: [Stored (Signed IdentityData)] @@ -78,23 +78,23 @@ instance Storable ChannelAcceptData where keySize :: Int keySize = 32 -createChannelRequest :: (MonadStorage m, MonadIO m, MonadError String m) => UnifiedIdentity -> UnifiedIdentity -> m (Stored ChannelRequest) +createChannelRequest :: (MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => UnifiedIdentity -> UnifiedIdentity -> m (Stored ChannelRequest) createChannelRequest self peer = do (_, xpublic) <- liftIO . generateKeys =<< getStorage skey <- loadKey $ idKeyMessage self mstore =<< sign skey =<< mstore ChannelRequest { crPeers = sort [idData self, idData peer], crKey = xpublic } -acceptChannelRequest :: (MonadStorage m, MonadIO m, MonadError String m) => UnifiedIdentity -> UnifiedIdentity -> Stored ChannelRequest -> m (Stored ChannelAccept, Channel) +acceptChannelRequest :: (MonadStorage m, MonadIO m, MonadError e m, FromErebosError e) => UnifiedIdentity -> UnifiedIdentity -> Stored ChannelRequest -> m (Stored ChannelAccept, Channel) acceptChannelRequest self peer req = do case sequence $ map validateIdentity $ crPeers $ fromStored $ signedData $ fromStored req of - Nothing -> throwError $ "invalid peers in channel request" + Nothing -> throwOtherError $ "invalid peers in channel request" Just peers -> do when (not $ any (self `sameIdentity`) peers) $ - throwError $ "self identity missing in channel request peers" + throwOtherError $ "self identity missing in channel request peers" when (not $ any (peer `sameIdentity`) peers) $ - throwError $ "peer identity missing in channel request peers" + throwOtherError $ "peer identity missing in channel request peers" when (idKeyMessage peer `notElem` (map (sigKey . fromStored) $ signedSignature $ fromStored req)) $ - throwError $ "channel requent not signed by peer" + throwOtherError $ "channel requent not signed by peer" (xsecret, xpublic) <- liftIO . generateKeys =<< getStorage skey <- loadKey $ idKeyMessage self @@ -110,20 +110,20 @@ acceptChannelRequest self peer req = do return (acc, Channel {..}) -acceptedChannel :: (MonadIO m, MonadError String m) => UnifiedIdentity -> UnifiedIdentity -> Stored ChannelAccept -> m Channel +acceptedChannel :: (MonadIO m, MonadError e m, FromErebosError e) => UnifiedIdentity -> UnifiedIdentity -> Stored ChannelAccept -> m Channel acceptedChannel self peer acc = do let req = caRequest $ fromStored $ signedData $ fromStored acc case sequence $ map validateIdentity $ crPeers $ fromStored $ signedData $ fromStored req of - Nothing -> throwError $ "invalid peers in channel accept" + Nothing -> throwOtherError $ "invalid peers in channel accept" Just peers -> do when (not $ any (self `sameIdentity`) peers) $ - throwError $ "self identity missing in channel accept peers" + throwOtherError $ "self identity missing in channel accept peers" when (not $ any (peer `sameIdentity`) peers) $ - throwError $ "peer identity missing in channel accept peers" + throwOtherError $ "peer identity missing in channel accept peers" when (idKeyMessage peer `notElem` (map (sigKey . fromStored) $ signedSignature $ fromStored acc)) $ - throwError $ "channel accept not signed by peer" + throwOtherError $ "channel accept not signed by peer" when (idKeyMessage self `notElem` (map (sigKey . fromStored) $ signedSignature $ fromStored req)) $ - throwError $ "original channel request not signed by us" + throwOtherError $ "original channel request not signed by us" xsecret <- loadKey $ crKey $ fromStored $ signedData $ fromStored req let chPeers = crPeers $ fromStored $ signedData $ fromStored req @@ -137,23 +137,23 @@ acceptedChannel self peer acc = do return Channel {..} -channelEncrypt :: (ByteArray ba, MonadIO m, MonadError String m) => Channel -> ba -> m (ba, Word64) +channelEncrypt :: (ByteArray ba, MonadIO m, MonadError e m, FromErebosError e) => Channel -> ba -> m (ba, Word64) channelEncrypt Channel {..} plain = do count <- liftIO $ modifyMVar chCounterNextOut $ \c -> return (c + 1, c) let cbytes = convert $ BL.toStrict $ encode count nonce = nonce8 chNonceFixedOur cbytes state <- case initialize chKey =<< nonce of CryptoPassed state -> return state - CryptoFailed err -> throwError $ "failed to init chacha-poly1305 cipher: " <> show err + CryptoFailed err -> throwOtherError $ "failed to init chacha-poly1305 cipher: " <> show err let (ctext, state') = encrypt plain state tag = finalize state' return (BA.concat [ convert $ BA.drop 7 cbytes, ctext, convert tag ], count) -channelDecrypt :: (ByteArray ba, MonadIO m, MonadError String m) => Channel -> ba -> m (ba, Word64) +channelDecrypt :: (ByteArray ba, MonadIO m, MonadError e m, FromErebosError e) => Channel -> ba -> m (ba, Word64) channelDecrypt Channel {..} body = do when (BA.length body < 17) $ do - throwError $ "invalid encrypted data length" + throwOtherError $ "invalid encrypted data length" expectedCount <- liftIO $ readMVar chCounterNextIn let countByte = body `BA.index` 0 @@ -165,11 +165,11 @@ channelDecrypt Channel {..} body = do tag = BA.dropView body' blen state <- case initialize chKey =<< nonce of CryptoPassed state -> return state - CryptoFailed err -> throwError $ "failed to init chacha-poly1305 cipher: " <> show err + CryptoFailed err -> throwOtherError $ "failed to init chacha-poly1305 cipher: " <> show err let (plain, state') = decrypt (convert ctext) state when (not $ tag `BA.constEq` finalize state') $ do - throwError $ "tag validation falied" + throwOtherError $ "tag validation falied" liftIO $ modifyMVar_ chCounterNextIn $ return . max (guessedCount + 1) return (plain, guessedCount) diff --git a/src/Erebos/Network/Protocol.hs b/src/Erebos/Network/Protocol.hs index b0047fc..c340503 100644 --- a/src/Erebos/Network/Protocol.hs +++ b/src/Erebos/Network/Protocol.hs @@ -40,7 +40,17 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Trans +import Crypto.Cipher.ChaChaPoly1305 qualified as C +import Crypto.MAC.Poly1305 qualified as C (Auth(..), authTag) +import Crypto.Error +import Crypto.Random + +import Data.Binary +import Data.Binary.Get +import Data.Binary.Put import Data.Bits +import Data.ByteArray (Bytes, ScrubbedBytes) +import Data.ByteArray qualified as BA import Data.ByteString (ByteString) import Data.ByteString qualified as B import Data.ByteString.Char8 qualified as BC @@ -51,14 +61,15 @@ import Data.Maybe import Data.Text (Text) import Data.Text qualified as T import Data.Void -import Data.Word import System.Clock -import Erebos.Channel import Erebos.Flow import Erebos.Identity +import Erebos.Network.Channel +import Erebos.Object import Erebos.Service +import Erebos.Storable import Erebos.Storage @@ -68,6 +79,9 @@ protocolVersion = T.pack "0.1" protocolVersions :: [Text] protocolVersions = [protocolVersion] +keepAliveInternal :: TimeSpec +keepAliveInternal = fromNanoSecs $ 30 * 10^(9 :: Int) + data TransportPacket a = TransportPacket TransportHeader [a] @@ -101,6 +115,35 @@ data SecurityRequirement = PlaintextOnly | EncryptedOnly deriving (Eq, Ord) +data ParsedCookie = ParsedCookie + { cookieNonce :: C.Nonce + , cookieValidity :: Word32 + , cookieContent :: ByteString + , cookieMac :: C.Auth + } + +instance Eq ParsedCookie where + (==) = (==) `on` (\c -> ( BA.convert (cookieNonce c) :: ByteString, cookieValidity c, cookieContent c, cookieMac c )) + +instance Show ParsedCookie where + show ParsedCookie {..} = show (nonce, cookieValidity, cookieContent, mac) + where C.Auth mac = cookieMac + nonce = BA.convert cookieNonce :: ByteString + +instance Binary ParsedCookie where + put ParsedCookie {..} = do + putByteString $ BA.convert cookieNonce + putWord32be cookieValidity + putByteString $ BA.convert cookieMac + putByteString cookieContent + + get = do + Just cookieNonce <- maybeCryptoError . C.nonce12 <$> getByteString 12 + cookieValidity <- getWord32be + Just cookieMac <- maybeCryptoError . C.authTag <$> getByteString 16 + cookieContent <- BL.toStrict <$> getRemainingLazyByteString + return ParsedCookie {..} + isHeaderItemAcknowledged :: TransportHeaderItem -> Bool isHeaderItemAcknowledged = \case Acknowledged {} -> False @@ -165,9 +208,12 @@ data GlobalState addr = (Eq addr, Show addr) => GlobalState , gNextUp :: TMVar (Connection addr, (Bool, TransportPacket PartialObject)) , gLog :: String -> STM () , gStorage :: PartialStorage + , gStartTime :: TimeSpec , gNowVar :: TVar TimeSpec , gNextTimeout :: TVar TimeSpec , gInitConfig :: Ref + , gCookieKey :: ScrubbedBytes + , gCookieStartTime :: Word32 } data Connection addr = Connection @@ -186,6 +232,7 @@ data Connection addr = Connection , cReservedPackets :: TVar Int , cSentPackets :: TVar [SentPacket] , cToAcknowledge :: TVar [Integer] + , cNextKeepAlive :: TVar (Maybe TimeSpec) , cInStreams :: TVar [(Word8, Stream)] , cOutStreams :: TVar [(Word8, Stream)] } @@ -276,7 +323,7 @@ connAddWriteStream conn@Connection {..} = do Right (ctext, counter) -> do let isAcked = True return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing Nothing | secure -> return Nothing | otherwise -> return $ Just (plain, plainAckedBy) @@ -355,16 +402,16 @@ readStreamToList stream = readFlowIO stream >>= \case StreamData sq bytes -> fmap ((sq, bytes) :) <$> readStreamToList stream StreamClosed sqEnd -> return (sqEnd, []) -readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except String [PartialObject]) +readObjectsFromStream :: PartialStorage -> RawStreamReader -> IO (Except ErebosError [PartialObject]) readObjectsFromStream st stream = do (seqEnd, list) <- readStreamToList stream let validate s ((s', bytes) : rest) | s == s' = (bytes : ) <$> validate (s + 1) rest | s > s' = validate s rest - | otherwise = throwError "missing object chunk" + | otherwise = throwOtherError "missing object chunk" validate s [] | s == seqEnd = return [] - | otherwise = throwError "content length mismatch" + | otherwise = throwOtherError "content length mismatch" return $ do content <- BL.fromChunks <$> validate 0 list deserializeObjects st content @@ -387,7 +434,7 @@ data WaitingRef = WaitingRef , wrefStatus :: TVar (Either [RefDigest] Ref) } -type WaitingRefCallback = ExceptT String IO () +type WaitingRefCallback = ExceptT ErebosError IO () wrDigest :: WaitingRef -> RefDigest wrDigest = refDigest . wrefPartial @@ -440,11 +487,14 @@ erebosNetworkProtocol initialIdentity gLog gDataFlow gControlFlow = do mStorage <- memoryStorage gStorage <- derivePartialStorage mStorage - startTime <- getTime Monotonic - gNowVar <- newTVarIO startTime - gNextTimeout <- newTVarIO startTime + gStartTime <- getTime Monotonic + gNowVar <- newTVarIO gStartTime + gNextTimeout <- newTVarIO gStartTime gInitConfig <- store mStorage $ (Rec [] :: Object) + gCookieKey <- getRandomBytes 32 + gCookieStartTime <- runGet getWord32host . BL.pack . BA.unpack @ScrubbedBytes <$> getRandomBytes 4 + let gs = GlobalState {..} let signalTimeouts = forever $ do @@ -487,6 +537,7 @@ newConnection cGlobalState@GlobalState {..} addr = do cReservedPackets <- newTVar 0 cSentPackets <- newTVar [] cToAcknowledge <- newTVar [] + cNextKeepAlive <- newTVar Nothing cInStreams <- newTVar [] cOutStreams <- newTVar [] let conn = Connection {..} @@ -520,7 +571,7 @@ processIncoming gs@GlobalState {..} = do let parse = case B.uncons msg of Just (b, enc) | b .&. 0xE0 == 0x80 -> do - ch <- maybe (throwError "unexpected encrypted packet") return mbch + ch <- maybe (throwOtherError "unexpected encrypted packet") return mbch (dec, counter) <- channelDecrypt ch enc case B.uncons dec of @@ -535,19 +586,20 @@ processIncoming gs@GlobalState {..} = do return $ Right (snum, seq8, content, counter) Just (_, _) -> do - throwError "unexpected stream header" + throwOtherError "unexpected stream header" Nothing -> do - throwError "empty decrypted content" + throwOtherError "empty decrypted content" | b .&. 0xE0 == 0x60 -> do objs <- deserialize msg return $ Left (False, objs, Nothing) - | otherwise -> throwError "invalid packet" + | otherwise -> throwOtherError "invalid packet" - Nothing -> throwError "empty packet" + Nothing -> throwOtherError "empty packet" + now <- getTime Monotonic runExceptT parse >>= \case Right (Left (secure, objs, mbcounter)) | hobj:content <- objs @@ -562,6 +614,7 @@ processIncoming gs@GlobalState {..} = do case mbup of Just up -> putTMVar gNextUp (conn, (secure, up)) Nothing -> return () + updateKeepAlive conn now processAcknowledgements gs conn items ioAfter Nothing -> return () @@ -571,8 +624,9 @@ processIncoming gs@GlobalState {..} = do gLog $ show objs Right (Right (snum, seq8, content, counter)) - | Just Connection {..} <- mbconn + | Just conn@Connection {..} <- mbconn -> atomically $ do + updateKeepAlive conn now (lookup snum <$> readTVar cInStreams) >>= \case Nothing -> gLog $ "unexpected stream number " ++ show snum @@ -594,7 +648,7 @@ processIncoming gs@GlobalState {..} = do atomically $ gLog $ show addr <> ": stream packet without connection" Left err -> do - atomically $ gLog $ show addr <> ": failed to parse packet: " <> err + atomically $ gLog $ show addr <> ": failed to parse packet: " <> showErebosError err processPacket :: GlobalState addr -> Either addr (Connection addr) -> Bool -> TransportPacket a -> IO (Maybe (Connection addr, Maybe (TransportPacket a))) processPacket gs@GlobalState {..} econn secure packet@(TransportPacket (TransportHeader header) _) = if @@ -694,11 +748,38 @@ generateCookieHeaders Connection {..} ch = catMaybes <$> sequence [ echoHeader, _ -> return Nothing createCookie :: GlobalState addr -> addr -> IO Cookie -createCookie GlobalState {} addr = return (Cookie $ BC.pack $ show addr) +createCookie GlobalState {..} addr = do + (nonceBytes :: Bytes) <- getRandomBytes 12 + validUntil <- (fromNanoSecs (60 * 10^(9 :: Int)) +) <$> getTime Monotonic + let validSecondsFromStart = fromIntegral $ toNanoSecs (validUntil - gStartTime) `div` (10^(9 :: Int)) + cookieValidity = validSecondsFromStart - gCookieStartTime + plainContent = BC.pack (show addr) + throwCryptoErrorIO $ do + cookieNonce <- C.nonce12 nonceBytes + st1 <- C.initialize gCookieKey cookieNonce + let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1 + (cookieContent, st3) = C.encrypt plainContent st2 + cookieMac = C.finalize st3 + return $ Cookie $ BL.toStrict $ encode $ ParsedCookie {..} verifyCookie :: GlobalState addr -> addr -> Cookie -> IO Bool -verifyCookie GlobalState {} addr (Cookie cookie) = return $ show addr == BC.unpack cookie - +verifyCookie GlobalState {..} addr (Cookie cookie) = do + ctime <- getTime Monotonic + return $ fromMaybe False $ do + ( _, _, ParsedCookie {..} ) <- either (const Nothing) Just $ decodeOrFail $ BL.fromStrict cookie + maybeCryptoError $ do + st1 <- C.initialize gCookieKey cookieNonce + let st2 = C.finalizeAAD $ C.appendAAD (BL.toStrict $ runPut $ putWord32be cookieValidity) st1 + (plainContent, st3) = C.decrypt cookieContent st2 + mac = C.finalize st3 + + validSecondsFromStart = fromIntegral $ cookieValidity + gCookieStartTime + validUntil = gStartTime + fromNanoSecs (validSecondsFromStart * (10^(9 :: Int))) + return $ and + [ mac == cookieMac + , ctime <= validUntil + , show addr == BC.unpack plainContent + ] reservePacket :: Connection addr -> STM ReservedToSend reservePacket conn@Connection {..} = do @@ -713,7 +794,7 @@ reservePacket conn@Connection {..} = do return $ ReservedToSend Nothing (return ()) (atomically $ connClose conn) resendBytes :: Connection addr -> Maybe ReservedToSend -> SentPacket -> IO () -resendBytes Connection {..} reserved sp = do +resendBytes conn@Connection {..} reserved sp = do let GlobalState {..} = cGlobalState now <- getTime Monotonic atomically $ do @@ -726,6 +807,7 @@ resendBytes Connection {..} reserved sp = do , spRetryCount = spRetryCount sp + 1 } writeFlow gDataFlow (cAddress, spData sp) + updateKeepAlive conn now sendBytes :: Connection addr -> Maybe ReservedToSend -> ByteString -> IO () sendBytes conn reserved bs = resendBytes conn reserved @@ -738,6 +820,12 @@ sendBytes conn reserved bs = resendBytes conn reserved , spData = bs } +updateKeepAlive :: Connection addr -> TimeSpec -> STM () +updateKeepAlive Connection {..} now = do + let next = now + keepAliveInternal + writeTVar cNextKeepAlive $ Just next + + processOutgoing :: forall addr. GlobalState addr -> STM (IO ()) processOutgoing gs@GlobalState {..} = do @@ -777,11 +865,12 @@ processOutgoing gs@GlobalState {..} = do let onAck = sequence_ $ map (streamAccepted conn) $ catMaybes (map (\case StreamOpen n -> Just n; _ -> Nothing) hitems) - let mkPlain extraHeaders = - let header = TransportHeader $ map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems - in BL.concat $ - (serializeObject $ transportToObject gStorage header) - : map lazyLoadBytes content + let mkPlain extraHeaders + | combinedHeaderItems@(_:_) <- map AcknowledgedSingle acknowledge ++ extraHeaders ++ hitems = + BL.concat $ + (serializeObject $ transportToObject gStorage $ TransportHeader combinedHeaderItems) + : map lazyLoadBytes content + | otherwise = BL.empty let usePlaintext = do plain <- mkPlain <$> generateCookieHeaders conn channel @@ -793,7 +882,7 @@ processOutgoing gs@GlobalState {..} = do Right (ctext, counter) -> do let isAcked = any isHeaderItemAcknowledged hitems return $ Just (0x80 `B.cons` ctext, if isAcked then [ AcknowledgedSingle $ fromIntegral counter ] else []) - Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ err + Left err -> do atomically $ gLog $ "Failed to encrypt data: " ++ showErebosError err return Nothing mbs <- case (secure, mbch) of @@ -811,6 +900,13 @@ processOutgoing gs@GlobalState {..} = do sendBytes conn mbReserved' bs Nothing -> return () + let waitUntil :: TimeSpec -> TimeSpec -> STM () + waitUntil now till = do + nextTimeout <- readTVar gNextTimeout + if nextTimeout <= now || till < nextTimeout + then writeTVar gNextTimeout till + else retry + let retransmitPacket :: Connection addr -> STM (IO ()) retransmitPacket conn@Connection {..} = do now <- readTVar gNowVar @@ -819,11 +915,8 @@ processOutgoing gs@GlobalState {..} = do _ -> retry let nextTry = spTime sp + fromNanoSecs 1000000000 if | now < nextTry -> do - nextTimeout <- readTVar gNextTimeout - if nextTimeout <= now || nextTry < nextTimeout - then do writeTVar gNextTimeout nextTry - return $ return () - else retry + waitUntil now nextTry + return $ return () | spRetryCount sp < 2 -> do reserved <- reservePacket conn writeTVar cSentPackets rest @@ -863,11 +956,28 @@ processOutgoing gs@GlobalState {..} = do writeTVar gIdentity (nid, cur : past) return $ return () + let sendKeepAlive :: Connection addr -> STM (IO ()) + sendKeepAlive Connection {..} = do + readTVar cNextKeepAlive >>= \case + Nothing -> retry + Just next -> do + now <- readTVar gNowVar + if next <= now + then do + writeTVar cNextKeepAlive Nothing + identity <- fst <$> readTVar gIdentity + let header = TransportHeader [ AnnounceSelf $ refDigest $ storedRef $ idData identity ] + writeTQueue cSecureOutQueue (EncryptedOnly, TransportPacket header [], []) + else do + waitUntil now next + return $ return () + conns <- readTVar gConnections msum $ concat $ [ map retransmitPacket conns , map sendNextPacket conns , [ handleControlRequests ] + , map sendKeepAlive conns ] processAcknowledgements :: GlobalState addr -> Connection addr -> [TransportHeaderItem] -> STM (IO ()) diff --git a/src/Erebos/Network/ifaddrs.c b/src/Erebos/Network/ifaddrs.c index 37c3e00..ff4382a 100644 --- a/src/Erebos/Network/ifaddrs.c +++ b/src/Erebos/Network/ifaddrs.c @@ -1,11 +1,157 @@ #include "ifaddrs.h" +#include <errno.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifndef _WIN32 #include <arpa/inet.h> -#include <ifaddrs.h> #include <net/if.h> -#include <stdlib.h> -#include <sys/types.h> +#include <netinet/in.h> +#include <ifaddrs.h> #include <endian.h> +#include <sys/types.h> +#include <sys/socket.h> +#else +#include <winsock2.h> +#include <ws2ipdef.h> +#include <ws2tcpip.h> +#endif + +#define DISCOVERY_MULTICAST_GROUP "ff12:b6a4:6b1f:969:caee:acc2:5c93:73e1" + +uint32_t * join_multicast(int fd, size_t * count) +{ + size_t capacity = 16; + *count = 0; + uint32_t * interfaces = malloc(sizeof(uint32_t) * capacity); + +#ifdef _WIN32 + interfaces[0] = 0; + *count = 1; +#else + struct ifaddrs * addrs; + if (getifaddrs(&addrs) < 0) + return 0; + + for (struct ifaddrs * ifa = addrs; ifa; ifa = ifa->ifa_next) { + if( ifa->ifa_addr && ifa->ifa_addr->sa_family == AF_INET6 && + ! (ifa->ifa_flags & IFF_LOOPBACK) && + (ifa->ifa_flags & IFF_MULTICAST) && + ! IN6_IS_ADDR_LINKLOCAL( & ((struct sockaddr_in6 *) ifa->ifa_addr)->sin6_addr ) ){ + int idx = if_nametoindex(ifa->ifa_name); + + bool seen = false; + for (size_t i = 0; i < *count; i++) { + if (interfaces[i] == idx) { + seen = true; + break; + } + } + if (seen) + continue; + + if (*count + 1 >= capacity) { + capacity *= 2; + uint32_t * nret = realloc(interfaces, sizeof(uint32_t) * capacity); + if (nret) { + interfaces = nret; + } else { + free(interfaces); + *count = 0; + return NULL; + } + } + + interfaces[*count] = idx; + (*count)++; + } + } + + freeifaddrs(addrs); +#endif + + for (size_t i = 0; i < *count; i++) { + struct ipv6_mreq group; + group.ipv6mr_interface = interfaces[i]; + inet_pton(AF_INET6, DISCOVERY_MULTICAST_GROUP, &group.ipv6mr_multiaddr); + int ret = setsockopt(fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + (const void *) &group, sizeof(group)); + if (ret < 0) + fprintf(stderr, "IPV6_ADD_MEMBERSHIP failed: %s\n", strerror(errno)); + } + + return interfaces; +} + +static bool copy_local_address( struct InetAddress * dst, const struct sockaddr * src ) +{ + int family = src->sa_family; + + if( family == AF_INET ){ + struct in_addr * addr = & (( struct sockaddr_in * ) src)->sin_addr; + if (! ((ntohl( addr->s_addr ) & 0xff000000) == 0x7f000000) && // loopback + ! ((ntohl( addr->s_addr ) & 0xffff0000) == 0xa9fe0000) // link-local + ){ + dst->family = family; + memcpy( & dst->addr, addr, sizeof( * addr )); + return true; + } + } + + if( family == AF_INET6 ){ + struct in6_addr * addr = & (( struct sockaddr_in6 * ) src)->sin6_addr; + if (! IN6_IS_ADDR_LOOPBACK( addr ) && + ! IN6_IS_ADDR_LINKLOCAL( addr ) + ){ + dst->family = family; + memcpy( & dst->addr, addr, sizeof( * addr )); + return true; + } + } + + return false; +} + +#ifndef _WIN32 + +struct InetAddress * local_addresses( size_t * count ) +{ + struct ifaddrs * addrs; + if( getifaddrs( &addrs ) < 0 ) + return 0; + + * count = 0; + size_t capacity = 16; + struct InetAddress * ret = malloc( sizeof(* ret) * capacity ); + + for( struct ifaddrs * ifa = addrs; ifa; ifa = ifa->ifa_next ){ + if ( ifa->ifa_addr ){ + int family = ifa->ifa_addr->sa_family; + if( family == AF_INET || family == AF_INET6 ){ + if( (* count) >= capacity ){ + capacity *= 2; + struct InetAddress * nret = realloc( ret, sizeof(* ret) * capacity ); + if (nret) { + ret = nret; + } else { + free( ret ); + freeifaddrs( addrs ); + return 0; + } + } + + if( copy_local_address( & ret[ * count ], ifa->ifa_addr )) + (* count)++; + } + } + } + + freeifaddrs(addrs); + return ret; +} uint32_t * broadcast_addresses(void) { @@ -26,6 +172,7 @@ uint32_t * broadcast_addresses(void) ret = nret; } else { free(ret); + freeifaddrs(addrs); return 0; } } @@ -39,3 +186,94 @@ uint32_t * broadcast_addresses(void) ret[count] = 0; return ret; } + +#else // _WIN32 + +#include <winsock2.h> +#include <ws2tcpip.h> +#include <iptypes.h> +#include <iphlpapi.h> + +#pragma comment(lib, "ws2_32.lib") + +struct InetAddress * local_addresses( size_t * count ) +{ + * count = 0; + struct InetAddress * ret = NULL; + + ULONG bufsize = 15000; + IP_ADAPTER_ADDRESSES * buf = NULL; + + DWORD rv = 0; + + do { + buf = realloc( buf, bufsize ); + rv = GetAdaptersAddresses( AF_UNSPEC, 0, NULL, buf, & bufsize ); + + if( rv == ERROR_BUFFER_OVERFLOW ) + continue; + } while (0); + + if( rv == NO_ERROR ){ + size_t capacity = 16; + ret = malloc( sizeof( * ret ) * capacity ); + + for( IP_ADAPTER_ADDRESSES * cur = (IP_ADAPTER_ADDRESSES *) buf; + cur && (* count) < capacity; + cur = cur->Next ){ + + for( IP_ADAPTER_UNICAST_ADDRESS * curAddr = cur->FirstUnicastAddress; + curAddr && (* count) < capacity; + curAddr = curAddr->Next ){ + + if( copy_local_address( & ret[ * count ], curAddr->Address.lpSockaddr )) + (* count)++; + } + } + } + +cleanup: + free( buf ); + return ret; +} + +uint32_t * broadcast_addresses(void) +{ + uint32_t * ret = NULL; + SOCKET wsock = INVALID_SOCKET; + + struct WSAData wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) + return NULL; + + wsock = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, 0); + if (wsock == INVALID_SOCKET) + goto cleanup; + + INTERFACE_INFO InterfaceList[32]; + unsigned long nBytesReturned; + + if (WSAIoctl(wsock, SIO_GET_INTERFACE_LIST, 0, 0, + InterfaceList, sizeof(InterfaceList), + &nBytesReturned, 0, 0) == SOCKET_ERROR) + goto cleanup; + + int numInterfaces = nBytesReturned / sizeof(INTERFACE_INFO); + + size_t capacity = 16, count = 0; + ret = malloc(sizeof(uint32_t) * capacity); + + for (int i = 0; i < numInterfaces && count < capacity - 1; i++) + if (InterfaceList[i].iiFlags & IFF_BROADCAST) + ret[count++] = InterfaceList[i].iiBroadcastAddress.AddressIn.sin_addr.s_addr; + + ret[count] = 0; +cleanup: + if (wsock != INVALID_SOCKET) + closesocket(wsock); + WSACleanup(); + + return ret; +} + +#endif diff --git a/src/Erebos/Network/ifaddrs.h b/src/Erebos/Network/ifaddrs.h index 06d26ec..2ee45a7 100644 --- a/src/Erebos/Network/ifaddrs.h +++ b/src/Erebos/Network/ifaddrs.h @@ -1,3 +1,18 @@ +#include <stddef.h> #include <stdint.h> +#ifndef _WIN32 +#include <sys/socket.h> +#else +#include <winsock2.h> +#endif + +struct InetAddress +{ + int family; + uint8_t addr[16]; +} __attribute__((packed)); + +uint32_t * join_multicast(int fd, size_t * count); +struct InetAddress * local_addresses( size_t * count ); uint32_t * broadcast_addresses(void); diff --git a/src/Erebos/Object.hs b/src/Erebos/Object.hs new file mode 100644 index 0000000..26ca09f --- /dev/null +++ b/src/Erebos/Object.hs @@ -0,0 +1,22 @@ +{-| +Description: Core Erebos objects and references + +Data types and functions for working with "raw" Erebos objects and references. +-} + +module Erebos.Object ( + Object, PartialObject, Object'(..), + serializeObject, deserializeObject, deserializeObjects, + ioLoadObject, ioLoadBytes, + storeRawBytes, lazyLoadBytes, + + RecItem, RecItem'(..), + + Ref, PartialRef, RefDigest, + refDigest, + readRef, showRef, showRefDigest, + refDigestFromByteString, hashToRefDigest, + copyRef, partialRef, partialRefFromDigest, +) where + +import Erebos.Object.Internal diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs new file mode 100644 index 0000000..4bca49c --- /dev/null +++ b/src/Erebos/Object/Internal.hs @@ -0,0 +1,727 @@ +module Erebos.Object.Internal ( + Storage, PartialStorage, StorageCompleteness, + + Ref, PartialRef, RefDigest, + refDigest, + readRef, showRef, showRefDigest, + refDigestFromByteString, hashToRefDigest, + copyRef, partialRef, partialRefFromDigest, + + Object, PartialObject, Object'(..), RecItem, RecItem'(..), + serializeObject, deserializeObject, deserializeObjects, + ioLoadObject, ioLoadBytes, + storeRawBytes, lazyLoadBytes, + storeObject, + collectObjects, collectStoredObjects, + + MonadStorage(..), + + Storable(..), ZeroStorable(..), + StorableText(..), StorableDate(..), StorableUUID(..), + + Store, StoreRec, + evalStore, evalStoreObject, + storeBlob, storeRec, storeZero, + storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, + storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, + storeZRef, + storeRecItems, + + Load, LoadRec, + evalLoad, + loadCurrentRef, loadCurrentObject, + loadRecCurrentRef, loadRecItems, + + loadBlob, loadRec, loadZero, + loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, + loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, + loadTexts, loadBinaries, loadRefs, loadRawRefs, + loadZRef, + + Stored, + fromStored, storedRef, + wrappedStore, wrappedLoad, + copyStored, + unsafeMapStored, +) where + +import Control.Applicative +import Control.Monad +import Control.Monad.Except +import Control.Monad.Reader +import Control.Monad.Writer + +import Crypto.Hash + +import Data.Bifunctor +import Data.ByteString (ByteString) +import qualified Data.ByteArray as BA +import qualified Data.ByteString as B +import qualified Data.ByteString.Char8 as BC +import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString.Lazy.Char8 as BLC +import Data.Char +import Data.Function +import Data.Maybe +import Data.Ratio +import Data.Set (Set) +import qualified Data.Set as S +import Data.Text (Text) +import qualified Data.Text as T +import Data.Text.Encoding +import Data.Text.Encoding.Error +import Data.Time.Calendar +import Data.Time.Clock +import Data.Time.Format +import Data.Time.LocalTime +import Data.UUID (UUID) +import qualified Data.UUID as U + +import System.IO.Unsafe + +import Erebos.Error +import Erebos.Storage.Internal + + +zeroRef :: Storage' c -> Ref' c +zeroRef s = Ref s (RefDigest h) + where h = case digestFromByteString $ B.replicate (hashDigestSize $ digestAlgo h) 0 of + Nothing -> error $ "Failed to create zero hash" + Just h' -> h' + digestAlgo :: Digest a -> a + digestAlgo = undefined + +isZeroRef :: Ref' c -> Bool +isZeroRef (Ref _ h) = all (==0) $ BA.unpack h + + +refFromDigest :: Storage' c -> RefDigest -> IO (Maybe (Ref' c)) +refFromDigest st dgst = fmap (const $ Ref st dgst) <$> ioLoadBytesFromStorage st dgst + +readRef :: Storage -> ByteString -> IO (Maybe Ref) +readRef s b = + case readRefDigest b of + Nothing -> return Nothing + Just dgst -> refFromDigest s dgst + +copyRef' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Ref' c -> IO (c (Ref' c')) +copyRef' st ref'@(Ref _ dgst) = refFromDigest st dgst >>= \case Just ref -> return $ return ref + Nothing -> doCopy + where doCopy = do mbobj' <- ioLoadObject ref' + mbobj <- sequence $ copyObject' st <$> mbobj' + sequence $ unsafeStoreObject st <$> join mbobj + +copyRecItem' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> RecItem' c -> IO (c (RecItem' c')) +copyRecItem' st = \case + RecEmpty -> return $ return $ RecEmpty + RecInt x -> return $ return $ RecInt x + RecNum x -> return $ return $ RecNum x + RecText x -> return $ return $ RecText x + RecBinary x -> return $ return $ RecBinary x + RecDate x -> return $ return $ RecDate x + RecUUID x -> return $ return $ RecUUID x + RecRef x -> fmap RecRef <$> copyRef' st x + RecUnknown t x -> return $ return $ RecUnknown t x + +copyObject' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (c (Object' c')) +copyObject' _ (Blob bs) = return $ return $ Blob bs +copyObject' st (Rec rs) = fmap Rec . sequence <$> mapM (\( n, item ) -> fmap ( n, ) <$> copyRecItem' st item) rs +copyObject' _ ZeroObject = return $ return ZeroObject +copyObject' _ (UnknownObject otype content) = return $ return $ UnknownObject otype content + +copyRef :: forall c c' m. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => Storage' c' -> Ref' c -> m (LoadResult c (Ref' c')) +copyRef st ref' = liftIO $ returnLoadResult <$> copyRef' st ref' + +copyRecItem :: forall c c' m. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => Storage' c' -> RecItem' c -> m (LoadResult c (RecItem' c')) +copyRecItem st item' = liftIO $ returnLoadResult <$> copyRecItem' st item' + +copyObject :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (LoadResult c (Object' c')) +copyObject st obj' = returnLoadResult <$> copyObject' st obj' + +partialRef :: PartialStorage -> Ref -> PartialRef +partialRef st (Ref _ dgst) = Ref st dgst + +partialRefFromDigest :: PartialStorage -> RefDigest -> PartialRef +partialRefFromDigest st dgst = Ref st dgst + + +data Object' c + = Blob ByteString + | Rec [(ByteString, RecItem' c)] + | ZeroObject + | UnknownObject ByteString ByteString + deriving (Show) + +type Object = Object' Complete +type PartialObject = Object' Partial + +data RecItem' c + = RecEmpty + | RecInt Integer + | RecNum Rational + | RecText Text + | RecBinary ByteString + | RecDate ZonedTime + | RecUUID UUID + | RecRef (Ref' c) + | RecUnknown ByteString ByteString + deriving (Show) + +type RecItem = RecItem' Complete + +serializeObject :: Object' c -> BL.ByteString +serializeObject = \case + Blob cnt -> BL.fromChunks [BC.pack "blob ", BC.pack (show $ B.length cnt), BC.singleton '\n', cnt] + Rec rec -> let cnt = BL.fromChunks $ concatMap (uncurry serializeRecItem) rec + in BL.fromChunks [BC.pack "rec ", BC.pack (show $ BL.length cnt), BC.singleton '\n'] `BL.append` cnt + ZeroObject -> BL.empty + UnknownObject otype cnt -> BL.fromChunks [ otype, BC.singleton ' ', BC.pack (show $ B.length cnt), BC.singleton '\n', cnt ] + +-- |Serializes and stores object data without ony dependencies, so is safe only +-- if all the referenced objects are already stored or reference is partial. +unsafeStoreObject :: Storage' c -> Object' c -> IO (Ref' c) +unsafeStoreObject storage = \case + ZeroObject -> return $ zeroRef storage + obj -> unsafeStoreRawBytes storage $ serializeObject obj + +storeObject :: PartialStorage -> PartialObject -> IO PartialRef +storeObject = unsafeStoreObject + +storeRawBytes :: PartialStorage -> BL.ByteString -> IO PartialRef +storeRawBytes = unsafeStoreRawBytes + +serializeRecItem :: ByteString -> RecItem' c -> [ByteString] +serializeRecItem name (RecEmpty) = [name, BC.pack ":e", BC.singleton ' ', BC.singleton '\n'] +serializeRecItem name (RecInt x) = [name, BC.pack ":i", BC.singleton ' ', BC.pack (show x), BC.singleton '\n'] +serializeRecItem name (RecNum x) = [name, BC.pack ":n", BC.singleton ' ', BC.pack (showRatio x), BC.singleton '\n'] +serializeRecItem name (RecText x) = [name, BC.pack ":t", BC.singleton ' ', escaped, BC.singleton '\n'] + where escaped = BC.concatMap escape $ encodeUtf8 x + escape '\n' = BC.pack "\n\t" + escape c = BC.singleton c +serializeRecItem name (RecBinary x) = [name, BC.pack ":b ", showHex x, BC.singleton '\n'] +serializeRecItem name (RecDate x) = [name, BC.pack ":d", BC.singleton ' ', BC.pack (formatTime defaultTimeLocale "%s %z" x), BC.singleton '\n'] +serializeRecItem name (RecUUID x) = [name, BC.pack ":u", BC.singleton ' ', U.toASCIIBytes x, BC.singleton '\n'] +serializeRecItem name (RecRef x) = [name, BC.pack ":r ", showRef x, BC.singleton '\n'] +serializeRecItem name (RecUnknown t x) = [ name, BC.singleton ':', t, BC.singleton ' ', x, BC.singleton '\n' ] + +lazyLoadObject :: forall c. StorageCompleteness c => Ref' c -> LoadResult c (Object' c) +lazyLoadObject = returnLoadResult . unsafePerformIO . ioLoadObject + +ioLoadObject :: forall c. StorageCompleteness c => Ref' c -> IO (c (Object' c)) +ioLoadObject ref | isZeroRef ref = return $ return ZeroObject +ioLoadObject ref@(Ref st rhash) = do + file' <- ioLoadBytes ref + return $ do + file <- file' + let chash = hashToRefDigest file + when (chash /= rhash) $ error $ "Hash mismatch on object " ++ BC.unpack (showRef ref) {- TODO throw -} + return $ case runExcept $ unsafeDeserializeObject st file of + Left err -> error $ showErebosError err ++ ", ref " ++ BC.unpack (showRef ref) {- TODO throw -} + Right (x, rest) | BL.null rest -> x + | otherwise -> error $ "Superfluous content after " ++ BC.unpack (showRef ref) {- TODO throw -} + +lazyLoadBytes :: forall c. StorageCompleteness c => Ref' c -> LoadResult c BL.ByteString +lazyLoadBytes ref | isZeroRef ref = returnLoadResult (return BL.empty :: c BL.ByteString) +lazyLoadBytes ref = returnLoadResult $ unsafePerformIO $ ioLoadBytes ref + +unsafeDeserializeObject :: Storage' c -> BL.ByteString -> Except ErebosError (Object' c, BL.ByteString) +unsafeDeserializeObject _ bytes | BL.null bytes = return (ZeroObject, bytes) +unsafeDeserializeObject st bytes = + case BLC.break (=='\n') bytes of + (line, rest) | Just (otype, len) <- splitObjPrefix line -> do + let (content, next) = first BL.toStrict $ BL.splitAt (fromIntegral len) $ BL.drop 1 rest + guard $ B.length content == len + (,next) <$> case otype of + _ | otype == BC.pack "blob" -> return $ Blob content + | otype == BC.pack "rec" -> maybe (throwOtherError $ "malformed record item ") + (return . Rec) $ sequence $ map parseRecLine $ mergeCont [] $ BC.lines content + | otherwise -> return $ UnknownObject otype content + _ -> throwOtherError $ "malformed object" + where splitObjPrefix line = do + [otype, tlen] <- return $ BLC.words line + (len, rest) <- BLC.readInt tlen + guard $ BL.null rest + return (BL.toStrict otype, len) + + mergeCont cs (a:b:rest) | Just ('\t', b') <- BC.uncons b = mergeCont (b':BC.pack "\n":cs) (a:rest) + mergeCont cs (a:rest) = B.concat (a : reverse cs) : mergeCont [] rest + mergeCont _ [] = [] + + parseRecLine line = do + colon <- BC.elemIndex ':' line + space <- BC.elemIndex ' ' line + guard $ colon < space + let name = B.take colon line + itype = B.take (space-colon-1) $ B.drop (colon+1) line + content = B.drop (space+1) line + + let val = fromMaybe (RecUnknown itype content) $ + case BC.unpack itype of + "e" -> do guard $ B.null content + return RecEmpty + "i" -> do (num, rest) <- BC.readInteger content + guard $ B.null rest + return $ RecInt num + "n" -> RecNum <$> parseRatio content + "t" -> return $ RecText $ decodeUtf8With lenientDecode content + "b" -> RecBinary <$> readHex content + "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content) + "u" -> RecUUID <$> U.fromASCIIBytes content + "r" -> RecRef . Ref st <$> readRefDigest content + _ -> Nothing + return (name, val) + +deserializeObject :: PartialStorage -> BL.ByteString -> Except ErebosError (PartialObject, BL.ByteString) +deserializeObject = unsafeDeserializeObject + +deserializeObjects :: PartialStorage -> BL.ByteString -> Except ErebosError [PartialObject] +deserializeObjects _ bytes | BL.null bytes = return [] +deserializeObjects st bytes = do (obj, rest) <- deserializeObject st bytes + (obj:) <$> deserializeObjects st rest + + +collectObjects :: Object -> [Object] +collectObjects obj = obj : map fromStored (fst $ collectOtherStored S.empty obj) + +collectStoredObjects :: Stored Object -> [Stored Object] +collectStoredObjects obj = obj : (fst $ collectOtherStored S.empty $ fromStored obj) + +collectOtherStored :: Set RefDigest -> Object -> ([Stored Object], Set RefDigest) +collectOtherStored seen (Rec items) = foldr helper ([], seen) $ map snd items + where helper (RecRef ref) (xs, s) | r <- refDigest ref + , r `S.notMember` s + = let o = wrappedLoad ref + (xs', s') = collectOtherStored (S.insert r s) $ fromStored o + in ((o : xs') ++ xs, s') + helper _ (xs, s) = (xs, s) +collectOtherStored seen _ = ([], seen) + + +deriving instance StorableUUID HeadID +deriving instance StorableUUID HeadTypeID + + +class Monad m => MonadStorage m where + getStorage :: m Storage + mstore :: Storable a => a -> m (Stored a) + + default mstore :: MonadIO m => Storable a => a -> m (Stored a) + mstore x = do + st <- getStorage + wrappedStore st x + +instance MonadIO m => MonadStorage (ReaderT Storage m) where + getStorage = ask + + +class Storable a where + store' :: a -> Store + load' :: Load a + + store :: StorageCompleteness c => Storage' c -> a -> IO (Ref' c) + store st = evalStore st . store' + load :: Ref -> a + load = evalLoad load' + +class Storable a => ZeroStorable a where + fromZero :: Storage -> a + +data Store = StoreBlob ByteString + | StoreRec (forall c. StorageCompleteness c => Storage' c -> [IO [(ByteString, RecItem' c)]]) + | StoreZero + | StoreUnknown ByteString ByteString + +evalStore :: StorageCompleteness c => Storage' c -> Store -> IO (Ref' c) +evalStore st = unsafeStoreObject st <=< evalStoreObject st + +evalStoreObject :: StorageCompleteness c => Storage' c -> Store -> IO (Object' c) +evalStoreObject _ (StoreBlob x) = return $ Blob x +evalStoreObject s (StoreRec f) = Rec . concat <$> sequence (f s) +evalStoreObject _ StoreZero = return ZeroObject +evalStoreObject _ (StoreUnknown otype content) = return $ UnknownObject otype content + +newtype StoreRecM c a = StoreRecM (ReaderT (Storage' c) (Writer [IO [(ByteString, RecItem' c)]]) a) + deriving (Functor, Applicative, Monad) + +type StoreRec c = StoreRecM c () + +newtype Load a = Load (ReaderT (Ref, Object) (Except ErebosError) a) + deriving (Functor, Applicative, Alternative, Monad, MonadPlus, MonadError ErebosError) + +evalLoad :: Load a -> Ref -> a +evalLoad (Load f) ref = either (error {- TODO throw -} . ((BC.unpack (showRef ref) ++ ": ") ++) . showErebosError) id $ + runExcept $ runReaderT f (ref, lazyLoadObject ref) + +loadCurrentRef :: Load Ref +loadCurrentRef = Load $ asks fst + +loadCurrentObject :: Load Object +loadCurrentObject = Load $ asks snd + +newtype LoadRec a = LoadRec (ReaderT (Ref, [(ByteString, RecItem)]) (Except ErebosError) a) + deriving (Functor, Applicative, Alternative, Monad, MonadPlus, MonadError ErebosError) + +loadRecCurrentRef :: LoadRec Ref +loadRecCurrentRef = LoadRec $ asks fst + +loadRecItems :: LoadRec [(ByteString, RecItem)] +loadRecItems = LoadRec $ asks snd + + +instance Storable Object where + store' (Blob bs) = StoreBlob bs + store' (Rec xs) = StoreRec $ \st -> return $ do + Rec xs' <- copyObject st (Rec xs) + return xs' + store' ZeroObject = StoreZero + store' (UnknownObject otype content) = StoreUnknown otype content + + load' = loadCurrentObject + + store st = unsafeStoreObject st <=< copyObject st + load = lazyLoadObject + +instance Storable ByteString where + store' = storeBlob + load' = loadBlob id + +instance Storable a => Storable [a] where + store' [] = storeZero + store' (x:xs) = storeRec $ do + storeRef "i" x + storeRef "n" xs + + load' = loadCurrentObject >>= \case + ZeroObject -> return [] + _ -> loadRec $ (:) + <$> loadRef "i" + <*> loadRef "n" + +instance Storable a => ZeroStorable [a] where + fromZero _ = [] + + +storeBlob :: ByteString -> Store +storeBlob = StoreBlob + +storeRec :: (forall c. StorageCompleteness c => StoreRec c) -> Store +storeRec sr = StoreRec $ do + let StoreRecM r = sr + execWriter . runReaderT r + +storeZero :: Store +storeZero = StoreZero + + +class StorableText a where + toText :: a -> Text + fromText :: MonadError ErebosError m => Text -> m a + +instance StorableText Text where + toText = id; fromText = return + +instance StorableText [Char] where + toText = T.pack; fromText = return . T.unpack + + +class StorableDate a where + toDate :: a -> ZonedTime + fromDate :: ZonedTime -> a + +instance StorableDate ZonedTime where + toDate = id; fromDate = id + +instance StorableDate UTCTime where + toDate = utcToZonedTime utc + fromDate = zonedTimeToUTC + +instance StorableDate Day where + toDate day = toDate $ UTCTime day 0 + fromDate = utctDay . fromDate + + +class StorableUUID a where + toUUID :: a -> UUID + fromUUID :: UUID -> a + +instance StorableUUID UUID where + toUUID = id; fromUUID = id + + +storeEmpty :: String -> StoreRec c +storeEmpty name = StoreRecM $ tell [return [(BC.pack name, RecEmpty)]] + +storeMbEmpty :: String -> Maybe () -> StoreRec c +storeMbEmpty name = maybe (return ()) (const $ storeEmpty name) + +storeInt :: Integral a => String -> a -> StoreRec c +storeInt name x = StoreRecM $ tell [return [(BC.pack name, RecInt $ toInteger x)]] + +storeMbInt :: Integral a => String -> Maybe a -> StoreRec c +storeMbInt name = maybe (return ()) (storeInt name) + +storeNum :: (Real a, Fractional a) => String -> a -> StoreRec c +storeNum name x = StoreRecM $ tell [return [(BC.pack name, RecNum $ toRational x)]] + +storeMbNum :: (Real a, Fractional a) => String -> Maybe a -> StoreRec c +storeMbNum name = maybe (return ()) (storeNum name) + +storeText :: StorableText a => String -> a -> StoreRec c +storeText name x = StoreRecM $ tell [return [(BC.pack name, RecText $ toText x)]] + +storeMbText :: StorableText a => String -> Maybe a -> StoreRec c +storeMbText name = maybe (return ()) (storeText name) + +storeBinary :: BA.ByteArrayAccess a => String -> a -> StoreRec c +storeBinary name x = StoreRecM $ tell [return [(BC.pack name, RecBinary $ BA.convert x)]] + +storeMbBinary :: BA.ByteArrayAccess a => String -> Maybe a -> StoreRec c +storeMbBinary name = maybe (return ()) (storeBinary name) + +storeDate :: StorableDate a => String -> a -> StoreRec c +storeDate name x = StoreRecM $ tell [return [(BC.pack name, RecDate $ toDate x)]] + +storeMbDate :: StorableDate a => String -> Maybe a -> StoreRec c +storeMbDate name = maybe (return ()) (storeDate name) + +storeUUID :: StorableUUID a => String -> a -> StoreRec c +storeUUID name x = StoreRecM $ tell [return [(BC.pack name, RecUUID $ toUUID x)]] + +storeMbUUID :: StorableUUID a => String -> Maybe a -> StoreRec c +storeMbUUID name = maybe (return ()) (storeUUID name) + +storeRef :: Storable a => StorageCompleteness c => String -> a -> StoreRec c +storeRef name x = StoreRecM $ do + s <- ask + tell $ (:[]) $ do + ref <- store s x + return [(BC.pack name, RecRef ref)] + +storeMbRef :: Storable a => StorageCompleteness c => String -> Maybe a -> StoreRec c +storeMbRef name = maybe (return ()) (storeRef name) + +storeRawRef :: StorageCompleteness c => String -> Ref -> StoreRec c +storeRawRef name ref = StoreRecM $ do + st <- ask + tell $ (:[]) $ do + ref' <- copyRef st ref + return [(BC.pack name, RecRef ref')] + +storeMbRawRef :: StorageCompleteness c => String -> Maybe Ref -> StoreRec c +storeMbRawRef name = maybe (return ()) (storeRawRef name) + +storeZRef :: (ZeroStorable a, StorageCompleteness c) => String -> a -> StoreRec c +storeZRef name x = StoreRecM $ do + s <- ask + tell $ (:[]) $ do + ref <- store s x + return $ if isZeroRef ref then [] + else [(BC.pack name, RecRef ref)] + +storeRecItems :: StorageCompleteness c => [ ( ByteString, RecItem ) ] -> StoreRec c +storeRecItems items = StoreRecM $ do + st <- ask + tell $ flip map items $ \( name, value ) -> do + value' <- copyRecItem st value + return [ ( name, value' ) ] + +loadBlob :: (ByteString -> a) -> Load a +loadBlob f = loadCurrentObject >>= \case + Blob x -> return $ f x + _ -> throwOtherError "Expecting blob" + +loadRec :: LoadRec a -> Load a +loadRec (LoadRec lrec) = loadCurrentObject >>= \case + Rec rs -> do + ref <- loadCurrentRef + either throwError return $ runExcept $ runReaderT lrec (ref, rs) + _ -> throwOtherError "Expecting record" + +loadZero :: a -> Load a +loadZero x = loadCurrentObject >>= \case + ZeroObject -> return x + _ -> throwOtherError "Expecting zero" + + +loadEmpty :: String -> LoadRec () +loadEmpty name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbEmpty name + +loadMbEmpty :: String -> LoadRec (Maybe ()) +loadMbEmpty name = listToMaybe . mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecEmpty ) | name' == bname + = Just () + p _ = Nothing + +loadInt :: Num a => String -> LoadRec a +loadInt name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbInt name + +loadMbInt :: Num a => String -> LoadRec (Maybe a) +loadMbInt name = listToMaybe . mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecInt x ) | name' == bname + = Just (fromInteger x) + p _ = Nothing + +loadNum :: (Real a, Fractional a) => String -> LoadRec a +loadNum name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbNum name + +loadMbNum :: (Real a, Fractional a) => String -> LoadRec (Maybe a) +loadMbNum name = listToMaybe . mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecNum x ) | name' == bname + = Just (fromRational x) + p _ = Nothing + +loadText :: StorableText a => String -> LoadRec a +loadText name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbText name + +loadMbText :: StorableText a => String -> LoadRec (Maybe a) +loadMbText name = listToMaybe <$> loadTexts name + +loadTexts :: StorableText a => String -> LoadRec [a] +loadTexts name = sequence . mapMaybe p =<< loadRecItems + where + bname = BC.pack name + p ( name', RecText x ) | name' == bname + = Just (fromText x) + p _ = Nothing + +loadBinary :: BA.ByteArray a => String -> LoadRec a +loadBinary name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbBinary name + +loadMbBinary :: BA.ByteArray a => String -> LoadRec (Maybe a) +loadMbBinary name = listToMaybe <$> loadBinaries name + +loadBinaries :: BA.ByteArray a => String -> LoadRec [a] +loadBinaries name = mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecBinary x ) | name' == bname + = Just (BA.convert x) + p _ = Nothing + +loadDate :: StorableDate a => String -> LoadRec a +loadDate name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbDate name + +loadMbDate :: StorableDate a => String -> LoadRec (Maybe a) +loadMbDate name = listToMaybe . mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecDate x ) | name' == bname + = Just (fromDate x) + p _ = Nothing + +loadUUID :: StorableUUID a => String -> LoadRec a +loadUUID name = maybe (throwOtherError $ "Missing record iteem '"++name++"'") return =<< loadMbUUID name + +loadMbUUID :: StorableUUID a => String -> LoadRec (Maybe a) +loadMbUUID name = listToMaybe . mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecUUID x ) | name' == bname + = Just (fromUUID x) + p _ = Nothing + +loadRawRef :: String -> LoadRec Ref +loadRawRef name = maybe (throwOtherError $ "Missing record item '"++name++"'") return =<< loadMbRawRef name + +loadMbRawRef :: String -> LoadRec (Maybe Ref) +loadMbRawRef name = listToMaybe <$> loadRawRefs name + +loadRawRefs :: String -> LoadRec [Ref] +loadRawRefs name = mapMaybe p <$> loadRecItems + where + bname = BC.pack name + p ( name', RecRef x ) | name' == bname = Just x + p _ = Nothing + +loadRef :: Storable a => String -> LoadRec a +loadRef name = load <$> loadRawRef name + +loadMbRef :: Storable a => String -> LoadRec (Maybe a) +loadMbRef name = fmap load <$> loadMbRawRef name + +loadRefs :: Storable a => String -> LoadRec [a] +loadRefs name = map load <$> loadRawRefs name + +loadZRef :: ZeroStorable a => String -> LoadRec a +loadZRef name = loadMbRef name >>= \case + Nothing -> do Ref st _ <- loadRecCurrentRef + return $ fromZero st + Just x -> return x + + +type Stored a = Stored' Complete a + +instance Storable a => Storable (Stored a) where + store st = copyRef st . storedRef + store' (Stored _ x) = store' x + load' = Stored <$> loadCurrentRef <*> load' + +instance ZeroStorable a => ZeroStorable (Stored a) where + fromZero st = Stored (zeroRef st) $ fromZero st + +fromStored :: Stored a -> a +fromStored (Stored _ x) = x + +storedRef :: Stored a -> Ref +storedRef (Stored ref _) = ref + +wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) +wrappedStore st x = do ref <- liftIO $ store st x + return $ Stored ref x + +wrappedLoad :: Storable a => Ref -> Stored a +wrappedLoad ref = Stored ref (load ref) + +copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => + Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a)) +copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref' + +-- |Passed function needs to preserve the object representation to be safe +unsafeMapStored :: (a -> b) -> Stored a -> Stored b +unsafeMapStored f (Stored ref x) = Stored ref (f x) + + +showRatio :: Rational -> String +showRatio r = case decimalRatio r of + Just (n, 1) -> show n + Just (n', d) -> let n = abs n' + in (if n' < 0 then "-" else "") ++ show (n `div` d) ++ "." ++ + (concatMap (show.(`mod` 10).snd) $ reverse $ takeWhile ((>1).fst) $ zip (iterate (`div` 10) d) (iterate (`div` 10) (n `mod` d))) + Nothing -> show (numerator r) ++ "/" ++ show (denominator r) + +decimalRatio :: Rational -> Maybe (Integer, Integer) +decimalRatio r = do + let n = numerator r + d = denominator r + (c2, d') = takeFactors 2 d + (c5, d'') = takeFactors 5 d' + guard $ d'' == 1 + let m = if c2 > c5 then 5 ^ (c2 - c5) + else 2 ^ (c5 - c2) + return (n * m, d * m) + +takeFactors :: Integer -> Integer -> (Integer, Integer) +takeFactors f n | n `mod` f == 0 = let (c, n') = takeFactors f (n `div` f) + in (c+1, n') + | otherwise = (0, n) + +parseRatio :: ByteString -> Maybe Rational +parseRatio bs = case BC.groupBy ((==) `on` isNumber) bs of + (m:xs) | m == BC.pack "-" -> negate <$> positive xs + xs -> positive xs + where positive = \case + [bx] -> fromInteger . fst <$> BC.readInteger bx + [bx, op, by] -> do + (x, _) <- BC.readInteger bx + (y, _) <- BC.readInteger by + case BC.unpack op of + "." -> return $ (x % 1) + (y % (10 ^ BC.length by)) + "/" -> return $ x % y + _ -> Nothing + _ -> Nothing diff --git a/src/Erebos/Pairing.hs b/src/Erebos/Pairing.hs index 2166e71..703afcd 100644 --- a/src/Erebos/Pairing.hs +++ b/src/Erebos/Pairing.hs @@ -27,10 +27,11 @@ import Data.Word import Erebos.Identity import Erebos.Network +import Erebos.Object import Erebos.PubKey import Erebos.Service import Erebos.State -import Erebos.Storage +import Erebos.Storable data PairingService a = PairingRequest (Stored (Signed IdentityData)) (Stored (Signed IdentityData)) RefDigest | PairingResponse Bytes @@ -48,7 +49,7 @@ data PairingState a = NoPairing data PairingFailureReason a = PairingUserRejected | PairingUnexpectedMessage (PairingState a) (PairingService a) - | PairingFailedOther String + | PairingFailedOther ErebosError data PairingAttributes a = PairingAttributes { pairingHookRequest :: ServiceHandler (PairingService a) () @@ -115,16 +116,16 @@ instance PairingResult a => Service (PairingService a) where serviceHandler spacket = ((,fromStored spacket) <$> svcGet) >>= \case (NoPairing, PairingRequest pdata sdata confirm) -> do - self <- maybe (throwError "failed to validate received identity") return $ validateIdentity sdata - self' <- maybe (throwError "failed to validate own identity") return . + self <- maybe (throwOtherError "failed to validate received identity") return $ validateIdentity sdata + self' <- maybe (throwOtherError "failed to validate own identity") return . validateExtendedIdentity . lsIdentity . fromStored =<< svcGetLocal when (not $ self `sameIdentity` self') $ do - throwError "pairing request to different identity" + throwOtherError "pairing request to different identity" - peer <- maybe (throwError "failed to validate received peer identity") return $ validateIdentity pdata + peer <- maybe (throwOtherError "failed to validate received peer identity") return $ validateIdentity pdata peer' <- asks $ svcPeerIdentity when (not $ peer `sameIdentity` peer') $ do - throwError "pairing request from different identity" + throwOtherError "pairing request from different identity" join $ asks $ pairingHookRequest . svcAttributes nonce <- liftIO $ getRandomBytes 32 @@ -166,7 +167,7 @@ instance PairingResult a => Service (PairingService a) where svcSet $ PairingDone Nothing -> do join $ asks $ pairingHookVerifyFailed . svcAttributes - throwError "" + throwOtherError "" x@(OurRequestReady, _) -> reject $ uncurry PairingUnexpectedMessage x (PeerRequest peer self nonce dgst, PairingRequestNonce pnonce) -> do @@ -203,22 +204,22 @@ confirmationNumber dgst = _ -> "" where len = 6 -pairingRequest :: forall a m proxy. (PairingResult a, MonadIO m, MonadError String m) => proxy a -> Peer -> m () +pairingRequest :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m () pairingRequest _ peer = do self <- liftIO $ serverIdentity $ peerServer peer nonce <- liftIO $ getRandomBytes 32 pid <- peerIdentity peer >>= \case PeerIdentityFull pid -> return pid - _ -> throwError "incomplete peer identity" + _ -> throwOtherError "incomplete peer identity" sendToPeerWith @(PairingService a) peer $ \case NoPairing -> return (Just $ PairingRequest (idData self) (idData pid) (nonceDigest self pid nonce BA.empty), OurRequest self pid nonce) - _ -> throwError "already in progress" + _ -> throwOtherError "already in progress" -pairingAccept :: forall a m proxy. (PairingResult a, MonadIO m, MonadError String m) => proxy a -> Peer -> m () +pairingAccept :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m () pairingAccept _ peer = runPeerService @(PairingService a) peer $ do svcGet >>= \case - NoPairing -> throwError $ "none in progress" - OurRequest {} -> throwError $ "waiting for peer" + NoPairing -> throwOtherError $ "none in progress" + OurRequest {} -> throwOtherError $ "waiting for peer" OurRequestConfirm Nothing -> do join $ asks $ pairingHookConfirmedResponse . svcAttributes svcSet OurRequestReady @@ -226,17 +227,17 @@ pairingAccept _ peer = runPeerService @(PairingService a) peer $ do join $ asks $ pairingHookAcceptedResponse . svcAttributes pairingFinalizeRequest verified svcSet PairingDone - OurRequestReady -> throwError $ "already accepted, waiting for peer" - PeerRequest {} -> throwError $ "waiting for peer" + OurRequestReady -> throwOtherError $ "already accepted, waiting for peer" + PeerRequest {} -> throwOtherError $ "waiting for peer" PeerRequestConfirm -> do join $ asks $ pairingHookAcceptedRequest . svcAttributes replyPacket . PairingAccept =<< pairingFinalizeResponse svcSet PairingDone - PairingDone -> throwError $ "already done" + PairingDone -> throwOtherError $ "already done" -pairingReject :: forall a m proxy. (PairingResult a, MonadIO m, MonadError String m) => proxy a -> Peer -> m () +pairingReject :: forall a m e proxy. (PairingResult a, MonadIO m, MonadError e m, FromErebosError e) => proxy a -> Peer -> m () pairingReject _ peer = runPeerService @(PairingService a) peer $ do svcGet >>= \case - NoPairing -> throwError $ "none in progress" - PairingDone -> throwError $ "already done" + NoPairing -> throwOtherError $ "none in progress" + PairingDone -> throwOtherError $ "already done" _ -> reject PairingUserRejected diff --git a/src/Erebos/PubKey.hs b/src/Erebos/PubKey.hs index 09a8e02..a2ee519 100644 --- a/src/Erebos/PubKey.hs +++ b/src/Erebos/PubKey.hs @@ -11,7 +11,6 @@ module Erebos.PubKey ( ) where import Control.Monad -import Control.Monad.Except import Crypto.Error import qualified Crypto.PubKey.Ed25519 as ED @@ -21,7 +20,7 @@ import Data.ByteArray import Data.ByteString (ByteString) import qualified Data.Text as T -import Erebos.Storage +import Erebos.Storable import Erebos.Storage.Key data PublicKey = PublicKey ED.PublicKey @@ -70,7 +69,7 @@ instance Storable PublicKey where load' = loadRec $ do ktype <- loadText "type" guard $ ktype == "ed25519" - maybe (throwError "Public key decoding failed") (return . PublicKey) . + maybe (throwOtherError "public key decoding failed") (return . PublicKey) . maybeCryptoError . (ED.publicKey :: ByteString -> CryptoFailable ED.PublicKey) =<< loadBinary "pubkey" @@ -82,7 +81,7 @@ instance Storable Signature where load' = loadRec $ Signature <$> loadRef "key" <*> loadSignature "sig" - where loadSignature = maybe (throwError "Signature decoding failed") return . + where loadSignature = maybe (throwOtherError "signature decoding failed") return . maybeCryptoError . (ED.signature :: ByteString -> CryptoFailable ED.Signature) <=< loadBinary instance Storable a => Storable (Signed a) where @@ -96,7 +95,7 @@ instance Storable a => Storable (Signed a) where forM_ sigs $ \sig -> do let PublicKey pubkey = fromStored $ sigKey $ fromStored sig when (not $ ED.verify pubkey (storedRef sdata) $ sigSignature $ fromStored sig) $ - throwError "signature verification failed" + throwOtherError "signature verification failed" return $ Signed sdata sigs sign :: MonadStorage m => SecretKey -> Stored a -> m (Signed a) @@ -148,7 +147,7 @@ instance Storable PublicKexKey where load' = loadRec $ do ktype <- loadText "type" guard $ ktype == "x25519" - maybe (throwError "public key decoding failed") (return . PublicKexKey) . + maybe (throwOtherError "public key decoding failed") (return . PublicKexKey) . maybeCryptoError . (CX.publicKey :: ScrubbedBytes -> CryptoFailable CX.PublicKey) =<< loadBinary "pubkey" diff --git a/src/Erebos/Service.hs b/src/Erebos/Service.hs index f8428d1..e95e700 100644 --- a/src/Erebos/Service.hs +++ b/src/Erebos/Service.hs @@ -35,9 +35,16 @@ import qualified Data.UUID as U import Erebos.Identity import {-# SOURCE #-} Erebos.Network import Erebos.State -import Erebos.Storage +import Erebos.Storable +import Erebos.Storage.Head + +class ( + Typeable s, Storable s, + Typeable (ServiceAttributes s), + Typeable (ServiceState s), + Typeable (ServiceGlobalState s) + ) => Service s where -class (Typeable s, Storable s, Typeable (ServiceState s), Typeable (ServiceGlobalState s)) => Service s where serviceID :: proxy s -> ServiceID serviceHandler :: Stored s -> ServiceHandler s () @@ -120,8 +127,8 @@ data ServiceHandlerState s = ServiceHandlerState , svcLocal :: Stored LocalState } -newtype ServiceHandler s a = ServiceHandler (ReaderT (ServiceInput s) (WriterT [ServiceReply s] (StateT (ServiceHandlerState s) (ExceptT String IO))) a) - deriving (Functor, Applicative, Monad, MonadReader (ServiceInput s), MonadWriter [ServiceReply s], MonadState (ServiceHandlerState s), MonadError String, MonadIO) +newtype ServiceHandler s a = ServiceHandler (ReaderT (ServiceInput s) (WriterT [ServiceReply s] (StateT (ServiceHandlerState s) (ExceptT ErebosError IO))) a) + deriving (Functor, Applicative, Monad, MonadReader (ServiceInput s), MonadWriter [ServiceReply s], MonadState (ServiceHandlerState s), MonadError ErebosError, MonadIO) instance MonadStorage (ServiceHandler s) where getStorage = asks $ peerStorage . svcPeer @@ -138,7 +145,7 @@ runServiceHandler h input svc global shandler = do ServiceHandler handler = shandler (runExceptT $ flip runStateT sstate $ execWriterT $ flip runReaderT input $ handler) >>= \case Left err -> do - svcPrintOp input $ "service failed: " ++ err + svcPrintOp input $ "service failed: " ++ showErebosError err return ([], (svc, global)) Right (rsp, sstate') | svcLocal sstate' == svcLocal sstate -> return (rsp, (svcValue sstate', svcGlobal sstate')) @@ -171,7 +178,7 @@ svcSetLocal :: Stored LocalState -> ServiceHandler s () svcSetLocal x = modify $ \st -> st { svcLocal = x } svcSelf :: ServiceHandler s UnifiedIdentity -svcSelf = maybe (throwError "failed to validate own identity") return . +svcSelf = maybe (throwOtherError "failed to validate own identity") return . validateExtendedIdentity . lsIdentity . fromStored =<< svcGetLocal svcPrint :: String -> ServiceHandler s () diff --git a/src/Erebos/Set.hs b/src/Erebos/Set.hs index c5edd56..270c0ba 100644 --- a/src/Erebos/Set.hs +++ b/src/Erebos/Set.hs @@ -19,7 +19,8 @@ import Data.Map qualified as M import Data.Maybe import Data.Ord -import Erebos.Storage +import Erebos.Object +import Erebos.Storable import Erebos.Storage.Merge import Erebos.Util diff --git a/src/Erebos/State.hs b/src/Erebos/State.hs index 324127a..a2ecb9e 100644 --- a/src/Erebos/State.hs +++ b/src/Erebos/State.hs @@ -1,13 +1,12 @@ module Erebos.State ( LocalState(..), - SharedState, SharedType(..), + SharedState(..), SharedType(..), SharedTypeID, mkSharedTypeID, + MonadStorage(..), MonadHead(..), updateLocalHead_, - loadLocalStateHead, - updateSharedState, updateSharedState_, lookupSharedValue, makeSharedStateUpdate, @@ -15,31 +14,28 @@ module Erebos.State ( headLocalIdentity, mergeSharedIdentity, - updateSharedIdentity, - interactiveIdentityUpdate, ) where import Control.Monad.Except import Control.Monad.Reader -import Data.Foldable -import Data.Maybe -import qualified Data.Text as T -import qualified Data.Text.IO as T +import Data.ByteString (ByteString) +import Data.ByteString.Char8 qualified as BC import Data.Typeable import Data.UUID (UUID) -import qualified Data.UUID as U - -import System.IO +import Data.UUID qualified as U import Erebos.Identity +import Erebos.Object import Erebos.PubKey -import Erebos.Storage +import Erebos.Storable +import Erebos.Storage.Head import Erebos.Storage.Merge data LocalState = LocalState { lsIdentity :: Stored (Signed ExtendedIdentityData) , lsShared :: [Stored SharedState] + , lsOther :: [ ( ByteString, RecItem ) ] } data SharedState = SharedState @@ -58,13 +54,16 @@ class Mergeable a => SharedType a where sharedTypeID :: proxy a -> SharedTypeID instance Storable LocalState where - store' st = storeRec $ do - storeRef "id" $ lsIdentity st - mapM_ (storeRef "shared") $ lsShared st + store' LocalState {..} = storeRec $ do + storeRef "id" lsIdentity + mapM_ (storeRef "shared") lsShared + storeRecItems lsOther - load' = loadRec $ LocalState - <$> loadRef "id" - <*> loadRefs "shared" + load' = loadRec $ do + lsIdentity <- loadRef "id" + lsShared <- loadRefs "shared" + lsOther <- filter ((`notElem` [ BC.pack "id", BC.pack "shared" ]) . fst) <$> loadRecItems + return LocalState {..} instance HeadType LocalState where headTypeID _ = mkHeadTypeID "1d7491a9-7bcb-4eaa-8f13-c8c4c4087e4e" @@ -98,34 +97,6 @@ instance (HeadType a, MonadIO m) => MonadHead a (ReaderT (Head a) m) where snd <$> updateHead h f -loadLocalStateHead :: MonadIO m => Storage -> m (Head LocalState) -loadLocalStateHead st = loadHeads st >>= \case - (h:_) -> return h - [] -> liftIO $ do - putStr "Name: " - hFlush stdout - name <- T.getLine - - putStr "Device: " - hFlush stdout - devName <- T.getLine - - owner <- if - | T.null name -> return Nothing - | otherwise -> Just <$> createIdentity st (Just name) Nothing - - identity <- createIdentity st (if T.null devName then Nothing else Just devName) owner - - shared <- wrappedStore st $ SharedState - { ssPrev = [] - , ssType = Just $ sharedTypeID @(Maybe ComposedIdentity) Proxy - , ssValue = [storedRef $ idExtData $ fromMaybe identity owner] - } - storeHead st $ LocalState - { lsIdentity = idExtData identity - , lsShared = [shared] - } - localIdentity :: LocalState -> UnifiedIdentity localIdentity ls = maybe (error "failed to verify local identity") (updateOwners $ maybe [] idExtDataF $ lookupSharedValue $ lsShared ls) @@ -163,39 +134,9 @@ makeSharedStateUpdate st val prev = liftIO $ wrappedStore st SharedState } -mergeSharedIdentity :: (MonadHead LocalState m, MonadError String m) => m UnifiedIdentity +mergeSharedIdentity :: (MonadHead LocalState m, MonadError e m, FromErebosError e) => m UnifiedIdentity mergeSharedIdentity = updateLocalHead $ updateSharedState $ \case Just cidentity -> do identity <- mergeIdentity cidentity return (Just $ toComposedIdentity identity, identity) - Nothing -> throwError "no existing shared identity" - -updateSharedIdentity :: (MonadHead LocalState m, MonadError String m) => m () -updateSharedIdentity = updateLocalHead_ $ updateSharedState_ $ \case - Just identity -> do - Just . toComposedIdentity <$> interactiveIdentityUpdate identity - Nothing -> throwError "no existing shared identity" - -interactiveIdentityUpdate :: (Foldable f, MonadStorage m, MonadIO m, MonadError String m) => Identity f -> m UnifiedIdentity -interactiveIdentityUpdate identity = do - let public = idKeyIdentity identity - - name <- liftIO $ do - T.putStr $ T.concat $ concat - [ [ T.pack "Name" ] - , case idName identity of - Just name -> [T.pack " [", name, T.pack "]"] - Nothing -> [] - , [ T.pack ": " ] - ] - hFlush stdout - T.getLine - - if | T.null name -> mergeIdentity identity - | otherwise -> do - secret <- loadKey public - maybe (throwError "created invalid identity") return . validateIdentity =<< - mstore =<< sign secret =<< mstore (emptyIdentityData public) - { iddPrev = toList $ idDataF identity - , iddName = Just name - } + Nothing -> throwOtherError "no existing shared identity" diff --git a/src/Erebos/Storable.hs b/src/Erebos/Storable.hs new file mode 100644 index 0000000..b0795f9 --- /dev/null +++ b/src/Erebos/Storable.hs @@ -0,0 +1,44 @@ +{-| +Description: Encoding custom types into Erebos objects + +Module provides the 'Storable' class for types that can be serialized to/from +Erebos objects, along with various helpers, mostly for encoding using records. + +The 'Stored' wrapper for objects actually encoded and stored in some storage is +defined here as well. +-} + +module Erebos.Storable ( + Storable(..), ZeroStorable(..), + StorableText(..), StorableDate(..), StorableUUID(..), + + Store, StoreRec, + storeBlob, storeRec, storeZero, + storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, + storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, + storeZRef, + storeRecItems, + + Load, LoadRec, + loadCurrentRef, loadCurrentObject, + loadRecCurrentRef, loadRecItems, + + loadBlob, loadRec, loadZero, + loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, + loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, + loadTexts, loadBinaries, loadRefs, loadRawRefs, + loadZRef, + + Stored, + fromStored, storedRef, + wrappedStore, wrappedLoad, + copyStored, + unsafeMapStored, + + Storage, MonadStorage(..), + + module Erebos.Error, +) where + +import Erebos.Error +import Erebos.Object.Internal diff --git a/src/Erebos/Storage.hs b/src/Erebos/Storage.hs index 2e6653a..f1cce84 100644 --- a/src/Erebos/Storage.hs +++ b/src/Erebos/Storage.hs @@ -1,1053 +1,29 @@ +{-| +Description: Working with storage and heads + +Provides functions for opening 'Storage' backed either by disk or memory. For +conveniance also function for working with 'Head's are reexported here. +-} + module Erebos.Storage ( - Storage, PartialStorage, StorageCompleteness, + Storage, PartialStorage, openStorage, memoryStorage, deriveEphemeralStorage, derivePartialStorage, - Ref, PartialRef, RefDigest, - refDigest, - readRef, showRef, showRefDigest, - refDigestFromByteString, hashToRefDigest, - copyRef, partialRef, partialRefFromDigest, - - Object, PartialObject, Object'(..), RecItem, RecItem'(..), - serializeObject, deserializeObject, deserializeObjects, - ioLoadObject, ioLoadBytes, - storeRawBytes, lazyLoadBytes, - storeObject, - collectObjects, collectStoredObjects, - - Head, HeadType(..), - HeadTypeID, mkHeadTypeID, + Head, HeadType, + HeadID, HeadTypeID, headId, headStorage, headRef, headObject, headStoredObject, loadHeads, loadHead, reloadHead, storeHead, replaceHead, updateHead, updateHead_, - loadHeadRaw, storeHeadRaw, replaceHeadRaw, WatchedHead, watchHead, watchHeadWith, unwatchHead, watchHeadRaw, MonadStorage(..), - - Storable(..), ZeroStorable(..), - StorableText(..), StorableDate(..), StorableUUID(..), - - Store, StoreRec, - evalStore, evalStoreObject, - storeBlob, storeRec, storeZero, - storeEmpty, storeInt, storeNum, storeText, storeBinary, storeDate, storeUUID, storeRef, storeRawRef, - storeMbEmpty, storeMbInt, storeMbNum, storeMbText, storeMbBinary, storeMbDate, storeMbUUID, storeMbRef, storeMbRawRef, - storeZRef, - - Load, LoadRec, - evalLoad, - loadCurrentRef, loadCurrentObject, - loadRecCurrentRef, loadRecItems, - - loadBlob, loadRec, loadZero, - loadEmpty, loadInt, loadNum, loadText, loadBinary, loadDate, loadUUID, loadRef, loadRawRef, - loadMbEmpty, loadMbInt, loadMbNum, loadMbText, loadMbBinary, loadMbDate, loadMbUUID, loadMbRef, loadMbRawRef, - loadTexts, loadBinaries, loadRefs, loadRawRefs, - loadZRef, - - Stored, - fromStored, storedRef, - wrappedStore, wrappedLoad, - copyStored, - unsafeMapStored, - - StoreInfo(..), makeStoreInfo, - - StoredHistory, - fromHistory, fromHistoryAt, storedFromHistory, storedHistoryList, - beginHistory, modifyHistory, ) where -import Control.Applicative -import Control.Concurrent -import Control.Exception -import Control.Monad -import Control.Monad.Except -import Control.Monad.Reader -import Control.Monad.Writer - -import Crypto.Hash - -import Data.Bifunctor -import Data.ByteString (ByteString) -import qualified Data.ByteArray as BA -import qualified Data.ByteString as B -import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL -import qualified Data.ByteString.Lazy.Char8 as BLC -import Data.Char -import Data.Function -import qualified Data.HashTable.IO as HT -import Data.List -import qualified Data.Map as M -import Data.Maybe -import Data.Ratio -import Data.Set (Set) -import qualified Data.Set as S -import Data.Text (Text) -import qualified Data.Text as T -import Data.Text.Encoding -import Data.Text.Encoding.Error -import Data.Time.Calendar -import Data.Time.Clock -import Data.Time.Format -import Data.Time.LocalTime -import Data.Typeable -import Data.UUID (UUID) -import qualified Data.UUID as U -import qualified Data.UUID.V4 as U - -import System.Directory -import System.FSNotify -import System.FilePath -import System.IO.Error -import System.IO.Unsafe - -import Erebos.Storage.Internal - - -type Storage = Storage' Complete -type PartialStorage = Storage' Partial - -storageVersion :: String -storageVersion = "0.1" - -openStorage :: FilePath -> IO Storage -openStorage path = modifyIOError annotate $ do - let versionFileName = "erebos-storage" - let versionPath = path </> versionFileName - let writeVersionFile = writeFile versionPath $ storageVersion <> "\n" - - doesDirectoryExist path >>= \case - True -> do - listDirectory path >>= \case - files@(_:_) - | versionFileName `elem` files -> do - readFile versionPath >>= \case - content | (ver:_) <- lines content, ver == storageVersion -> return () - | otherwise -> fail "unsupported storage version" - - | "objects" `notElem` files || "heads" `notElem` files -> do - fail "directory is neither empty, nor an existing erebos storage" - - _ -> writeVersionFile - False -> do - createDirectoryIfMissing True $ path - writeVersionFile - - createDirectoryIfMissing True $ path </> "objects" - createDirectoryIfMissing True $ path </> "heads" - watchers <- newMVar (Nothing, [], WatchList 1 []) - refgen <- newMVar =<< HT.new - refroots <- newMVar =<< HT.new - return $ Storage - { stBacking = StorageDir path watchers - , stParent = Nothing - , stRefGeneration = refgen - , stRefRoots = refroots - } - where - annotate e = annotateIOError e "failed to open storage" Nothing (Just path) - -memoryStorage' :: IO (Storage' c') -memoryStorage' = do - backing <- StorageMemory <$> newMVar [] <*> newMVar M.empty <*> newMVar M.empty <*> newMVar (WatchList 1 []) - refgen <- newMVar =<< HT.new - refroots <- newMVar =<< HT.new - return $ Storage - { stBacking = backing - , stParent = Nothing - , stRefGeneration = refgen - , stRefRoots = refroots - } - -memoryStorage :: IO Storage -memoryStorage = memoryStorage' - -deriveEphemeralStorage :: Storage -> IO Storage -deriveEphemeralStorage parent = do - st <- memoryStorage - return $ st { stParent = Just parent } - -derivePartialStorage :: Storage -> IO PartialStorage -derivePartialStorage parent = do - st <- memoryStorage' - return $ st { stParent = Just parent } - -type Ref = Ref' Complete -type PartialRef = Ref' Partial - -zeroRef :: Storage' c -> Ref' c -zeroRef s = Ref s (RefDigest h) - where h = case digestFromByteString $ B.replicate (hashDigestSize $ digestAlgo h) 0 of - Nothing -> error $ "Failed to create zero hash" - Just h' -> h' - digestAlgo :: Digest a -> a - digestAlgo = undefined - -isZeroRef :: Ref' c -> Bool -isZeroRef (Ref _ h) = all (==0) $ BA.unpack h - - -refFromDigest :: Storage' c -> RefDigest -> IO (Maybe (Ref' c)) -refFromDigest st dgst = fmap (const $ Ref st dgst) <$> ioLoadBytesFromStorage st dgst - -readRef :: Storage -> ByteString -> IO (Maybe Ref) -readRef s b = - case readRefDigest b of - Nothing -> return Nothing - Just dgst -> refFromDigest s dgst - -copyRef' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Ref' c -> IO (c (Ref' c')) -copyRef' st ref'@(Ref _ dgst) = refFromDigest st dgst >>= \case Just ref -> return $ return ref - Nothing -> doCopy - where doCopy = do mbobj' <- ioLoadObject ref' - mbobj <- sequence $ copyObject' st <$> mbobj' - sequence $ unsafeStoreObject st <$> join mbobj - -copyObject' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (c (Object' c')) -copyObject' _ (Blob bs) = return $ return $ Blob bs -copyObject' st (Rec rs) = fmap Rec . sequence <$> mapM copyItem rs - where copyItem :: (ByteString, RecItem' c) -> IO (c (ByteString, RecItem' c')) - copyItem (n, item) = fmap (n,) <$> case item of - RecEmpty -> return $ return $ RecEmpty - RecInt x -> return $ return $ RecInt x - RecNum x -> return $ return $ RecNum x - RecText x -> return $ return $ RecText x - RecBinary x -> return $ return $ RecBinary x - RecDate x -> return $ return $ RecDate x - RecUUID x -> return $ return $ RecUUID x - RecRef x -> fmap RecRef <$> copyRef' st x -copyObject' _ ZeroObject = return $ return ZeroObject - -copyRef :: forall c c' m. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => Storage' c' -> Ref' c -> m (LoadResult c (Ref' c')) -copyRef st ref' = liftIO $ returnLoadResult <$> copyRef' st ref' - -copyObject :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (LoadResult c (Object' c')) -copyObject st obj' = returnLoadResult <$> copyObject' st obj' - -partialRef :: PartialStorage -> Ref -> PartialRef -partialRef st (Ref _ dgst) = Ref st dgst - -partialRefFromDigest :: PartialStorage -> RefDigest -> PartialRef -partialRefFromDigest st dgst = Ref st dgst - - -data Object' c - = Blob ByteString - | Rec [(ByteString, RecItem' c)] - | ZeroObject - deriving (Show) - -type Object = Object' Complete -type PartialObject = Object' Partial - -data RecItem' c - = RecEmpty - | RecInt Integer - | RecNum Rational - | RecText Text - | RecBinary ByteString - | RecDate ZonedTime - | RecUUID UUID - | RecRef (Ref' c) - deriving (Show) - -type RecItem = RecItem' Complete - -serializeObject :: Object' c -> BL.ByteString -serializeObject = \case - Blob cnt -> BL.fromChunks [BC.pack "blob ", BC.pack (show $ B.length cnt), BC.singleton '\n', cnt] - Rec rec -> let cnt = BL.fromChunks $ concatMap (uncurry serializeRecItem) rec - in BL.fromChunks [BC.pack "rec ", BC.pack (show $ BL.length cnt), BC.singleton '\n'] `BL.append` cnt - ZeroObject -> BL.empty - --- |Serializes and stores object data without ony dependencies, so is safe only --- if all the referenced objects are already stored or reference is partial. -unsafeStoreObject :: Storage' c -> Object' c -> IO (Ref' c) -unsafeStoreObject storage = \case - ZeroObject -> return $ zeroRef storage - obj -> unsafeStoreRawBytes storage $ serializeObject obj - -storeObject :: PartialStorage -> PartialObject -> IO PartialRef -storeObject = unsafeStoreObject - -storeRawBytes :: PartialStorage -> BL.ByteString -> IO PartialRef -storeRawBytes = unsafeStoreRawBytes - -serializeRecItem :: ByteString -> RecItem' c -> [ByteString] -serializeRecItem name (RecEmpty) = [name, BC.pack ":e", BC.singleton ' ', BC.singleton '\n'] -serializeRecItem name (RecInt x) = [name, BC.pack ":i", BC.singleton ' ', BC.pack (show x), BC.singleton '\n'] -serializeRecItem name (RecNum x) = [name, BC.pack ":n", BC.singleton ' ', BC.pack (showRatio x), BC.singleton '\n'] -serializeRecItem name (RecText x) = [name, BC.pack ":t", BC.singleton ' ', escaped, BC.singleton '\n'] - where escaped = BC.concatMap escape $ encodeUtf8 x - escape '\n' = BC.pack "\n\t" - escape c = BC.singleton c -serializeRecItem name (RecBinary x) = [name, BC.pack ":b ", showHex x, BC.singleton '\n'] -serializeRecItem name (RecDate x) = [name, BC.pack ":d", BC.singleton ' ', BC.pack (formatTime defaultTimeLocale "%s %z" x), BC.singleton '\n'] -serializeRecItem name (RecUUID x) = [name, BC.pack ":u", BC.singleton ' ', U.toASCIIBytes x, BC.singleton '\n'] -serializeRecItem name (RecRef x) = [name, BC.pack ":r ", showRef x, BC.singleton '\n'] - -lazyLoadObject :: forall c. StorageCompleteness c => Ref' c -> LoadResult c (Object' c) -lazyLoadObject = returnLoadResult . unsafePerformIO . ioLoadObject - -ioLoadObject :: forall c. StorageCompleteness c => Ref' c -> IO (c (Object' c)) -ioLoadObject ref | isZeroRef ref = return $ return ZeroObject -ioLoadObject ref@(Ref st rhash) = do - file' <- ioLoadBytes ref - return $ do - file <- file' - let chash = hashToRefDigest file - when (chash /= rhash) $ error $ "Hash mismatch on object " ++ BC.unpack (showRef ref) {- TODO throw -} - return $ case runExcept $ unsafeDeserializeObject st file of - Left err -> error $ err ++ ", ref " ++ BC.unpack (showRef ref) {- TODO throw -} - Right (x, rest) | BL.null rest -> x - | otherwise -> error $ "Superfluous content after " ++ BC.unpack (showRef ref) {- TODO throw -} - -lazyLoadBytes :: forall c. StorageCompleteness c => Ref' c -> LoadResult c BL.ByteString -lazyLoadBytes ref | isZeroRef ref = returnLoadResult (return BL.empty :: c BL.ByteString) -lazyLoadBytes ref = returnLoadResult $ unsafePerformIO $ ioLoadBytes ref - -unsafeDeserializeObject :: Storage' c -> BL.ByteString -> Except String (Object' c, BL.ByteString) -unsafeDeserializeObject _ bytes | BL.null bytes = return (ZeroObject, bytes) -unsafeDeserializeObject st bytes = - case BLC.break (=='\n') bytes of - (line, rest) | Just (otype, len) <- splitObjPrefix line -> do - let (content, next) = first BL.toStrict $ BL.splitAt (fromIntegral len) $ BL.drop 1 rest - guard $ B.length content == len - (,next) <$> case otype of - _ | otype == BC.pack "blob" -> return $ Blob content - | otype == BC.pack "rec" -> maybe (throwError $ "Malformed record item ") - (return . Rec) $ sequence $ map parseRecLine $ mergeCont [] $ BC.lines content - | otherwise -> throwError $ "Unknown object type" - _ -> throwError $ "Malformed object" - where splitObjPrefix line = do - [otype, tlen] <- return $ BLC.words line - (len, rest) <- BLC.readInt tlen - guard $ BL.null rest - return (BL.toStrict otype, len) - - mergeCont cs (a:b:rest) | Just ('\t', b') <- BC.uncons b = mergeCont (b':BC.pack "\n":cs) (a:rest) - mergeCont cs (a:rest) = B.concat (a : reverse cs) : mergeCont [] rest - mergeCont _ [] = [] - - parseRecLine line = do - colon <- BC.elemIndex ':' line - space <- BC.elemIndex ' ' line - guard $ colon < space - let name = B.take colon line - itype = B.take (space-colon-1) $ B.drop (colon+1) line - content = B.drop (space+1) line - - val <- case BC.unpack itype of - "e" -> do guard $ B.null content - return RecEmpty - "i" -> do (num, rest) <- BC.readInteger content - guard $ B.null rest - return $ RecInt num - "n" -> RecNum <$> parseRatio content - "t" -> return $ RecText $ decodeUtf8With lenientDecode content - "b" -> RecBinary <$> readHex content - "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content) - "u" -> RecUUID <$> U.fromASCIIBytes content - "r" -> RecRef . Ref st <$> readRefDigest content - _ -> Nothing - return (name, val) - -deserializeObject :: PartialStorage -> BL.ByteString -> Except String (PartialObject, BL.ByteString) -deserializeObject = unsafeDeserializeObject - -deserializeObjects :: PartialStorage -> BL.ByteString -> Except String [PartialObject] -deserializeObjects _ bytes | BL.null bytes = return [] -deserializeObjects st bytes = do (obj, rest) <- deserializeObject st bytes - (obj:) <$> deserializeObjects st rest - - -collectObjects :: Object -> [Object] -collectObjects obj = obj : map fromStored (fst $ collectOtherStored S.empty obj) - -collectStoredObjects :: Stored Object -> [Stored Object] -collectStoredObjects obj = obj : (fst $ collectOtherStored S.empty $ fromStored obj) - -collectOtherStored :: Set RefDigest -> Object -> ([Stored Object], Set RefDigest) -collectOtherStored seen (Rec items) = foldr helper ([], seen) $ map snd items - where helper (RecRef ref) (xs, s) | r <- refDigest ref - , r `S.notMember` s - = let o = wrappedLoad ref - (xs', s') = collectOtherStored (S.insert r s) $ fromStored o - in ((o : xs') ++ xs, s') - helper _ (xs, s) = (xs, s) -collectOtherStored seen _ = ([], seen) - - -type Head = Head' Complete - -headId :: Head a -> HeadID -headId (Head uuid _) = uuid - -headStorage :: Head a -> Storage -headStorage = refStorage . headRef - -headRef :: Head a -> Ref -headRef (Head _ sx) = storedRef sx - -headObject :: Head a -> a -headObject (Head _ sx) = fromStored sx - -headStoredObject :: Head a -> Stored a -headStoredObject (Head _ sx) = sx - -deriving instance StorableUUID HeadID -deriving instance StorableUUID HeadTypeID - -mkHeadTypeID :: String -> HeadTypeID -mkHeadTypeID = maybe (error "Invalid head type ID") HeadTypeID . U.fromString - -class Storable a => HeadType a where - headTypeID :: proxy a -> HeadTypeID - - -headTypePath :: FilePath -> HeadTypeID -> FilePath -headTypePath spath (HeadTypeID tid) = spath </> "heads" </> U.toString tid - -headPath :: FilePath -> HeadTypeID -> HeadID -> FilePath -headPath spath tid (HeadID hid) = headTypePath spath tid </> U.toString hid - -loadHeads :: forall a m. MonadIO m => HeadType a => Storage -> m [Head a] -loadHeads s@(Storage { stBacking = StorageDir { dirPath = spath }}) = liftIO $ do - let hpath = headTypePath spath $ headTypeID @a Proxy - - files <- filterM (doesFileExist . (hpath </>)) =<< - handleJust (\e -> guard (isDoesNotExistError e)) (const $ return []) - (getDirectoryContents hpath) - fmap catMaybes $ forM files $ \hname -> do - case U.fromString hname of - Just hid -> do - (h:_) <- BC.lines <$> B.readFile (hpath </> hname) - Just ref <- readRef s h - return $ Just $ Head (HeadID hid) $ wrappedLoad ref - Nothing -> return Nothing -loadHeads Storage { stBacking = StorageMemory { memHeads = theads } } = liftIO $ do - let toHead ((tid, hid), ref) | tid == headTypeID @a Proxy = Just $ Head hid $ wrappedLoad ref - | otherwise = Nothing - catMaybes . map toHead <$> readMVar theads - -loadHead :: forall a m. (HeadType a, MonadIO m) => Storage -> HeadID -> m (Maybe (Head a)) -loadHead st hid = fmap (Head hid . wrappedLoad) <$> loadHeadRaw st (headTypeID @a Proxy) hid - -loadHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> m (Maybe Ref) -loadHeadRaw s@(Storage { stBacking = StorageDir { dirPath = spath }}) tid hid = liftIO $ do - handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ do - (h:_) <- BC.lines <$> B.readFile (headPath spath tid hid) - Just ref <- readRef s h - return $ Just ref -loadHeadRaw Storage { stBacking = StorageMemory { memHeads = theads } } tid hid = liftIO $ do - lookup (tid, hid) <$> readMVar theads - -reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a)) -reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid - -storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) -storeHead st obj = do - let tid = headTypeID @a Proxy - stored <- wrappedStore st obj - hid <- storeHeadRaw st tid (storedRef stored) - return $ Head hid stored - -storeHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> Ref -> m HeadID -storeHeadRaw st tid ref = liftIO $ do - hid <- HeadID <$> U.nextRandom - case stBacking st of - StorageDir { dirPath = spath } -> do - Right () <- writeFileChecked (headPath spath tid hid) Nothing $ - showRef ref `B.append` BC.singleton '\n' - return () - StorageMemory { memHeads = theads } -> do - modifyMVar_ theads $ return . (((tid, hid), ref) :) - return hid - -replaceHead :: forall a m. (HeadType a, MonadIO m) => Head a -> Stored a -> m (Either (Maybe (Head a)) (Head a)) -replaceHead prev@(Head hid pobj) stored' = liftIO $ do - let st = headStorage prev - tid = headTypeID @a Proxy - stored <- copyStored st stored' - bimap (fmap $ Head hid . wrappedLoad) (const $ Head hid stored) <$> - replaceHeadRaw st tid hid (storedRef pobj) (storedRef stored) - -replaceHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> HeadID -> Ref -> Ref -> m (Either (Maybe Ref) Ref) -replaceHeadRaw st tid hid prev new = liftIO $ do - case stBacking st of - StorageDir { dirPath = spath } -> do - let filename = headPath spath tid hid - showRefL r = showRef r `B.append` BC.singleton '\n' - - writeFileChecked filename (Just $ showRefL prev) (showRefL new) >>= \case - Left Nothing -> return $ Left Nothing - Left (Just bs) -> do Just oref <- readRef st $ BC.takeWhile (/='\n') bs - return $ Left $ Just oref - Right () -> return $ Right new - - StorageMemory { memHeads = theads, memWatchers = twatch } -> do - res <- modifyMVar theads $ \hs -> do - ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar twatch - return $ case partition ((==(tid, hid)) . fst) hs of - ([] , _ ) -> (hs, Left Nothing) - ((_, r):_, hs') | r == prev -> (((tid, hid), new) : hs', - Right (new, ws)) - | otherwise -> (hs, Left $ Just r) - case res of - Right (r, ws) -> mapM_ ($ r) ws >> return (Right r) - Left x -> return $ Left x - -updateHead :: (HeadType a, MonadIO m) => Head a -> (Stored a -> m (Stored a, b)) -> m (Maybe (Head a), b) -updateHead h f = do - (o, x) <- f $ headStoredObject h - replaceHead h o >>= \case - Right h' -> return (Just h', x) - Left Nothing -> return (Nothing, x) - Left (Just h') -> updateHead h' f - -updateHead_ :: (HeadType a, MonadIO m) => Head a -> (Stored a -> m (Stored a)) -> m (Maybe (Head a)) -updateHead_ h = fmap fst . updateHead h . (fmap (,()) .) - - -data WatchedHead = forall a. WatchedHead Storage WatchID (MVar a) - -watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHead -watchHead h = watchHeadWith h id - -watchHeadWith :: forall a b. (HeadType a, Eq b) => Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead -watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do - watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb - -watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead -watchHeadRaw st tid hid sel cb = do - memo <- newEmptyMVar - let addWatcher wl = (wl', WatchedHead st (wlNext wl) memo) - where wl' = wl { wlNext = wlNext wl + 1 - , wlList = WatchListItem - { wlID = wlNext wl - , wlHead = (tid, hid) - , wlFun = \r -> do - let x = sel r - modifyMVar_ memo $ \prev -> do - when (Just x /= prev) $ cb x - return $ Just x - } : wlList wl - } - - watched <- case stBacking st of - StorageDir { dirPath = spath, dirWatchers = mvar } -> modifyMVar mvar $ \(mbmanager, ilist, wl) -> do - manager <- maybe startManager return mbmanager - ilist' <- case tid `elem` ilist of - True -> return ilist - False -> do - void $ watchDir manager (headTypePath spath tid) (const True) $ \case - Added { eventPath = fpath } | Just ihid <- HeadID <$> U.fromString (takeFileName fpath) -> do - loadHeadRaw st tid ihid >>= \case - Just ref -> do - (_, _, iwl) <- readMVar mvar - mapM_ ($ ref) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList $ iwl - Nothing -> return () - _ -> return () - return $ tid : ilist - return $ first ( Just manager, ilist', ) $ addWatcher wl - - StorageMemory { memWatchers = mvar } -> modifyMVar mvar $ return . addWatcher - - cur <- fmap sel <$> loadHeadRaw st tid hid - maybe (return ()) cb cur - putMVar memo cur - - return watched - -unwatchHead :: WatchedHead -> IO () -unwatchHead (WatchedHead st wid _) = do - let delWatcher wl = wl { wlList = filter ((/=wid) . wlID) $ wlList wl } - case stBacking st of - StorageDir { dirWatchers = mvar } -> modifyMVar_ mvar $ return . second delWatcher - StorageMemory { memWatchers = mvar } -> modifyMVar_ mvar $ return . delWatcher - - -class Monad m => MonadStorage m where - getStorage :: m Storage - mstore :: Storable a => a -> m (Stored a) - - default mstore :: MonadIO m => Storable a => a -> m (Stored a) - mstore x = do - st <- getStorage - wrappedStore st x - -instance MonadIO m => MonadStorage (ReaderT Storage m) where - getStorage = ask - -instance MonadIO m => MonadStorage (ReaderT (Head a) m) where - getStorage = asks $ headStorage - - -class Storable a where - store' :: a -> Store - load' :: Load a - - store :: StorageCompleteness c => Storage' c -> a -> IO (Ref' c) - store st = evalStore st . store' - load :: Ref -> a - load = evalLoad load' - -class Storable a => ZeroStorable a where - fromZero :: Storage -> a - -data Store = StoreBlob ByteString - | StoreRec (forall c. StorageCompleteness c => Storage' c -> [IO [(ByteString, RecItem' c)]]) - | StoreZero - -evalStore :: StorageCompleteness c => Storage' c -> Store -> IO (Ref' c) -evalStore st = unsafeStoreObject st <=< evalStoreObject st - -evalStoreObject :: StorageCompleteness c => Storage' c -> Store -> IO (Object' c) -evalStoreObject _ (StoreBlob x) = return $ Blob x -evalStoreObject s (StoreRec f) = Rec . concat <$> sequence (f s) -evalStoreObject _ StoreZero = return ZeroObject - -newtype StoreRecM c a = StoreRecM (ReaderT (Storage' c) (Writer [IO [(ByteString, RecItem' c)]]) a) - deriving (Functor, Applicative, Monad) - -type StoreRec c = StoreRecM c () - -newtype Load a = Load (ReaderT (Ref, Object) (Except String) a) - deriving (Functor, Applicative, Alternative, Monad, MonadPlus, MonadError String) - -evalLoad :: Load a -> Ref -> a -evalLoad (Load f) ref = either (error {- TODO throw -} . ((BC.unpack (showRef ref) ++ ": ")++)) id $ runExcept $ runReaderT f (ref, lazyLoadObject ref) - -loadCurrentRef :: Load Ref -loadCurrentRef = Load $ asks fst - -loadCurrentObject :: Load Object -loadCurrentObject = Load $ asks snd - -newtype LoadRec a = LoadRec (ReaderT (Ref, [(ByteString, RecItem)]) (Except String) a) - deriving (Functor, Applicative, Alternative, Monad, MonadPlus, MonadError String) - -loadRecCurrentRef :: LoadRec Ref -loadRecCurrentRef = LoadRec $ asks fst - -loadRecItems :: LoadRec [(ByteString, RecItem)] -loadRecItems = LoadRec $ asks snd - - -instance Storable Object where - store' (Blob bs) = StoreBlob bs - store' (Rec xs) = StoreRec $ \st -> return $ do - Rec xs' <- copyObject st (Rec xs) - return xs' - store' ZeroObject = StoreZero - - load' = loadCurrentObject - - store st = unsafeStoreObject st <=< copyObject st - load = lazyLoadObject - -instance Storable ByteString where - store' = storeBlob - load' = loadBlob id - -instance Storable a => Storable [a] where - store' [] = storeZero - store' (x:xs) = storeRec $ do - storeRef "i" x - storeRef "n" xs - - load' = loadCurrentObject >>= \case - ZeroObject -> return [] - _ -> loadRec $ (:) - <$> loadRef "i" - <*> loadRef "n" - -instance Storable a => ZeroStorable [a] where - fromZero _ = [] - - -storeBlob :: ByteString -> Store -storeBlob = StoreBlob - -storeRec :: (forall c. StorageCompleteness c => StoreRec c) -> Store -storeRec sr = StoreRec $ do - let StoreRecM r = sr - execWriter . runReaderT r - -storeZero :: Store -storeZero = StoreZero - - -class StorableText a where - toText :: a -> Text - fromText :: MonadError String m => Text -> m a - -instance StorableText Text where - toText = id; fromText = return - -instance StorableText [Char] where - toText = T.pack; fromText = return . T.unpack - - -class StorableDate a where - toDate :: a -> ZonedTime - fromDate :: ZonedTime -> a - -instance StorableDate ZonedTime where - toDate = id; fromDate = id - -instance StorableDate UTCTime where - toDate = utcToZonedTime utc - fromDate = zonedTimeToUTC - -instance StorableDate Day where - toDate day = toDate $ UTCTime day 0 - fromDate = utctDay . fromDate - - -class StorableUUID a where - toUUID :: a -> UUID - fromUUID :: UUID -> a - -instance StorableUUID UUID where - toUUID = id; fromUUID = id - - -storeEmpty :: String -> StoreRec c -storeEmpty name = StoreRecM $ tell [return [(BC.pack name, RecEmpty)]] - -storeMbEmpty :: String -> Maybe () -> StoreRec c -storeMbEmpty name = maybe (return ()) (const $ storeEmpty name) - -storeInt :: Integral a => String -> a -> StoreRec c -storeInt name x = StoreRecM $ tell [return [(BC.pack name, RecInt $ toInteger x)]] - -storeMbInt :: Integral a => String -> Maybe a -> StoreRec c -storeMbInt name = maybe (return ()) (storeInt name) - -storeNum :: (Real a, Fractional a) => String -> a -> StoreRec c -storeNum name x = StoreRecM $ tell [return [(BC.pack name, RecNum $ toRational x)]] - -storeMbNum :: (Real a, Fractional a) => String -> Maybe a -> StoreRec c -storeMbNum name = maybe (return ()) (storeNum name) - -storeText :: StorableText a => String -> a -> StoreRec c -storeText name x = StoreRecM $ tell [return [(BC.pack name, RecText $ toText x)]] - -storeMbText :: StorableText a => String -> Maybe a -> StoreRec c -storeMbText name = maybe (return ()) (storeText name) - -storeBinary :: BA.ByteArrayAccess a => String -> a -> StoreRec c -storeBinary name x = StoreRecM $ tell [return [(BC.pack name, RecBinary $ BA.convert x)]] - -storeMbBinary :: BA.ByteArrayAccess a => String -> Maybe a -> StoreRec c -storeMbBinary name = maybe (return ()) (storeBinary name) - -storeDate :: StorableDate a => String -> a -> StoreRec c -storeDate name x = StoreRecM $ tell [return [(BC.pack name, RecDate $ toDate x)]] - -storeMbDate :: StorableDate a => String -> Maybe a -> StoreRec c -storeMbDate name = maybe (return ()) (storeDate name) - -storeUUID :: StorableUUID a => String -> a -> StoreRec c -storeUUID name x = StoreRecM $ tell [return [(BC.pack name, RecUUID $ toUUID x)]] - -storeMbUUID :: StorableUUID a => String -> Maybe a -> StoreRec c -storeMbUUID name = maybe (return ()) (storeUUID name) - -storeRef :: Storable a => StorageCompleteness c => String -> a -> StoreRec c -storeRef name x = StoreRecM $ do - s <- ask - tell $ (:[]) $ do - ref <- store s x - return [(BC.pack name, RecRef ref)] - -storeMbRef :: Storable a => StorageCompleteness c => String -> Maybe a -> StoreRec c -storeMbRef name = maybe (return ()) (storeRef name) - -storeRawRef :: StorageCompleteness c => String -> Ref -> StoreRec c -storeRawRef name ref = StoreRecM $ do - st <- ask - tell $ (:[]) $ do - ref' <- copyRef st ref - return [(BC.pack name, RecRef ref')] - -storeMbRawRef :: StorageCompleteness c => String -> Maybe Ref -> StoreRec c -storeMbRawRef name = maybe (return ()) (storeRawRef name) - -storeZRef :: (ZeroStorable a, StorageCompleteness c) => String -> a -> StoreRec c -storeZRef name x = StoreRecM $ do - s <- ask - tell $ (:[]) $ do - ref <- store s x - return $ if isZeroRef ref then [] - else [(BC.pack name, RecRef ref)] - - -loadBlob :: (ByteString -> a) -> Load a -loadBlob f = loadCurrentObject >>= \case - Blob x -> return $ f x - _ -> throwError "Expecting blob" - -loadRec :: LoadRec a -> Load a -loadRec (LoadRec lrec) = loadCurrentObject >>= \case - Rec rs -> do - ref <- loadCurrentRef - either throwError return $ runExcept $ runReaderT lrec (ref, rs) - _ -> throwError "Expecting record" - -loadZero :: a -> Load a -loadZero x = loadCurrentObject >>= \case - ZeroObject -> return x - _ -> throwError "Expecting zero" - - -loadEmpty :: String -> LoadRec () -loadEmpty name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbEmpty name - -loadMbEmpty :: String -> LoadRec (Maybe ()) -loadMbEmpty name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecEmpty) -> return (Just ()) - Just _ -> throwError $ "Expecting type int of record item '"++name++"'" - -loadInt :: Num a => String -> LoadRec a -loadInt name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbInt name - -loadMbInt :: Num a => String -> LoadRec (Maybe a) -loadMbInt name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecInt x) -> return (Just $ fromInteger x) - Just _ -> throwError $ "Expecting type int of record item '"++name++"'" - -loadNum :: (Real a, Fractional a) => String -> LoadRec a -loadNum name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbNum name - -loadMbNum :: (Real a, Fractional a) => String -> LoadRec (Maybe a) -loadMbNum name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecNum x) -> return (Just $ fromRational x) - Just _ -> throwError $ "Expecting type number of record item '"++name++"'" - -loadText :: StorableText a => String -> LoadRec a -loadText name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbText name - -loadMbText :: StorableText a => String -> LoadRec (Maybe a) -loadMbText name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecText x) -> Just <$> fromText x - Just _ -> throwError $ "Expecting type text of record item '"++name++"'" - -loadTexts :: StorableText a => String -> LoadRec [a] -loadTexts name = do - items <- map snd . filter ((BC.pack name ==) . fst) <$> loadRecItems - forM items $ \case RecText x -> fromText x - _ -> throwError $ "Expecting type text of record item '"++name++"'" - -loadBinary :: BA.ByteArray a => String -> LoadRec a -loadBinary name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbBinary name - -loadMbBinary :: BA.ByteArray a => String -> LoadRec (Maybe a) -loadMbBinary name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecBinary x) -> return $ Just $ BA.convert x - Just _ -> throwError $ "Expecting type binary of record item '"++name++"'" - -loadBinaries :: BA.ByteArray a => String -> LoadRec [a] -loadBinaries name = do - items <- map snd . filter ((BC.pack name ==) . fst) <$> loadRecItems - forM items $ \case RecBinary x -> return $ BA.convert x - _ -> throwError $ "Expecting type binary of record item '"++name++"'" - -loadDate :: StorableDate a => String -> LoadRec a -loadDate name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbDate name - -loadMbDate :: StorableDate a => String -> LoadRec (Maybe a) -loadMbDate name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecDate x) -> return $ Just $ fromDate x - Just _ -> throwError $ "Expecting type date of record item '"++name++"'" - -loadUUID :: StorableUUID a => String -> LoadRec a -loadUUID name = maybe (throwError $ "Missing record iteem '"++name++"'") return =<< loadMbUUID name - -loadMbUUID :: StorableUUID a => String -> LoadRec (Maybe a) -loadMbUUID name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecUUID x) -> return $ Just $ fromUUID x - Just _ -> throwError $ "Expecting type UUID of record item '"++name++"'" - -loadRawRef :: String -> LoadRec Ref -loadRawRef name = maybe (throwError $ "Missing record item '"++name++"'") return =<< loadMbRawRef name - -loadMbRawRef :: String -> LoadRec (Maybe Ref) -loadMbRawRef name = (lookup (BC.pack name) <$> loadRecItems) >>= \case - Nothing -> return Nothing - Just (RecRef x) -> return (Just x) - Just _ -> throwError $ "Expecting type ref of record item '"++name++"'" - -loadRawRefs :: String -> LoadRec [Ref] -loadRawRefs name = do - items <- map snd . filter ((BC.pack name ==) . fst) <$> loadRecItems - forM items $ \case RecRef x -> return x - _ -> throwError $ "Expecting type ref of record item '"++name++"'" - -loadRef :: Storable a => String -> LoadRec a -loadRef name = load <$> loadRawRef name - -loadMbRef :: Storable a => String -> LoadRec (Maybe a) -loadMbRef name = fmap load <$> loadMbRawRef name - -loadRefs :: Storable a => String -> LoadRec [a] -loadRefs name = map load <$> loadRawRefs name - -loadZRef :: ZeroStorable a => String -> LoadRec a -loadZRef name = loadMbRef name >>= \case - Nothing -> do Ref st _ <- loadRecCurrentRef - return $ fromZero st - Just x -> return x - - -type Stored a = Stored' Complete a - -instance Storable a => Storable (Stored a) where - store st = copyRef st . storedRef - store' (Stored _ x) = store' x - load' = Stored <$> loadCurrentRef <*> load' - -instance ZeroStorable a => ZeroStorable (Stored a) where - fromZero st = Stored (zeroRef st) $ fromZero st - -fromStored :: Stored a -> a -fromStored (Stored _ x) = x - -storedRef :: Stored a -> Ref -storedRef (Stored ref _) = ref - -wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a) -wrappedStore st x = do ref <- liftIO $ store st x - return $ Stored ref x - -wrappedLoad :: Storable a => Ref -> Stored a -wrappedLoad ref = Stored ref (load ref) - -copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) => - Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a)) -copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref' - --- |Passed function needs to preserve the object representation to be safe -unsafeMapStored :: (a -> b) -> Stored a -> Stored b -unsafeMapStored f (Stored ref x) = Stored ref (f x) - - -data StoreInfo = StoreInfo - { infoDate :: ZonedTime - , infoNote :: Maybe Text - } - deriving (Show) - -makeStoreInfo :: IO StoreInfo -makeStoreInfo = StoreInfo - <$> getZonedTime - <*> pure Nothing - -storeInfoRec :: StoreInfo -> StoreRec c -storeInfoRec info = do - storeDate "date" $ infoDate info - storeMbText "note" $ infoNote info - -loadInfoRec :: LoadRec StoreInfo -loadInfoRec = StoreInfo - <$> loadDate "date" - <*> loadMbText "note" - - -data History a = History StoreInfo (Stored a) (Maybe (StoredHistory a)) - deriving (Show) - -type StoredHistory a = Stored (History a) - -instance Storable a => Storable (History a) where - store' (History si x prev) = storeRec $ do - storeInfoRec si - storeMbRef "prev" prev - storeRef "item" x - - load' = loadRec $ History - <$> loadInfoRec - <*> loadRef "item" - <*> loadMbRef "prev" - -fromHistory :: StoredHistory a -> a -fromHistory = fromStored . storedFromHistory - -fromHistoryAt :: ZonedTime -> StoredHistory a -> Maybe a -fromHistoryAt zat = fmap (fromStored . snd) . listToMaybe . dropWhile ((at<) . zonedTimeToUTC . fst) . storedHistoryTimedList - where at = zonedTimeToUTC zat - -storedFromHistory :: StoredHistory a -> Stored a -storedFromHistory sh = let History _ item _ = fromStored sh - in item - -storedHistoryList :: StoredHistory a -> [Stored a] -storedHistoryList = map snd . storedHistoryTimedList - -storedHistoryTimedList :: StoredHistory a -> [(ZonedTime, Stored a)] -storedHistoryTimedList sh = let History hinfo item prev = fromStored sh - in (infoDate hinfo, item) : maybe [] storedHistoryTimedList prev - -beginHistory :: Storable a => Storage -> StoreInfo -> a -> IO (StoredHistory a) -beginHistory st si x = do sx <- wrappedStore st x - wrappedStore st $ History si sx Nothing - -modifyHistory :: Storable a => StoreInfo -> (a -> a) -> StoredHistory a -> IO (StoredHistory a) -modifyHistory si f prev@(Stored (Ref st _) _) = do - sx <- wrappedStore st $ f $ fromHistory prev - wrappedStore st $ History si sx (Just prev) - - -showRatio :: Rational -> String -showRatio r = case decimalRatio r of - Just (n, 1) -> show n - Just (n', d) -> let n = abs n' - in (if n' < 0 then "-" else "") ++ show (n `div` d) ++ "." ++ - (concatMap (show.(`mod` 10).snd) $ reverse $ takeWhile ((>1).fst) $ zip (iterate (`div` 10) d) (iterate (`div` 10) (n `mod` d))) - Nothing -> show (numerator r) ++ "/" ++ show (denominator r) - -decimalRatio :: Rational -> Maybe (Integer, Integer) -decimalRatio r = do - let n = numerator r - d = denominator r - (c2, d') = takeFactors 2 d - (c5, d'') = takeFactors 5 d' - guard $ d'' == 1 - let m = if c2 > c5 then 5 ^ (c2 - c5) - else 2 ^ (c5 - c2) - return (n * m, d * m) - -takeFactors :: Integer -> Integer -> (Integer, Integer) -takeFactors f n | n `mod` f == 0 = let (c, n') = takeFactors f (n `div` f) - in (c+1, n') - | otherwise = (0, n) - -parseRatio :: ByteString -> Maybe Rational -parseRatio bs = case BC.groupBy ((==) `on` isNumber) bs of - (m:xs) | m == BC.pack "-" -> negate <$> positive xs - xs -> positive xs - where positive = \case - [bx] -> fromInteger . fst <$> BC.readInteger bx - [bx, op, by] -> do - (x, _) <- BC.readInteger bx - (y, _) <- BC.readInteger by - case BC.unpack op of - "." -> return $ (x % 1) + (y % (10 ^ BC.length by)) - "/" -> return $ x % y - _ -> Nothing - _ -> Nothing +import Erebos.Object.Internal +import Erebos.Storage.Disk +import Erebos.Storage.Head +import Erebos.Storage.Memory diff --git a/src/Erebos/Storage/Backend.hs b/src/Erebos/Storage/Backend.hs new file mode 100644 index 0000000..620d423 --- /dev/null +++ b/src/Erebos/Storage/Backend.hs @@ -0,0 +1,28 @@ +{-| +Description: Implement custom storage backend + +Exports type class, which can be used to create custom 'Storage' backend. +-} + +module Erebos.Storage.Backend ( + StorageBackend(..), + Complete, Partial, + Storage, PartialStorage, + newStorage, + + WatchID, startWatchID, nextWatchID, +) where + +import Control.Concurrent.MVar + +import Data.HashTable.IO qualified as HT + +import Erebos.Object.Internal +import Erebos.Storage.Internal + + +newStorage :: StorageBackend bck => bck -> IO (Storage' (BackendCompleteness bck)) +newStorage stBackend = do + stRefGeneration <- newMVar =<< HT.new + stRefRoots <- newMVar =<< HT.new + return Storage {..} diff --git a/src/Erebos/Storage/Disk.hs b/src/Erebos/Storage/Disk.hs new file mode 100644 index 0000000..370c584 --- /dev/null +++ b/src/Erebos/Storage/Disk.hs @@ -0,0 +1,230 @@ +module Erebos.Storage.Disk ( + openStorage, +) where + +import Codec.Compression.Zlib + +import Control.Arrow +import Control.Concurrent +import Control.Exception +import Control.Monad + +import Data.ByteArray qualified as BA +import Data.ByteString (ByteString) +import Data.ByteString qualified as B +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL +import Data.ByteString.Lazy.Char8 qualified as BLC +import Data.Function +import Data.List +import Data.Maybe +import Data.UUID qualified as U + +import System.Directory +import System.FSNotify +import System.FilePath +import System.IO +import System.IO.Error + +import Erebos.Object +import Erebos.Storage.Backend +import Erebos.Storage.Head +import Erebos.Storage.Internal +import Erebos.Storage.Platform + + +data DiskStorage = StorageDir + { dirPath :: FilePath + , dirWatchers :: MVar ( Maybe WatchManager, [ HeadTypeID ], WatchList ) + } + +instance Eq DiskStorage where + (==) = (==) `on` dirPath + +instance Show DiskStorage where + show StorageDir { dirPath = path } = "dir:" ++ path + +instance StorageBackend DiskStorage where + backendLoadBytes StorageDir {..} dgst = + handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ + Just . decompress . BL.fromChunks . (:[]) <$> (B.readFile $ refPath dirPath dgst) + backendStoreBytes StorageDir {..} dgst = writeFileOnce (refPath dirPath dgst) . compress + + + backendLoadHeads StorageDir {..} tid = do + let hpath = headTypePath dirPath tid + + files <- filterM (doesFileExist . (hpath </>)) =<< + handleJust (\e -> guard (isDoesNotExistError e)) (const $ return []) + (getDirectoryContents hpath) + fmap catMaybes $ forM files $ \hname -> do + case U.fromString hname of + Just hid -> do + content <- B.readFile (hpath </> hname) + return $ do + (h : _) <- Just (BC.lines content) + dgst <- readRefDigest h + Just $ ( HeadID hid, dgst ) + Nothing -> return Nothing + + backendLoadHead StorageDir {..} tid hid = do + handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ do + (h:_) <- BC.lines <$> B.readFile (headPath dirPath tid hid) + return $ readRefDigest h + + backendStoreHead StorageDir {..} tid hid dgst = do + Right () <- writeFileChecked (headPath dirPath tid hid) Nothing $ + showRefDigest dgst `B.append` BC.singleton '\n' + return () + + backendReplaceHead StorageDir {..} tid hid expected new = do + let filename = headPath dirPath tid hid + showDgstL r = showRefDigest r `B.append` BC.singleton '\n' + + writeFileChecked filename (Just $ showDgstL expected) (showDgstL new) >>= \case + Left Nothing -> return $ Left Nothing + Left (Just bs) -> do Just cur <- return $ readRefDigest $ BC.takeWhile (/='\n') bs + return $ Left $ Just cur + Right () -> return $ Right new + + backendWatchHead st@StorageDir {..} tid hid cb = do + modifyMVar dirWatchers $ \( mbmanager, ilist, wl ) -> do + manager <- maybe startManager return mbmanager + ilist' <- case tid `elem` ilist of + True -> return ilist + False -> do + void $ watchDir manager (headTypePath dirPath tid) (const True) $ \case + ev@Added {} | Just ihid <- HeadID <$> U.fromString (takeFileName (eventPath ev)) -> do + backendLoadHead st tid ihid >>= \case + Just dgst -> do + (_, _, iwl) <- readMVar dirWatchers + mapM_ ($ dgst) . map wlFun . filter ((== (tid, ihid)) . wlHead) . wlList $ iwl + Nothing -> return () + _ -> return () + return $ tid : ilist + return $ first ( Just manager, ilist', ) $ watchListAdd tid hid cb wl + + backendUnwatchHead StorageDir {..} wid = do + modifyMVar_ dirWatchers $ \( mbmanager, ilist, wl ) -> do + return ( mbmanager, ilist, watchListDel wid wl ) + + + backendListKeys StorageDir {..} = do + catMaybes . map (readRefDigest . BC.pack) <$> + listDirectory (keyDirPath dirPath) + + backendLoadKey StorageDir {..} dgst = do + tryIOError (BC.readFile (keyFilePath dirPath dgst)) >>= \case + Right kdata -> return $ Just $ BA.convert kdata + Left _ -> return Nothing + + backendStoreKey StorageDir {..} dgst key = do + writeFileOnce (keyFilePath dirPath dgst) (BL.fromStrict $ BA.convert key) + + backendRemoveKey StorageDir {..} dgst = do + void $ tryIOError (removeFile $ keyFilePath dirPath dgst) + + +storageVersion :: String +storageVersion = "0.1" + +openStorage :: FilePath -> IO Storage +openStorage path = modifyIOError annotate $ do + let versionFileName = "erebos-storage" + let versionPath = path </> versionFileName + let writeVersionFile = writeFileOnce versionPath $ BLC.pack $ storageVersion <> "\n" + + maybeVersion <- handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ + Just <$> readFile versionPath + version <- case maybeVersion of + Just versionContent -> do + return $ takeWhile (/= '\n') versionContent + + Nothing -> do + files <- handleJust (guard . isDoesNotExistError) (const $ return []) $ + listDirectory path + when (not $ or + [ null files + , versionFileName `elem` files + , (versionFileName ++ ".lock") `elem` files + , "objects" `elem` files && "heads" `elem` files + ]) $ do + fail "directory is neither empty, nor an existing erebos storage" + + createDirectoryIfMissing True $ path + writeVersionFile + takeWhile (/= '\n') <$> readFile versionPath + + when (version /= storageVersion) $ do + fail $ "unsupported storage version " <> version + + createDirectoryIfMissing True $ path </> "objects" + createDirectoryIfMissing True $ path </> "heads" + watchers <- newMVar ( Nothing, [], WatchList startWatchID [] ) + newStorage $ StorageDir path watchers + where + annotate e = annotateIOError e "failed to open storage" Nothing (Just path) + + +refPath :: FilePath -> RefDigest -> FilePath +refPath spath rdgst = intercalate "/" [ spath, "objects", BC.unpack alg, pref, rest ] + where (alg, dgst) = showRefDigestParts rdgst + (pref, rest) = splitAt 2 $ BC.unpack dgst + +headTypePath :: FilePath -> HeadTypeID -> FilePath +headTypePath spath (HeadTypeID tid) = spath </> "heads" </> U.toString tid + +headPath :: FilePath -> HeadTypeID -> HeadID -> FilePath +headPath spath tid (HeadID hid) = headTypePath spath tid </> U.toString hid + +keyDirPath :: FilePath -> FilePath +keyDirPath sdir = sdir </> "keys" + +keyFilePath :: FilePath -> RefDigest -> FilePath +keyFilePath sdir dgst = keyDirPath sdir </> (BC.unpack $ showRefDigest dgst) + + +openLockFile :: FilePath -> IO Handle +openLockFile path = do + createDirectoryIfMissing True (takeDirectory path) + retry 10 $ createFileExclusive path + where + retry :: Int -> IO a -> IO a + retry 0 act = act + retry n act = catchJust (\e -> if isAlreadyExistsError e then Just () else Nothing) + act (\_ -> threadDelay (100 * 1000) >> retry (n - 1) act) + +writeFileOnce :: FilePath -> BL.ByteString -> IO () +writeFileOnce file content = bracket (openLockFile locked) + hClose $ \h -> do + doesFileExist file >>= \case + True -> removeFile locked + False -> do BL.hPut h content + hClose h + renameFile locked file + where locked = file ++ ".lock" + +writeFileChecked :: FilePath -> Maybe ByteString -> ByteString -> IO (Either (Maybe ByteString) ()) +writeFileChecked file prev content = bracket (openLockFile locked) + hClose $ \h -> do + (prev,) <$> doesFileExist file >>= \case + (Nothing, True) -> do + current <- B.readFile file + removeFile locked + return $ Left $ Just current + (Nothing, False) -> do B.hPut h content + hClose h + renameFile locked file + return $ Right () + (Just expected, True) -> do + current <- B.readFile file + if current == expected then do B.hPut h content + hClose h + renameFile locked file + return $ return () + else do removeFile locked + return $ Left $ Just current + (Just _, False) -> do + removeFile locked + return $ Left Nothing + where locked = file ++ ".lock" diff --git a/src/Erebos/Storage/Head.hs b/src/Erebos/Storage/Head.hs new file mode 100644 index 0000000..8f8e009 --- /dev/null +++ b/src/Erebos/Storage/Head.hs @@ -0,0 +1,259 @@ +{-| +Description: Define, use and watch heads + +Provides data types and functions for reading, writing or watching `Head's. +Type class `HeadType' is used to define custom new `Head' types. +-} + +module Erebos.Storage.Head ( + -- * Head type and accessors + Head, HeadType(..), + HeadID, HeadTypeID, mkHeadTypeID, + headId, headStorage, headRef, headObject, headStoredObject, + + -- * Loading and storing heads + loadHeads, loadHead, reloadHead, + storeHead, replaceHead, updateHead, updateHead_, + loadHeadRaw, storeHeadRaw, replaceHeadRaw, + + -- * Watching heads + WatchedHead, + watchHead, watchHeadWith, unwatchHead, + watchHeadRaw, +) where + +import Control.Concurrent +import Control.Monad +import Control.Monad.Reader + +import Data.Bifunctor +import Data.Typeable +import Data.UUID qualified as U +import Data.UUID.V4 qualified as U + +import Erebos.Object +import Erebos.Storable +import Erebos.Storage.Backend +import Erebos.Storage.Internal + + +-- | Represents loaded Erebos storage head, along with the object it pointed to +-- at the time it was loaded. +-- +-- Each possible head type has associated unique ID, represented as +-- `HeadTypeID'. For each type, there can be multiple individual heads in given +-- storage, each also identified by unique ID (`HeadID'). +data Head a = Head HeadID (Stored a) + deriving (Eq, Show) + +-- | Instances of this class can be used as objects pointed to by heads in +-- Erebos storage. Each such type must be `Storable' and have a unique ID. +-- +-- To create a custom head type, generate a new UUID and assign it to the type using +-- `mkHeadTypeID': +-- +-- > instance HeadType MyType where +-- > headTypeID _ = mkHeadTypeID "86e8033d-c476-4f81-9b7c-fd36b9144475" +class Storable a => HeadType a where + headTypeID :: proxy a -> HeadTypeID + -- ^ Get the ID of the given head type; must be unique for each `HeadType' instance. + +instance MonadIO m => MonadStorage (ReaderT (Head a) m) where + getStorage = asks $ headStorage + + +-- | Get `HeadID' associated with given `Head'. +headId :: Head a -> HeadID +headId (Head uuid _) = uuid + +-- | Get storage from which the `Head' was loaded. +headStorage :: Head a -> Storage +headStorage = refStorage . headRef + +-- | Get `Ref' of the `Head'\'s associated object. +headRef :: Head a -> Ref +headRef (Head _ sx) = storedRef sx + +-- | Get the object the `Head' pointed to when it was loaded. +headObject :: Head a -> a +headObject (Head _ sx) = fromStored sx + +-- | Get the object the `Head' pointed to when it was loaded as a `Stored' value. +headStoredObject :: Head a -> Stored a +headStoredObject (Head _ sx) = sx + +-- | Create `HeadTypeID' from string representation of UUID. +mkHeadTypeID :: String -> HeadTypeID +mkHeadTypeID = maybe (error "Invalid head type ID") HeadTypeID . U.fromString + + +-- | Load all `Head's of type @a@ from storage. +loadHeads :: forall a m. MonadIO m => HeadType a => Storage -> m [Head a] +loadHeads st@Storage {..} = + map (uncurry Head . fmap (wrappedLoad . Ref st)) + <$> liftIO (backendLoadHeads stBackend (headTypeID @a Proxy)) + +-- | Try to load a `Head' of type @a@ from storage. +loadHead + :: forall a m. (HeadType a, MonadIO m) + => Storage -- ^ Storage from which to load the head + -> HeadID -- ^ ID of the particular head + -> m (Maybe (Head a)) -- ^ Head object, or `Nothing' if not found +loadHead st hid = fmap (Head hid . wrappedLoad) <$> loadHeadRaw st (headTypeID @a Proxy) hid + +-- | Try to load `Head' using a raw head and type IDs, getting `Ref' if found. +loadHeadRaw + :: forall m. MonadIO m + => Storage -- ^ Storage from which to load the head + -> HeadTypeID -- ^ ID of the head type + -> HeadID -- ^ ID of the particular head + -> m (Maybe Ref) -- ^ `Ref' pointing to the head object, or `Nothing' if not found +loadHeadRaw st@Storage {..} tid hid = do + fmap (Ref st) <$> liftIO (backendLoadHead stBackend tid hid) + +-- | Reload the given head from storage, returning `Head' with updated object, +-- or `Nothing' if there is no longer head with the particular ID in storage. +reloadHead :: (HeadType a, MonadIO m) => Head a -> m (Maybe (Head a)) +reloadHead (Head hid (Stored (Ref st _) _)) = loadHead st hid + +-- | Store a new `Head' of type 'a' in the storage. +storeHead :: forall a m. MonadIO m => HeadType a => Storage -> a -> m (Head a) +storeHead st obj = do + let tid = headTypeID @a Proxy + stored <- wrappedStore st obj + hid <- storeHeadRaw st tid (storedRef stored) + return $ Head hid stored + +-- | Store a new `Head' in the storage, using the raw `HeadTypeID' and `Ref', +-- the function returns the assigned `HeadID' of the new head. +storeHeadRaw :: forall m. MonadIO m => Storage -> HeadTypeID -> Ref -> m HeadID +storeHeadRaw Storage {..} tid ref = liftIO $ do + hid <- HeadID <$> U.nextRandom + backendStoreHead stBackend tid hid (refDigest ref) + return hid + +-- | Try to replace existing `Head' of type @a@ in the storage. Function fails +-- if the head value in storage changed after being loaded here; for automatic +-- retry see `updateHead'. +replaceHead + :: forall a m. (HeadType a, MonadIO m) + => Head a -- ^ Existing head, associated object is supposed to match the one in storage + -> Stored a -- ^ Intended new value + -> m (Either (Maybe (Head a)) (Head a)) + -- ^ + -- [@`Left' `Nothing'@]: + -- Nothing was stored – the head no longer exists in storage. + -- [@`Left' (`Just' h)@]: + -- Nothing was stored – the head value in storage does not match + -- the first parameter, but is @h@ instead. + -- [@`Right' h@]: + -- Head value was updated in storage, the new head is @h@ (which is + -- the same as first parameter with associated object replaced by + -- the second parameter). +replaceHead prev@(Head hid pobj) stored' = liftIO $ do + let st = headStorage prev + tid = headTypeID @a Proxy + stored <- copyStored st stored' + bimap (fmap $ Head hid . wrappedLoad) (const $ Head hid stored) <$> + replaceHeadRaw st tid hid (storedRef pobj) (storedRef stored) + +-- | Try to replace existing head using raw IDs and `Ref's. +replaceHeadRaw + :: forall m. MonadIO m + => Storage -- ^ Storage to use + -> HeadTypeID -- ^ ID of the head type + -> HeadID -- ^ ID of the particular head + -> Ref -- ^ Expected value in storage + -> Ref -- ^ Intended new value + -> m (Either (Maybe Ref) Ref) + -- ^ + -- [@`Left' `Nothing'@]: + -- Nothing was stored – the head no longer exists in storage. + -- [@`Left' (`Just' r)@]: + -- Nothing was stored – the head value in storage does not match + -- the expected value, but is @r@ instead. + -- [@`Right' r@]: + -- Head value was updated in storage, the new head value is @r@ + -- (which is the same as the indended value). +replaceHeadRaw st@Storage {..} tid hid prev new = liftIO $ do + _ <- copyRef st new + bimap (fmap $ Ref st) (Ref st) <$> backendReplaceHead stBackend tid hid (refDigest prev) (refDigest new) + +-- | Update existing existing `Head' of type @a@ in the storage, using a given +-- function. The update function may be called multiple times in case the head +-- content changes concurrently during evaluation. +updateHead + :: (HeadType a, MonadIO m) + => Head a -- ^ Existing head to be updated + -> (Stored a -> m ( Stored a, b )) + -- ^ Function that gets current value of the head and returns updated + -- value, along with a custom extra value to be returned from + -- `updateHead' call. The function may be called multiple times. + -> m ( Maybe (Head a), b ) + -- ^ First element contains either the new head as @`Just' h@, or + -- `Nothing' in case the head no longer exists in storage. Second + -- element is the value from last call to the update function. +updateHead h f = do + (o, x) <- f $ headStoredObject h + replaceHead h o >>= \case + Right h' -> return (Just h', x) + Left Nothing -> return (Nothing, x) + Left (Just h') -> updateHead h' f + +-- | Update existing existing `Head' of type @a@ in the storage, using a given +-- function. The update function may be called multiple times in case the head +-- content changes concurrently during evaluation. +updateHead_ + :: (HeadType a, MonadIO m) + => Head a -- ^ Existing head to be updated + -> (Stored a -> m (Stored a)) + -- ^ Function that gets current value of the head and returns updated + -- value; may be called multiple times. + -> m (Maybe (Head a)) + -- ^ The new head as @`Just' h@, or `Nothing' in case the head no + -- longer exists in storage. +updateHead_ h = fmap fst . updateHead h . (fmap (,()) .) + + +-- | Represents a handle of a watched head, which can be used to cancel the +-- watching. +data WatchedHead = forall a. WatchedHead Storage WatchID (MVar a) + +-- | Watch the given head. The callback will be called with the current head +-- value, and then again each time the head changes. +watchHead :: forall a. HeadType a => Head a -> (Head a -> IO ()) -> IO WatchedHead +watchHead h = watchHeadWith h id + +-- | Watch the given head using custom selector function. The callback will be +-- called with the value derived from current head state, and then again each +-- time the selected value changes according to its `Eq' instance. +watchHeadWith + :: forall a b. (HeadType a, Eq b) + => Head a -- ^ Head to watch + -> (Head a -> b) -- ^ Selector function + -> (b -> IO ()) -- ^ Callback + -> IO WatchedHead -- ^ Watched head handle +watchHeadWith (Head hid (Stored (Ref st _) _)) sel cb = do + watchHeadRaw st (headTypeID @a Proxy) hid (sel . Head hid . wrappedLoad) cb + +-- | Watch the given head using raw IDs and a selector from `Ref'. +watchHeadRaw :: forall b. Eq b => Storage -> HeadTypeID -> HeadID -> (Ref -> b) -> (b -> IO ()) -> IO WatchedHead +watchHeadRaw st@Storage {..} tid hid sel cb = do + memo <- newEmptyMVar + let cb' dgst = do + let x = sel (Ref st dgst) + modifyMVar_ memo $ \prev -> do + when (Just x /= prev) $ cb x + return $ Just x + wid <- backendWatchHead stBackend tid hid cb' + + cur <- fmap sel <$> loadHeadRaw st tid hid + maybe (return ()) cb cur + putMVar memo cur + + return $ WatchedHead st wid memo + +-- | Stop watching previously watched head. +unwatchHead :: WatchedHead -> IO () +unwatchHead (WatchedHead Storage {..} wid _) = do + backendUnwatchHead stBackend wid diff --git a/src/Erebos/Storage/Internal.hs b/src/Erebos/Storage/Internal.hs index d419a5e..6df1410 100644 --- a/src/Erebos/Storage/Internal.hs +++ b/src/Erebos/Storage/Internal.hs @@ -1,7 +1,5 @@ module Erebos.Storage.Internal where -import Codec.Compression.Zlib - import Control.Arrow import Control.Concurrent import Control.DeepSeq @@ -13,76 +11,145 @@ import Crypto.Hash import Data.Bits import Data.ByteArray (ByteArray, ByteArrayAccess, ScrubbedBytes) -import qualified Data.ByteArray as BA +import Data.ByteArray qualified as BA import Data.ByteString (ByteString) -import qualified Data.ByteString as B -import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL +import Data.ByteString qualified as B +import Data.ByteString.Char8 qualified as BC +import Data.ByteString.Lazy qualified as BL import Data.Char -import Data.Function +import Data.HashTable.IO qualified as HT import Data.Hashable -import qualified Data.HashTable.IO as HT import Data.Kind -import Data.List -import Data.Map (Map) -import qualified Data.Map as M +import Data.Typeable import Data.UUID (UUID) import Foreign.Storable (peek) -import System.Directory -import System.FSNotify (WatchManager) -import System.FilePath -import System.IO -import System.IO.Error import System.IO.Unsafe (unsafePerformIO) -import Erebos.Storage.Platform - -data Storage' c = Storage - { stBacking :: StorageBacking c - , stParent :: Maybe (Storage' Identity) +data Storage' c = forall bck. (StorageBackend bck, BackendCompleteness bck ~ c) => Storage + { stBackend :: bck , stRefGeneration :: MVar (HT.BasicHashTable RefDigest Generation) , stRefRoots :: MVar (HT.BasicHashTable RefDigest [RefDigest]) } +type Storage = Storage' Complete +type PartialStorage = Storage' Partial + instance Eq (Storage' c) where - (==) = (==) `on` (stBacking &&& stParent) + Storage { stBackend = b } == Storage { stBackend = b' } + | Just b'' <- cast b' = b == b'' + | otherwise = False instance Show (Storage' c) where - show st@(Storage { stBacking = StorageDir { dirPath = path }}) = "dir" ++ showParentStorage st ++ ":" ++ path - show st@(Storage { stBacking = StorageMemory {} }) = "mem" ++ showParentStorage st - -showParentStorage :: Storage' c -> String -showParentStorage Storage { stParent = Nothing } = "" -showParentStorage Storage { stParent = Just st } = "@" ++ show st - -data StorageBacking c - = StorageDir { dirPath :: FilePath - , dirWatchers :: MVar ( Maybe WatchManager, [ HeadTypeID ], WatchList c ) - } - | StorageMemory { memHeads :: MVar [((HeadTypeID, HeadID), Ref' c)] - , memObjs :: MVar (Map RefDigest BL.ByteString) - , memKeys :: MVar (Map RefDigest ScrubbedBytes) - , memWatchers :: MVar (WatchList c) - } - deriving (Eq) + show Storage { stBackend = b } = show b ++ showParentStorage b + +showParentStorage :: StorageBackend bck => bck -> String +showParentStorage bck + | Just (st :: Storage) <- cast (backendParent bck) = "@" ++ show st + | Just (st :: PartialStorage) <- cast (backendParent bck) = "@" ++ show st + | otherwise = "" + + +class (Eq bck, Show bck, Typeable bck, Typeable (BackendParent bck)) => StorageBackend bck where + type BackendCompleteness bck :: Type -> Type + type BackendCompleteness bck = Complete + + type BackendParent bck :: Type + type BackendParent bck = () + backendParent :: bck -> BackendParent bck + default backendParent :: BackendParent bck ~ () => bck -> BackendParent bck + backendParent _ = () + + + backendLoadBytes :: bck -> RefDigest -> IO (Maybe BL.ByteString) + default backendLoadBytes :: BackendParent bck ~ Storage => bck -> RefDigest -> IO (Maybe BL.ByteString) + backendLoadBytes bck = case backendParent bck of Storage { stBackend = bck' } -> backendLoadBytes bck' + + backendStoreBytes :: bck -> RefDigest -> BL.ByteString -> IO () + default backendStoreBytes :: BackendParent bck ~ Storage => bck -> RefDigest -> BL.ByteString -> IO () + backendStoreBytes bck = case backendParent bck of Storage { stBackend = bck' } -> backendStoreBytes bck' + + + backendLoadHeads :: bck -> HeadTypeID -> IO [ ( HeadID, RefDigest ) ] + default backendLoadHeads :: BackendParent bck ~ Storage => bck -> HeadTypeID -> IO [ ( HeadID, RefDigest ) ] + backendLoadHeads bck = case backendParent bck of Storage { stBackend = bck' } -> backendLoadHeads bck' + + backendLoadHead :: bck -> HeadTypeID -> HeadID -> IO (Maybe RefDigest) + default backendLoadHead :: BackendParent bck ~ Storage => bck -> HeadTypeID -> HeadID -> IO (Maybe RefDigest) + backendLoadHead bck = case backendParent bck of Storage { stBackend = bck' } -> backendLoadHead bck' + + backendStoreHead :: bck -> HeadTypeID -> HeadID -> RefDigest -> IO () + default backendStoreHead :: BackendParent bck ~ Storage => bck -> HeadTypeID -> HeadID -> RefDigest -> IO () + backendStoreHead bck = case backendParent bck of Storage { stBackend = bck' } -> backendStoreHead bck' + + backendReplaceHead :: bck -> HeadTypeID -> HeadID -> RefDigest -> RefDigest -> IO (Either (Maybe RefDigest) RefDigest) + default backendReplaceHead :: BackendParent bck ~ Storage => bck -> HeadTypeID -> HeadID -> RefDigest -> RefDigest -> IO (Either (Maybe RefDigest) RefDigest) + backendReplaceHead bck = case backendParent bck of Storage { stBackend = bck' } -> backendReplaceHead bck' + + backendWatchHead :: bck -> HeadTypeID -> HeadID -> (RefDigest -> IO ()) -> IO WatchID + default backendWatchHead :: BackendParent bck ~ Storage => bck -> HeadTypeID -> HeadID -> (RefDigest -> IO ()) -> IO WatchID + backendWatchHead bck = case backendParent bck of Storage { stBackend = bck' } -> backendWatchHead bck' + + backendUnwatchHead :: bck -> WatchID -> IO () + default backendUnwatchHead :: BackendParent bck ~ Storage => bck -> WatchID -> IO () + backendUnwatchHead bck = case backendParent bck of Storage { stBackend = bck' } -> backendUnwatchHead bck' + + + backendListKeys :: bck -> IO [ RefDigest ] + default backendListKeys :: BackendParent bck ~ Storage => bck -> IO [ RefDigest ] + backendListKeys bck = case backendParent bck of Storage { stBackend = bck' } -> backendListKeys bck' + + backendLoadKey :: bck -> RefDigest -> IO (Maybe ScrubbedBytes) + default backendLoadKey :: BackendParent bck ~ Storage => bck -> RefDigest -> IO (Maybe ScrubbedBytes) + backendLoadKey bck = case backendParent bck of Storage { stBackend = bck' } -> backendLoadKey bck' + + backendStoreKey :: bck -> RefDigest -> ScrubbedBytes -> IO () + default backendStoreKey :: BackendParent bck ~ Storage => bck -> RefDigest -> ScrubbedBytes -> IO () + backendStoreKey bck = case backendParent bck of Storage { stBackend = bck' } -> backendStoreKey bck' + + backendRemoveKey :: bck -> RefDigest -> IO () + default backendRemoveKey :: BackendParent bck ~ Storage => bck -> RefDigest -> IO () + backendRemoveKey bck = case backendParent bck of Storage { stBackend = bck' } -> backendRemoveKey bck' + + newtype WatchID = WatchID Int - deriving (Eq, Ord, Num) + deriving (Eq, Ord) + +startWatchID :: WatchID +startWatchID = WatchID 1 -data WatchList c = WatchList +nextWatchID :: WatchID -> WatchID +nextWatchID (WatchID n) = WatchID (n + 1) + +data WatchList = WatchList { wlNext :: WatchID - , wlList :: [WatchListItem c] + , wlList :: [ WatchListItem ] } -data WatchListItem c = WatchListItem +data WatchListItem = WatchListItem { wlID :: WatchID - , wlHead :: (HeadTypeID, HeadID) - , wlFun :: Ref' c -> IO () + , wlHead :: ( HeadTypeID, HeadID ) + , wlFun :: RefDigest -> IO () } +watchListAdd :: HeadTypeID -> HeadID -> (RefDigest -> IO ()) -> WatchList -> ( WatchList, WatchID ) +watchListAdd tid hid cb wl = ( wl', wlNext wl ) + where + wl' = wl + { wlNext = nextWatchID (wlNext wl) + , wlList = WatchListItem + { wlID = wlNext wl + , wlHead = (tid, hid) + , wlFun = cb + } : wlList wl + } + +watchListDel :: WatchID -> WatchList -> WatchList +watchListDel wid wl = wl { wlList = filter ((/= wid) . wlID) $ wlList wl } + newtype RefDigest = RefDigest (Digest Blake2b_256) deriving (Eq, Ord, NFData, ByteArrayAccess) @@ -92,6 +159,9 @@ instance Show RefDigest where data Ref' c = Ref (Storage' c) RefDigest +type Ref = Ref' Complete +type PartialRef = Ref' Partial + instance Eq (Ref' c) where Ref _ d1 == Ref _ d2 = d1 == d2 @@ -159,12 +229,11 @@ readHex = return . BA.concat <=< readHex' newtype Generation = Generation Int deriving (Eq, Show) -data Head' c a = Head HeadID (Stored' c a) - deriving (Eq, Show) - +-- | UUID of individual Erebos storage head. newtype HeadID = HeadID UUID deriving (Eq, Ord, Show) +-- | UUID of Erebos storage head type. newtype HeadTypeID = HeadTypeID UUID deriving (Eq, Ord) @@ -184,7 +253,7 @@ storedStorage (Stored (Ref st _) _) = st type Complete = Identity type Partial = Either RefDigest -class (Traversable compl, Monad compl) => StorageCompleteness compl where +class (Traversable compl, Monad compl, Typeable compl) => StorageCompleteness compl where type LoadResult compl a :: Type returnLoadResult :: compl a -> LoadResult compl a ioLoadBytes :: Ref' compl -> IO (compl BL.ByteString) @@ -201,71 +270,16 @@ instance StorageCompleteness Partial where ioLoadBytes (Ref st dgst) = maybe (Left dgst) Right <$> ioLoadBytesFromStorage st dgst unsafeStoreRawBytes :: Storage' c -> BL.ByteString -> IO (Ref' c) -unsafeStoreRawBytes st raw = do - let dgst = hashToRefDigest raw - case stBacking st of - StorageDir { dirPath = sdir } -> writeFileOnce (refPath sdir dgst) $ compress raw - StorageMemory { memObjs = tobjs } -> - dgst `deepseq` -- the TVar may be accessed when evaluating the data to be written - modifyMVar_ tobjs (return . M.insert dgst raw) +unsafeStoreRawBytes st@Storage {..} raw = do + dgst <- evaluate $ force $ hashToRefDigest raw + backendStoreBytes stBackend dgst raw return $ Ref st dgst ioLoadBytesFromStorage :: Storage' c -> RefDigest -> IO (Maybe BL.ByteString) -ioLoadBytesFromStorage st dgst = loadCurrent st >>= - \case Just bytes -> return $ Just bytes - Nothing | Just parent <- stParent st -> ioLoadBytesFromStorage parent dgst - | otherwise -> return Nothing - where loadCurrent Storage { stBacking = StorageDir { dirPath = spath } } = handleJust (guard . isDoesNotExistError) (const $ return Nothing) $ - Just . decompress . BL.fromChunks . (:[]) <$> (B.readFile $ refPath spath dgst) - loadCurrent Storage { stBacking = StorageMemory { memObjs = tobjs } } = M.lookup dgst <$> readMVar tobjs - -refPath :: FilePath -> RefDigest -> FilePath -refPath spath rdgst = intercalate "/" [spath, "objects", BC.unpack alg, pref, rest] - where (alg, dgst) = showRefDigestParts rdgst - (pref, rest) = splitAt 2 $ BC.unpack dgst - - -openLockFile :: FilePath -> IO Handle -openLockFile path = do - createDirectoryIfMissing True (takeDirectory path) - retry 10 $ createFileExclusive path - where - retry :: Int -> IO a -> IO a - retry 0 act = act - retry n act = catchJust (\e -> if isAlreadyExistsError e then Just () else Nothing) - act (\_ -> threadDelay (100 * 1000) >> retry (n - 1) act) - -writeFileOnce :: FilePath -> BL.ByteString -> IO () -writeFileOnce file content = bracket (openLockFile locked) - hClose $ \h -> do - doesFileExist file >>= \case - True -> removeFile locked - False -> do BL.hPut h content - hFlush h - renameFile locked file - where locked = file ++ ".lock" - -writeFileChecked :: FilePath -> Maybe ByteString -> ByteString -> IO (Either (Maybe ByteString) ()) -writeFileChecked file prev content = bracket (openLockFile locked) - hClose $ \h -> do - (prev,) <$> doesFileExist file >>= \case - (Nothing, True) -> do - current <- B.readFile file - removeFile locked - return $ Left $ Just current - (Nothing, False) -> do B.hPut h content - hFlush h - renameFile locked file - return $ Right () - (Just expected, True) -> do - current <- B.readFile file - if current == expected then do B.hPut h content - hFlush h - renameFile locked file - return $ return () - else do removeFile locked - return $ Left $ Just current - (Just _, False) -> do - removeFile locked - return $ Left Nothing - where locked = file ++ ".lock" +ioLoadBytesFromStorage Storage {..} dgst = + backendLoadBytes stBackend dgst >>= \case + Just bytes -> return $ Just bytes + Nothing + | Just (parent :: Storage) <- cast (backendParent stBackend) -> ioLoadBytesFromStorage parent dgst + | Just (parent :: PartialStorage) <- cast (backendParent stBackend) -> ioLoadBytesFromStorage parent dgst + | otherwise -> return Nothing diff --git a/src/Erebos/Storage/Key.hs b/src/Erebos/Storage/Key.hs index b6afc20..b615f16 100644 --- a/src/Erebos/Storage/Key.hs +++ b/src/Erebos/Storage/Key.hs @@ -4,21 +4,14 @@ module Erebos.Storage.Key ( moveKeys, ) where -import Control.Concurrent.MVar import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class import Data.ByteArray -import qualified Data.ByteString.Char8 as BC -import qualified Data.ByteString.Lazy as BL -import qualified Data.Map as M +import Data.Typeable -import System.Directory -import System.FilePath -import System.IO.Error - -import Erebos.Storage +import Erebos.Storable import Erebos.Storage.Internal class Storable pub => KeyPair sec pub | sec -> pub, pub -> sec where @@ -28,58 +21,32 @@ class Storable pub => KeyPair sec pub | sec -> pub, pub -> sec where keyFromData :: ScrubbedBytes -> Stored pub -> Maybe sec -keyFilePath :: KeyPair sec pub => FilePath -> Stored pub -> FilePath -keyFilePath sdir pkey = sdir </> "keys" </> (BC.unpack $ showRef $ storedRef pkey) - storeKey :: KeyPair sec pub => sec -> IO () storeKey key = do let spub = keyGetPublic key - case stBacking $ storedStorage spub of - StorageDir { dirPath = dir } -> writeFileOnce (keyFilePath dir spub) (BL.fromStrict $ convert $ keyGetData key) - StorageMemory { memKeys = kstore } -> modifyMVar_ kstore $ return . M.insert (refDigest $ storedRef spub) (keyGetData key) + case storedStorage spub of + Storage {..} -> backendStoreKey stBackend (refDigest $ storedRef spub) (keyGetData key) -loadKey :: (KeyPair sec pub, MonadIO m, MonadError String m) => Stored pub -> m sec -loadKey pub = maybe (throwError $ "secret key not found for " <> show (storedRef pub)) return =<< loadKeyMb pub +loadKey :: (KeyPair sec pub, MonadIO m, MonadError e m, FromErebosError e) => Stored pub -> m sec +loadKey pub = maybe (throwOtherError $ "secret key not found for " <> show (storedRef pub)) return =<< loadKeyMb pub -loadKeyMb :: (KeyPair sec pub, MonadIO m) => Stored pub -> m (Maybe sec) +loadKeyMb :: forall sec pub m. (KeyPair sec pub, MonadIO m) => Stored pub -> m (Maybe sec) loadKeyMb spub = liftIO $ run $ storedStorage spub where - run st = tryOneLevel (stBacking st) >>= \case - key@Just {} -> return key - Nothing | Just parent <- stParent st -> run parent - | otherwise -> return Nothing - tryOneLevel = \case - StorageDir { dirPath = dir } -> tryIOError (BC.readFile (keyFilePath dir spub)) >>= \case - Right kdata -> return $ keyFromData (convert kdata) spub - Left _ -> return Nothing - StorageMemory { memKeys = kstore } -> (flip keyFromData spub <=< M.lookup (refDigest $ storedRef spub)) <$> readMVar kstore + run :: Storage' c -> IO (Maybe sec) + run Storage {..} = backendLoadKey stBackend (refDigest $ storedRef spub) >>= \case + Just bytes -> return $ keyFromData bytes spub + Nothing + | Just (parent :: Storage) <- cast (backendParent stBackend) -> run parent + | Just (parent :: PartialStorage) <- cast (backendParent stBackend) -> run parent + | otherwise -> return Nothing moveKeys :: MonadIO m => Storage -> Storage -> m () -moveKeys from to = liftIO $ do - case (stBacking from, stBacking to) of - (StorageDir { dirPath = fromPath }, StorageDir { dirPath = toPath }) -> do - files <- listDirectory (fromPath </> "keys") - forM_ files $ \file -> do - renameFile (fromPath </> "keys" </> file) (toPath </> "keys" </> file) - - (StorageDir { dirPath = fromPath }, StorageMemory { memKeys = toKeys }) -> do - let move m file - | Just dgst <- readRefDigest (BC.pack file) = do - let path = fromPath </> "keys" </> file - key <- convert <$> BC.readFile path - removeFile path - return $ M.insert dgst key m - | otherwise = return m - files <- listDirectory (fromPath </> "keys") - modifyMVar_ toKeys $ \keys -> foldM move keys files - - (StorageMemory { memKeys = fromKeys }, StorageDir { dirPath = toPath }) -> do - modifyMVar_ fromKeys $ \keys -> do - forM_ (M.assocs keys) $ \(dgst, key) -> - writeFileOnce (toPath </> "keys" </> (BC.unpack $ showRefDigest dgst)) (BL.fromStrict $ convert key) - return M.empty - - (StorageMemory { memKeys = fromKeys }, StorageMemory { memKeys = toKeys }) -> do - modifyMVar_ fromKeys $ \fkeys -> do - modifyMVar_ toKeys $ return . M.union fkeys - return M.empty +moveKeys Storage { stBackend = from } Storage { stBackend = to } = liftIO $ do + keys <- backendListKeys from + forM_ keys $ \key -> do + backendLoadKey from key >>= \case + Just sec -> do + backendStoreKey to key sec + backendRemoveKey from key + Nothing -> return () diff --git a/src/Erebos/Storage/Memory.hs b/src/Erebos/Storage/Memory.hs new file mode 100644 index 0000000..677e8c5 --- /dev/null +++ b/src/Erebos/Storage/Memory.hs @@ -0,0 +1,101 @@ +module Erebos.Storage.Memory ( + memoryStorage, + deriveEphemeralStorage, + derivePartialStorage, +) where + +import Control.Concurrent.MVar + +import Data.ByteArray (ScrubbedBytes) +import Data.ByteString.Lazy qualified as BL +import Data.Function +import Data.Kind +import Data.List +import Data.Map (Map) +import Data.Map qualified as M +import Data.Maybe +import Data.Typeable + +import Erebos.Object +import Erebos.Storage.Backend +import Erebos.Storage.Head +import Erebos.Storage.Internal + + +data MemoryStorage p (c :: Type -> Type) = StorageMemory + { memParent :: p + , memHeads :: MVar [ (( HeadTypeID, HeadID ), RefDigest ) ] + , memObjs :: MVar (Map RefDigest BL.ByteString) + , memKeys :: MVar (Map RefDigest ScrubbedBytes) + , memWatchers :: MVar WatchList + } + +instance Eq (MemoryStorage p c) where + (==) = (==) `on` memObjs + +instance Show (MemoryStorage p c) where + show StorageMemory {} = "mem" + +instance (StorageCompleteness c, Typeable p) => StorageBackend (MemoryStorage p c) where + type BackendCompleteness (MemoryStorage p c) = c + type BackendParent (MemoryStorage p c) = p + backendParent = memParent + + backendLoadBytes StorageMemory {..} dgst = + M.lookup dgst <$> readMVar memObjs + + backendStoreBytes StorageMemory {..} dgst raw = + modifyMVar_ memObjs (return . M.insert dgst raw) + + + backendLoadHeads StorageMemory {..} tid = do + let toRes ( ( tid', hid ), dgst ) + | tid' == tid = Just ( hid, dgst ) + | otherwise = Nothing + catMaybes . map toRes <$> readMVar memHeads + + backendLoadHead StorageMemory {..} tid hid = + lookup (tid, hid) <$> readMVar memHeads + + backendStoreHead StorageMemory {..} tid hid dgst = + modifyMVar_ memHeads $ return . (( ( tid, hid ), dgst ) :) + + backendReplaceHead StorageMemory {..} tid hid expected new = do + res <- modifyMVar memHeads $ \hs -> do + ws <- map wlFun . filter ((==(tid, hid)) . wlHead) . wlList <$> readMVar memWatchers + return $ case partition ((==(tid, hid)) . fst) hs of + ( [] , _ ) -> ( hs, Left Nothing ) + (( _, dgst ) : _, hs' ) + | dgst == expected -> ((( tid, hid ), new ) : hs', Right ( new, ws )) + | otherwise -> ( hs, Left $ Just dgst ) + case res of + Right ( dgst, ws ) -> mapM_ ($ dgst) ws >> return (Right dgst) + Left x -> return $ Left x + + backendWatchHead StorageMemory {..} tid hid cb = modifyMVar memWatchers $ return . watchListAdd tid hid cb + + backendUnwatchHead StorageMemory {..} wid = modifyMVar_ memWatchers $ return . watchListDel wid + + + backendListKeys StorageMemory {..} = M.keys <$> readMVar memKeys + backendLoadKey StorageMemory {..} dgst = M.lookup dgst <$> readMVar memKeys + backendStoreKey StorageMemory {..} dgst key = modifyMVar_ memKeys $ return . M.insert dgst key + backendRemoveKey StorageMemory {..} dgst = modifyMVar_ memKeys $ return . M.delete dgst + + +memoryStorage' :: (StorageCompleteness c, Typeable p) => p -> IO (Storage' c) +memoryStorage' memParent = do + memHeads <- newMVar [] + memObjs <- newMVar M.empty + memKeys <- newMVar M.empty + memWatchers <- newMVar (WatchList startWatchID []) + newStorage $ StorageMemory {..} + +memoryStorage :: IO Storage +memoryStorage = memoryStorage' () + +deriveEphemeralStorage :: Storage -> IO Storage +deriveEphemeralStorage parent = memoryStorage' parent + +derivePartialStorage :: Storage -> IO PartialStorage +derivePartialStorage parent = memoryStorage' parent diff --git a/src/Erebos/Storage/Merge.hs b/src/Erebos/Storage/Merge.hs index 9d9db13..41725af 100644 --- a/src/Erebos/Storage/Merge.hs +++ b/src/Erebos/Storage/Merge.hs @@ -31,7 +31,8 @@ import Data.Set qualified as S import System.IO.Unsafe (unsafePerformIO) -import Erebos.Storage +import Erebos.Object +import Erebos.Storable import Erebos.Storage.Internal import Erebos.Util @@ -97,13 +98,16 @@ storedGeneration x = doLookup x +-- |Returns list of sets starting with the set of given objects and +-- intcrementally adding parents. generations :: Storable a => [Stored a] -> [Set (Stored a)] generations = unfoldr gen . (,S.empty) - where gen (hs, cur) = case filter (`S.notMember` cur) $ previous =<< hs of + where gen (hs, cur) = case filter (`S.notMember` cur) hs of [] -> Nothing added -> let next = foldr S.insert cur added - in Just (next, (added, next)) + in Just (next, (previous =<< added, next)) +-- |Returns set containing all given objects and their ancestors ancestors :: Storable a => [Stored a] -> Set (Stored a) ancestors = last . (S.empty:) . generations diff --git a/src/Erebos/Sync.hs b/src/Erebos/Sync.hs index 04b5f11..32e2e22 100644 --- a/src/Erebos/Sync.hs +++ b/src/Erebos/Sync.hs @@ -10,7 +10,7 @@ import Data.List import Erebos.Identity import Erebos.Service import Erebos.State -import Erebos.Storage +import Erebos.Storable import Erebos.Storage.Merge data SyncService = SyncPacket (Stored SharedState) |