incremental verification for retrieval from import remotes
Sponsored-by: Dartmouth College's Datalad project
This commit is contained in:
parent
2f2701137d
commit
e8a601aa24
12 changed files with 129 additions and 83 deletions
|
@ -1,6 +1,6 @@
|
|||
{- Copying files.
|
||||
-
|
||||
- Copyright 2011-2021 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2011-2022 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
@ -79,40 +79,47 @@ data CopyMethod = CopiedCoW | Copied
|
|||
- (eg when isStableKey is false), and doing this avoids getting a
|
||||
- corrupted file in such cases.
|
||||
-}
|
||||
fileCopier :: CopyCoWTried -> FilePath -> FilePath -> MeterUpdate -> Maybe IncrementalVerifier -> Annex CopyMethod
|
||||
fileCopier :: CopyCoWTried -> FilePath -> FilePath -> MeterUpdate -> Maybe IncrementalVerifier -> IO CopyMethod
|
||||
#ifdef mingw32_HOST_OS
|
||||
fileCopier _ src dest meterupdate iv = docopy
|
||||
#else
|
||||
fileCopier copycowtried src dest meterupdate iv =
|
||||
ifM (liftIO $ tryCopyCoW copycowtried src dest meterupdate)
|
||||
ifM (tryCopyCoW copycowtried src dest meterupdate)
|
||||
( do
|
||||
liftIO $ maybe noop unableIncrementalVerifier iv
|
||||
maybe noop unableIncrementalVerifier iv
|
||||
return CopiedCoW
|
||||
, docopy
|
||||
)
|
||||
#endif
|
||||
where
|
||||
dest' = toRawFilePath dest
|
||||
|
||||
docopy = do
|
||||
-- The file might have had the write bit removed,
|
||||
-- so make sure we can write to it.
|
||||
void $ liftIO $ tryIO $ allowWrite dest'
|
||||
void $ tryIO $ allowWrite dest'
|
||||
|
||||
liftIO $ withBinaryFile dest ReadWriteMode $ \hdest ->
|
||||
withBinaryFile src ReadMode $ \hsrc -> do
|
||||
sofar <- compareexisting hdest hsrc zeroBytesProcessed
|
||||
docopy' hdest hsrc sofar
|
||||
withBinaryFile src ReadMode $ \hsrc ->
|
||||
fileContentCopier hsrc dest meterupdate iv
|
||||
|
||||
-- Copy src mode and mtime.
|
||||
mode <- liftIO $ fileMode <$> getFileStatus src
|
||||
mtime <- liftIO $ utcTimeToPOSIXSeconds <$> getModificationTime src
|
||||
liftIO $ setFileMode dest mode
|
||||
liftIO $ touch dest' mtime False
|
||||
mode <- fileMode <$> getFileStatus src
|
||||
mtime <- utcTimeToPOSIXSeconds <$> getModificationTime src
|
||||
setFileMode dest mode
|
||||
touch dest' mtime False
|
||||
|
||||
return Copied
|
||||
|
||||
docopy' hdest hsrc sofar = do
|
||||
dest' = toRawFilePath dest
|
||||
|
||||
{- Copies content from a handle to a destination file. Does not
|
||||
- use copy-on-write, and does not copy file mode and mtime.
|
||||
-}
|
||||
fileContentCopier :: Handle -> FilePath -> MeterUpdate -> Maybe IncrementalVerifier -> IO ()
|
||||
fileContentCopier hsrc dest meterupdate iv =
|
||||
withBinaryFile dest ReadWriteMode $ \hdest -> do
|
||||
sofar <- compareexisting hdest zeroBytesProcessed
|
||||
docopy hdest sofar
|
||||
where
|
||||
docopy hdest sofar = do
|
||||
s <- S.hGet hsrc defaultChunkSize
|
||||
if s == S.empty
|
||||
then return ()
|
||||
|
@ -121,12 +128,12 @@ fileCopier copycowtried src dest meterupdate iv =
|
|||
S.hPut hdest s
|
||||
maybe noop (flip updateIncrementalVerifier s) iv
|
||||
meterupdate sofar'
|
||||
docopy' hdest hsrc sofar'
|
||||
docopy hdest sofar'
|
||||
|
||||
-- Leaves hdest and hsrc seeked to wherever the two diverge,
|
||||
-- so typically hdest will be seeked to end, and hsrc to the same
|
||||
-- position.
|
||||
compareexisting hdest hsrc sofar = do
|
||||
compareexisting hdest sofar = do
|
||||
s <- S.hGet hdest defaultChunkSize
|
||||
if s == S.empty
|
||||
then return sofar
|
||||
|
@ -137,7 +144,7 @@ fileCopier copycowtried src dest meterupdate iv =
|
|||
maybe noop (flip updateIncrementalVerifier s) iv
|
||||
let sofar' = addBytesProcessed sofar (S.length s)
|
||||
meterupdate sofar'
|
||||
compareexisting hdest hsrc sofar'
|
||||
compareexisting hdest sofar'
|
||||
else do
|
||||
seekbefore hdest s
|
||||
seekbefore hsrc s'
|
||||
|
|
|
@ -597,14 +597,14 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
|||
getcontent k = do
|
||||
let af = AssociatedFile (Just f)
|
||||
let downloader p' tmpfile = do
|
||||
k' <- Remote.retrieveExportWithContentIdentifier
|
||||
_ <- Remote.retrieveExportWithContentIdentifier
|
||||
ia loc cid (fromRawFilePath tmpfile)
|
||||
(pure k)
|
||||
(Left k)
|
||||
(combineMeterUpdate p' p)
|
||||
ok <- moveAnnex k' af tmpfile
|
||||
ok <- moveAnnex k af tmpfile
|
||||
when ok $
|
||||
logStatus k InfoPresent
|
||||
return (Just (k', ok))
|
||||
return (Just (k, ok))
|
||||
checkDiskSpaceToGet k Nothing $
|
||||
notifyTransfer Download af $
|
||||
download' (Remote.uuid remote) k af Nothing stdRetry $ \p' ->
|
||||
|
@ -615,9 +615,9 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
|||
-- need to retrieve this file.
|
||||
doimportsmall cidmap db loc cid sz p = do
|
||||
let downloader tmpfile = do
|
||||
k <- Remote.retrieveExportWithContentIdentifier
|
||||
(k, _) <- Remote.retrieveExportWithContentIdentifier
|
||||
ia loc cid (fromRawFilePath tmpfile)
|
||||
(mkkey tmpfile)
|
||||
(Right (mkkey tmpfile))
|
||||
p
|
||||
case keyGitSha k of
|
||||
Just sha -> do
|
||||
|
@ -638,9 +638,9 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
|||
dodownload cidmap db (loc, (cid, sz)) f matcher = do
|
||||
let af = AssociatedFile (Just f)
|
||||
let downloader tmpfile p = do
|
||||
k <- Remote.retrieveExportWithContentIdentifier
|
||||
(k, _) <- Remote.retrieveExportWithContentIdentifier
|
||||
ia loc cid (fromRawFilePath tmpfile)
|
||||
(mkkey tmpfile)
|
||||
(Right (mkkey tmpfile))
|
||||
p
|
||||
case keyGitSha k of
|
||||
Nothing -> do
|
||||
|
|
|
@ -5,6 +5,9 @@ git-annex (10.20220505) UNRELEASED; urgency=medium
|
|||
data units. Note that the short form is "Mbit" not "Mb" because
|
||||
that differs from "MB" only in case, and git-annex parses units
|
||||
case-insensitively.
|
||||
* Special remotes using exporttree=yes and/or importtree=yes now
|
||||
checksum content while it is being retrieved, instead of in a separate
|
||||
pass at the end.
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Thu, 05 May 2022 15:08:07 -0400
|
||||
|
||||
|
|
|
@ -338,15 +338,23 @@ listImportableContentsM serial adir c = adbfind >>= \case
|
|||
-- connection is resonably fast, it's probably as good as
|
||||
-- git's handling of similar situations with files being modified while
|
||||
-- it's updating the working tree for a merge.
|
||||
retrieveExportWithContentIdentifierM :: AndroidSerial -> AndroidPath -> ExportLocation -> ContentIdentifier -> FilePath -> Annex Key -> MeterUpdate -> Annex Key
|
||||
retrieveExportWithContentIdentifierM serial adir loc cid dest mkkey _p = do
|
||||
retrieve' serial src dest
|
||||
retrieveExportWithContentIdentifierM :: AndroidSerial -> AndroidPath -> ExportLocation -> ContentIdentifier -> FilePath -> Either Key (Annex Key) -> MeterUpdate -> Annex (Key, Verification)
|
||||
retrieveExportWithContentIdentifierM serial adir loc cid dest gk _p = do
|
||||
case gk of
|
||||
Right mkkey -> do
|
||||
go
|
||||
k <- mkkey
|
||||
currcid <- getExportContentIdentifier serial adir loc
|
||||
if currcid == Right (Just cid)
|
||||
then return k
|
||||
else giveup "the file on the android device has changed"
|
||||
return (k, UnVerified)
|
||||
Left k -> do
|
||||
v <- verifyKeyContentIncrementally DefaultVerify k
|
||||
(\iv -> tailVerify iv (toRawFilePath dest) go)
|
||||
return (k, v)
|
||||
where
|
||||
go = do
|
||||
retrieve' serial src dest
|
||||
currcid <- getExportContentIdentifier serial adir loc
|
||||
when (currcid /= Right (Just cid)) $
|
||||
giveup "the file on the android device has changed"
|
||||
src = androidExportLocation adir loc
|
||||
|
||||
storeExportWithContentIdentifierM :: AndroidSerial -> AndroidPath -> FilePath -> Key -> ExportLocation -> [ContentIdentifier] -> MeterUpdate -> Annex ContentIdentifier
|
||||
|
|
|
@ -29,6 +29,7 @@ import Utility.Metered
|
|||
import Logs.Export
|
||||
import qualified Remote.Helper.ThirdPartyPopulated as ThirdPartyPopulated
|
||||
import Utility.Env
|
||||
import Annex.Verify
|
||||
|
||||
import Data.Either
|
||||
import Text.Read
|
||||
|
@ -370,10 +371,20 @@ checkPresentExportWithContentIdentifierM borgrepo _ loc _ = prompt $ liftIO $ do
|
|||
, giveup $ "Unable to access borg repository " ++ locBorgRepo borgrepo
|
||||
)
|
||||
|
||||
retrieveExportWithContentIdentifierM :: BorgRepo -> ImportLocation -> ContentIdentifier -> FilePath -> Annex Key -> MeterUpdate -> Annex Key
|
||||
retrieveExportWithContentIdentifierM borgrepo loc _ dest mkk _ = do
|
||||
retrieveExportWithContentIdentifierM :: BorgRepo -> ImportLocation -> ContentIdentifier -> FilePath -> Either Key (Annex Key) -> MeterUpdate -> Annex (Key, Verification)
|
||||
retrieveExportWithContentIdentifierM borgrepo loc _ dest gk _ = do
|
||||
showOutput
|
||||
prompt $ withOtherTmp $ \othertmp -> liftIO $ do
|
||||
case gk of
|
||||
Right mkkey -> do
|
||||
go
|
||||
k <- mkkey
|
||||
return (k, UnVerified)
|
||||
Left k -> do
|
||||
v <- verifyKeyContentIncrementally DefaultVerify k
|
||||
(\iv -> tailVerify iv (toRawFilePath dest) go)
|
||||
return (k, v)
|
||||
where
|
||||
go = prompt $ withOtherTmp $ \othertmp -> liftIO $ do
|
||||
-- borgrepo could be relative, and borg has to be run
|
||||
-- in the temp directory to get it to write there
|
||||
absborgrepo <- absBorgRepo borgrepo
|
||||
|
@ -398,6 +409,5 @@ retrieveExportWithContentIdentifierM borgrepo loc _ dest mkk _ = do
|
|||
-- combine with </>
|
||||
moveFile (fromRawFilePath othertmp </> fromRawFilePath archivefile) dest
|
||||
removeDirectoryRecursive (fromRawFilePath othertmp)
|
||||
mkk
|
||||
where
|
||||
|
||||
(archivename, archivefile) = extractImportLocation loc
|
||||
|
|
|
@ -198,9 +198,9 @@ storeKeyM d chunkconfig cow k c m =
|
|||
(fromRawFilePath destdir)
|
||||
in byteStorer go k c m
|
||||
NoChunks ->
|
||||
let go _k src p = do
|
||||
let go _k src p = liftIO $ do
|
||||
void $ fileCopier cow src tmpf p Nothing
|
||||
liftIO $ finalizeStoreGeneric d tmpdir destdir
|
||||
finalizeStoreGeneric d tmpdir destdir
|
||||
in fileStorer go k c m
|
||||
_ ->
|
||||
let go _k b p = liftIO $ do
|
||||
|
@ -242,7 +242,7 @@ retrieveKeyFileM :: RawFilePath -> ChunkConfig -> CopyCoWTried -> Retriever
|
|||
retrieveKeyFileM d (LegacyChunks _) _ = Legacy.retrieve locations d
|
||||
retrieveKeyFileM d NoChunks cow = fileRetriever' $ \dest k p iv -> do
|
||||
src <- liftIO $ fromRawFilePath <$> getLocation d k
|
||||
void $ fileCopier cow src (fromRawFilePath dest) p iv
|
||||
void $ liftIO $ fileCopier cow src (fromRawFilePath dest) p iv
|
||||
retrieveKeyFileM d _ _ = byteRetriever $ \k sink ->
|
||||
sink =<< liftIO (L.readFile . fromRawFilePath =<< getLocation d k)
|
||||
|
||||
|
@ -315,12 +315,12 @@ storeExportM d cow src _k loc p = do
|
|||
viaTmp go (fromRawFilePath dest) ()
|
||||
where
|
||||
dest = exportPath d loc
|
||||
go tmp () = void $ fileCopier cow src tmp p Nothing
|
||||
go tmp () = void $ liftIO $ fileCopier cow src tmp p Nothing
|
||||
|
||||
retrieveExportM :: RawFilePath -> CopyCoWTried -> Key -> ExportLocation -> FilePath -> MeterUpdate -> Annex Verification
|
||||
retrieveExportM d cow k loc dest p =
|
||||
verifyKeyContentIncrementally AlwaysVerify k $ \iv ->
|
||||
void $ fileCopier cow src dest p iv
|
||||
void $ liftIO $ fileCopier cow src dest p iv
|
||||
where
|
||||
src = fromRawFilePath $ exportPath d loc
|
||||
|
||||
|
@ -413,25 +413,31 @@ importKeyM ii dir loc cid sz p = do
|
|||
, inodeCache = Nothing
|
||||
}
|
||||
|
||||
retrieveExportWithContentIdentifierM :: IgnoreInodes -> RawFilePath -> CopyCoWTried -> ExportLocation -> ContentIdentifier -> FilePath -> Annex Key -> MeterUpdate -> Annex Key
|
||||
retrieveExportWithContentIdentifierM ii dir cow loc cid dest mkkey p =
|
||||
precheck docopy
|
||||
retrieveExportWithContentIdentifierM :: IgnoreInodes -> RawFilePath -> CopyCoWTried -> ExportLocation -> ContentIdentifier -> FilePath -> Either Key (Annex Key) -> MeterUpdate -> Annex (Key, Verification)
|
||||
retrieveExportWithContentIdentifierM ii dir cow loc cid dest gk p =
|
||||
case gk of
|
||||
Right mkkey -> do
|
||||
go Nothing
|
||||
k <- mkkey
|
||||
return (k, UnVerified)
|
||||
Left k -> do
|
||||
v <- verifyKeyContentIncrementally DefaultVerify k go
|
||||
return (k, v)
|
||||
where
|
||||
f = exportPath dir loc
|
||||
f' = fromRawFilePath f
|
||||
|
||||
docopy = ifM (liftIO $ tryCopyCoW cow f' dest p)
|
||||
( do
|
||||
k <- mkkey
|
||||
postcheckcow (return k)
|
||||
, docopynoncow
|
||||
go iv = precheck (docopy iv)
|
||||
|
||||
docopy iv = ifM (liftIO $ tryCopyCoW cow f' dest p)
|
||||
( postcheckcow (liftIO $ maybe noop unableIncrementalVerifier iv)
|
||||
, docopynoncow iv
|
||||
)
|
||||
|
||||
docopynoncow = do
|
||||
docopynoncow iv = do
|
||||
#ifndef mingw32_HOST_OS
|
||||
let open = do
|
||||
-- Need a duplicate fd for the post check, since
|
||||
-- hGetContentsMetered closes its handle.
|
||||
-- Need a duplicate fd for the post check.
|
||||
fd <- openFd f' ReadOnly Nothing defaultFileFlags
|
||||
dupfd <- dup fd
|
||||
h <- fdToHandle fd
|
||||
|
@ -445,12 +451,11 @@ retrieveExportWithContentIdentifierM ii dir cow loc cid dest mkkey p =
|
|||
let close = hClose
|
||||
bracketIO open close $ \h -> do
|
||||
#endif
|
||||
liftIO $ hGetContentsMetered h p >>= L.writeFile dest
|
||||
k <- mkkey
|
||||
liftIO $ fileContentCopier h dest p iv
|
||||
#ifndef mingw32_HOST_OS
|
||||
postchecknoncow dupfd (return k)
|
||||
postchecknoncow dupfd (return ())
|
||||
#else
|
||||
postchecknoncow (return k)
|
||||
postchecknoncow (return ())
|
||||
#endif
|
||||
|
||||
-- Check before copy, to avoid expensive copy of wrong file
|
||||
|
@ -500,7 +505,7 @@ storeExportWithContentIdentifierM ii dir cow src _k loc overwritablecids p = do
|
|||
liftIO $ createDirectoryUnder dir (toRawFilePath destdir)
|
||||
withTmpFileIn destdir template $ \tmpf tmph -> do
|
||||
liftIO $ hClose tmph
|
||||
void $ fileCopier cow src tmpf p Nothing
|
||||
void $ liftIO $ fileCopier cow src tmpf p Nothing
|
||||
let tmpf' = toRawFilePath tmpf
|
||||
resetAnnexFilePerm tmpf'
|
||||
liftIO (getFileStatus tmpf) >>= liftIO . mkContentIdentifier ii tmpf' >>= \case
|
||||
|
|
|
@ -715,7 +715,7 @@ mkFileCopier remotewanthardlink (State _ _ copycowtried _ _) = do
|
|||
where
|
||||
copier src dest k p check verifyconfig = do
|
||||
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
||||
fileCopier copycowtried src dest p iv >>= \case
|
||||
liftIO (fileCopier copycowtried src dest p iv) >>= \case
|
||||
Copied -> ifM check
|
||||
( finishVerifyKeyContentIncrementally iv
|
||||
, do
|
||||
|
|
|
@ -360,8 +360,7 @@ adjustExportImport' isexport isimport r rs = do
|
|||
getkeycids ciddbv k >>= \case
|
||||
(cid:_) -> do
|
||||
l <- getfirstexportloc dbv k
|
||||
void $ retrieveExportWithContentIdentifier (importActions r) l cid dest (pure k) p
|
||||
return UnVerified
|
||||
snd <$> retrieveExportWithContentIdentifier (importActions r) l cid dest (Left k) p
|
||||
-- In case a content identifier is somehow missing,
|
||||
-- try this instead.
|
||||
[] -> if isexport
|
||||
|
|
19
Remote/S3.hs
19
Remote/S3.hs
|
@ -649,13 +649,23 @@ mkImportableContentsVersioned info = build . groupfiles
|
|||
| otherwise =
|
||||
i : removemostrecent mtime rest
|
||||
|
||||
retrieveExportWithContentIdentifierS3 :: S3HandleVar -> Remote -> RemoteStateHandle -> S3Info -> ExportLocation -> ContentIdentifier -> FilePath -> Annex Key -> MeterUpdate -> Annex Key
|
||||
retrieveExportWithContentIdentifierS3 hv r rs info loc cid dest mkkey p = withS3Handle hv $ \case
|
||||
retrieveExportWithContentIdentifierS3 :: S3HandleVar -> Remote -> RemoteStateHandle -> S3Info -> ExportLocation -> ContentIdentifier -> FilePath -> Either Key (Annex Key) -> MeterUpdate -> Annex (Key, Verification)
|
||||
retrieveExportWithContentIdentifierS3 hv r rs info loc cid dest gk p =
|
||||
case gk of
|
||||
Right _mkkey -> do
|
||||
k <- go Nothing
|
||||
return (k, UnVerified)
|
||||
Left k -> do
|
||||
v <- verifyKeyContentIncrementally DefaultVerify k
|
||||
(void . go)
|
||||
return (k, v)
|
||||
where
|
||||
go iv = withS3Handle hv $ \case
|
||||
Just h -> do
|
||||
rewritePreconditionException $ retrieveHelper' h dest p Nothing $
|
||||
rewritePreconditionException $ retrieveHelper' h dest p iv $
|
||||
limitGetToContentIdentifier cid $
|
||||
S3.getObject (bucket info) o
|
||||
k <- mkkey
|
||||
k <- either return id gk
|
||||
case extractContentIdentifier cid o of
|
||||
Right vid -> do
|
||||
vids <- getS3VersionID rs k
|
||||
|
@ -664,7 +674,6 @@ retrieveExportWithContentIdentifierS3 hv r rs info loc cid dest mkkey p = withS3
|
|||
Left _ -> noop
|
||||
return k
|
||||
Nothing -> giveup $ needS3Creds (uuid r)
|
||||
where
|
||||
o = T.pack $ bucketExportLocation info loc
|
||||
|
||||
{- Catch exception getObject returns when a precondition is not met,
|
||||
|
|
|
@ -346,10 +346,11 @@ data ImportActions a = ImportActions
|
|||
-> ContentIdentifier
|
||||
-- file to write content to
|
||||
-> FilePath
|
||||
-- callback that generates a key from the downloaded content
|
||||
-> a Key
|
||||
-- Either the key, or when it's not yet known, a callback
|
||||
-- that generates a key from the downloaded content.
|
||||
-> Either Key (a Key)
|
||||
-> MeterUpdate
|
||||
-> a Key
|
||||
-> a (Key, Verification)
|
||||
-- Exports content to an ExportLocation, and returns the
|
||||
-- ContentIdentifier corresponding to the content it stored.
|
||||
--
|
||||
|
|
|
@ -12,3 +12,5 @@ If needed example, here is http://datasets.datalad.org/allen-brain-observatory/v
|
|||
|
||||
[[!meta author=yoh]]
|
||||
[[!tag projects/dandi]]
|
||||
|
||||
> [[done]] --[[Joey]]
|
||||
|
|
|
@ -6,4 +6,6 @@
|
|||
Update: incremental hashing is also now done for all export remotes.
|
||||
Only import (and export+import) remotes don't support incremental hashing
|
||||
now.
|
||||
|
||||
Update 2: Now also done for import remotes. All done!
|
||||
"""]]
|
||||
|
|
Loading…
Reference in a new issue