resume interrupted chunked downloads

Leverage the new chunked remotes to automatically resume downloads.
Sort of like rsync, although of course not as efficient since this
needs to start at a chunk boundry.

But, unlike rsync, this method will work for S3, WebDAV, external
special remotes, etc, etc. Only directory special remotes so far,
but many more soon!

This implementation will also properly handle starting a download
from one remote, interrupting, and resuming from another one, and so on.

(Resuming interrupted chunked uploads is similarly doable, although
slightly more expensive.)

This commit was sponsored by Thomas Djärv.
This commit is contained in:
Joey Hess 2014-07-27 18:52:42 -04:00
parent 13bbb61a51
commit 9d4a766cd7
6 changed files with 88 additions and 32 deletions

View file

@ -1,6 +1,6 @@
{- git-annex chunked remotes
-
- Copyright 2012-2014 Joey Hess <joey@kitenet.net>
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@ -27,6 +27,7 @@ import Annex.Exception
import qualified Data.ByteString.Lazy as L
import qualified Data.Map as M
import Control.Exception
data ChunkConfig
= NoChunks
@ -147,12 +148,16 @@ removeChunks remover u chunkconfig encryptor k = do
-
- When the remote is chunked, tries each of the options returned by
- chunkKeys until it finds one where the retriever successfully
- gets the first key in the list. The content of that key, and any
- gets the first chunked key. The content of that key, and any
- other chunks in the list is fed to the sink.
-
- If retrival of one of the subsequent chunks throws an exception,
- gives up and returns False. Note that partial data may have been
- written to the sink in this case.
-
- Resuming is supported when using chunks. When the destination file
- already exists, it skips to the next chunked key that would be needed
- to resume.
-}
retrieveChunks
:: (Key -> IO L.ByteString)
@ -160,43 +165,88 @@ retrieveChunks
-> ChunkConfig
-> EncKey
-> Key
-> FilePath
-> MeterUpdate
-> (MeterUpdate -> L.ByteString -> IO ())
-> (Handle -> MeterUpdate -> L.ByteString -> IO ())
-> Annex Bool
retrieveChunks retriever u chunkconfig encryptor basek basep sink
retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
| noChunks chunkconfig =
-- Optimisation: Try the unchunked key first, to avoid
-- looking in the git-annex branch for chunk counts.
liftIO (retriever (encryptor basek) >>= sink basep >> return True)
`catchNonAsyncAnnex`
const (go =<< chunkKeysOnly u basek)
-- looking in the git-annex branch for chunk counts
-- that are likely not there.
getunchunked `catchNonAsyncAnnex`
const (go =<< chunkKeysOnly u basek)
| otherwise = go =<< chunkKeys u chunkconfig basek
where
go ls = liftIO $ firstavail ls `catchNonAsync` giveup
go ls = liftIO $ do
currsize <- catchMaybeIO $
toInteger . fileSize <$> getFileStatus dest
let ls' = maybe ls (setupResume ls) currsize
firstavail currsize ls' `catchNonAsync` giveup
giveup e = do
warningIO (show e)
return False
firstavail [] = return False
firstavail ([]:ls) = firstavail ls
firstavail ((k:ks):ls) = do
firstavail _ [] = return False
firstavail currsize ([]:ls) = firstavail currsize ls
firstavail currsize ((k:ks):ls) = do
v <- tryNonAsync $ retriever (encryptor k)
case v of
Left e
| null ls -> giveup e
| otherwise -> firstavail ls
| otherwise -> firstavail currsize ls
Right b -> do
sink basep b
let sz = toBytesProcessed $
fromMaybe 0 $ keyChunkSize k
getrest sz sz ks
let offset = resumeOffset currsize k
let p = maybe basep
(offsetMeterUpdate basep . toBytesProcessed)
offset
bracket (maybe opennew openresume offset) hClose $ \h -> do
sink h p b
let sz = toBytesProcessed $
fromMaybe 0 $ keyChunkSize k
getrest p h sz sz ks
getrest _ _ [] = return True
getrest sz bytesprocessed (k:ks) = do
let p = offsetMeterUpdate basep bytesprocessed
sink p =<< retriever (encryptor k)
getrest sz (addBytesProcessed bytesprocessed sz) ks
getrest _ _ _ _ [] = return True
getrest p h sz bytesprocessed (k:ks) = do
let p' = offsetMeterUpdate p bytesprocessed
sink h p' =<< retriever (encryptor k)
getrest p h sz (addBytesProcessed bytesprocessed sz) ks
getunchunked = liftIO $ bracket opennew hClose $ \h -> do
retriever (encryptor basek) >>= sink h basep
return True
opennew = openBinaryFile dest WriteMode
-- Open the file and seek to the start point in order to resume.
openresume startpoint = do
-- ReadWriteMode allows seeking; AppendMode does not.
h <- openBinaryFile dest ReadWriteMode
hSeek h AbsoluteSeek startpoint
return h
{- Can resume when the chunk's offset is at or before the end of
- the dest file. -}
resumeOffset :: Maybe Integer -> Key -> Maybe Integer
resumeOffset Nothing _ = Nothing
resumeOffset currsize k
| offset <= currsize = offset
| otherwise = Nothing
where
offset = chunkKeyOffset k
{- Drops chunks that are already present in a file, based on its size.
- Keeps any non-chunk keys.
-}
setupResume :: [[Key]] -> Integer -> [[Key]]
setupResume ls currsize = map dropunneeded ls
where
dropunneeded [] = []
dropunneeded l@(k:_) = case keyChunkSize k of
Just chunksize | chunksize > 0 ->
genericDrop (currsize `div` chunksize) l
_ -> l
{- Checks if a key is present in a remote. This requires any one
- of the lists of options returned by chunkKeys to all check out
@ -212,7 +262,8 @@ hasKeyChunks
hasKeyChunks checker u chunkconfig encryptor basek
| noChunks chunkconfig =
-- Optimisation: Try the unchunked key first, to avoid
-- looking in the git-annex branch for chunk counts.
-- looking in the git-annex branch for chunk counts
-- that are likely not there.
ifM ((Right True ==) <$> checker (encryptor basek))
( return (Right True)
, checklists impossible =<< chunkKeysOnly u basek