incremental hashing for fileRetriever
It uses tailVerify to hash the file while it's being written. This is able to sometimes avoid a separate checksum step. Although if the file gets written quickly enough, tailVerify may not see it get created before the write finishes, and the checksum still happens. Testing with the directory special remote, incremental checksumming did not happen. But then I disabled the copy CoW probing, and it did work. What's going on with that is the CoW probe creates an empty file on failure, then deletes it, and then the file is created again. tailVerify will open the first, empty file, and so fails to read the content that gets written to the file that replaces it. The directory special remote really ought to be able to avoid needing to use tailVerify, and while other special remotes could do things that cause similar problems, they probably don't. And if they do, it just means the checksum doesn't get done incrementally. Sponsored-by: Dartmouth College's DANDI project
This commit is contained in:
parent
ff2dc5eb18
commit
dadbb510f6
8 changed files with 80 additions and 49 deletions
|
@ -267,8 +267,7 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
-- Optimisation: Try the unchunked key first, to avoid
|
||||
-- looking in the git-annex branch for chunk counts
|
||||
-- that are likely not there.
|
||||
iv <- startVerifyKeyContentIncrementally vc basek
|
||||
tryNonAsync (getunchunked iv) >>= \case
|
||||
tryNonAsync getunchunked >>= \case
|
||||
Right r -> finalize r
|
||||
Left e -> go (Just e)
|
||||
=<< chunkKeysOnly u chunkconfig basek
|
||||
|
@ -287,22 +286,20 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
firstavail (Just e) _ [] = throwM e
|
||||
firstavail pe currsize ([]:ls) = firstavail pe currsize ls
|
||||
firstavail _ currsize ((k:ks):ls)
|
||||
| k == basek = do
|
||||
iv <- startVerifyKeyContentIncrementally vc basek
|
||||
getunchunked iv
|
||||
`catchNonAsync` (\e -> firstavail (Just e) currsize ls)
|
||||
| k == basek = getunchunked
|
||||
`catchNonAsync` (\e -> firstavail (Just e) currsize ls)
|
||||
| otherwise = do
|
||||
let offset = resumeOffset currsize k
|
||||
let p = maybe basep
|
||||
(offsetMeterUpdate basep . toBytesProcessed)
|
||||
offset
|
||||
v <- tryNonAsync $
|
||||
retriever (encryptor k) p $ \content ->
|
||||
retriever (encryptor k) p Nothing $ \content ->
|
||||
bracket (maybe opennew openresume offset) (liftIO . hClose . fst) $ \(h, iv) -> do
|
||||
iv' <- retrieved iv (Just h) p content
|
||||
retrieved iv (Just h) p content
|
||||
let sz = toBytesProcessed $
|
||||
fromMaybe 0 $ fromKey keyChunkSize k
|
||||
getrest p h iv' sz sz ks
|
||||
getrest p h iv sz sz ks
|
||||
case v of
|
||||
Left e
|
||||
| null ls -> throwM e
|
||||
|
@ -313,12 +310,21 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
getrest p h iv sz bytesprocessed (k:ks) = do
|
||||
let p' = offsetMeterUpdate p bytesprocessed
|
||||
liftIO $ p' zeroBytesProcessed
|
||||
iv' <- retriever (encryptor k) p' $
|
||||
retriever (encryptor k) p' Nothing $
|
||||
retrieved iv (Just h) p'
|
||||
getrest p h iv' sz (addBytesProcessed bytesprocessed sz) ks
|
||||
getrest p h iv sz (addBytesProcessed bytesprocessed sz) ks
|
||||
|
||||
getunchunked iv = retriever (encryptor basek) basep $
|
||||
retrieved iv Nothing basep
|
||||
getunchunked = do
|
||||
iv <- startVerifyKeyContentIncrementally vc basek
|
||||
case enc of
|
||||
Just _ -> retriever (encryptor basek) basep Nothing $
|
||||
retrieved iv Nothing basep
|
||||
-- Not chunked and not encrypted, so ask the
|
||||
-- retriever to incrementally verify when it
|
||||
-- retrieves to a file.
|
||||
Nothing -> retriever (encryptor basek) basep iv $
|
||||
retrieved iv Nothing basep
|
||||
return iv
|
||||
|
||||
opennew = do
|
||||
iv <- startVerifyKeyContentIncrementally vc basek
|
||||
|
@ -365,13 +371,11 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
- will be provided, and instead the content will be written to the
|
||||
- dest file.
|
||||
-
|
||||
- The IncrementalVerifier is updated as the file content is read.
|
||||
-
|
||||
- Note that when neither chunking nor encryption is used, and the remote
|
||||
- provides FileContent, that file only needs to be renamed
|
||||
- into place. (And it may even already be in the right place..)
|
||||
-
|
||||
- The IncrementalVerifier is updated as the file content is read.
|
||||
- If it was not able to be updated, due to the file not needing to be read,
|
||||
- Nothing will be returned.
|
||||
-}
|
||||
writeRetrievedContent
|
||||
:: LensGpgEncParams encc
|
||||
|
@ -382,32 +386,25 @@ writeRetrievedContent
|
|||
-> Maybe MeterUpdate
|
||||
-> ContentSource
|
||||
-> Maybe IncrementalVerifier
|
||||
-> Annex (Maybe IncrementalVerifier)
|
||||
-> Annex ()
|
||||
writeRetrievedContent dest enc encc mh mp content miv = case (enc, mh, content) of
|
||||
(Nothing, Nothing, FileContent f)
|
||||
| f == dest -> return Nothing
|
||||
| otherwise -> do
|
||||
liftIO $ moveFile f dest
|
||||
return Nothing
|
||||
| f == dest -> noop
|
||||
| otherwise -> liftIO $ moveFile f dest
|
||||
(Just (cipher, _), _, ByteContent b) -> do
|
||||
cmd <- gpgCmd <$> Annex.getGitConfig
|
||||
decrypt cmd encc cipher (feedBytes b) $
|
||||
readBytes write
|
||||
return miv
|
||||
(Just (cipher, _), _, FileContent f) -> do
|
||||
cmd <- gpgCmd <$> Annex.getGitConfig
|
||||
withBytes content $ \b ->
|
||||
decrypt cmd encc cipher (feedBytes b) $
|
||||
readBytes write
|
||||
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
|
||||
return miv
|
||||
(Nothing, _, FileContent f) -> do
|
||||
withBytes content write
|
||||
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
|
||||
return miv
|
||||
(Nothing, _, ByteContent b) -> do
|
||||
write b
|
||||
return miv
|
||||
(Nothing, _, ByteContent b) -> write b
|
||||
where
|
||||
write b = case mh of
|
||||
Just h -> liftIO $ write' b h
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{- helpers for special remotes
|
||||
-
|
||||
- Copyright 2011-2020 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2011-2021 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
@ -39,6 +39,7 @@ import Annex.Common
|
|||
import Annex.SpecialRemote.Config
|
||||
import Types.StoreRetrieve
|
||||
import Types.Remote
|
||||
import Annex.Verify
|
||||
import Annex.UUID
|
||||
import Config
|
||||
import Config.Cost
|
||||
|
@ -54,6 +55,8 @@ import Git.Types
|
|||
import qualified Data.ByteString as S
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.Map as M
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.Async
|
||||
|
||||
{- Special remotes don't have a configured url, so Git.Repo does not
|
||||
- automatically generate remotes for them. This looks for a different
|
||||
|
@ -101,19 +104,33 @@ fileStorer a k (ByteContent b) m = withTmp k $ \f -> do
|
|||
byteStorer :: (Key -> L.ByteString -> MeterUpdate -> Annex ()) -> Storer
|
||||
byteStorer a k c m = withBytes c $ \b -> a k b m
|
||||
|
||||
-- A Retriever that writes the content of a Key to a provided file.
|
||||
-- It is responsible for updating the progress meter as it retrieves data.
|
||||
fileRetriever :: (FilePath -> Key -> MeterUpdate -> Annex ()) -> Retriever
|
||||
fileRetriever a k m callback = do
|
||||
f <- prepTmp k
|
||||
a (fromRawFilePath f) k m
|
||||
pruneTmpWorkDirBefore f (callback . FileContent . fromRawFilePath)
|
||||
|
||||
-- A Retriever that generates a lazy ByteString containing the Key's
|
||||
-- content, and passes it to a callback action which will fully consume it
|
||||
-- before returning.
|
||||
byteRetriever :: (Key -> (L.ByteString -> Annex a) -> Annex a) -> Key -> MeterUpdate -> (ContentSource -> Annex a) -> Annex a
|
||||
byteRetriever a k _m callback = a k (callback . ByteContent)
|
||||
byteRetriever :: (Key -> (L.ByteString -> Annex a) -> Annex a) -> Key -> MeterUpdate -> Maybe IncrementalVerifier -> (ContentSource -> Annex a) -> Annex a
|
||||
byteRetriever a k _m _miv callback = a k (callback . ByteContent)
|
||||
|
||||
-- A Retriever that writes the content of a Key to a provided file.
|
||||
-- The action is responsible for updating the progress meter as it
|
||||
-- retrieves data. The incremental verifier is updated in the background as
|
||||
-- the action writes to the file.
|
||||
fileRetriever :: (FilePath -> Key -> MeterUpdate -> Annex ()) -> Retriever
|
||||
fileRetriever a k m miv callback = do
|
||||
f <- prepTmp k
|
||||
let retrieve = a (fromRawFilePath f) k m
|
||||
case miv of
|
||||
Nothing -> retrieve
|
||||
Just iv -> do
|
||||
finished <- liftIO newEmptyTMVarIO
|
||||
t <- liftIO $ async $ tailVerify iv f finished
|
||||
retrieve
|
||||
liftIO $ atomically $ putTMVar finished ()
|
||||
liftIO (wait t) >>= \case
|
||||
Nothing -> noop
|
||||
Just deferredverify -> do
|
||||
showAction (descVerify iv)
|
||||
liftIO deferredverify
|
||||
pruneTmpWorkDirBefore f (callback . FileContent . fromRawFilePath)
|
||||
|
||||
{- The base Remote that is provided to specialRemote needs to have
|
||||
- storeKey, retrieveKeyFile, removeKey, and checkPresent methods,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue