Remote.Git storeKey works with annex+http urls
Does not yet update progress meter.
This commit is contained in:
parent
0280e2dd5e
commit
b3915b88ba
3 changed files with 76 additions and 48 deletions
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
module P2P.Http.Client (
|
module P2P.Http.Client (
|
||||||
module P2P.Http.Client,
|
module P2P.Http.Client,
|
||||||
|
module P2P.Http.Types,
|
||||||
Validity(..),
|
Validity(..),
|
||||||
) where
|
) where
|
||||||
|
|
||||||
|
@ -25,6 +26,7 @@ import Annex.UUID
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import P2P.Http
|
import P2P.Http
|
||||||
import P2P.Http.Url
|
import P2P.Http.Url
|
||||||
|
import P2P.Http.Types
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import P2P.Protocol hiding (Offset, Bypass, auth)
|
import P2P.Protocol hiding (Offset, Bypass, auth)
|
||||||
import Annex.Concurrent
|
import Annex.Concurrent
|
||||||
|
@ -286,20 +288,16 @@ clientGetTimestamp clientenv (ProtocolVersion ver) su cu bypass auth =
|
||||||
|
|
||||||
#ifdef WITH_SERVANT
|
#ifdef WITH_SERVANT
|
||||||
clientPut
|
clientPut
|
||||||
:: ClientEnv
|
:: MeterUpdate
|
||||||
-> ProtocolVersion
|
-> Key
|
||||||
-> B64Key
|
|
||||||
-> B64UUID ServerSide
|
|
||||||
-> B64UUID ClientSide
|
|
||||||
-> [B64UUID Bypass]
|
|
||||||
-> Maybe Auth
|
|
||||||
-> Maybe Offset
|
-> Maybe Offset
|
||||||
-> AssociatedFile
|
-> AssociatedFile
|
||||||
-> FilePath
|
-> FilePath
|
||||||
-> FileSize
|
-> FileSize
|
||||||
-> Annex Bool
|
-> Annex Bool
|
||||||
-> Annex PutResultPlus
|
-- ^ Called after sending the file to check if it's valid.
|
||||||
clientPut clientenv (ProtocolVersion ver) k su cu bypass auth moffset af contentfile contentfilesize validitycheck = do
|
-> ClientAction PutResultPlus
|
||||||
|
clientPut meterupdate k moffset af contentfile contentfilesize validitycheck clientenv (ProtocolVersion ver) su cu bypass auth = do
|
||||||
checkv <- liftIO newEmptyTMVarIO
|
checkv <- liftIO newEmptyTMVarIO
|
||||||
checkresultv <- liftIO newEmptyTMVarIO
|
checkresultv <- liftIO newEmptyTMVarIO
|
||||||
let checker = do
|
let checker = do
|
||||||
|
@ -314,10 +312,10 @@ clientPut clientenv (ProtocolVersion ver) k su cu bypass auth moffset af content
|
||||||
Left err -> do
|
Left err -> do
|
||||||
void $ liftIO $ atomically $ tryPutTMVar checkv ()
|
void $ liftIO $ atomically $ tryPutTMVar checkv ()
|
||||||
join $ liftIO (wait checkerthread)
|
join $ liftIO (wait checkerthread)
|
||||||
throwM err
|
return (Left err)
|
||||||
Right res -> do
|
Right res -> do
|
||||||
join $ liftIO (wait checkerthread)
|
join $ liftIO (wait checkerthread)
|
||||||
return res
|
return (Right res)
|
||||||
where
|
where
|
||||||
stream h checkv checkresultv = S.SourceT $ \a -> do
|
stream h checkv checkresultv = S.SourceT $ \a -> do
|
||||||
bl <- L.hGetContents h
|
bl <- L.hGetContents h
|
||||||
|
@ -365,12 +363,14 @@ clientPut clientenv (ProtocolVersion ver) k su cu bypass auth moffset af content
|
||||||
offset = case moffset of
|
offset = case moffset of
|
||||||
Nothing -> 0
|
Nothing -> 0
|
||||||
Just (Offset o) -> fromIntegral o
|
Just (Offset o) -> fromIntegral o
|
||||||
|
|
||||||
|
bk = B64Key k
|
||||||
|
|
||||||
cli src = case ver of
|
cli src = case ver of
|
||||||
3 -> v3 su V3 len k cu bypass baf moffset src auth
|
3 -> v3 su V3 len bk cu bypass baf moffset src auth
|
||||||
2 -> v2 su V2 len k cu bypass baf moffset src auth
|
2 -> v2 su V2 len bk cu bypass baf moffset src auth
|
||||||
1 -> plus <$> v1 su V1 len k cu bypass baf moffset src auth
|
1 -> plus <$> v1 su V1 len bk cu bypass baf moffset src auth
|
||||||
0 -> plus <$> v0 su V0 len k cu bypass baf moffset src auth
|
0 -> plus <$> v0 su V0 len bk cu bypass baf moffset src auth
|
||||||
_ -> error "unsupported protocol version"
|
_ -> error "unsupported protocol version"
|
||||||
|
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
|
@ -379,29 +379,24 @@ clientPut clientenv (ProtocolVersion ver) k su cu bypass auth moffset af content
|
||||||
_ :<|>
|
_ :<|>
|
||||||
_ :<|>
|
_ :<|>
|
||||||
v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI
|
v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI
|
||||||
|
#else
|
||||||
|
clientPut _ _ _ _ _ _ _ = ()
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef WITH_SERVANT
|
#ifdef WITH_SERVANT
|
||||||
clientPutOffset
|
clientPutOffset
|
||||||
:: ClientEnv
|
:: Key
|
||||||
-> ProtocolVersion
|
-> ClientAction PutOffsetResultPlus
|
||||||
-> B64Key
|
clientPutOffset k clientenv (ProtocolVersion ver) su cu bypass auth
|
||||||
-> B64UUID ServerSide
|
| ver == 0 = return (Right (PutOffsetResultPlus (Offset 0)))
|
||||||
-> B64UUID ClientSide
|
| otherwise = liftIO $ withClientM cli clientenv return
|
||||||
-> [B64UUID Bypass]
|
|
||||||
-> Maybe Auth
|
|
||||||
-> IO PutOffsetResultPlus
|
|
||||||
clientPutOffset clientenv (ProtocolVersion ver) k su cu bypass auth
|
|
||||||
| ver == 0 = return (PutOffsetResultPlus (Offset 0))
|
|
||||||
| otherwise =
|
|
||||||
withClientM cli clientenv $ \case
|
|
||||||
Left err -> throwM err
|
|
||||||
Right res -> return res
|
|
||||||
where
|
where
|
||||||
|
bk = B64Key k
|
||||||
|
|
||||||
cli = case ver of
|
cli = case ver of
|
||||||
3 -> v3 su V3 k cu bypass auth
|
3 -> v3 su V3 bk cu bypass auth
|
||||||
2 -> v2 su V2 k cu bypass auth
|
2 -> v2 su V2 bk cu bypass auth
|
||||||
1 -> plus <$> v1 su V1 k cu bypass auth
|
1 -> plus <$> v1 su V1 bk cu bypass auth
|
||||||
_ -> error "unsupported protocol version"
|
_ -> error "unsupported protocol version"
|
||||||
|
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
|
@ -411,6 +406,8 @@ clientPutOffset clientenv (ProtocolVersion ver) k su cu bypass auth
|
||||||
_ :<|>
|
_ :<|>
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
v3 :<|> v2 :<|> v1 :<|> _ = client p2pHttpAPI
|
v3 :<|> v2 :<|> v1 :<|> _ = client p2pHttpAPI
|
||||||
|
#else
|
||||||
|
clientPutOffset _ = ()
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef WITH_SERVANT
|
#ifdef WITH_SERVANT
|
||||||
|
|
|
@ -539,7 +539,7 @@ copyFromRemote r st key file dest meterupdate vc = do
|
||||||
|
|
||||||
copyFromRemote'' :: Git.Repo -> Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
copyFromRemote'' :: Git.Repo -> Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||||
copyFromRemote'' repo r st@(State connpool _ _ _ _) key af dest meterupdate vc
|
copyFromRemote'' repo r st@(State connpool _ _ _ _) key af dest meterupdate vc
|
||||||
| isP2PHttp r = p2phttp
|
| isP2PHttp r = copyp2phttp
|
||||||
| Git.repoIsHttp repo = verifyKeyContentIncrementally vc key $ \iv -> do
|
| Git.repoIsHttp repo = verifyKeyContentIncrementally vc key $ \iv -> do
|
||||||
gc <- Annex.getGitConfig
|
gc <- Annex.getGitConfig
|
||||||
ok <- Url.withUrlOptionsPromptingCreds $
|
ok <- Url.withUrlOptionsPromptingCreds $
|
||||||
|
@ -574,7 +574,7 @@ copyFromRemote'' repo r st@(State connpool _ _ _ _) key af dest meterupdate vc
|
||||||
bwlimit = remoteAnnexBwLimitDownload (gitconfig r)
|
bwlimit = remoteAnnexBwLimitDownload (gitconfig r)
|
||||||
<|> remoteAnnexBwLimit (gitconfig r)
|
<|> remoteAnnexBwLimit (gitconfig r)
|
||||||
|
|
||||||
p2phttp = verifyKeyContentIncrementally vc key $ \iv -> do
|
copyp2phttp = verifyKeyContentIncrementally vc key $ \iv -> do
|
||||||
startsz <- liftIO $ tryWhenExists $
|
startsz <- liftIO $ tryWhenExists $
|
||||||
getFileSize (toRawFilePath dest)
|
getFileSize (toRawFilePath dest)
|
||||||
bracketIO (openBinaryFile dest ReadWriteMode) (hClose) $ \h -> do
|
bracketIO (openBinaryFile dest ReadWriteMode) (hClose) $ \h -> do
|
||||||
|
@ -614,9 +614,10 @@ copyToRemote r st key af o meterupdate = do
|
||||||
|
|
||||||
copyToRemote' :: Git.Repo -> Remote -> State -> Key -> AssociatedFile -> Maybe FilePath -> MeterUpdate -> Annex ()
|
copyToRemote' :: Git.Repo -> Remote -> State -> Key -> AssociatedFile -> Maybe FilePath -> MeterUpdate -> Annex ()
|
||||||
copyToRemote' repo r st@(State connpool duc _ _ _) key af o meterupdate
|
copyToRemote' repo r st@(State connpool duc _ _ _) key af o meterupdate
|
||||||
|
| isP2PHttp r = prepsendwith copyp2phttp
|
||||||
| not $ Git.repoIsUrl repo = ifM duc
|
| not $ Git.repoIsUrl repo = ifM duc
|
||||||
( guardUsable repo (giveup "cannot access remote") $ commitOnCleanup repo r st $
|
( guardUsable repo (giveup "cannot access remote") $ commitOnCleanup repo r st $
|
||||||
copylocal =<< Annex.Content.prepSendAnnex' key o
|
prepsendwith copylocal
|
||||||
, giveup "remote does not have expected annex.uuid value"
|
, giveup "remote does not have expected annex.uuid value"
|
||||||
)
|
)
|
||||||
| Git.repoIsSsh repo =
|
| Git.repoIsSsh repo =
|
||||||
|
@ -624,18 +625,24 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key af o meterupdate
|
||||||
(Ssh.runProto r connpool (return Nothing))
|
(Ssh.runProto r connpool (return Nothing))
|
||||||
key af o meterupdate
|
key af o meterupdate
|
||||||
|
|
||||||
| otherwise = giveup "copying to non-ssh repo not supported"
|
| otherwise = giveup "copying to this remote is not supported"
|
||||||
where
|
where
|
||||||
copylocal Nothing = giveup "content not available"
|
prepsendwith a = Annex.Content.prepSendAnnex' key o >>= \case
|
||||||
copylocal (Just (object, sz, check)) = do
|
Nothing -> giveup "content not available"
|
||||||
|
Just v -> a v
|
||||||
|
|
||||||
|
bwlimit = remoteAnnexBwLimitUpload (gitconfig r)
|
||||||
|
<|> remoteAnnexBwLimit (gitconfig r)
|
||||||
|
|
||||||
|
failedsend = giveup "failed to send content to remote"
|
||||||
|
|
||||||
|
copylocal (object, sz, check) = do
|
||||||
-- The check action is going to be run in
|
-- The check action is going to be run in
|
||||||
-- the remote's Annex, but it needs access to the local
|
-- the remote's Annex, but it needs access to the local
|
||||||
-- Annex monad's state.
|
-- Annex monad's state.
|
||||||
checkio <- Annex.withCurrentState check
|
checkio <- Annex.withCurrentState check
|
||||||
u <- getUUID
|
u <- getUUID
|
||||||
hardlink <- wantHardLink
|
hardlink <- wantHardLink
|
||||||
let bwlimit = remoteAnnexBwLimitUpload (gitconfig r)
|
|
||||||
<|> remoteAnnexBwLimit (gitconfig r)
|
|
||||||
-- run copy from perspective of remote
|
-- run copy from perspective of remote
|
||||||
res <- onLocalFast st $ ifM (Annex.Content.inAnnex key)
|
res <- onLocalFast st $ ifM (Annex.Content.inAnnex key)
|
||||||
( return True
|
( return True
|
||||||
|
@ -651,7 +658,28 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key af o meterupdate
|
||||||
copier object (fromRawFilePath dest) key p' checksuccess verify
|
copier object (fromRawFilePath dest) key p' checksuccess verify
|
||||||
)
|
)
|
||||||
unless res $
|
unless res $
|
||||||
giveup "failed to send content to remote"
|
failedsend
|
||||||
|
|
||||||
|
copyp2phttp (object, sz, check) =
|
||||||
|
let check' = check >>= \case
|
||||||
|
Just s -> do
|
||||||
|
warning (UnquotedString s)
|
||||||
|
return False
|
||||||
|
Nothing -> return True
|
||||||
|
in p2pHttpClient r (const $ pure $ PutOffsetResultPlus (Offset 0)) (clientPutOffset key) >>= \case
|
||||||
|
PutOffsetResultPlus offset ->
|
||||||
|
metered (Just meterupdate) key bwlimit $ \_ p -> do
|
||||||
|
res <- p2pHttpClient r giveup $
|
||||||
|
clientPut p key (Just offset) af object sz check'
|
||||||
|
case res of
|
||||||
|
PutResultPlus False _ ->
|
||||||
|
failedsend
|
||||||
|
PutResultPlus True fanoutuuids ->
|
||||||
|
storefanout fanoutuuids
|
||||||
|
PutOffsetResultAlreadyHavePlus fanoutuuids ->
|
||||||
|
storefanout fanoutuuids
|
||||||
|
|
||||||
|
storefanout = P2PHelper.storeFanout key (uuid r) . map fromB64UUID
|
||||||
|
|
||||||
fsckOnRemote :: Git.Repo -> [CommandParam] -> Annex (IO Bool)
|
fsckOnRemote :: Git.Repo -> [CommandParam] -> Annex (IO Bool)
|
||||||
fsckOnRemote r params
|
fsckOnRemote r params
|
||||||
|
|
|
@ -42,16 +42,19 @@ store remoteuuid gc runner k af o p = do
|
||||||
let bwlimit = remoteAnnexBwLimitUpload gc <|> remoteAnnexBwLimit gc
|
let bwlimit = remoteAnnexBwLimitUpload gc <|> remoteAnnexBwLimit gc
|
||||||
metered (Just p) sizer bwlimit $ \_ p' ->
|
metered (Just p) sizer bwlimit $ \_ p' ->
|
||||||
runner (P2P.put k af p') >>= \case
|
runner (P2P.put k af p') >>= \case
|
||||||
Just (Just fanoutuuids) -> do
|
Just (Just fanoutuuids) ->
|
||||||
-- Storing on the remote can cause it
|
storeFanout k remoteuuid fanoutuuids
|
||||||
-- to be stored on additional UUIDs,
|
|
||||||
-- so record those.
|
|
||||||
forM_ fanoutuuids $ \u ->
|
|
||||||
when (u /= remoteuuid) $
|
|
||||||
logChange k u InfoPresent
|
|
||||||
Just Nothing -> giveup "Transfer failed"
|
Just Nothing -> giveup "Transfer failed"
|
||||||
Nothing -> remoteUnavail
|
Nothing -> remoteUnavail
|
||||||
|
|
||||||
|
storeFanout :: Key -> UUID -> [UUID] -> Annex ()
|
||||||
|
storeFanout k remoteuuid us =
|
||||||
|
-- Storing on the remote can cause it to be stored on additional UUIDs,
|
||||||
|
-- so record those.
|
||||||
|
forM_ us $ \u ->
|
||||||
|
when (u /= remoteuuid) $
|
||||||
|
logChange k u InfoPresent
|
||||||
|
|
||||||
retrieve :: RemoteGitConfig -> (ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
retrieve :: RemoteGitConfig -> (ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||||
retrieve gc runner k af dest p verifyconfig = do
|
retrieve gc runner k af dest p verifyconfig = do
|
||||||
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue