refactor
Avoiding using a callback simplifies this and should make it easier to implement incremental checksumming, which will need to happen partly in writeRetrievedContent and partly in retrieveChunks.
This commit is contained in:
parent
48310f2d55
commit
381f203d1a
2 changed files with 88 additions and 78 deletions
|
@ -20,16 +20,19 @@ module Remote.Helper.Chunked (
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
import qualified Annex
|
||||||
import Utility.DataUnits
|
import Utility.DataUnits
|
||||||
import Types.StoreRetrieve
|
import Types.StoreRetrieve
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Types.ProposedAccepted
|
import Types.ProposedAccepted
|
||||||
import Logs.Chunk
|
import Logs.Chunk
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
import Crypto (EncKey)
|
import Crypto
|
||||||
import Backend (isStableKey)
|
import Backend (isStableKey)
|
||||||
import Annex.SpecialRemote.Config
|
import Annex.SpecialRemote.Config
|
||||||
|
import qualified Utility.RawFilePath as R
|
||||||
|
|
||||||
|
import qualified Data.ByteString as S
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
data ChunkConfig
|
data ChunkConfig
|
||||||
|
@ -109,16 +112,19 @@ numChunks = pred . fromJust . fromKey keyChunkNum . fst . nextChunkKeyStream
|
||||||
- writes a whole L.ByteString at a time.
|
- writes a whole L.ByteString at a time.
|
||||||
-}
|
-}
|
||||||
storeChunks
|
storeChunks
|
||||||
:: UUID
|
:: LensGpgEncParams encc
|
||||||
|
=> UUID
|
||||||
-> ChunkConfig
|
-> ChunkConfig
|
||||||
-> EncKey
|
-> EncKey
|
||||||
-> Key
|
-> Key
|
||||||
-> FilePath
|
-> FilePath
|
||||||
-> MeterUpdate
|
-> MeterUpdate
|
||||||
|
-> Maybe (Cipher, EncKey)
|
||||||
|
-> encc
|
||||||
-> Storer
|
-> Storer
|
||||||
-> CheckPresent
|
-> CheckPresent
|
||||||
-> Annex ()
|
-> Annex ()
|
||||||
storeChunks u chunkconfig encryptor k f p storer checker =
|
storeChunks u chunkconfig encryptor k f p enc encc storer checker =
|
||||||
case chunkconfig of
|
case chunkconfig of
|
||||||
-- Only stable keys are safe to store chunked,
|
-- Only stable keys are safe to store chunked,
|
||||||
-- because an unstable key can have multiple different
|
-- because an unstable key can have multiple different
|
||||||
|
@ -129,9 +135,9 @@ storeChunks u chunkconfig encryptor k f p storer checker =
|
||||||
h <- liftIO $ openBinaryFile f ReadMode
|
h <- liftIO $ openBinaryFile f ReadMode
|
||||||
go chunksize h
|
go chunksize h
|
||||||
liftIO $ hClose h
|
liftIO $ hClose h
|
||||||
, storer k (FileContent f) p
|
, storechunk k (FileContent f) p
|
||||||
)
|
)
|
||||||
_ -> storer k (FileContent f) p
|
_ -> storechunk k (FileContent f) p
|
||||||
where
|
where
|
||||||
go chunksize h = do
|
go chunksize h = do
|
||||||
let chunkkeys = chunkKeyStream k chunksize
|
let chunkkeys = chunkKeyStream k chunksize
|
||||||
|
@ -152,7 +158,7 @@ storeChunks u chunkconfig encryptor k f p storer checker =
|
||||||
| otherwise = do
|
| otherwise = do
|
||||||
liftIO $ meterupdate' zeroBytesProcessed
|
liftIO $ meterupdate' zeroBytesProcessed
|
||||||
let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys
|
let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys
|
||||||
storer chunkkey (ByteContent chunk) meterupdate'
|
storechunk chunkkey (ByteContent chunk) meterupdate'
|
||||||
let bytesprocessed' = addBytesProcessed bytesprocessed (L.length chunk)
|
let bytesprocessed' = addBytesProcessed bytesprocessed (L.length chunk)
|
||||||
loop bytesprocessed' (splitchunk bs) chunkkeys'
|
loop bytesprocessed' (splitchunk bs) chunkkeys'
|
||||||
where
|
where
|
||||||
|
@ -163,6 +169,15 @@ storeChunks u chunkconfig encryptor k f p storer checker =
|
||||||
- in previous chunks. -}
|
- in previous chunks. -}
|
||||||
meterupdate' = offsetMeterUpdate meterupdate bytesprocessed
|
meterupdate' = offsetMeterUpdate meterupdate bytesprocessed
|
||||||
|
|
||||||
|
storechunk ck content meterupdate = case enc of
|
||||||
|
Nothing -> storer ck content meterupdate
|
||||||
|
(Just (cipher, enck)) -> do
|
||||||
|
cmd <- gpgCmd <$> Annex.getGitConfig
|
||||||
|
withBytes content $ \b ->
|
||||||
|
encrypt cmd encc cipher (feedBytes b) $
|
||||||
|
readBytes $ \encb ->
|
||||||
|
storer (enck ck) (ByteContent encb) meterupdate
|
||||||
|
|
||||||
{- Check if any of the chunk keys are present. If found, seek forward
|
{- Check if any of the chunk keys are present. If found, seek forward
|
||||||
- in the Handle, so it will be read starting at the first missing chunk.
|
- in the Handle, so it will be read starting at the first missing chunk.
|
||||||
- Returns the ChunkKeyStream truncated to start at the first missing
|
- Returns the ChunkKeyStream truncated to start at the first missing
|
||||||
|
@ -219,28 +234,31 @@ removeChunks remover u chunkconfig encryptor k = do
|
||||||
-
|
-
|
||||||
- When the remote is chunked, tries each of the options returned by
|
- When the remote is chunked, tries each of the options returned by
|
||||||
- chunkKeys until it finds one where the retriever successfully
|
- chunkKeys until it finds one where the retriever successfully
|
||||||
- gets the first chunked key. The content of that key, and any
|
- gets the first chunked key.
|
||||||
- other chunks in the list is fed to the sink.
|
|
||||||
-
|
-
|
||||||
- If retrival of one of the subsequent chunks throws an exception,
|
- If retrival of one of the subsequent chunks throws an exception,
|
||||||
- gives up. Note that partial data may have been written to the sink
|
- gives up. Note that partial data may have been written to the file
|
||||||
- in this case.
|
- in this case.
|
||||||
-
|
-
|
||||||
- Resuming is supported when using chunks. When the destination file
|
- Resuming is supported when using chunks. When the destination file
|
||||||
- already exists, it skips to the next chunked key that would be needed
|
- already exists, it skips to the next chunked key that would be needed
|
||||||
- to resume.
|
- to resume.
|
||||||
|
-
|
||||||
|
- Handles decrypting the content when encryption is used.
|
||||||
-}
|
-}
|
||||||
retrieveChunks
|
retrieveChunks
|
||||||
:: Retriever
|
:: LensGpgEncParams encc
|
||||||
|
=> Retriever
|
||||||
-> UUID
|
-> UUID
|
||||||
-> ChunkConfig
|
-> ChunkConfig
|
||||||
-> EncKey
|
-> EncKey
|
||||||
-> Key
|
-> Key
|
||||||
-> FilePath
|
-> FilePath
|
||||||
-> MeterUpdate
|
-> MeterUpdate
|
||||||
-> (Maybe Handle -> Maybe MeterUpdate -> ContentSource -> Annex ())
|
-> Maybe (Cipher, EncKey)
|
||||||
|
-> encc
|
||||||
-> Annex ()
|
-> Annex ()
|
||||||
retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
retrieveChunks retriever u chunkconfig encryptor basek dest basep enc encc
|
||||||
| noChunks chunkconfig =
|
| noChunks chunkconfig =
|
||||||
-- Optimisation: Try the unchunked key first, to avoid
|
-- Optimisation: Try the unchunked key first, to avoid
|
||||||
-- looking in the git-annex branch for chunk counts
|
-- looking in the git-annex branch for chunk counts
|
||||||
|
@ -271,7 +289,7 @@ retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
v <- tryNonAsync $
|
v <- tryNonAsync $
|
||||||
retriever (encryptor k) p $ \content ->
|
retriever (encryptor k) p $ \content ->
|
||||||
bracketIO (maybe opennew openresume offset) hClose $ \h -> do
|
bracketIO (maybe opennew openresume offset) hClose $ \h -> do
|
||||||
tosink (Just h) p content
|
retrieved (Just h) p content
|
||||||
let sz = toBytesProcessed $
|
let sz = toBytesProcessed $
|
||||||
fromMaybe 0 $ fromKey keyChunkSize k
|
fromMaybe 0 $ fromKey keyChunkSize k
|
||||||
getrest p h sz sz ks
|
getrest p h sz sz ks
|
||||||
|
@ -285,10 +303,10 @@ retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
getrest p h sz bytesprocessed (k:ks) = do
|
getrest p h sz bytesprocessed (k:ks) = do
|
||||||
let p' = offsetMeterUpdate p bytesprocessed
|
let p' = offsetMeterUpdate p bytesprocessed
|
||||||
liftIO $ p' zeroBytesProcessed
|
liftIO $ p' zeroBytesProcessed
|
||||||
retriever (encryptor k) p' $ tosink (Just h) p'
|
retriever (encryptor k) p' $ retrieved (Just h) p'
|
||||||
getrest p h sz (addBytesProcessed bytesprocessed sz) ks
|
getrest p h sz (addBytesProcessed bytesprocessed sz) ks
|
||||||
|
|
||||||
getunchunked = retriever (encryptor basek) basep $ tosink Nothing basep
|
getunchunked = retriever (encryptor basek) basep $ retrieved Nothing basep
|
||||||
|
|
||||||
opennew = openBinaryFile dest WriteMode
|
opennew = openBinaryFile dest WriteMode
|
||||||
|
|
||||||
|
@ -305,15 +323,61 @@ retrieveChunks retriever u chunkconfig encryptor basek dest basep sink
|
||||||
-
|
-
|
||||||
- However, if the Retriever generates a lazy ByteString,
|
- However, if the Retriever generates a lazy ByteString,
|
||||||
- it is not responsible for updating progress (often it cannot).
|
- it is not responsible for updating progress (often it cannot).
|
||||||
- Instead, the sink is passed a meter to update as it consumes
|
- Instead, writeRetrievedContent is passed a meter to update
|
||||||
- the ByteString.
|
- as it consumes the ByteString.
|
||||||
-}
|
-}
|
||||||
tosink h p content = sink h p' content
|
retrieved h p content = writeRetrievedContent dest enc encc h p' content
|
||||||
where
|
where
|
||||||
p'
|
p'
|
||||||
| isByteContent content = Just p
|
| isByteContent content = Just p
|
||||||
| otherwise = Nothing
|
| otherwise = Nothing
|
||||||
|
|
||||||
|
{- Writes retrieved file content into the provided Handle, decrypting it
|
||||||
|
- first if necessary.
|
||||||
|
-
|
||||||
|
- If the remote did not store the content using chunks, no Handle
|
||||||
|
- will be provided, and it's up to us to open the destination file.
|
||||||
|
-
|
||||||
|
- Note that when neither chunking nor encryption is used, and the remote
|
||||||
|
- provides FileContent, that file only needs to be renamed
|
||||||
|
- into place. (And it may even already be in the right place..)
|
||||||
|
-}
|
||||||
|
writeRetrievedContent
|
||||||
|
:: LensGpgEncParams encc
|
||||||
|
=> FilePath
|
||||||
|
-> Maybe (Cipher, EncKey)
|
||||||
|
-> encc
|
||||||
|
-> Maybe Handle
|
||||||
|
-> Maybe MeterUpdate
|
||||||
|
-> ContentSource
|
||||||
|
-> Annex ()
|
||||||
|
writeRetrievedContent dest enc encc mh mp content = case (enc, mh, content) of
|
||||||
|
(Nothing, Nothing, FileContent f)
|
||||||
|
| f == dest -> noop
|
||||||
|
| otherwise -> liftIO $ moveFile f dest
|
||||||
|
(Just (cipher, _), _, ByteContent b) -> do
|
||||||
|
cmd <- gpgCmd <$> Annex.getGitConfig
|
||||||
|
decrypt cmd encc cipher (feedBytes b) $
|
||||||
|
readBytes write
|
||||||
|
(Just (cipher, _), _, FileContent f) -> do
|
||||||
|
cmd <- gpgCmd <$> Annex.getGitConfig
|
||||||
|
withBytes content $ \b ->
|
||||||
|
decrypt cmd encc cipher (feedBytes b) $
|
||||||
|
readBytes write
|
||||||
|
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
|
||||||
|
(Nothing, _, FileContent f) -> do
|
||||||
|
withBytes content write
|
||||||
|
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
|
||||||
|
(Nothing, _, ByteContent b) -> write b
|
||||||
|
where
|
||||||
|
write b = case mh of
|
||||||
|
Just h -> liftIO $ b `streamto` h
|
||||||
|
Nothing -> liftIO $ bracket opendest hClose (b `streamto`)
|
||||||
|
streamto b h = case mp of
|
||||||
|
Just p -> meteredWrite p (S.hPut h) b
|
||||||
|
Nothing -> L.hPut h b
|
||||||
|
opendest = openBinaryFile dest WriteMode
|
||||||
|
|
||||||
{- Can resume when the chunk's offset is at or before the end of
|
{- Can resume when the chunk's offset is at or before the end of
|
||||||
- the dest file. -}
|
- the dest file. -}
|
||||||
resumeOffset :: Maybe Integer -> Key -> Maybe Integer
|
resumeOffset :: Maybe Integer -> Key -> Maybe Integer
|
||||||
|
@ -455,3 +519,7 @@ ensureChunksAreLogged :: UUID -> Key -> ChunkKeys -> Annex ()
|
||||||
ensureChunksAreLogged u k (SpeculativeChunkKeys (chunkmethod, chunkcount) _) =
|
ensureChunksAreLogged u k (SpeculativeChunkKeys (chunkmethod, chunkcount) _) =
|
||||||
chunksStored u k chunkmethod chunkcount
|
chunksStored u k chunkmethod chunkcount
|
||||||
ensureChunksAreLogged _ _ (ChunkKeys _) = return ()
|
ensureChunksAreLogged _ _ (ChunkKeys _) = return ()
|
||||||
|
|
||||||
|
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
||||||
|
withBytes (ByteContent b) a = a b
|
||||||
|
withBytes (FileContent f) a = a =<< liftIO (L.readFile f)
|
||||||
|
|
|
@ -35,11 +35,9 @@ module Remote.Helper.Special (
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Annex
|
|
||||||
import Annex.SpecialRemote.Config
|
import Annex.SpecialRemote.Config
|
||||||
import Types.StoreRetrieve
|
import Types.StoreRetrieve
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Crypto
|
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
import Config
|
import Config
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
|
@ -51,7 +49,6 @@ import Messages.Progress
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import qualified Git.Construct
|
import qualified Git.Construct
|
||||||
import Git.Types
|
import Git.Types
|
||||||
import qualified Utility.RawFilePath as R
|
|
||||||
|
|
||||||
import qualified Data.ByteString as S
|
import qualified Data.ByteString as S
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
@ -213,25 +210,16 @@ specialRemote' cfg c storer retriever remover checkpresent baser = encr
|
||||||
storeKeyGen k p enc = sendAnnex k rollback $ \src ->
|
storeKeyGen k p enc = sendAnnex k rollback $ \src ->
|
||||||
displayprogress p k (Just src) $ \p' ->
|
displayprogress p k (Just src) $ \p' ->
|
||||||
storeChunks (uuid baser) chunkconfig enck k src p'
|
storeChunks (uuid baser) chunkconfig enck k src p'
|
||||||
(storechunk enc)
|
enc encr storer checkpresent
|
||||||
checkpresent
|
|
||||||
where
|
where
|
||||||
rollback = void $ removeKey encr k
|
rollback = void $ removeKey encr k
|
||||||
enck = maybe id snd enc
|
enck = maybe id snd enc
|
||||||
|
|
||||||
storechunk Nothing k content p = storer k content p
|
|
||||||
storechunk (Just (cipher, enck)) k content p = do
|
|
||||||
cmd <- gpgCmd <$> Annex.getGitConfig
|
|
||||||
withBytes content $ \b ->
|
|
||||||
encrypt cmd encr cipher (feedBytes b) $
|
|
||||||
readBytes $ \encb ->
|
|
||||||
storer (enck k) (ByteContent encb) p
|
|
||||||
|
|
||||||
-- call retriever to get chunks; decrypt them; stream to dest file
|
-- call retriever to get chunks; decrypt them; stream to dest file
|
||||||
retrieveKeyFileGen k dest p enc = do
|
retrieveKeyFileGen k dest p enc = do
|
||||||
displayprogress p k Nothing $ \p' ->
|
displayprogress p k Nothing $ \p' ->
|
||||||
retrieveChunks retriever (uuid baser) chunkconfig
|
retrieveChunks retriever (uuid baser) chunkconfig
|
||||||
enck k dest p' (sink dest enc encr)
|
enck k dest p' enc encr
|
||||||
return UnVerified
|
return UnVerified
|
||||||
where
|
where
|
||||||
enck = maybe id snd enc
|
enck = maybe id snd enc
|
||||||
|
@ -253,52 +241,6 @@ specialRemote' cfg c storer retriever remover checkpresent baser = encr
|
||||||
metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) (const a)
|
metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) (const a)
|
||||||
| otherwise = a p
|
| otherwise = a p
|
||||||
|
|
||||||
{- Sink callback for retrieveChunks. Stores the file content into the
|
|
||||||
- provided Handle, decrypting it first if necessary.
|
|
||||||
-
|
|
||||||
- If the remote did not store the content using chunks, no Handle
|
|
||||||
- will be provided, and it's up to us to open the destination file.
|
|
||||||
-
|
|
||||||
- Note that when neither chunking nor encryption is used, and the remote
|
|
||||||
- provides FileContent, that file only needs to be renamed
|
|
||||||
- into place. (And it may even already be in the right place..)
|
|
||||||
-}
|
|
||||||
sink
|
|
||||||
:: LensGpgEncParams c
|
|
||||||
=> FilePath
|
|
||||||
-> Maybe (Cipher, EncKey)
|
|
||||||
-> c
|
|
||||||
-> Maybe Handle
|
|
||||||
-> Maybe MeterUpdate
|
|
||||||
-> ContentSource
|
|
||||||
-> Annex ()
|
|
||||||
sink dest enc c mh mp content = case (enc, mh, content) of
|
|
||||||
(Nothing, Nothing, FileContent f)
|
|
||||||
| f == dest -> noop
|
|
||||||
| otherwise -> liftIO $ moveFile f dest
|
|
||||||
(Just (cipher, _), _, ByteContent b) -> do
|
|
||||||
cmd <- gpgCmd <$> Annex.getGitConfig
|
|
||||||
decrypt cmd c cipher (feedBytes b) $
|
|
||||||
readBytes write
|
|
||||||
(Just (cipher, _), _, FileContent f) -> do
|
|
||||||
cmd <- gpgCmd <$> Annex.getGitConfig
|
|
||||||
withBytes content $ \b ->
|
|
||||||
decrypt cmd c cipher (feedBytes b) $
|
|
||||||
readBytes write
|
|
||||||
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
|
|
||||||
(Nothing, _, FileContent f) -> do
|
|
||||||
withBytes content write
|
|
||||||
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
|
|
||||||
(Nothing, _, ByteContent b) -> write b
|
|
||||||
where
|
|
||||||
write b = case mh of
|
|
||||||
Just h -> liftIO $ b `streamto` h
|
|
||||||
Nothing -> liftIO $ bracket opendest hClose (b `streamto`)
|
|
||||||
streamto b h = case mp of
|
|
||||||
Just p -> meteredWrite p (S.hPut h) b
|
|
||||||
Nothing -> L.hPut h b
|
|
||||||
opendest = openBinaryFile dest WriteMode
|
|
||||||
|
|
||||||
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
||||||
withBytes (ByteContent b) a = a b
|
withBytes (ByteContent b) a = a b
|
||||||
withBytes (FileContent f) a = a =<< liftIO (L.readFile f)
|
withBytes (FileContent f) a = a =<< liftIO (L.readFile f)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue