incremental checksum on download from ssh or p2p
Checksum as content is received from a remote git-annex repository, rather than doing it in a second pass. Not tested at all yet, but I imagine it will work! Not implemented for any special remotes, and also not implemented for copies from local remotes. It may be that, for local remotes, it will suffice to use rsync, rely on its checksumming, and simply return Verified. (It would still make a checksumming pass when cp is used for COW, I guess.)
This commit is contained in:
parent
ed684f651e
commit
62e152f210
17 changed files with 118 additions and 52 deletions
|
@ -26,10 +26,10 @@ module Annex.Content.Presence (
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
|
import Annex.Verify
|
||||||
import Annex.LockPool
|
import Annex.LockPool
|
||||||
import Annex.WorkerPool
|
import Annex.WorkerPool
|
||||||
import Types.Remote (unVerified, Verification(..), RetrievalSecurityPolicy(..))
|
import Types.Remote (unVerified, Verification(..), RetrievalSecurityPolicy(..))
|
||||||
import qualified Types.Remote
|
|
||||||
import qualified Types.Backend
|
import qualified Types.Backend
|
||||||
import qualified Backend
|
import qualified Backend
|
||||||
import qualified Database.Keys
|
import qualified Database.Keys
|
||||||
|
@ -231,16 +231,3 @@ warnUnverifiableInsecure k = warning $ unwords
|
||||||
]
|
]
|
||||||
where
|
where
|
||||||
kv = decodeBS (formatKeyVariety (fromKey keyVariety k))
|
kv = decodeBS (formatKeyVariety (fromKey keyVariety k))
|
||||||
|
|
||||||
data VerifyConfig = AlwaysVerify | NoVerify | RemoteVerify Remote | DefaultVerify
|
|
||||||
|
|
||||||
shouldVerify :: VerifyConfig -> Annex Bool
|
|
||||||
shouldVerify AlwaysVerify = return True
|
|
||||||
shouldVerify NoVerify = return False
|
|
||||||
shouldVerify DefaultVerify = annexVerify <$> Annex.getGitConfig
|
|
||||||
shouldVerify (RemoteVerify r) =
|
|
||||||
(shouldVerify DefaultVerify
|
|
||||||
<&&> pure (remoteAnnexVerify (Types.Remote.gitconfig r)))
|
|
||||||
-- Export remotes are not key/value stores, so always verify
|
|
||||||
-- content from them even when verification is disabled.
|
|
||||||
<||> Types.Remote.isExportSupported r
|
|
||||||
|
|
25
Annex/Verify.hs
Normal file
25
Annex/Verify.hs
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
{- verification
|
||||||
|
-
|
||||||
|
- Copyright 2010-2021 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Annex.Verify where
|
||||||
|
|
||||||
|
import Annex.Common
|
||||||
|
import qualified Annex
|
||||||
|
import qualified Types.Remote
|
||||||
|
|
||||||
|
data VerifyConfig = AlwaysVerify | NoVerify | RemoteVerify Remote | DefaultVerify
|
||||||
|
|
||||||
|
shouldVerify :: VerifyConfig -> Annex Bool
|
||||||
|
shouldVerify AlwaysVerify = return True
|
||||||
|
shouldVerify NoVerify = return False
|
||||||
|
shouldVerify DefaultVerify = annexVerify <$> Annex.getGitConfig
|
||||||
|
shouldVerify (RemoteVerify r) =
|
||||||
|
(shouldVerify DefaultVerify
|
||||||
|
<&&> pure (remoteAnnexVerify (Types.Remote.gitconfig r)))
|
||||||
|
-- Export remotes are not key/value stores, so always verify
|
||||||
|
-- content from them even when verification is disabled.
|
||||||
|
<||> Types.Remote.isExportSupported r
|
15
Backend.hs
15
Backend.hs
|
@ -1,6 +1,6 @@
|
||||||
{- git-annex key/value backends
|
{- git-annex key/value backends
|
||||||
-
|
-
|
||||||
- Copyright 2010-2020 Joey Hess <id@joeyh.name>
|
- Copyright 2010-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -17,11 +17,13 @@ module Backend (
|
||||||
isStableKey,
|
isStableKey,
|
||||||
isCryptographicallySecure,
|
isCryptographicallySecure,
|
||||||
isVerifiable,
|
isVerifiable,
|
||||||
|
startVerifyKeyContentIncrementally,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import Annex.CheckAttr
|
import Annex.CheckAttr
|
||||||
|
import Annex.Verify
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import Types.KeySource
|
import Types.KeySource
|
||||||
import qualified Types.Backend as B
|
import qualified Types.Backend as B
|
||||||
|
@ -127,3 +129,14 @@ isCryptographicallySecure k = maybe False (`B.isCryptographicallySecure` k)
|
||||||
isVerifiable :: Key -> Annex Bool
|
isVerifiable :: Key -> Annex Bool
|
||||||
isVerifiable k = maybe False (isJust . B.verifyKeyContent)
|
isVerifiable k = maybe False (isJust . B.verifyKeyContent)
|
||||||
<$> maybeLookupBackendVariety (fromKey keyVariety k)
|
<$> maybeLookupBackendVariety (fromKey keyVariety k)
|
||||||
|
|
||||||
|
startVerifyKeyContentIncrementally :: VerifyConfig -> Key -> Annex (Maybe B.IncrementalVerifier)
|
||||||
|
startVerifyKeyContentIncrementally verifyconfig k =
|
||||||
|
ifM (shouldVerify verifyconfig)
|
||||||
|
( maybeLookupBackendVariety (fromKey keyVariety k) >>= \case
|
||||||
|
Just b -> case B.verifyKeyContentIncrementally b of
|
||||||
|
Just v -> Just <$> v k
|
||||||
|
Nothing -> return Nothing
|
||||||
|
Nothing -> return Nothing
|
||||||
|
, return Nothing
|
||||||
|
)
|
||||||
|
|
|
@ -145,8 +145,7 @@ sameCheckSum key s
|
||||||
expected = decodeBS (keyHash key)
|
expected = decodeBS (keyHash key)
|
||||||
|
|
||||||
checkKeyChecksumIncremental :: Hash -> Key -> Annex IncrementalVerifier
|
checkKeyChecksumIncremental :: Hash -> Key -> Annex IncrementalVerifier
|
||||||
checkKeyChecksumIncremental hash key = liftIO $
|
checkKeyChecksumIncremental hash key = liftIO $ (snd $ hasher hash) key
|
||||||
(\h -> snd h key) (hasher hash)
|
|
||||||
|
|
||||||
keyHash :: Key -> S.ByteString
|
keyHash :: Key -> S.ByteString
|
||||||
keyHash = fst . splitKeyNameExtension
|
keyHash = fst . splitKeyNameExtension
|
||||||
|
|
|
@ -24,6 +24,8 @@ git-annex (8.20210128) UNRELEASED; urgency=medium
|
||||||
* Include libkqueue.h file needed to build the assistant on BSDs.
|
* Include libkqueue.h file needed to build the assistant on BSDs.
|
||||||
* Tahoe: Avoid verifying hash after download, since tahoe does sufficient
|
* Tahoe: Avoid verifying hash after download, since tahoe does sufficient
|
||||||
verification itself.
|
verification itself.
|
||||||
|
* Checksum as content is received from a remote git-annex repository,
|
||||||
|
rather than doing it in a second pass.
|
||||||
|
|
||||||
-- Joey Hess <id@joeyh.name> Thu, 28 Jan 2021 12:34:32 -0400
|
-- Joey Hess <id@joeyh.name> Thu, 28 Jan 2021 12:34:32 -0400
|
||||||
|
|
||||||
|
|
53
P2P/Annex.hs
53
P2P/Annex.hs
|
@ -1,6 +1,6 @@
|
||||||
{- P2P protocol, Annex implementation
|
{- P2P protocol, Annex implementation
|
||||||
-
|
-
|
||||||
- Copyright 2016-2018 Joey Hess <id@joeyh.name>
|
- Copyright 2016-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -23,9 +23,11 @@ import P2P.IO
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Types.NumCopies
|
import Types.NumCopies
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
import Types.Backend (IncrementalVerifier(..))
|
||||||
|
|
||||||
import Control.Monad.Free
|
import Control.Monad.Free
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
import qualified Data.ByteString as S
|
||||||
|
|
||||||
-- Full interpreter for Proto, that can receive and send objects.
|
-- Full interpreter for Proto, that can receive and send objects.
|
||||||
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
|
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
|
||||||
|
@ -77,7 +79,7 @@ runLocal runst runner a = case a of
|
||||||
let runtransfer ti =
|
let runtransfer ti =
|
||||||
Right <$> transfer download' k af Nothing (\p ->
|
Right <$> transfer download' k af Nothing (\p ->
|
||||||
logStatusAfter k $ getViaTmp rsp DefaultVerify k af $ \tmp ->
|
logStatusAfter k $ getViaTmp rsp DefaultVerify k af $ \tmp ->
|
||||||
storefile (fromRawFilePath tmp) o l getb validitycheck p ti)
|
storefile (fromRawFilePath tmp) o l getb Nothing validitycheck p ti)
|
||||||
let fallback = return $ Left $
|
let fallback = return $ Left $
|
||||||
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
|
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
|
||||||
checktransfer runtransfer fallback
|
checktransfer runtransfer fallback
|
||||||
|
@ -85,10 +87,10 @@ runLocal runst runner a = case a of
|
||||||
Left e -> return $ Left $ ProtoFailureException e
|
Left e -> return $ Left $ ProtoFailureException e
|
||||||
Right (Left e) -> return $ Left e
|
Right (Left e) -> return $ Left e
|
||||||
Right (Right ok) -> runner (next ok)
|
Right (Right ok) -> runner (next ok)
|
||||||
StoreContentTo dest o l getb validitycheck next -> do
|
StoreContentTo dest incrementalverifier o l getb validitycheck next -> do
|
||||||
v <- tryNonAsync $ do
|
v <- tryNonAsync $ do
|
||||||
let runtransfer ti = Right
|
let runtransfer ti = Right
|
||||||
<$> storefile dest o l getb validitycheck nullMeterUpdate ti
|
<$> storefile dest o l getb incrementalverifier validitycheck nullMeterUpdate ti
|
||||||
let fallback = return $ Left $
|
let fallback = return $ Left $
|
||||||
ProtoFailureMessage "transfer failed"
|
ProtoFailureMessage "transfer failed"
|
||||||
checktransfer runtransfer fallback
|
checktransfer runtransfer fallback
|
||||||
|
@ -154,15 +156,40 @@ runLocal runst runner a = case a of
|
||||||
-- a client.
|
-- a client.
|
||||||
Client _ -> ta nullMeterUpdate
|
Client _ -> ta nullMeterUpdate
|
||||||
|
|
||||||
storefile dest (Offset o) (Len l) getb validitycheck p ti = do
|
resumefromoffset o incrementalverifier p h
|
||||||
let p' = offsetMeterUpdate p (toBytesProcessed o)
|
| o /= 0 = do
|
||||||
|
p' <- case incrementalverifier of
|
||||||
|
Just iv -> do
|
||||||
|
go iv o
|
||||||
|
return p
|
||||||
|
_ -> return $ offsetMeterUpdate p (toBytesProcessed o)
|
||||||
|
-- Make sure the handle is seeked to the offset.
|
||||||
|
-- (Reading the file probably left it there
|
||||||
|
-- when that was done, but let's be sure.)
|
||||||
|
hSeek h AbsoluteSeek o
|
||||||
|
return p'
|
||||||
|
| otherwise = return p
|
||||||
|
where
|
||||||
|
go iv n
|
||||||
|
| n == 0 = return ()
|
||||||
|
| n > fromIntegral defaultChunkSize = do
|
||||||
|
updateIncremental iv =<< S.hGet h defaultChunkSize
|
||||||
|
go iv (n - fromIntegral defaultChunkSize)
|
||||||
|
| otherwise =
|
||||||
|
updateIncremental iv =<< S.hGet h (fromIntegral n)
|
||||||
|
|
||||||
|
storefile dest (Offset o) (Len l) getb incrementalverifier validitycheck p ti = do
|
||||||
v <- runner getb
|
v <- runner getb
|
||||||
case v of
|
case v of
|
||||||
Right b -> do
|
Right b -> do
|
||||||
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
|
liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do
|
||||||
when (o /= 0) $
|
p' <- resumefromoffset o incrementalverifier p h
|
||||||
hSeek h AbsoluteSeek o
|
let writechunk = case incrementalverifier of
|
||||||
meteredWrite p' h b
|
Nothing -> \c -> S.hPut h c
|
||||||
|
Just iv -> \c -> do
|
||||||
|
S.hPut h c
|
||||||
|
updateIncremental iv c
|
||||||
|
meteredWrite p' writechunk b
|
||||||
indicatetransferred ti
|
indicatetransferred ti
|
||||||
|
|
||||||
rightsize <- do
|
rightsize <- do
|
||||||
|
@ -170,8 +197,12 @@ runLocal runst runner a = case a of
|
||||||
return (toInteger sz == l + o)
|
return (toInteger sz == l + o)
|
||||||
|
|
||||||
runner validitycheck >>= \case
|
runner validitycheck >>= \case
|
||||||
Right (Just Valid) ->
|
Right (Just Valid) -> case incrementalverifier of
|
||||||
return (rightsize, UnVerified)
|
Just iv -> ifM (liftIO (finalizeIncremental iv) <&&> pure rightsize)
|
||||||
|
( return (True, Verified)
|
||||||
|
, return (False, UnVerified)
|
||||||
|
)
|
||||||
|
Nothing -> return (rightsize, UnVerified)
|
||||||
Right (Just Invalid) | l == 0 ->
|
Right (Just Invalid) | l == 0 ->
|
||||||
-- Special case, for when
|
-- Special case, for when
|
||||||
-- content was not
|
-- content was not
|
||||||
|
|
|
@ -259,7 +259,7 @@ debugMessage conn prefix m = do
|
||||||
-- connection. False is returned to indicate this problem.
|
-- connection. False is returned to indicate this problem.
|
||||||
sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
|
sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
|
||||||
sendExactly (Len n) b h p = do
|
sendExactly (Len n) b h p = do
|
||||||
sent <- meteredWrite' p h (L.take (fromIntegral n) b)
|
sent <- meteredWrite' p (B.hPut h) (L.take (fromIntegral n) b)
|
||||||
return (fromBytesProcessed sent == n)
|
return (fromBytesProcessed sent == n)
|
||||||
|
|
||||||
receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString
|
receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
-
|
-
|
||||||
- See doc/design/p2p_protocol.mdwn
|
- See doc/design/p2p_protocol.mdwn
|
||||||
-
|
-
|
||||||
- Copyright 2016-2020 Joey Hess <id@joeyh.name>
|
- Copyright 2016-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -18,6 +18,7 @@ import Types (Annex)
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import Types.UUID
|
import Types.UUID
|
||||||
import Types.Remote (Verification(..), unVerified)
|
import Types.Remote (Verification(..), unVerified)
|
||||||
|
import Types.Backend (IncrementalVerifier(..))
|
||||||
import Utility.AuthToken
|
import Utility.AuthToken
|
||||||
import Utility.Applicative
|
import Utility.Applicative
|
||||||
import Utility.PartialPrelude
|
import Utility.PartialPrelude
|
||||||
|
@ -266,7 +267,7 @@ data LocalF c
|
||||||
-- Note: The ByteString may not contain the entire remaining content
|
-- Note: The ByteString may not contain the entire remaining content
|
||||||
-- of the key. Only once the temp file size == Len has the whole
|
-- of the key. Only once the temp file size == Len has the whole
|
||||||
-- content been transferred.
|
-- content been transferred.
|
||||||
| StoreContentTo FilePath Offset Len (Proto L.ByteString) (Proto (Maybe Validity)) ((Bool, Verification) -> c)
|
| StoreContentTo FilePath (Maybe IncrementalVerifier) Offset Len (Proto L.ByteString) (Proto (Maybe Validity)) ((Bool, Verification) -> c)
|
||||||
-- ^ Like StoreContent, but stores the content to a temp file.
|
-- ^ Like StoreContent, but stores the content to a temp file.
|
||||||
| SetPresent Key UUID c
|
| SetPresent Key UUID c
|
||||||
| CheckContentPresent Key (Bool -> c)
|
| CheckContentPresent Key (Bool -> c)
|
||||||
|
@ -351,13 +352,13 @@ remove key = do
|
||||||
net $ sendMessage (REMOVE key)
|
net $ sendMessage (REMOVE key)
|
||||||
checkSuccess
|
checkSuccess
|
||||||
|
|
||||||
get :: FilePath -> Key -> AssociatedFile -> Meter -> MeterUpdate -> Proto (Bool, Verification)
|
get :: FilePath -> Key -> Maybe IncrementalVerifier -> AssociatedFile -> Meter -> MeterUpdate -> Proto (Bool, Verification)
|
||||||
get dest key af m p =
|
get dest key iv af m p =
|
||||||
receiveContent (Just m) p sizer storer $ \offset ->
|
receiveContent (Just m) p sizer storer $ \offset ->
|
||||||
GET offset (ProtoAssociatedFile af) key
|
GET offset (ProtoAssociatedFile af) key
|
||||||
where
|
where
|
||||||
sizer = fileSize dest
|
sizer = fileSize dest
|
||||||
storer = storeContentTo dest
|
storer = storeContentTo dest iv
|
||||||
|
|
||||||
put :: Key -> AssociatedFile -> MeterUpdate -> Proto Bool
|
put :: Key -> AssociatedFile -> MeterUpdate -> Proto Bool
|
||||||
put key af p = do
|
put key af p = do
|
||||||
|
|
|
@ -167,7 +167,7 @@ store r buprepo = byteStorer $ \k b p -> do
|
||||||
}
|
}
|
||||||
else cmd
|
else cmd
|
||||||
feeder = \h -> do
|
feeder = \h -> do
|
||||||
meteredWrite p h b
|
meteredWrite p (S.hPut h) b
|
||||||
hClose h
|
hClose h
|
||||||
in withCreateProcess cmd' (go feeder cmd')
|
in withCreateProcess cmd' (go feeder cmd')
|
||||||
where
|
where
|
||||||
|
|
|
@ -557,6 +557,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
|
||||||
then return v
|
then return v
|
||||||
else giveup "failed to retrieve content from remote"
|
else giveup "failed to retrieve content from remote"
|
||||||
else P2PHelper.retrieve
|
else P2PHelper.retrieve
|
||||||
|
(Annex.Content.RemoteVerify r)
|
||||||
(\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p))
|
(\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p))
|
||||||
key file dest meterupdate
|
key file dest meterupdate
|
||||||
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
|
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
|
||||||
|
|
|
@ -9,6 +9,7 @@ module Remote.Glacier (remote, jobList, checkSaneGlacierCommand) where
|
||||||
|
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
|
import qualified Data.ByteString as S
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
@ -168,7 +169,7 @@ store' r k b p = go =<< glacierEnv c gc u
|
||||||
{ std_in = CreatePipe }
|
{ std_in = CreatePipe }
|
||||||
in liftIO $ withCreateProcess cmd (go' cmd)
|
in liftIO $ withCreateProcess cmd (go' cmd)
|
||||||
go' cmd (Just hin) _ _ pid = do
|
go' cmd (Just hin) _ _ pid = do
|
||||||
meteredWrite p hin b
|
meteredWrite p (S.hPut hin) b
|
||||||
hClose hin
|
hClose hin
|
||||||
forceSuccessProcess cmd pid
|
forceSuccessProcess cmd pid
|
||||||
go' _ _ _ _ _ = error "internal"
|
go' _ _ _ _ _ = error "internal"
|
||||||
|
|
|
@ -11,6 +11,7 @@ import Annex.Common
|
||||||
import Remote.Helper.Chunked
|
import Remote.Helper.Chunked
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
|
||||||
|
import qualified Data.ByteString as S
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
{- This is an extension that's added to the usual file (or whatever)
|
{- This is an extension that's added to the usual file (or whatever)
|
||||||
|
@ -117,4 +118,4 @@ meteredWriteFileChunks :: MeterUpdate -> FilePath -> [v] -> (v -> IO L.ByteStrin
|
||||||
meteredWriteFileChunks meterupdate dest chunks feeder =
|
meteredWriteFileChunks meterupdate dest chunks feeder =
|
||||||
withBinaryFile dest WriteMode $ \h ->
|
withBinaryFile dest WriteMode $ \h ->
|
||||||
forM_ chunks $
|
forM_ chunks $
|
||||||
meteredWrite meterupdate h <=< feeder
|
meteredWrite meterupdate (S.hPut h) <=< feeder
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- Helpers for remotes using the git-annex P2P protocol.
|
{- Helpers for remotes using the git-annex P2P protocol.
|
||||||
-
|
-
|
||||||
- Copyright 2016-2020 Joey Hess <id@joeyh.name>
|
- Copyright 2016-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -17,6 +17,7 @@ import Annex.Content
|
||||||
import Messages.Progress
|
import Messages.Progress
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
import Types.NumCopies
|
import Types.NumCopies
|
||||||
|
import Backend
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
|
||||||
|
@ -39,10 +40,11 @@ store runner k af p = do
|
||||||
Just False -> giveup "transfer failed"
|
Just False -> giveup "transfer failed"
|
||||||
Nothing -> remoteUnavail
|
Nothing -> remoteUnavail
|
||||||
|
|
||||||
retrieve :: (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex Verification
|
retrieve :: VerifyConfig -> (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex Verification
|
||||||
retrieve runner k af dest p =
|
retrieve verifyconfig runner k af dest p = do
|
||||||
|
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
||||||
metered (Just p) k $ \m p' ->
|
metered (Just p) k $ \m p' ->
|
||||||
runner p' (P2P.get dest k af m p') >>= \case
|
runner p' (P2P.get dest k iv af m p') >>= \case
|
||||||
Just (True, v) -> return v
|
Just (True, v) -> return v
|
||||||
Just (False, _) -> giveup "transfer failed"
|
Just (False, _) -> giveup "transfer failed"
|
||||||
Nothing -> remoteUnavail
|
Nothing -> remoteUnavail
|
||||||
|
|
|
@ -295,7 +295,7 @@ sink dest enc c mh mp content = case (enc, mh, content) of
|
||||||
Just h -> liftIO $ b `streamto` h
|
Just h -> liftIO $ b `streamto` h
|
||||||
Nothing -> liftIO $ bracket opendest hClose (b `streamto`)
|
Nothing -> liftIO $ bracket opendest hClose (b `streamto`)
|
||||||
streamto b h = case mp of
|
streamto b h = case mp of
|
||||||
Just p -> meteredWrite p h b
|
Just p -> meteredWrite p (S.hPut h) b
|
||||||
Nothing -> L.hPut h b
|
Nothing -> L.hPut h b
|
||||||
opendest = openBinaryFile dest WriteMode
|
opendest = openBinaryFile dest WriteMode
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ module Remote.P2P (
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import qualified P2P.Protocol as P2P
|
import qualified P2P.Protocol as P2P
|
||||||
|
import qualified Annex.Content
|
||||||
import P2P.Address
|
import P2P.Address
|
||||||
import P2P.Annex
|
import P2P.Annex
|
||||||
import P2P.IO
|
import P2P.IO
|
||||||
|
@ -56,7 +57,7 @@ chainGen addr r u rc gc rs = do
|
||||||
, cost = cst
|
, cost = cst
|
||||||
, name = Git.repoDescribe r
|
, name = Git.repoDescribe r
|
||||||
, storeKey = store (const protorunner)
|
, storeKey = store (const protorunner)
|
||||||
, retrieveKeyFile = retrieve (const protorunner)
|
, retrieveKeyFile = retrieve (Annex.Content.RemoteVerify this) (const protorunner)
|
||||||
, retrieveKeyFileCheap = Nothing
|
, retrieveKeyFileCheap = Nothing
|
||||||
, retrievalSecurityPolicy = RetrievalAllKeysSecure
|
, retrievalSecurityPolicy = RetrievalAllKeysSecure
|
||||||
, removeKey = remove protorunner
|
, removeKey = remove protorunner
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- Metered IO and actions
|
{- Metered IO and actions
|
||||||
-
|
-
|
||||||
- Copyright 2012-2020 Joey Hess <id@joeyh.name>
|
- Copyright 2012-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- License: BSD-2-clause
|
- License: BSD-2-clause
|
||||||
-}
|
-}
|
||||||
|
@ -118,23 +118,24 @@ withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
|
||||||
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
|
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
|
||||||
hGetContentsMetered h meterupdate >>= a
|
hGetContentsMetered h meterupdate >>= a
|
||||||
|
|
||||||
{- Writes a ByteString to a Handle, updating a meter as it's written. -}
|
{- Calls the action repeatedly with chunks from the lazy ByteString.
|
||||||
meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
|
- Updates the meter after each chunk is processed. -}
|
||||||
meteredWrite meterupdate h = void . meteredWrite' meterupdate h
|
meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO ()
|
||||||
|
meteredWrite meterupdate a = void . meteredWrite' meterupdate a
|
||||||
|
|
||||||
meteredWrite' :: MeterUpdate -> Handle -> L.ByteString -> IO BytesProcessed
|
meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed
|
||||||
meteredWrite' meterupdate h = go zeroBytesProcessed . L.toChunks
|
meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks
|
||||||
where
|
where
|
||||||
go sofar [] = return sofar
|
go sofar [] = return sofar
|
||||||
go sofar (c:cs) = do
|
go sofar (c:cs) = do
|
||||||
S.hPut h c
|
a c
|
||||||
let !sofar' = addBytesProcessed sofar $ S.length c
|
let !sofar' = addBytesProcessed sofar $ S.length c
|
||||||
meterupdate sofar'
|
meterupdate sofar'
|
||||||
go sofar' cs
|
go sofar' cs
|
||||||
|
|
||||||
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
|
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
|
||||||
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
|
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
|
||||||
meteredWrite meterupdate h b
|
meteredWrite meterupdate (S.hPut h) b
|
||||||
|
|
||||||
{- Applies an offset to a MeterUpdate. This can be useful when
|
{- Applies an offset to a MeterUpdate. This can be useful when
|
||||||
- performing a sequence of actions, such as multiple meteredWriteFiles,
|
- performing a sequence of actions, such as multiple meteredWriteFiles,
|
||||||
|
|
|
@ -671,9 +671,10 @@ Executable git-annex
|
||||||
Annex.UpdateInstead
|
Annex.UpdateInstead
|
||||||
Annex.UUID
|
Annex.UUID
|
||||||
Annex.Url
|
Annex.Url
|
||||||
|
Annex.VariantFile
|
||||||
Annex.VectorClock
|
Annex.VectorClock
|
||||||
Annex.VectorClock.Utility
|
Annex.VectorClock.Utility
|
||||||
Annex.VariantFile
|
Annex.Verify
|
||||||
Annex.Version
|
Annex.Version
|
||||||
Annex.View
|
Annex.View
|
||||||
Annex.View.ViewedFile
|
Annex.View.ViewedFile
|
||||||
|
|
Loading…
Reference in a new issue