summaryrefslogtreecommitdiff
path: root/src/Erebos/Object
diff options
context:
space:
mode:
Diffstat (limited to 'src/Erebos/Object')
-rw-r--r--src/Erebos/Object/Deferred.hs108
-rw-r--r--src/Erebos/Object/Internal.hs164
2 files changed, 208 insertions, 64 deletions
diff --git a/src/Erebos/Object/Deferred.hs b/src/Erebos/Object/Deferred.hs
new file mode 100644
index 0000000..31ff0f9
--- /dev/null
+++ b/src/Erebos/Object/Deferred.hs
@@ -0,0 +1,108 @@
+module Erebos.Object.Deferred (
+ Deferred,
+ DeferredSize(..),
+ DeferredResult(..),
+
+ deferredRef,
+ deferredLoad,
+ deferredWait,
+ deferredCheck,
+
+ deferLoadWithServer,
+) where
+
+import Control.Concurrent.MVar
+import Control.Monad.IO.Class
+
+import Data.Word
+
+import Erebos.Identity
+import Erebos.Network
+import Erebos.Object
+import Erebos.Storable
+
+
+-- | Deffered value, which can be loaded on request. Holds a reference (digest)
+-- to an object and information about suitable network peers, from which the
+-- data can be requested.
+data Deferred a = Deferred
+ { deferredRef_ :: RefDigest
+ , deferredSize :: DeferredSize
+ , deferredServer :: Server
+ , deferredPeers :: [ RefDigest ]
+ , deferredStatus :: MVar (Maybe (MVar (DeferredResult a)))
+ }
+
+-- | Size constraint for the deferred object.
+data DeferredSize
+ = DeferredExactSize Word64 -- ^ Component size of the referred data must be exactly the given value.
+ | DeferredMaximumSize Word64 -- ^ Component size of the referred data must not exceed the given value.
+
+-- | Result of the deferred load request.
+data DeferredResult a
+ = DeferredLoaded (Stored a) -- ^ Deferred object was sucessfully loaded.
+ | DeferredInvalid -- ^ Deferred object was (partially) loaded, but failed to meet the size constraint or was an invalid object.
+ | DeferredFailed -- ^ Failure to load the object, e.g. no suitable peer was found.
+
+-- | Get the digest of the deferred object.
+deferredRef :: Deferred a -> RefDigest
+deferredRef = deferredRef_
+
+-- | Request the deferred object to be loaded. Does nothing if that was already
+-- requested before. The result can be received using `deferredWait` or
+-- `deferredCheck` functions.
+deferredLoad :: (Storable a, MonadIO m) => Deferred a -> m ()
+deferredLoad Deferred {..} = liftIO $ do
+ modifyMVar_ deferredStatus $ \case
+ Nothing -> do
+ mvar <- newEmptyMVar
+ let matchPeer peer =
+ getPeerIdentity peer >>= \case
+ PeerIdentityFull pid -> do
+ return $ any (`elem` identityDigests pid) deferredPeers
+ _ -> return False
+
+ liftIO (findPeer deferredServer matchPeer) >>= \case
+ Just peer -> do
+ let bound = case deferredSize of
+ DeferredExactSize s -> s
+ DeferredMaximumSize s -> s
+
+ checkSize ref = case deferredSize of
+ DeferredExactSize s -> componentSize ref == s
+ DeferredMaximumSize s -> componentSize ref <= s
+
+ requestDataFromPeer peer deferredRef_ bound $ liftIO . \case
+ DataRequestFulfilled ref
+ | checkSize ref -> putMVar mvar $ DeferredLoaded $ wrappedLoad ref
+ | otherwise -> putMVar mvar DeferredInvalid
+ DataRequestRejected -> putMVar mvar DeferredFailed
+ DataRequestBrokenBound -> putMVar mvar DeferredInvalid
+
+ Nothing -> putMVar mvar DeferredFailed
+ return $ Just mvar
+ cur@Just {} -> return cur
+
+-- | Wait for a `Deferred` value to be loaded and return the result. Requests
+-- the value to be loaded if that was not already done.
+deferredWait :: (Storable a, MonadIO m) => Deferred a -> m (DeferredResult a)
+deferredWait d@Deferred {..} = liftIO $ readMVar deferredStatus >>= \case
+ Nothing -> deferredLoad d >> deferredWait d
+ Just mvar -> readMVar mvar
+
+-- | Check if a `Deferred` value has already been loaded and return it in
+-- `Just` if so, otherwise return `Nothing`. Requests the value to be loaded if
+-- that was not already done.
+deferredCheck :: (Storable a, MonadIO m) => Deferred a -> m (Maybe (DeferredResult a))
+deferredCheck d@Deferred {..} = liftIO $ readMVar deferredStatus >>= \case
+ Nothing -> deferredLoad d >> deferredCheck d
+ Just mvar -> tryReadMVar mvar
+
+deferLoadWithServer :: (Storable a, MonadIO m) => RefDigest -> DeferredSize -> Server -> [ RefDigest ] -> m (Deferred a)
+deferLoadWithServer deferredRef_ deferredSize deferredServer deferredPeers = do
+ deferredStatus <- liftIO $ newMVar Nothing
+ return Deferred {..}
+
+
+identityDigests :: Foldable f => Identity f -> [ RefDigest ]
+identityDigests pid = map (refDigest . storedRef) $ idDataF =<< unfoldOwners pid
diff --git a/src/Erebos/Object/Internal.hs b/src/Erebos/Object/Internal.hs
index 6111d2a..1e65321 100644
--- a/src/Erebos/Object/Internal.hs
+++ b/src/Erebos/Object/Internal.hs
@@ -2,8 +2,9 @@ module Erebos.Object.Internal (
Storage, PartialStorage, StorageCompleteness,
Ref, PartialRef, RefDigest,
- refDigest,
- readRef, showRef, showRefDigest,
+ refDigest, refFromDigest,
+ readRef, showRef,
+ readRefDigest, showRefDigest,
refDigestFromByteString, hashToRefDigest,
copyRef, partialRef, partialRefFromDigest,
@@ -54,33 +55,35 @@ import Control.Monad.Writer
import Crypto.Hash
import Data.Bifunctor
+import Data.ByteArray qualified as BA
import Data.ByteString (ByteString)
-import qualified Data.ByteArray as BA
-import qualified Data.ByteString as B
-import qualified Data.ByteString.Char8 as BC
-import qualified Data.ByteString.Lazy as BL
-import qualified Data.ByteString.Lazy.Char8 as BLC
+import Data.ByteString qualified as B
+import Data.ByteString.Char8 qualified as BC
+import Data.ByteString.Lazy qualified as BL
+import Data.ByteString.Lazy.Char8 qualified as BLC
import Data.Char
import Data.Function
import Data.Maybe
import Data.Ratio
import Data.Set (Set)
-import qualified Data.Set as S
+import Data.Set qualified as S
import Data.Text (Text)
-import qualified Data.Text as T
+import Data.Text qualified as T
import Data.Text.Encoding
import Data.Text.Encoding.Error
import Data.Time.Calendar
import Data.Time.Clock
import Data.Time.Format
import Data.Time.LocalTime
-import Data.UUID (UUID)
-import qualified Data.UUID as U
+import Data.Word
import System.IO.Unsafe
import Erebos.Error
import Erebos.Storage.Internal
+import Erebos.UUID (UUID)
+import Erebos.UUID qualified as U
+import Erebos.Util
zeroRef :: Storage' c -> Ref' c
@@ -127,6 +130,7 @@ copyRecItem' st = \case
copyObject' :: forall c c'. (StorageCompleteness c, StorageCompleteness c') => Storage' c' -> Object' c -> IO (c (Object' c'))
copyObject' _ (Blob bs) = return $ return $ Blob bs
copyObject' st (Rec rs) = fmap Rec . sequence <$> mapM (\( n, item ) -> fmap ( n, ) <$> copyRecItem' st item) rs
+copyObject' _ (OnDemand size dgst) = return $ return $ OnDemand size dgst
copyObject' _ ZeroObject = return $ return ZeroObject
copyObject' _ (UnknownObject otype content) = return $ return $ UnknownObject otype content
@@ -148,7 +152,8 @@ partialRefFromDigest st dgst = Ref st dgst
data Object' c
= Blob ByteString
- | Rec [(ByteString, RecItem' c)]
+ | Rec [ ( ByteString, RecItem' c ) ]
+ | OnDemand Word64 RefDigest
| ZeroObject
| UnknownObject ByteString ByteString
deriving (Show)
@@ -174,8 +179,12 @@ type RecItem = RecItem' Complete
serializeObject :: Object' c -> BL.ByteString
serializeObject = \case
Blob cnt -> BL.fromChunks [BC.pack "blob ", BC.pack (show $ B.length cnt), BC.singleton '\n', cnt]
- Rec rec -> let cnt = BL.fromChunks $ concatMap (uncurry serializeRecItem) rec
- in BL.fromChunks [BC.pack "rec ", BC.pack (show $ BL.length cnt), BC.singleton '\n'] `BL.append` cnt
+ Rec rec ->
+ let cnt = BL.fromChunks $ concatMap (uncurry serializeRecItem) rec
+ in BL.fromChunks [ BC.pack "rec ", BC.pack (show $ BL.length cnt), BC.singleton '\n' ] `BL.append` cnt
+ OnDemand size dgst ->
+ let cnt = BC.unlines [ BC.pack (show size), showRefDigest dgst ]
+ in BL.fromChunks [ BC.pack "ondemand ", BC.pack (show $ B.length cnt), BC.singleton '\n', cnt ]
ZeroObject -> BL.empty
UnknownObject otype cnt -> BL.fromChunks [ otype, BC.singleton ' ', BC.pack (show $ B.length cnt), BC.singleton '\n', cnt ]
@@ -234,46 +243,72 @@ unsafeDeserializeObject st bytes =
(line, rest) | Just (otype, len) <- splitObjPrefix line -> do
let (content, next) = first BL.toStrict $ BL.splitAt (fromIntegral len) $ BL.drop 1 rest
guard $ B.length content == len
- (,next) <$> case otype of
- _ | otype == BC.pack "blob" -> return $ Blob content
- | otype == BC.pack "rec" -> maybe (throwOtherError $ "malformed record item ")
- (return . Rec) $ sequence $ map parseRecLine $ mergeCont [] $ BC.lines content
- | otherwise -> return $ UnknownObject otype content
+ (, next) <$> if
+ | otype == BC.pack "blob"
+ -> return $ Blob content
+ | otype == BC.pack "rec"
+ , Just ritems <- parseRecordBody st content
+ -> return $ Rec ritems
+ | otype == BC.pack "ondemand"
+ , Just ondemand <- parseOnDemand st content
+ -> return ondemand
+ | otherwise
+ -> return $ UnknownObject otype content
_ -> throwOtherError $ "malformed object"
- where splitObjPrefix line = do
- [otype, tlen] <- return $ BLC.words line
- (len, rest) <- BLC.readInt tlen
- guard $ BL.null rest
- return (BL.toStrict otype, len)
-
- mergeCont cs (a:b:rest) | Just ('\t', b') <- BC.uncons b = mergeCont (b':BC.pack "\n":cs) (a:rest)
- mergeCont cs (a:rest) = B.concat (a : reverse cs) : mergeCont [] rest
- mergeCont _ [] = []
-
- parseRecLine line = do
- colon <- BC.elemIndex ':' line
- space <- BC.elemIndex ' ' line
- guard $ colon < space
- let name = B.take colon line
- itype = B.take (space-colon-1) $ B.drop (colon+1) line
- content = B.drop (space+1) line
-
- let val = fromMaybe (RecUnknown itype content) $
- case BC.unpack itype of
- "e" -> do guard $ B.null content
- return RecEmpty
- "i" -> do (num, rest) <- BC.readInteger content
- guard $ B.null rest
- return $ RecInt num
- "n" -> RecNum <$> parseRatio content
- "t" -> return $ RecText $ decodeUtf8With lenientDecode content
- "b" -> RecBinary <$> readHex content
- "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content)
- "u" -> RecUUID <$> U.fromASCIIBytes content
- "r" -> RecRef . Ref st <$> readRefDigest content
- "w" -> RecWeak <$> readRefDigest content
- _ -> Nothing
- return (name, val)
+ where
+ splitObjPrefix line = do
+ [ otype, tlen ] <- return $ BLC.words line
+ ( len, rest ) <- BLC.readInt tlen
+ guard $ BL.null rest
+ return ( BL.toStrict otype, len )
+
+parseRecordBody :: Storage' c -> ByteString -> Maybe [ ( ByteString, RecItem' c ) ]
+parseRecordBody _ body | B.null body = Just []
+parseRecordBody st body = do
+ colon <- BC.elemIndex ':' body
+ space <- BC.elemIndex ' ' $ B.drop (colon + 1) body
+ let name = B.take colon body
+ itype = B.take space $ B.drop (colon + 1) body
+ ( content, remainingBody ) <- parseTabEscapedLines $ B.drop (space + colon + 2) body
+
+ let val = fromMaybe (RecUnknown itype content) $
+ case BC.unpack itype of
+ "e" -> do guard $ B.null content
+ return RecEmpty
+ "i" -> do ( num, rest ) <- BC.readInteger content
+ guard $ B.null rest
+ return $ RecInt num
+ "n" -> RecNum <$> parseRatio content
+ "t" -> return $ RecText $ decodeUtf8With lenientDecode content
+ "b" -> RecBinary <$> readHex content
+ "d" -> RecDate <$> parseTimeM False defaultTimeLocale "%s %z" (BC.unpack content)
+ "u" -> RecUUID <$> U.fromASCIIBytes content
+ "r" -> RecRef . Ref st <$> readRefDigest content
+ "w" -> RecWeak <$> readRefDigest content
+ _ -> Nothing
+ (( name, val ) :) <$> parseRecordBody st remainingBody
+
+-- Split given ByteString on the first newline not preceded by tab; replace
+-- "\t\n" in the first part with "\n".
+parseTabEscapedLines :: ByteString -> Maybe ( ByteString, ByteString )
+parseTabEscapedLines = parseLines []
+ where
+ parseLines linesReversed cur = do
+ newline <- BC.elemIndex '\n' cur
+ case ( BC.length cur > newline + 1, BC.index cur (newline + 1) ) of
+ ( True, '\t' ) -> parseLines (B.take (newline + 1) cur : linesReversed) (B.drop (newline + 2) cur)
+ _ -> Just ( BC.concat $ reverse $ B.take newline cur : linesReversed, B.drop (newline + 1) cur )
+
+parseOnDemand :: Storage' c -> ByteString -> Maybe (Object' c)
+parseOnDemand _ body = do
+ newline1 <- BC.elemIndex '\n' body
+ newline2 <- BC.elemIndex '\n' $ B.drop (newline1 + 1) body
+ guard (newline1 + newline2 + 2 == B.length body)
+ ( size, sizeRest ) <- BC.readInt (B.take newline1 body)
+ guard (B.null sizeRest)
+ dgst <- readRefDigest $ B.take newline2 $ B.drop (newline1 + 1) body
+ return $ OnDemand (fromIntegral size) dgst
+
deserializeObject :: PartialStorage -> BL.ByteString -> Except ErebosError (PartialObject, BL.ByteString)
deserializeObject = unsafeDeserializeObject
@@ -330,10 +365,12 @@ class Storable a where
class Storable a => ZeroStorable a where
fromZero :: Storage -> a
-data Store = StoreBlob ByteString
- | StoreRec (forall c. StorageCompleteness c => Storage' c -> [IO [(ByteString, RecItem' c)]])
- | StoreZero
- | StoreUnknown ByteString ByteString
+data Store
+ = StoreBlob ByteString
+ | StoreRec (forall c. StorageCompleteness c => Storage' c -> [IO [(ByteString, RecItem' c)]])
+ | StoreOnDemand Word64 RefDigest
+ | StoreZero
+ | StoreUnknown ByteString ByteString
evalStore :: StorageCompleteness c => Storage' c -> Store -> IO (Ref' c)
evalStore st = unsafeStoreObject st <=< evalStoreObject st
@@ -341,6 +378,7 @@ evalStore st = unsafeStoreObject st <=< evalStoreObject st
evalStoreObject :: StorageCompleteness c => Storage' c -> Store -> IO (Object' c)
evalStoreObject _ (StoreBlob x) = return $ Blob x
evalStoreObject s (StoreRec f) = Rec . concat <$> sequence (f s)
+evalStoreObject _ (StoreOnDemand size dgst) = return $ OnDemand size dgst
evalStoreObject _ StoreZero = return ZeroObject
evalStoreObject _ (StoreUnknown otype content) = return $ UnknownObject otype content
@@ -377,6 +415,7 @@ instance Storable Object where
store' (Rec xs) = StoreRec $ \st -> return $ do
Rec xs' <- copyObject st (Rec xs)
return xs'
+ store' (OnDemand size dgst) = StoreOnDemand size dgst
store' ZeroObject = StoreZero
store' (UnknownObject otype content) = StoreUnknown otype content
@@ -701,8 +740,6 @@ loadRawWeaks name = mapMaybe p <$> loadRecItems
-type Stored a = Stored' Complete a
-
instance Storable a => Storable (Stored a) where
store st = copyRef st . storedRef
store' (Stored _ x) = store' x
@@ -712,10 +749,10 @@ instance ZeroStorable a => ZeroStorable (Stored a) where
fromZero st = Stored (zeroRef st) $ fromZero st
fromStored :: Stored a -> a
-fromStored (Stored _ x) = x
+fromStored = storedObject'
storedRef :: Stored a -> Ref
-storedRef (Stored ref _) = ref
+storedRef = storedRef'
wrappedStore :: MonadIO m => Storable a => Storage -> a -> m (Stored a)
wrappedStore st x = do ref <- liftIO $ store st x
@@ -724,9 +761,8 @@ wrappedStore st x = do ref <- liftIO $ store st x
wrappedLoad :: Storable a => Ref -> Stored a
wrappedLoad ref = Stored ref (load ref)
-copyStored :: forall c c' m a. (StorageCompleteness c, StorageCompleteness c', MonadIO m) =>
- Storage' c' -> Stored' c a -> m (LoadResult c (Stored' c' a))
-copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (flip Stored x) <$> copyRef' st ref'
+copyStored :: forall m a. MonadIO m => Storage -> Stored a -> m (Stored a)
+copyStored st (Stored ref' x) = liftIO $ returnLoadResult . fmap (\r -> Stored r x) <$> copyRef' st ref'
-- |Passed function needs to preserve the object representation to be safe
unsafeMapStored :: (a -> b) -> Stored a -> Stored b