git-annex/Remote/Helper/Chunked.hs
2021-04-27 15:23:56 -04:00

525 lines
17 KiB
Haskell

{- git-annex chunked remotes
-
- Copyright 2014-2020 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module Remote.Helper.Chunked (
ChunkSize,
ChunkConfig(..),
noChunks,
describeChunkConfig,
chunkConfigParsers,
getChunkConfig,
storeChunks,
removeChunks,
retrieveChunks,
checkPresentChunks,
chunkField,
) where
import Annex.Common
import qualified Annex
import Utility.DataUnits
import Types.StoreRetrieve
import Types.Remote
import Types.ProposedAccepted
import Logs.Chunk
import Utility.Metered
import Crypto
import Backend (isStableKey)
import Annex.SpecialRemote.Config
import qualified Utility.RawFilePath as R
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
data ChunkConfig
= NoChunks
| UnpaddedChunks ChunkSize
| LegacyChunks ChunkSize
deriving (Show)
describeChunkConfig :: ChunkConfig -> String
describeChunkConfig NoChunks = "none"
describeChunkConfig (UnpaddedChunks sz) = describeChunkSize sz ++ " chunks"
describeChunkConfig (LegacyChunks sz) = describeChunkSize sz ++ " chunks (old style)"
describeChunkSize :: ChunkSize -> String
describeChunkSize sz = roughSize storageUnits False (fromIntegral sz)
noChunks :: ChunkConfig -> Bool
noChunks NoChunks = True
noChunks _ = False
chunkConfigParsers :: [RemoteConfigFieldParser]
chunkConfigParsers =
[ optionalStringParser chunksizeField HiddenField -- deprecated
, optionalStringParser chunkField
(FieldDesc "size of chunks (eg, 1MiB)")
]
getChunkConfig :: ParsedRemoteConfig -> ChunkConfig
getChunkConfig c =
case getRemoteConfigValue chunksizeField c of
Nothing -> case getRemoteConfigValue chunkField c of
Nothing -> NoChunks
Just v -> readsz UnpaddedChunks v chunkField
Just v -> readsz LegacyChunks v chunksizeField
where
readsz mk v f = case readSize dataUnits v of
Just size
| size == 0 -> NoChunks
| size > 0 -> mk (fromInteger size)
_ -> giveup $ "bad configuration " ++ fromProposedAccepted f ++ "=" ++ v
-- An infinite stream of chunk keys, starting from chunk 1.
newtype ChunkKeyStream = ChunkKeyStream [Key]
chunkKeyStream :: Key -> ChunkSize -> ChunkKeyStream
chunkKeyStream basek chunksize = ChunkKeyStream $ map mk [1..]
where
mk chunknum = alterKey sizedk $ \d -> d
{ keyChunkNum = Just chunknum }
sizedk = alterKey basek $ \d -> d
{ keyChunkSize = Just (toInteger chunksize) }
nextChunkKeyStream :: ChunkKeyStream -> (Key, ChunkKeyStream)
nextChunkKeyStream (ChunkKeyStream (k:l)) = (k, ChunkKeyStream l)
nextChunkKeyStream (ChunkKeyStream []) = error "expected infinite ChunkKeyStream"
takeChunkKeyStream :: ChunkCount -> ChunkKeyStream -> [Key]
takeChunkKeyStream n (ChunkKeyStream l) = genericTake n l
-- Number of chunks already consumed from the stream.
numChunks :: ChunkKeyStream -> Integer
numChunks = pred . fromJust . fromKey keyChunkNum . fst . nextChunkKeyStream
{- Splits up the key's content into chunks, passing each chunk to
- the storer action, along with a corresponding chunk key and a
- progress meter update callback.
-
- To support resuming, the checker is used to find the first missing
- chunk key. Storing starts from that chunk.
-
- This buffers each chunk in memory, so can use a lot of memory
- with a large ChunkSize.
- More optimal versions of this can be written, that rely
- on L.toChunks to split the lazy bytestring into chunks (typically
- smaller than the ChunkSize), and eg, write those chunks to a Handle.
- But this is the best that can be done with the storer interface that
- writes a whole L.ByteString at a time.
-}
storeChunks
:: LensGpgEncParams encc
=> UUID
-> ChunkConfig
-> EncKey
-> Key
-> FilePath
-> MeterUpdate
-> Maybe (Cipher, EncKey)
-> encc
-> Storer
-> CheckPresent
-> Annex ()
storeChunks u chunkconfig encryptor k f p enc encc storer checker =
case chunkconfig of
-- Only stable keys are safe to store chunked,
-- because an unstable key can have multiple different
-- objects, and mixing up chunks from them would be
-- possible without this check.
(UnpaddedChunks chunksize) -> ifM (isStableKey k)
( do
h <- liftIO $ openBinaryFile f ReadMode
go chunksize h
liftIO $ hClose h
, storechunk k (FileContent f) p
)
_ -> storechunk k (FileContent f) p
where
go chunksize h = do
let chunkkeys = chunkKeyStream k chunksize
(chunkkeys', startpos) <- seekResume h encryptor chunkkeys checker
b <- liftIO $ L.hGetContents h
gochunks p startpos chunksize b chunkkeys'
gochunks :: MeterUpdate -> BytesProcessed -> ChunkSize -> L.ByteString -> ChunkKeyStream -> Annex ()
gochunks meterupdate startpos chunksize = loop startpos . splitchunk
where
splitchunk = L.splitAt chunksize
loop bytesprocessed (chunk, bs) chunkkeys
| L.null chunk && numchunks > 0 = do
-- Once all chunks are successfully
-- stored, update the chunk log.
chunksStored u k (FixedSizeChunks chunksize) numchunks
| otherwise = do
liftIO $ meterupdate' zeroBytesProcessed
let (chunkkey, chunkkeys') = nextChunkKeyStream chunkkeys
storechunk chunkkey (ByteContent chunk) meterupdate'
let bytesprocessed' = addBytesProcessed bytesprocessed (L.length chunk)
loop bytesprocessed' (splitchunk bs) chunkkeys'
where
numchunks = numChunks chunkkeys
{- The MeterUpdate that is passed to the action
- storing a chunk is offset, so that it reflects
- the total bytes that have already been stored
- in previous chunks. -}
meterupdate' = offsetMeterUpdate meterupdate bytesprocessed
storechunk ck content meterupdate = case enc of
Nothing -> storer ck content meterupdate
(Just (cipher, enck)) -> do
cmd <- gpgCmd <$> Annex.getGitConfig
withBytes content $ \b ->
encrypt cmd encc cipher (feedBytes b) $
readBytes $ \encb ->
storer (enck ck) (ByteContent encb) meterupdate
{- Check if any of the chunk keys are present. If found, seek forward
- in the Handle, so it will be read starting at the first missing chunk.
- Returns the ChunkKeyStream truncated to start at the first missing
- chunk, and the number of bytes skipped due to resuming.
-
- As an optimisation, if the file fits into a single chunk, there's no need
- to check if that chunk is present -- we know it's not, because otherwise
- the whole file would be present and there would be no reason to try to
- store it.
-}
seekResume
:: Handle
-> EncKey
-> ChunkKeyStream
-> CheckPresent
-> Annex (ChunkKeyStream, BytesProcessed)
seekResume h encryptor chunkkeys checker = do
sz <- liftIO (hFileSize h)
if sz <= fromMaybe 0 (fromKey keyChunkSize $ fst $ nextChunkKeyStream chunkkeys)
then return (chunkkeys, zeroBytesProcessed)
else check 0 chunkkeys sz
where
check pos cks sz
| pos >= sz = do
-- All chunks are already stored!
liftIO $ hSeek h AbsoluteSeek sz
return (cks, toBytesProcessed sz)
| otherwise = do
v <- tryNonAsync (checker (encryptor k))
case v of
Right True ->
check pos' cks' sz
_ -> do
when (pos > 0) $
liftIO $ hSeek h AbsoluteSeek pos
return (cks, toBytesProcessed pos)
where
(k, cks') = nextChunkKeyStream cks
pos' = pos + fromMaybe 0 (fromKey keyChunkSize k)
{- Removes all chunks of a key from a remote, by calling a remover
- action on each.
-
- This action may be called on a chunked key. It will simply remove it.
-}
removeChunks :: Remover -> UUID -> ChunkConfig -> EncKey -> Key -> Annex ()
removeChunks remover u chunkconfig encryptor k = do
ls <- map chunkKeyList <$> chunkKeys u chunkconfig k
mapM_ (remover . encryptor) (concat ls)
let chunksizes = catMaybes $ map (fromKey keyChunkSize <=< headMaybe) ls
forM_ chunksizes $ chunksRemoved u k . FixedSizeChunks . fromIntegral
{- Retrieves a key from a remote, using a retriever action.
-
- When the remote is chunked, tries each of the options returned by
- chunkKeys until it finds one where the retriever successfully
- gets the first chunked key.
-
- If retrival of one of the subsequent chunks throws an exception,
- gives up. Note that partial data may have been written to the file
- in this case.
-
- Resuming is supported when using chunks. When the destination file
- already exists, it skips to the next chunked key that would be needed
- to resume.
-
- Handles decrypting the content when encryption is used.
-}
retrieveChunks
:: LensGpgEncParams encc
=> Retriever
-> UUID
-> ChunkConfig
-> EncKey
-> Key
-> FilePath
-> MeterUpdate
-> Maybe (Cipher, EncKey)
-> encc
-> Annex ()
retrieveChunks retriever u chunkconfig encryptor basek dest basep enc encc
| noChunks chunkconfig =
-- Optimisation: Try the unchunked key first, to avoid
-- looking in the git-annex branch for chunk counts
-- that are likely not there.
getunchunked `catchNonAsync`
(\e -> go (Just e) =<< chunkKeysOnly u chunkconfig basek)
| otherwise = go Nothing =<< chunkKeys u chunkconfig basek
where
go pe cks = do
let ls = map chunkKeyList cks
currsize <- liftIO $ catchMaybeIO $ getFileSize (toRawFilePath dest)
let ls' = maybe ls (setupResume ls) currsize
if any null ls'
then noop -- dest is already complete
else firstavail pe currsize ls'
firstavail Nothing _ [] = giveup "unable to determine the chunks to use for this remote"
firstavail (Just e) _ [] = throwM e
firstavail pe currsize ([]:ls) = firstavail pe currsize ls
firstavail _ currsize ((k:ks):ls)
| k == basek = getunchunked
`catchNonAsync` (\e -> firstavail (Just e) currsize ls)
| otherwise = do
let offset = resumeOffset currsize k
let p = maybe basep
(offsetMeterUpdate basep . toBytesProcessed)
offset
v <- tryNonAsync $
retriever (encryptor k) p $ \content ->
bracketIO (maybe opennew openresume offset) hClose $ \h -> do
retrieved (Just h) p content
let sz = toBytesProcessed $
fromMaybe 0 $ fromKey keyChunkSize k
getrest p h sz sz ks
case v of
Left e
| null ls -> throwM e
| otherwise -> firstavail (Just e) currsize ls
Right r -> return r
getrest _ _ _ _ [] = noop
getrest p h sz bytesprocessed (k:ks) = do
let p' = offsetMeterUpdate p bytesprocessed
liftIO $ p' zeroBytesProcessed
retriever (encryptor k) p' $ retrieved (Just h) p'
getrest p h sz (addBytesProcessed bytesprocessed sz) ks
getunchunked = retriever (encryptor basek) basep $ retrieved Nothing basep
opennew = openBinaryFile dest WriteMode
-- Open the file and seek to the start point in order to resume.
openresume startpoint = do
-- ReadWriteMode allows seeking; AppendMode does not.
h <- openBinaryFile dest ReadWriteMode
hSeek h AbsoluteSeek startpoint
return h
{- Progress meter updating is a bit tricky: If the Retriever
- populates a file, it is responsible for updating progress
- as the file is being retrieved.
-
- However, if the Retriever generates a lazy ByteString,
- it is not responsible for updating progress (often it cannot).
- Instead, writeRetrievedContent is passed a meter to update
- as it consumes the ByteString.
-}
retrieved h p content = writeRetrievedContent dest enc encc h p' content
where
p'
| isByteContent content = Just p
| otherwise = Nothing
{- Writes retrieved file content into the provided Handle, decrypting it
- first if necessary.
-
- If the remote did not store the content using chunks, no Handle
- will be provided, and it's up to us to open the destination file.
-
- Note that when neither chunking nor encryption is used, and the remote
- provides FileContent, that file only needs to be renamed
- into place. (And it may even already be in the right place..)
-}
writeRetrievedContent
:: LensGpgEncParams encc
=> FilePath
-> Maybe (Cipher, EncKey)
-> encc
-> Maybe Handle
-> Maybe MeterUpdate
-> ContentSource
-> Annex ()
writeRetrievedContent dest enc encc mh mp content = case (enc, mh, content) of
(Nothing, Nothing, FileContent f)
| f == dest -> noop
| otherwise -> liftIO $ moveFile f dest
(Just (cipher, _), _, ByteContent b) -> do
cmd <- gpgCmd <$> Annex.getGitConfig
decrypt cmd encc cipher (feedBytes b) $
readBytes write
(Just (cipher, _), _, FileContent f) -> do
cmd <- gpgCmd <$> Annex.getGitConfig
withBytes content $ \b ->
decrypt cmd encc cipher (feedBytes b) $
readBytes write
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
(Nothing, _, FileContent f) -> do
withBytes content write
liftIO $ removeWhenExistsWith R.removeLink (toRawFilePath f)
(Nothing, _, ByteContent b) -> write b
where
write b = case mh of
Just h -> liftIO $ b `streamto` h
Nothing -> liftIO $ bracket opendest hClose (b `streamto`)
streamto b h = case mp of
Just p -> meteredWrite p (S.hPut h) b
Nothing -> L.hPut h b
opendest = openBinaryFile dest WriteMode
{- Can resume when the chunk's offset is at or before the end of
- the dest file. -}
resumeOffset :: Maybe Integer -> Key -> Maybe Integer
resumeOffset Nothing _ = Nothing
resumeOffset currsize k
| offset <= currsize = offset
| otherwise = Nothing
where
offset = chunkKeyOffset k
{- Drops chunks that are already present in a file, based on its size.
- Keeps any non-chunk keys.
-}
setupResume :: [[Key]] -> Integer -> [[Key]]
setupResume ls currsize = map dropunneeded ls
where
dropunneeded [] = []
dropunneeded l@(k:_) = case fromKey keyChunkSize k of
Just chunksize | chunksize > 0 ->
genericDrop (currsize `div` chunksize) l
_ -> l
{- Checks if a key is present in a remote. This requires any one
- of the lists of options returned by chunkKeys to all check out
- as being present using the checker action.
-
- Throws an exception if the remote is not accessible.
-}
checkPresentChunks
:: CheckPresent
-> UUID
-> ChunkConfig
-> EncKey
-> Key
-> Annex Bool
checkPresentChunks checker u chunkconfig encryptor basek
| noChunks chunkconfig = do
-- Optimisation: Try the unchunked key first, to avoid
-- looking in the git-annex branch for chunk counts
-- that are likely not there.
v <- check basek
let getchunkkeys = chunkKeysOnly u chunkconfig basek
case v of
Right True -> return True
Left e -> checklists (Just e) =<< getchunkkeys
_ -> checklists Nothing =<< getchunkkeys
| otherwise = checklists Nothing =<< chunkKeys u chunkconfig basek
where
checklists Nothing [] = return False
checklists (Just deferrederror) [] = throwM deferrederror
checklists d (ck:cks)
| not (null l) = do
v <- checkchunks l
case v of
Left e -> checklists (Just e) cks
Right True -> do
ensureChunksAreLogged u basek ck
return True
Right False -> checklists Nothing cks
| otherwise = checklists d cks
where
l = chunkKeyList ck
checkchunks :: [Key] -> Annex (Either SomeException Bool)
checkchunks [] = return (Right True)
checkchunks (k:ks) = do
v <- check k
case v of
Right True -> checkchunks ks
Right False -> return $ Right False
Left e -> return $ Left e
check = tryNonAsync . checker . encryptor
data ChunkKeys
= ChunkKeys [Key]
| SpeculativeChunkKeys (ChunkMethod, ChunkCount) [Key]
chunkKeyList :: ChunkKeys -> [Key]
chunkKeyList (ChunkKeys l) = l
chunkKeyList (SpeculativeChunkKeys _ l) = l
{- A key can be stored in a remote unchunked, or as a list of chunked keys.
- This can be the case whether or not the remote is currently configured
- to use chunking.
-
- It's even possible for a remote to have the same key stored multiple
- times with different chunk sizes!
-
- This finds all possible lists of keys that might be on the remote that
- can be combined to get back the requested key, in order from most to
- least likely to exist.
-
- Speculatively tries chunks using the ChunkConfig last of all
- (when that's not the same as the recorded chunks). This can help
- recover from data loss, where the chunk log didn't make it out,
- though only as long as the ChunkConfig is unchanged.
-}
chunkKeys :: UUID -> ChunkConfig -> Key -> Annex [ChunkKeys]
chunkKeys = chunkKeys' False
{- Same as chunkKeys, but excluding the unchunked key. -}
chunkKeysOnly :: UUID -> ChunkConfig -> Key -> Annex [ChunkKeys]
chunkKeysOnly = chunkKeys' True
chunkKeys' :: Bool -> UUID -> ChunkConfig -> Key -> Annex [ChunkKeys]
chunkKeys' onlychunks u chunkconfig k = do
recorded <- getCurrentChunks u k
let recordedl = map (ChunkKeys . toChunkList k) recorded
return $ addspeculative recorded $ if onlychunks
then recordedl
else if noChunks chunkconfig
then ChunkKeys [k] : recordedl
else recordedl ++ [ChunkKeys [k]]
where
addspeculative recorded l = case chunkconfig of
NoChunks -> l
UnpaddedChunks chunksz -> case fromKey keySize k of
Nothing -> l
Just keysz ->
let (d, m) = keysz `divMod` fromIntegral chunksz
chunkcount = d + if m == 0 then 0 else 1
v = (FixedSizeChunks chunksz, chunkcount)
in if v `elem` recorded
then l
else l ++ [SpeculativeChunkKeys v (toChunkList k v)]
LegacyChunks _ -> l
toChunkList :: Key -> (ChunkMethod, ChunkCount) -> [Key]
toChunkList k (FixedSizeChunks chunksize, chunkcount) =
takeChunkKeyStream chunkcount $ chunkKeyStream k chunksize
toChunkList _ (UnknownChunks _, _) = []
{- When chunkKeys provided a speculative chunk list, and that has been
- verified to be present, use this to log it in the chunk log. This way,
- a later change to the chunk size of the remote won't prevent accessing
- the chunks. -}
ensureChunksAreLogged :: UUID -> Key -> ChunkKeys -> Annex ()
ensureChunksAreLogged u k (SpeculativeChunkKeys (chunkmethod, chunkcount) _) =
chunksStored u k chunkmethod chunkcount
ensureChunksAreLogged _ _ (ChunkKeys _) = return ()
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
withBytes (ByteContent b) a = a b
withBytes (FileContent f) a = a =<< liftIO (L.readFile f)