finish up basic chunked remote groundwork
Chunk retrieval and reassembly, removal, and checking if all necessary chunks are present. This commit was sponsored by Damien Raude-Morvan.
This commit is contained in:
parent
904859d676
commit
d4d68f57e5
2 changed files with 125 additions and 20 deletions
|
@ -5,14 +5,16 @@
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
module Remote.Helper.Chunked
|
module Remote.Helper.Chunked (
|
||||||
( ChunkSize
|
ChunkSize,
|
||||||
, ChunkConfig(..)
|
ChunkConfig(..),
|
||||||
, chunkConfig
|
chunkConfig,
|
||||||
, storeChunks
|
storeChunks,
|
||||||
, chunkKeys
|
chunkKeys,
|
||||||
, meteredWriteFileChunks
|
removeChunks,
|
||||||
) where
|
retrieveChunks,
|
||||||
|
hasKeyChunks,
|
||||||
|
) where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
import Utility.DataUnits
|
import Utility.DataUnits
|
||||||
|
@ -21,6 +23,7 @@ import Types.Key
|
||||||
import Logs.Chunk.Pure (ChunkSize, ChunkCount)
|
import Logs.Chunk.Pure (ChunkSize, ChunkCount)
|
||||||
import Logs.Chunk
|
import Logs.Chunk
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
import Crypto (EncKey)
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
import qualified Data.ByteString as S
|
import qualified Data.ByteString as S
|
||||||
|
@ -69,8 +72,10 @@ numChunks = pred . fromJust . keyChunkNum . fst . nextChunkKeyStream
|
||||||
-
|
-
|
||||||
- Note that the storer action is responsible for catching any
|
- Note that the storer action is responsible for catching any
|
||||||
- exceptions it may encounter.
|
- exceptions it may encounter.
|
||||||
|
-
|
||||||
|
- This action may be called on a chunked key. It will simply store it.
|
||||||
-}
|
-}
|
||||||
storeChunks :: UUID -> ChunkConfig -> Key -> FilePath -> MeterUpdate -> (Key -> L.ByteString -> MeterUpdate -> Annex Bool) -> Annex Bool
|
storeChunks :: UUID -> ChunkConfig -> Key -> FilePath -> MeterUpdate -> (Key -> L.ByteString -> MeterUpdate -> IO Bool) -> Annex Bool
|
||||||
storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
||||||
either (\e -> liftIO (print e) >> return False) (go meterupdate)
|
either (\e -> liftIO (print e) >> return False) (go meterupdate)
|
||||||
=<< (liftIO $ tryIO $ L.readFile f)
|
=<< (liftIO $ tryIO $ L.readFile f)
|
||||||
|
@ -78,7 +83,7 @@ storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
||||||
go meterupdate b = case chunkconfig of
|
go meterupdate b = case chunkconfig of
|
||||||
(UnpaddedChunks chunksize) | not (isChunkKey k) ->
|
(UnpaddedChunks chunksize) | not (isChunkKey k) ->
|
||||||
gochunks meterupdate chunksize b (chunkKeyStream k chunksize)
|
gochunks meterupdate chunksize b (chunkKeyStream k chunksize)
|
||||||
_ -> storer k b meterupdate
|
_ -> liftIO $ storer k b meterupdate
|
||||||
|
|
||||||
gochunks :: MeterUpdate -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex Bool
|
gochunks :: MeterUpdate -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex Bool
|
||||||
gochunks meterupdate chunksize lb =
|
gochunks meterupdate chunksize lb =
|
||||||
|
@ -107,7 +112,7 @@ storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
||||||
|
|
||||||
storechunk bytesprocessed sz bs c chunkkeys = do
|
storechunk bytesprocessed sz bs c chunkkeys = do
|
||||||
let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys
|
let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys
|
||||||
ifM (storer chunkkey (L.fromChunks $ reverse c) meterupdate')
|
ifM (liftIO $ storer chunkkey (L.fromChunks $ reverse c) meterupdate')
|
||||||
( do
|
( do
|
||||||
let bytesprocessed' = addBytesProcessed bytesprocessed (chunksize - sz)
|
let bytesprocessed' = addBytesProcessed bytesprocessed (chunksize - sz)
|
||||||
loop bytesprocessed' chunksize bs [] chunkkeys'
|
loop bytesprocessed' chunksize bs [] chunkkeys'
|
||||||
|
@ -129,19 +134,109 @@ storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
||||||
- requested key.
|
- requested key.
|
||||||
-}
|
-}
|
||||||
chunkKeys :: UUID -> ChunkConfig -> Key -> Annex [[Key]]
|
chunkKeys :: UUID -> ChunkConfig -> Key -> Annex [[Key]]
|
||||||
chunkKeys u (UnpaddedChunks _) k = do
|
chunkKeys u (UnpaddedChunks _) k | not (isChunkKey k) = do
|
||||||
chunklists <- map (toChunkList k) <$> getCurrentChunks u k
|
chunklists <- map (toChunkList k) <$> getCurrentChunks u k
|
||||||
return ([k]:chunklists)
|
-- Probably using the chunklists, but the unchunked
|
||||||
|
-- key could be present.
|
||||||
|
return (chunklists ++ [[k]])
|
||||||
chunkKeys _ _ k = pure [[k]]
|
chunkKeys _ _ k = pure [[k]]
|
||||||
|
|
||||||
toChunkList :: Key -> (ChunkSize, ChunkCount) -> [Key]
|
toChunkList :: Key -> (ChunkSize, ChunkCount) -> [Key]
|
||||||
toChunkList k (chunksize, chunkcount) = takeChunkKeyStream chunkcount $
|
toChunkList k (chunksize, chunkcount) = takeChunkKeyStream chunkcount $
|
||||||
chunkKeyStream k chunksize
|
chunkKeyStream k chunksize
|
||||||
|
|
||||||
{- Writes a series of chunks to a file. The feeder is called to get
|
{- Removes all chunks of a key from a remote, by calling a remover
|
||||||
- each chunk. -}
|
- action on each. The remover action should succeed even if asked to
|
||||||
meteredWriteFileChunks :: MeterUpdate -> FilePath -> [v] -> (v -> IO L.ByteString) -> IO ()
|
- remove a key that is not present on the remote.
|
||||||
meteredWriteFileChunks meterupdate dest chunks feeder =
|
-
|
||||||
withBinaryFile dest WriteMode $ \h ->
|
- This action may be called on a chunked key. It will simply remove it.
|
||||||
forM_ chunks $
|
-}
|
||||||
meteredWrite meterupdate h <=< feeder
|
removeChunks :: (Key -> Annex Bool) -> UUID -> ChunkConfig -> EncKey -> Key -> Annex Bool
|
||||||
|
removeChunks remover u chunkconfig encryptor k = do
|
||||||
|
ls <- chunkKeys u chunkconfig k
|
||||||
|
ok <- and <$> mapM (remover . encryptor) (concat ls)
|
||||||
|
when ok $
|
||||||
|
case chunkconfig of
|
||||||
|
(UnpaddedChunks _) | not (isChunkKey k) -> do
|
||||||
|
let chunksizes = catMaybes $ map (keyChunkSize <=< headMaybe) ls
|
||||||
|
forM_ chunksizes $ chunksRemoved u k . fromIntegral
|
||||||
|
_ -> noop
|
||||||
|
return ok
|
||||||
|
|
||||||
|
{- Retrieves a key from a remote, using a retriever action that
|
||||||
|
- streams it to a ByteString.
|
||||||
|
-
|
||||||
|
- When the remote is chunked, tries each of the options returned by
|
||||||
|
- chunkKeys until it finds one where the retriever successfully
|
||||||
|
- gets the first key in the list. The content of that key, and any
|
||||||
|
- other chunks in the list is fed to the sink.
|
||||||
|
-
|
||||||
|
- If retrival of one of the subsequent chunks throws an exception,
|
||||||
|
- gives up and returns False. Note that partial data may have been
|
||||||
|
- written to the sink in this case.
|
||||||
|
-}
|
||||||
|
retrieveChunks
|
||||||
|
:: (Key -> IO L.ByteString)
|
||||||
|
-> UUID
|
||||||
|
-> ChunkConfig
|
||||||
|
-> EncKey
|
||||||
|
-> Key
|
||||||
|
-> MeterUpdate
|
||||||
|
-> (MeterUpdate -> L.ByteString -> IO ())
|
||||||
|
-> Annex Bool
|
||||||
|
retrieveChunks retriever u chunkconfig encryptor basek basep sink = do
|
||||||
|
ls <- chunkKeys u chunkconfig basek
|
||||||
|
liftIO $ flip catchNonAsync giveup (firstavail ls)
|
||||||
|
where
|
||||||
|
giveup e = print e >> return False
|
||||||
|
|
||||||
|
firstavail [] = return False
|
||||||
|
firstavail ([]:ls) = firstavail ls
|
||||||
|
firstavail ((k:ks):ls) = do
|
||||||
|
v <- tryNonAsync $ retriever (encryptor k)
|
||||||
|
case v of
|
||||||
|
Left e
|
||||||
|
| null ls -> giveup e
|
||||||
|
| otherwise -> firstavail ls
|
||||||
|
Right b -> do
|
||||||
|
sink basep b
|
||||||
|
let sz = toBytesProcessed $
|
||||||
|
fromMaybe 0 $ keyChunkSize k
|
||||||
|
getrest sz sz ks
|
||||||
|
|
||||||
|
getrest _ _ [] = return True
|
||||||
|
getrest sz bytesprocessed (k:ks) = do
|
||||||
|
let p = offsetMeterUpdate basep bytesprocessed
|
||||||
|
sink p =<< retriever (encryptor k)
|
||||||
|
getrest sz (addBytesProcessed bytesprocessed sz) ks
|
||||||
|
|
||||||
|
{- Checks if a key is present in a remote. This requires any one
|
||||||
|
- of the lists of options returned by chunkKeys to all check out
|
||||||
|
- as being present using the checker action.
|
||||||
|
-}
|
||||||
|
hasKeyChunks
|
||||||
|
:: (Key -> Annex (Either String Bool))
|
||||||
|
-> UUID
|
||||||
|
-> ChunkConfig
|
||||||
|
-> EncKey
|
||||||
|
-> Key
|
||||||
|
-> Annex (Either String Bool)
|
||||||
|
hasKeyChunks checker u chunkconfig encryptor basek =
|
||||||
|
checklists impossible =<< chunkKeys u chunkconfig basek
|
||||||
|
where
|
||||||
|
checklists lastfailmsg [] = return $ Left lastfailmsg
|
||||||
|
checklists _ (l:ls)
|
||||||
|
| not (null l) =
|
||||||
|
either (`checklists` ls) (return . Right)
|
||||||
|
=<< checkchunks l
|
||||||
|
| otherwise = checklists impossible ls
|
||||||
|
|
||||||
|
checkchunks :: [Key] -> Annex (Either String Bool)
|
||||||
|
checkchunks [] = return (Right True)
|
||||||
|
checkchunks (k:ks) = do
|
||||||
|
v <- checker (encryptor k)
|
||||||
|
if v == Right True
|
||||||
|
then checkchunks ks
|
||||||
|
else return v
|
||||||
|
|
||||||
|
impossible = "no recorded chunks"
|
||||||
|
|
|
@ -9,6 +9,7 @@ module Remote.Helper.Chunked.Legacy where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
import Remote.Helper.Chunked
|
import Remote.Helper.Chunked
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
import qualified Control.Exception as E
|
import qualified Control.Exception as E
|
||||||
|
@ -114,3 +115,12 @@ storeChunked chunksize dests storer content = either onerr return
|
||||||
let (chunk, b') = L.splitAt sz b
|
let (chunk, b') = L.splitAt sz b
|
||||||
storer d chunk
|
storer d chunk
|
||||||
storechunks sz (d:useddests) ds b'
|
storechunks sz (d:useddests) ds b'
|
||||||
|
|
||||||
|
{- Writes a series of chunks to a file. The feeder is called to get
|
||||||
|
- each chunk.
|
||||||
|
-}
|
||||||
|
meteredWriteFileChunks :: MeterUpdate -> FilePath -> [v] -> (v -> IO L.ByteString) -> IO ()
|
||||||
|
meteredWriteFileChunks meterupdate dest chunks feeder =
|
||||||
|
withBinaryFile dest WriteMode $ \h ->
|
||||||
|
forM_ chunks $
|
||||||
|
meteredWrite meterupdate h <=< feeder
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue