resume interrupted chunked uploads
Leverage the new chunked remotes to automatically resume uploads. Sort of like rsync, although of course not as efficient since this needs to start at a chunk boundry. But, unlike rsync, this method will work for S3, WebDAV, external special remotes, etc, etc. Only directory special remotes so far, but many more soon! This implementation will also allow starting an upload from one repository, interrupting it, and then resuming the upload to the same remote from an entirely different repository. Note that I added a comment that storeKey should atomically move the content into place once it's all received. This was already an undocumented requirement -- it's necessary for hasKey to work reliably. This resume code just uses hasKey to find the first chunk that's missing. Note that if there are two uploads of the same key to the same chunked remote, one might resume at the point the other had gotten to, but both will then redundantly upload. As before. In the non-resume case, this adds one hasKey call per storeKey, and only if the remote is configured to use chunks. Future work: Try to eliminate that hasKey. Notice that eg, `git annex copy --to` checks if the key is present before sending it, so is already running hasKey.. which could perhaps be cached and reused. However, this additional overhead is not very large compared with transferring an entire large file, and the ability to resume is certianly worth it. There is an optimisation in place for small files, that avoids trying to resume if the whole file fits within one chunk. This commit was sponsored by Georg Bauer.
This commit is contained in:
parent
153ace4524
commit
58f727afdd
3 changed files with 80 additions and 21 deletions
|
@ -73,6 +73,9 @@ numChunks = pred . fromJust . keyChunkNum . fst . nextChunkKeyStream
|
|||
- the storer action, along with a corresponding chunk key and a
|
||||
- progress meter update callback.
|
||||
-
|
||||
- To support resuming, the checker is used to find the first missing
|
||||
- chunk key. Storing starts from that chunk.
|
||||
-
|
||||
- This buffers each chunk in memory, so can use a lot of memory
|
||||
- with a large ChunkSize.
|
||||
- More optimal versions of this can be written, that rely
|
||||
|
@ -88,18 +91,31 @@ storeChunks
|
|||
-> FilePath
|
||||
-> MeterUpdate
|
||||
-> (Key -> L.ByteString -> MeterUpdate -> IO Bool)
|
||||
-> (Key -> Annex (Either String Bool))
|
||||
-> Annex Bool
|
||||
storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
||||
either (\e -> warning (show e) >> return False) (go meterupdate)
|
||||
=<< (liftIO $ tryIO $ L.readFile f)
|
||||
storeChunks u chunkconfig k f p storer checker = bracketIO open close go
|
||||
where
|
||||
go meterupdate b = case chunkconfig of
|
||||
(UnpaddedChunks chunksize) ->
|
||||
gochunks meterupdate chunksize b (chunkKeyStream k chunksize)
|
||||
_ -> liftIO $ storer k b meterupdate
|
||||
open = tryIO $ openBinaryFile f ReadMode
|
||||
|
||||
gochunks :: MeterUpdate -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex Bool
|
||||
gochunks meterupdate chunksize = loop zeroBytesProcessed . splitchunk
|
||||
close (Right h) = hClose h
|
||||
close (Left _) = noop
|
||||
|
||||
go (Left e) = do
|
||||
warning (show e)
|
||||
return False
|
||||
go (Right h) = metered (Just p) k $ \meterupdate ->
|
||||
case chunkconfig of
|
||||
(UnpaddedChunks chunksize) -> do
|
||||
let chunkkeys = chunkKeyStream k chunksize
|
||||
(chunkkeys', startpos) <- seekResume h chunkkeys checker
|
||||
b <- liftIO $ L.hGetContents h
|
||||
gochunks meterupdate startpos chunksize b chunkkeys'
|
||||
_ -> liftIO $ do
|
||||
b <- L.hGetContents h
|
||||
storer k b meterupdate
|
||||
|
||||
gochunks :: MeterUpdate -> BytesProcessed -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex Bool
|
||||
gochunks meterupdate startpos chunksize = loop startpos . splitchunk
|
||||
where
|
||||
splitchunk = L.splitAt chunksize
|
||||
|
||||
|
@ -125,6 +141,45 @@ storeChunks u chunkconfig k f p storer = metered (Just p) k $ \meterupdate ->
|
|||
- in previous chunks. -}
|
||||
meterupdate' = offsetMeterUpdate meterupdate bytesprocessed
|
||||
|
||||
{- Check if any of the chunk keys are present. If found, seek forward
|
||||
- in the Handle, so it will be read starting at the first missing chunk.
|
||||
- Returns the ChunkKeyStream truncated to start at the first missing
|
||||
- chunk, and the number of bytes skipped due to resuming.
|
||||
-
|
||||
- As an optimisation, if the file fits into a single chunk, there's no need
|
||||
- to check if that chunk is present -- we know it's not, because otherwise
|
||||
- the whole file would be present and there would be no reason to try to
|
||||
- store it.
|
||||
-}
|
||||
seekResume
|
||||
:: Handle
|
||||
-> ChunkKeyStream
|
||||
-> (Key -> Annex (Either String Bool))
|
||||
-> Annex (ChunkKeyStream, BytesProcessed)
|
||||
seekResume h chunkkeys checker = do
|
||||
sz <- liftIO (hFileSize h)
|
||||
if sz <= fromMaybe 0 (keyChunkSize $ fst $ nextChunkKeyStream chunkkeys)
|
||||
then return (chunkkeys, zeroBytesProcessed)
|
||||
else check 0 chunkkeys sz
|
||||
where
|
||||
check pos cks sz
|
||||
| pos >= sz = do
|
||||
-- All chunks are already stored!
|
||||
liftIO $ hSeek h AbsoluteSeek sz
|
||||
return (cks', toBytesProcessed sz)
|
||||
| otherwise = do
|
||||
v <- checker k
|
||||
case v of
|
||||
Right True ->
|
||||
check pos' cks' sz
|
||||
_ -> do
|
||||
when (pos > 0) $
|
||||
liftIO $ hSeek h AbsoluteSeek pos
|
||||
return (cks, toBytesProcessed pos)
|
||||
where
|
||||
(k, cks') = nextChunkKeyStream cks
|
||||
pos' = pos + fromMaybe 0 (keyChunkSize k)
|
||||
|
||||
{- Removes all chunks of a key from a remote, by calling a remover
|
||||
- action on each.
|
||||
-
|
||||
|
|
|
@ -60,19 +60,19 @@ chunkedEncryptableRemote
|
|||
-> Preparer Retriever
|
||||
-> Remote
|
||||
-> Remote
|
||||
chunkedEncryptableRemote c preparestorer prepareretriever r = encr
|
||||
chunkedEncryptableRemote c preparestorer prepareretriever baser = encr
|
||||
where
|
||||
encr = r
|
||||
encr = baser
|
||||
{ storeKey = \k _f p -> cip >>= storeKeyGen k p
|
||||
, retrieveKeyFile = \k _f d p -> cip >>= retrieveKeyFileGen k d p
|
||||
, retrieveKeyFileCheap = \k d -> cip >>= maybe
|
||||
(retrieveKeyFileCheap r k d)
|
||||
(retrieveKeyFileCheap baser k d)
|
||||
(\_ -> return False)
|
||||
, removeKey = \k -> cip >>= removeKeyGen k
|
||||
, hasKey = \k -> cip >>= hasKeyGen k
|
||||
, cost = maybe
|
||||
(cost r)
|
||||
(const $ cost r + encryptedRemoteCostAdj)
|
||||
(cost baser)
|
||||
(const $ cost baser + encryptedRemoteCostAdj)
|
||||
(extractCipher c)
|
||||
}
|
||||
cip = cipherKey c
|
||||
|
@ -87,8 +87,9 @@ chunkedEncryptableRemote c preparestorer prepareretriever r = encr
|
|||
where
|
||||
go (Just storer) = sendAnnex k rollback $ \src ->
|
||||
metered (Just p) k $ \p' ->
|
||||
storeChunks (uuid r) chunkconfig k src p' $
|
||||
storechunk storer
|
||||
storeChunks (uuid baser) chunkconfig k src p'
|
||||
(storechunk storer)
|
||||
(hasKey baser)
|
||||
go Nothing = return False
|
||||
rollback = void $ removeKey encr k
|
||||
storechunk storer k' b p' = case enc of
|
||||
|
@ -103,7 +104,8 @@ chunkedEncryptableRemote c preparestorer prepareretriever r = encr
|
|||
safely $ prepareretriever k $ safely . go
|
||||
where
|
||||
go (Just retriever) = metered (Just p) k $ \p' ->
|
||||
retrieveChunks retriever (uuid r) chunkconfig enck k dest p' sink
|
||||
retrieveChunks retriever (uuid baser) chunkconfig
|
||||
enck k dest p' sink
|
||||
go Nothing = return False
|
||||
sink h p' b = do
|
||||
let write = meteredWrite p' h
|
||||
|
@ -114,15 +116,15 @@ chunkedEncryptableRemote c preparestorer prepareretriever r = encr
|
|||
readBytes write
|
||||
enck = maybe id snd enc
|
||||
|
||||
removeKeyGen k enc = removeChunks remover (uuid r) chunkconfig enck k
|
||||
removeKeyGen k enc = removeChunks remover (uuid baser) chunkconfig enck k
|
||||
where
|
||||
enck = maybe id snd enc
|
||||
remover = removeKey r
|
||||
remover = removeKey baser
|
||||
|
||||
hasKeyGen k enc = hasKeyChunks checker (uuid r) chunkconfig enck k
|
||||
hasKeyGen k enc = hasKeyChunks checker (uuid baser) chunkconfig enck k
|
||||
where
|
||||
enck = maybe id snd enc
|
||||
checker = hasKey r
|
||||
checker = hasKey baser
|
||||
|
||||
{- The base Remote that is provided to chunkedEncryptableRemote
|
||||
- needs to have storeKey and retreiveKeyFile methods, but they are
|
||||
|
|
|
@ -57,6 +57,8 @@ data RemoteA a = Remote {
|
|||
-- Remotes have a use cost; higher is more expensive
|
||||
cost :: Cost,
|
||||
-- Transfers a key's contents from disk to the remote.
|
||||
-- The key should not appear to be present on the remote until
|
||||
-- all of its contents have been transferred.
|
||||
storeKey :: Key -> AssociatedFile -> MeterUpdate -> a Bool,
|
||||
-- Retrieves a key's contents to a file.
|
||||
-- (The MeterUpdate does not need to be used if it retrieves
|
||||
|
|
Loading…
Reference in a new issue