allow Retriever action to update the progress meter
Needed for eg, Remote.External. Generally, any Retriever that stores content in a file is responsible for updating the meter, while ones that procude a lazy bytestring cannot update the meter, so are not asked to.
This commit is contained in:
parent
1d263e1e7e
commit
47e522979c
5 changed files with 46 additions and 29 deletions
|
@ -264,7 +264,10 @@ prepTmp key = do
|
||||||
createAnnexDirectory (parentDir tmp)
|
createAnnexDirectory (parentDir tmp)
|
||||||
return tmp
|
return tmp
|
||||||
|
|
||||||
{- Creates a temp file, runs an action on it, and cleans up the temp file. -}
|
{- Creates a temp file for a key, runs an action on it, and cleans up
|
||||||
|
- the temp file. If the action throws an exception, the temp file is
|
||||||
|
- left behind, which allows for resuming.
|
||||||
|
-}
|
||||||
withTmp :: Key -> (FilePath -> Annex a) -> Annex a
|
withTmp :: Key -> (FilePath -> Annex a) -> Annex a
|
||||||
withTmp key action = do
|
withTmp key action = do
|
||||||
tmp <- prepTmp key
|
tmp <- prepTmp key
|
||||||
|
|
|
@ -137,8 +137,8 @@ store d chunkconfig k b p = liftIO $ do
|
||||||
|
|
||||||
retrieve :: FilePath -> ChunkConfig -> Preparer Retriever
|
retrieve :: FilePath -> ChunkConfig -> Preparer Retriever
|
||||||
retrieve d (LegacyChunks _) = Legacy.retrieve locations d
|
retrieve d (LegacyChunks _) = Legacy.retrieve locations d
|
||||||
retrieve d _ = simplyPrepare $ byteRetriever $
|
retrieve d _ = simplyPrepare $ byteRetriever $ \k ->
|
||||||
\k -> liftIO $ L.readFile =<< getLocation d k
|
liftIO $ L.readFile =<< getLocation d k
|
||||||
|
|
||||||
retrieveCheap :: FilePath -> ChunkConfig -> Key -> FilePath -> Annex Bool
|
retrieveCheap :: FilePath -> ChunkConfig -> Key -> FilePath -> Annex Bool
|
||||||
-- no cheap retrieval possible for chunks
|
-- no cheap retrieval possible for chunks
|
||||||
|
|
|
@ -221,7 +221,7 @@ retrieveChunks
|
||||||
-> Key
|
-> Key
|
||||||
-> FilePath
|
-> FilePath
|
||||||
-> MeterUpdate
|
-> MeterUpdate
|
||||||
-> (Handle -> MeterUpdate -> L.ByteString -> IO ())
|
-> (Handle -> Maybe MeterUpdate -> L.ByteString -> IO ())
|
||||||
-> Annex Bool
|
-> Annex Bool
|
||||||
retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
| noChunks chunkconfig =
|
| noChunks chunkconfig =
|
||||||
|
@ -245,18 +245,18 @@ retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
firstavail _ [] = return False
|
firstavail _ [] = return False
|
||||||
firstavail currsize ([]:ls) = firstavail currsize ls
|
firstavail currsize ([]:ls) = firstavail currsize ls
|
||||||
firstavail currsize ((k:ks):ls) = do
|
firstavail currsize ((k:ks):ls) = do
|
||||||
v <- tryNonAsyncAnnex $ retriever (encryptor k)
|
let offset = resumeOffset currsize k
|
||||||
|
let p = maybe basep
|
||||||
|
(offsetMeterUpdate basep . toBytesProcessed)
|
||||||
|
offset
|
||||||
|
v <- tryNonAsyncAnnex $ retriever (encryptor k) p
|
||||||
case v of
|
case v of
|
||||||
Left e
|
Left e
|
||||||
| null ls -> giveup e
|
| null ls -> giveup e
|
||||||
| otherwise -> firstavail currsize ls
|
| otherwise -> firstavail currsize ls
|
||||||
Right content -> do
|
Right content -> do
|
||||||
let offset = resumeOffset currsize k
|
|
||||||
let p = maybe basep
|
|
||||||
(offsetMeterUpdate basep . toBytesProcessed)
|
|
||||||
offset
|
|
||||||
bracketIO (maybe opennew openresume offset) hClose $ \h -> do
|
bracketIO (maybe opennew openresume offset) hClose $ \h -> do
|
||||||
withBytes content $ liftIO . sink h p
|
tosink h p content
|
||||||
let sz = toBytesProcessed $
|
let sz = toBytesProcessed $
|
||||||
fromMaybe 0 $ keyChunkSize k
|
fromMaybe 0 $ keyChunkSize k
|
||||||
getrest p h sz sz ks
|
getrest p h sz sz ks
|
||||||
|
@ -264,13 +264,11 @@ retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
getrest _ _ _ _ [] = return True
|
getrest _ _ _ _ [] = return True
|
||||||
getrest p h sz bytesprocessed (k:ks) = do
|
getrest p h sz bytesprocessed (k:ks) = do
|
||||||
let p' = offsetMeterUpdate p bytesprocessed
|
let p' = offsetMeterUpdate p bytesprocessed
|
||||||
content <- retriever (encryptor k)
|
tosink h p' =<< retriever (encryptor k) p'
|
||||||
withBytes content $ liftIO . sink h p'
|
|
||||||
getrest p h sz (addBytesProcessed bytesprocessed sz) ks
|
getrest p h sz (addBytesProcessed bytesprocessed sz) ks
|
||||||
|
|
||||||
getunchunked = bracketIO opennew hClose $ \h -> do
|
getunchunked = bracketIO opennew hClose $ \h -> do
|
||||||
content <- retriever (encryptor basek)
|
tosink h basep =<< retriever (encryptor basek) basep
|
||||||
withBytes content $ liftIO . sink h basep
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
opennew = openBinaryFile dest WriteMode
|
opennew = openBinaryFile dest WriteMode
|
||||||
|
@ -282,6 +280,19 @@ retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
hSeek h AbsoluteSeek startpoint
|
hSeek h AbsoluteSeek startpoint
|
||||||
return h
|
return h
|
||||||
|
|
||||||
|
{- Progress meter updating is a bit tricky: If the Retriever
|
||||||
|
- populates a file, it is responsible for updating progress
|
||||||
|
- as the file is being retrieved.
|
||||||
|
-
|
||||||
|
- However, if the Retriever generates a lazy ByteString,
|
||||||
|
- it is not responsible for updating progress (often it cannot).
|
||||||
|
- Instead, the sink is passed a meter to update as it consumes
|
||||||
|
- the ByteString. -}
|
||||||
|
tosink h p (ByteContent b) = liftIO $
|
||||||
|
sink h (Just p) b
|
||||||
|
tosink h _ (FileContent f) = liftIO $
|
||||||
|
sink h Nothing =<< L.readFile f
|
||||||
|
|
||||||
{- Can resume when the chunk's offset is at or before the end of
|
{- Can resume when the chunk's offset is at or before the end of
|
||||||
- the dest file. -}
|
- the dest file. -}
|
||||||
resumeOffset :: Maybe Integer -> Key -> Maybe Integer
|
resumeOffset :: Maybe Integer -> Key -> Maybe Integer
|
||||||
|
|
|
@ -14,6 +14,7 @@ module Remote.Helper.ChunkedEncryptable (
|
||||||
Storer,
|
Storer,
|
||||||
Retriever,
|
Retriever,
|
||||||
simplyPrepare,
|
simplyPrepare,
|
||||||
|
ContentSource,
|
||||||
checkPrepare,
|
checkPrepare,
|
||||||
fileStorer,
|
fileStorer,
|
||||||
byteStorer,
|
byteStorer,
|
||||||
|
@ -36,6 +37,8 @@ import Remote.Helper.Encryptable as X
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
import Annex.Exception
|
import Annex.Exception
|
||||||
|
|
||||||
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
simplyPrepare :: helper -> Preparer helper
|
simplyPrepare :: helper -> Preparer helper
|
||||||
simplyPrepare helper _ a = a $ Just helper
|
simplyPrepare helper _ a = a $ Just helper
|
||||||
|
|
||||||
|
@ -101,8 +104,10 @@ chunkedEncryptableRemote c preparestorer prepareretriever baser = encr
|
||||||
retrieveChunks retriever (uuid baser) chunkconfig
|
retrieveChunks retriever (uuid baser) chunkconfig
|
||||||
enck k dest p' sink
|
enck k dest p' sink
|
||||||
go Nothing = return False
|
go Nothing = return False
|
||||||
sink h p' b = do
|
sink h mp b = do
|
||||||
let write = meteredWrite p' h
|
let write = case mp of
|
||||||
|
Just p' -> meteredWrite p' h
|
||||||
|
Nothing -> L.hPut h
|
||||||
case enc of
|
case enc of
|
||||||
Nothing -> write b
|
Nothing -> write b
|
||||||
Just (cipher, _) ->
|
Just (cipher, _) ->
|
||||||
|
|
|
@ -10,8 +10,8 @@
|
||||||
module Types.StoreRetrieve where
|
module Types.StoreRetrieve where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
import Annex.Content
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
import Utility.Tmp
|
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
|
@ -30,25 +30,23 @@ type Storer = Key -> ContentSource -> MeterUpdate -> Annex Bool
|
||||||
|
|
||||||
-- Action that retrieves a Key's content from a remote.
|
-- Action that retrieves a Key's content from a remote.
|
||||||
-- Throws exception if key is not present, or remote is not accessible.
|
-- Throws exception if key is not present, or remote is not accessible.
|
||||||
type Retriever = Key -> Annex ContentSource
|
type Retriever = Key -> MeterUpdate -> Annex ContentSource
|
||||||
|
|
||||||
fileStorer :: (Key -> FilePath -> MeterUpdate -> Annex Bool) -> Storer
|
fileStorer :: (Key -> FilePath -> MeterUpdate -> Annex Bool) -> Storer
|
||||||
fileStorer a k (FileContent f) m = a k f m
|
fileStorer a k (FileContent f) m = a k f m
|
||||||
fileStorer a k (ByteContent b) m = withTmpFile "tmpXXXXXX" $ \f h -> do
|
fileStorer a k (ByteContent b) m = withTmp k $ \tmp -> do
|
||||||
liftIO $ do
|
liftIO $ L.writeFile tmp b
|
||||||
L.hPut h b
|
a k tmp m
|
||||||
hClose h
|
|
||||||
a k f m
|
|
||||||
|
|
||||||
byteStorer :: (Key -> L.ByteString -> MeterUpdate -> Annex Bool) -> Storer
|
byteStorer :: (Key -> L.ByteString -> MeterUpdate -> Annex Bool) -> Storer
|
||||||
byteStorer a k c m = withBytes c $ \b -> a k b m
|
byteStorer a k c m = withBytes c $ \b -> a k b m
|
||||||
|
|
||||||
|
fileRetriever :: (Key -> MeterUpdate -> Annex FilePath) -> Retriever
|
||||||
|
fileRetriever a k m = FileContent <$> a k m
|
||||||
|
|
||||||
|
byteRetriever :: (Key -> Annex L.ByteString) -> Retriever
|
||||||
|
byteRetriever a k _m = ByteContent <$> a k
|
||||||
|
|
||||||
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
||||||
withBytes (ByteContent b) a = a b
|
withBytes (ByteContent b) a = a b
|
||||||
withBytes (FileContent f) a = a =<< liftIO (L.readFile f)
|
withBytes (FileContent f) a = a =<< liftIO (L.readFile f)
|
||||||
|
|
||||||
fileRetriever :: (Key -> Annex FilePath) -> Retriever
|
|
||||||
fileRetriever a k = FileContent <$> a k
|
|
||||||
|
|
||||||
byteRetriever :: (Key -> Annex L.ByteString) -> Retriever
|
|
||||||
byteRetriever a k = ByteContent <$> a k
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue