better handling of finishing up incomplete incremental verify
Now it's run in VerifyStage. I thought about keeping the file handle open, and resuming reading where tailVerify left off. But that risks leaking open file handles, until the GC closes them, if the deferred verification does not get resumed. Since that could perhaps happen if there's an exception somewhere, I decided that was too unsafe. Instead, re-open the file, seek, and resume. Sponsored-by: Dartmouth College's DANDI project
This commit is contained in:
parent
e0b7f391bd
commit
c4aba8e032
7 changed files with 114 additions and 88 deletions
129
Annex/Verify.hs
129
Annex/Verify.hs
|
@ -37,7 +37,6 @@ import Control.Concurrent.STM
|
|||
import qualified System.INotify as INotify
|
||||
import qualified Data.ByteString as S
|
||||
import qualified System.FilePath.ByteString as P
|
||||
import Data.Time.Clock.POSIX
|
||||
#endif
|
||||
|
||||
data VerifyConfig = AlwaysVerify | NoVerify | RemoteVerify Remote | DefaultVerify
|
||||
|
@ -82,23 +81,69 @@ verifyKeyContentPostRetrieval rsp v verification k f = case (rsp, verification)
|
|||
, return True
|
||||
)
|
||||
(_, MustVerify) -> verify
|
||||
(_, IncompleteVerify _) -> ifM (shouldVerify v)
|
||||
( verify
|
||||
, return True
|
||||
)
|
||||
where
|
||||
verify = enteringStage VerifyStage $ verifyKeyContent k f
|
||||
verify = enteringStage VerifyStage $
|
||||
case verification of
|
||||
IncompleteVerify iv -> resumeVerifyKeyContent k f iv
|
||||
_ -> verifyKeyContent k f
|
||||
|
||||
verifyKeyContent :: Key -> RawFilePath -> Annex Bool
|
||||
verifyKeyContent k f = verifysize <&&> verifycontent
|
||||
where
|
||||
verifysize = case fromKey keySize k of
|
||||
Nothing -> return True
|
||||
Just size -> do
|
||||
size' <- liftIO $ catchDefaultIO 0 $ getFileSize f
|
||||
return (size' == size)
|
||||
verifycontent = Backend.maybeLookupBackendVariety (fromKey keyVariety k) >>= \case
|
||||
verifyKeyContent k f = verifyKeySize k f <&&> verifyKeyContent' k f
|
||||
|
||||
verifyKeyContent' :: Key -> RawFilePath -> Annex Bool
|
||||
verifyKeyContent' k f =
|
||||
Backend.maybeLookupBackendVariety (fromKey keyVariety k) >>= \case
|
||||
Nothing -> return True
|
||||
Just b -> case Types.Backend.verifyKeyContent b of
|
||||
Nothing -> return True
|
||||
Just verifier -> verifier k f
|
||||
|
||||
resumeVerifyKeyContent :: Key -> RawFilePath -> IncrementalVerifier -> Annex Bool
|
||||
resumeVerifyKeyContent k f iv = liftIO (positionIncremental iv) >>= \case
|
||||
Nothing -> fallback
|
||||
Just endpos -> do
|
||||
fsz <- liftIO $ catchDefaultIO 0 $ getFileSize f
|
||||
if fsz < endpos
|
||||
then fallback
|
||||
else case fromKey keySize k of
|
||||
Just size | fsz /= size -> return False
|
||||
_ -> go fsz endpos
|
||||
where
|
||||
fallback = verifyKeyContent k f
|
||||
|
||||
go fsz endpos
|
||||
| fsz == endpos =
|
||||
liftIO $ catchDefaultIO False $
|
||||
finalizeIncremental iv
|
||||
| otherwise = do
|
||||
showAction (descVerify iv)
|
||||
liftIO $ catchDefaultIO False $
|
||||
withBinaryFile (fromRawFilePath f) ReadMode $ \h -> do
|
||||
hSeek h AbsoluteSeek endpos
|
||||
feedincremental h
|
||||
finalizeIncremental iv
|
||||
|
||||
feedincremental h = do
|
||||
b <- S.hGetSome h chunk
|
||||
if S.null b
|
||||
then return ()
|
||||
else do
|
||||
updateIncremental iv b
|
||||
feedincremental h
|
||||
|
||||
chunk = 65536
|
||||
|
||||
verifyKeySize :: Key -> RawFilePath -> Annex Bool
|
||||
verifyKeySize k f = case fromKey keySize k of
|
||||
Just size -> do
|
||||
size' <- liftIO $ catchDefaultIO 0 $ getFileSize f
|
||||
return (size' == size)
|
||||
Nothing -> return True
|
||||
|
||||
warnUnverifiableInsecure :: Key -> Annex ()
|
||||
warnUnverifiableInsecure k = warning $ unwords
|
||||
[ "Getting " ++ kv ++ " keys with this remote is not secure;"
|
||||
|
@ -127,7 +172,9 @@ startVerifyKeyContentIncrementally verifyconfig k =
|
|||
-- | Reads the file as it grows, and feeds it to the incremental verifier.
|
||||
--
|
||||
-- The TMVar must start out empty, and be filled once whatever is
|
||||
-- writing to the file finishes.
|
||||
-- writing to the file finishes. Once the writer finishes, this returns
|
||||
-- quickly. It may not feed the entire content of the file to the
|
||||
-- incremental verifier.
|
||||
--
|
||||
-- The file does not need to exist yet when this is called. It will wait
|
||||
-- for the file to appear before opening it and starting verification.
|
||||
|
@ -155,22 +202,12 @@ startVerifyKeyContentIncrementally verifyconfig k =
|
|||
-- and if the disk is slow, the reader may never catch up to the writer,
|
||||
-- and the disk cache may never speed up reads. So this should only be
|
||||
-- used when there's not a better way to incrementally verify.
|
||||
--
|
||||
-- If the writer gets far ahead, this can still need to do a significant
|
||||
-- amount off work once the writer is finished. That could lead to a long
|
||||
-- pause with no indication to the user about what is being done. To deal
|
||||
-- with this problem, it will do at most half a second of work after the
|
||||
-- writer has finished. If there is more work still to do, it returns an IO
|
||||
-- action that will complete the work. This way, the caller can display
|
||||
-- something appropriate while that is running.
|
||||
tailVerify :: IncrementalVerifier -> RawFilePath -> TMVar () -> IO (Maybe (IO ()))
|
||||
tailVerify :: IncrementalVerifier -> RawFilePath -> TMVar () -> IO ()
|
||||
#if WITH_INOTIFY
|
||||
tailVerify iv f finished =
|
||||
tryNonAsync go >>= \case
|
||||
Right r -> return r
|
||||
Left _ -> do
|
||||
failIncremental iv
|
||||
return Nothing
|
||||
Left _ -> failIncremental iv
|
||||
where
|
||||
-- Watch the directory containing the file, and wait for
|
||||
-- the file to be modified. It's possible that the file already
|
||||
|
@ -196,19 +233,15 @@ tailVerify iv f finished =
|
|||
let stop w = do
|
||||
cleanup w
|
||||
failIncremental iv
|
||||
return Nothing
|
||||
waitopen modified >>= \case
|
||||
Nothing -> stop wd
|
||||
Just h -> do
|
||||
cleanup wd
|
||||
wf <- inotifyfilechange i signalmodified
|
||||
tryNonAsync (follow h modified) >>= \case
|
||||
Left _ -> do
|
||||
hClose h
|
||||
stop wf
|
||||
Right r -> do
|
||||
cleanup wf
|
||||
return r
|
||||
Left _ -> stop wf
|
||||
Right () -> cleanup wf
|
||||
hClose h
|
||||
|
||||
waitopen modified = do
|
||||
v <- atomically $
|
||||
|
@ -233,47 +266,19 @@ tailVerify iv f finished =
|
|||
-- or until we're told it is done being
|
||||
-- written.
|
||||
cont <- atomically $
|
||||
((const (follow h modified))
|
||||
(const (follow h modified)
|
||||
<$> takeTMVar modified)
|
||||
`orElse`
|
||||
((const (finish h =<< getPOSIXTime))
|
||||
(const (return ())
|
||||
<$> takeTMVar finished)
|
||||
cont
|
||||
else do
|
||||
updateIncremental iv b
|
||||
atomically (tryTakeTMVar finished) >>= \case
|
||||
Nothing -> follow h modified
|
||||
Just () -> finish h =<< getPOSIXTime
|
||||
|
||||
-- We've been told the file is done being written to, but we
|
||||
-- may not have reached the end of it yet.
|
||||
finish h starttime = do
|
||||
b <- S.hGet h chunk
|
||||
if S.null b
|
||||
then do
|
||||
hClose h
|
||||
return Nothing
|
||||
else do
|
||||
updateIncremental iv b
|
||||
now <- getPOSIXTime
|
||||
if now - starttime > 0.5
|
||||
then return $ Just $
|
||||
tryNonAsync (deferredfinish h) >>= \case
|
||||
Right () -> noop
|
||||
Left _ -> failIncremental iv
|
||||
else finish h starttime
|
||||
|
||||
deferredfinish h = do
|
||||
b <- S.hGet h chunk
|
||||
if S.null b
|
||||
then hClose h
|
||||
else do
|
||||
updateIncremental iv b
|
||||
deferredfinish h
|
||||
Just () -> return ()
|
||||
|
||||
chunk = 65536
|
||||
#else
|
||||
tailVerify iv _ _ = do
|
||||
failIncremental iv
|
||||
return Nothing
|
||||
tailVerify iv _ _ = failIncremental iv
|
||||
#endif
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
-}
|
||||
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module Backend.Hash (
|
||||
backends,
|
||||
|
@ -280,19 +281,25 @@ md5Hasher = mkHasher md5 md5_context
|
|||
|
||||
mkIncrementalVerifier :: HashAlgorithm h => Context h -> Key -> IO IncrementalVerifier
|
||||
mkIncrementalVerifier ctx key = do
|
||||
v <- newIORef (Just ctx)
|
||||
v <- newIORef (Just (ctx, 0))
|
||||
return $ IncrementalVerifier
|
||||
{ updateIncremental = \b ->
|
||||
modifyIORef' v $ \case
|
||||
Just ctx' -> Just (hashUpdate ctx' b)
|
||||
(Just (ctx', n)) ->
|
||||
let !ctx'' = hashUpdate ctx' b
|
||||
!n' = n + fromIntegral (S.length b)
|
||||
in (Just (ctx'', n'))
|
||||
Nothing -> Nothing
|
||||
, finalizeIncremental =
|
||||
readIORef v >>= \case
|
||||
Just ctx' -> do
|
||||
(Just (ctx', _)) -> do
|
||||
let digest = hashFinalize ctx'
|
||||
return $ sameCheckSum key (show digest)
|
||||
Nothing -> return False
|
||||
, failIncremental = writeIORef v Nothing
|
||||
, positionIncremental = readIORef v >>= \case
|
||||
Just (_, n) -> return (Just n)
|
||||
Nothing -> return Nothing
|
||||
, descVerify = descChecksum
|
||||
}
|
||||
|
||||
|
|
|
@ -269,7 +269,7 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
-- that are likely not there.
|
||||
tryNonAsync getunchunked >>= \case
|
||||
Right r -> finalize r
|
||||
Left e -> go (Just e)
|
||||
Left e -> go (Just e)
|
||||
=<< chunkKeysOnly u chunkconfig basek
|
||||
| otherwise = go Nothing
|
||||
=<< chunkKeys u chunkconfig basek
|
||||
|
@ -279,7 +279,8 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
currsize <- liftIO $ catchMaybeIO $ getFileSize (toRawFilePath dest)
|
||||
let ls' = maybe ls (setupResume ls) currsize
|
||||
if any null ls'
|
||||
then finalize Nothing -- dest is already complete
|
||||
-- dest is already complete
|
||||
then finalize (Right Nothing)
|
||||
else finalize =<< firstavail pe currsize ls'
|
||||
|
||||
firstavail Nothing _ [] = giveup "unable to determine the chunks to use for this remote"
|
||||
|
@ -306,7 +307,7 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
| otherwise -> firstavail (Just e) currsize ls
|
||||
Right r -> return r
|
||||
|
||||
getrest _ _ iv _ _ [] = return iv
|
||||
getrest _ _ iv _ _ [] = return (Right iv)
|
||||
getrest p h iv sz bytesprocessed (k:ks) = do
|
||||
let p' = offsetMeterUpdate p bytesprocessed
|
||||
liftIO $ p' zeroBytesProcessed
|
||||
|
@ -317,14 +318,21 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
getunchunked = do
|
||||
iv <- startVerifyKeyContentIncrementally vc basek
|
||||
case enc of
|
||||
Just _ -> retriever (encryptor basek) basep Nothing $
|
||||
retrieved iv Nothing basep
|
||||
Just _ -> do
|
||||
retriever (encryptor basek) basep Nothing $
|
||||
retrieved iv Nothing basep
|
||||
return (Right iv)
|
||||
-- Not chunked and not encrypted, so ask the
|
||||
-- retriever to incrementally verify when it
|
||||
-- retrieves to a file.
|
||||
Nothing -> retriever (encryptor basek) basep iv $
|
||||
retrieved iv Nothing basep
|
||||
return iv
|
||||
-- retrieves to a file. It may not finish
|
||||
-- passing the whole file content to the
|
||||
-- incremental verifier though.
|
||||
Nothing -> do
|
||||
retriever (encryptor basek) basep iv $
|
||||
retrieved iv Nothing basep
|
||||
return $ case iv of
|
||||
Nothing -> Right iv
|
||||
Just iv' -> Left (IncompleteVerify iv')
|
||||
|
||||
opennew = do
|
||||
iv <- startVerifyKeyContentIncrementally vc basek
|
||||
|
@ -357,12 +365,13 @@ retrieveChunks retriever u vc chunkconfig encryptor basek dest basep enc encc
|
|||
| isByteContent content = Just p
|
||||
| otherwise = Nothing
|
||||
|
||||
finalize Nothing = return UnVerified
|
||||
finalize (Just iv) =
|
||||
finalize (Right Nothing) = return UnVerified
|
||||
finalize (Right (Just iv)) =
|
||||
ifM (liftIO $ finalizeIncremental iv)
|
||||
( return Verified
|
||||
, return UnVerified
|
||||
)
|
||||
finalize (Left v) = return v
|
||||
|
||||
{- Writes retrieved file content to the provided Handle, decrypting it
|
||||
- first if necessary.
|
||||
|
|
|
@ -113,7 +113,8 @@ byteRetriever a k _m _miv callback = a k (callback . ByteContent)
|
|||
-- A Retriever that writes the content of a Key to a provided file.
|
||||
-- The action is responsible for updating the progress meter as it
|
||||
-- retrieves data. The incremental verifier is updated in the background as
|
||||
-- the action writes to the file.
|
||||
-- the action writes to the file, but may not be updated with the entire
|
||||
-- content of the file.
|
||||
fileRetriever :: (FilePath -> Key -> MeterUpdate -> Annex ()) -> Retriever
|
||||
fileRetriever a k m miv callback = do
|
||||
f <- prepTmp k
|
||||
|
@ -123,13 +124,10 @@ fileRetriever a k m miv callback = do
|
|||
Just iv -> do
|
||||
finished <- liftIO newEmptyTMVarIO
|
||||
t <- liftIO $ async $ tailVerify iv f finished
|
||||
retrieve
|
||||
liftIO $ atomically $ putTMVar finished ()
|
||||
liftIO (wait t) >>= \case
|
||||
Nothing -> noop
|
||||
Just deferredverify -> do
|
||||
showAction (descVerify iv)
|
||||
liftIO deferredverify
|
||||
let finishtail = do
|
||||
liftIO $ atomically $ putTMVar finished ()
|
||||
liftIO (wait t)
|
||||
retrieve `finally` finishtail
|
||||
pruneTmpWorkDirBefore f (callback . FileContent . fromRawFilePath)
|
||||
|
||||
{- The base Remote that is provided to specialRemote needs to have
|
||||
|
|
|
@ -52,6 +52,10 @@ data IncrementalVerifier = IncrementalVerifier
|
|||
-- if the hash verified.
|
||||
, failIncremental :: IO ()
|
||||
-- ^ Call if the incremental verification needs to fail.
|
||||
, positionIncremental :: IO (Maybe Integer)
|
||||
-- ^ Returns the number of bytes that have been fed to this
|
||||
-- incremental verifier so far. (Nothing if failIncremental was
|
||||
-- called.)
|
||||
, descVerify :: String
|
||||
-- ^ A description of what is done to verify the content.
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
-
|
||||
- Most things should not need this, using Types instead
|
||||
-
|
||||
- Copyright 2011-2020 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2011-2021 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
@ -41,6 +41,7 @@ import Types.NumCopies
|
|||
import Types.Export
|
||||
import Types.Import
|
||||
import Types.RemoteConfig
|
||||
import Types.Backend (IncrementalVerifier)
|
||||
import Config.Cost
|
||||
import Utility.Metered
|
||||
import Git.Types (RemoteName)
|
||||
|
@ -203,7 +204,9 @@ data Verification
|
|||
| MustVerify
|
||||
-- ^ Content likely to have been altered during transfer,
|
||||
-- verify even if verification is normally disabled
|
||||
deriving (Show)
|
||||
| IncompleteVerify IncrementalVerifier
|
||||
-- ^ Content was partially verified during transfer, but
|
||||
-- the verification is not complete.
|
||||
|
||||
unVerified :: Monad m => m a -> m (a, Verification)
|
||||
unVerified a = do
|
||||
|
|
|
@ -35,8 +35,8 @@ type Storer = Key -> ContentSource -> MeterUpdate -> Annex ()
|
|||
--
|
||||
-- When it retrieves FileContent, it is responsible for updating the
|
||||
-- MeterUpdate. And when the IncrementalVerifier is passed to it,
|
||||
-- and it retrieves FileContent, it should feed the content to the
|
||||
-- verifier before running the callback.
|
||||
-- and it retrieves FileContent, it can feed some or all of the file's
|
||||
-- content to the verifier before running the callback.
|
||||
-- This should not be done when it retrieves ByteContent.
|
||||
type Retriever = forall a.
|
||||
Key -> MeterUpdate -> Maybe IncrementalVerifier
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue