Merge branch 'p2pflagday'
This commit is contained in:
commit
e43aaa22be
16 changed files with 85 additions and 431 deletions
|
@ -396,8 +396,7 @@ store' repo r rsyncopts accessmethod
|
|||
then fileStorer $ \k f p -> do
|
||||
oh <- mkOutputHandler
|
||||
ok <- Ssh.rsyncHelper oh (Just p)
|
||||
=<< Ssh.rsyncParamsRemote False r Upload k f
|
||||
(AssociatedFile Nothing)
|
||||
=<< Ssh.rsyncParamsRemote r Upload k f
|
||||
unless ok $
|
||||
giveup "rsync failed"
|
||||
else storersync
|
||||
|
@ -418,9 +417,8 @@ retrieve' repo r rsyncopts accessmethod
|
|||
sink =<< liftIO (L.readFile $ gCryptLocation repo k)
|
||||
| Git.repoIsSsh repo = if accessShell r
|
||||
then fileRetriever $ \f k p -> do
|
||||
ps <- Ssh.rsyncParamsRemote False r Download k
|
||||
ps <- Ssh.rsyncParamsRemote r Download k
|
||||
(fromRawFilePath f)
|
||||
(AssociatedFile Nothing)
|
||||
oh <- mkOutputHandler
|
||||
unlessM (Ssh.rsyncHelper oh (Just p) ps) $
|
||||
giveup "rsync failed"
|
||||
|
|
177
Remote/Git.hs
177
Remote/Git.hs
|
@ -48,7 +48,6 @@ import Logs.Location
|
|||
import Utility.Metered
|
||||
import Utility.Env
|
||||
import Utility.Batch
|
||||
import Utility.SimpleProtocol
|
||||
import Remote.Helper.Git
|
||||
import Remote.Helper.Messages
|
||||
import Remote.Helper.ExportImport
|
||||
|
@ -70,7 +69,6 @@ import qualified Utility.RawFilePath as R
|
|||
#endif
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.MSampleVar
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.ByteString as S
|
||||
import Network.URI
|
||||
|
@ -413,9 +411,7 @@ inAnnex' repo rmt st@(State connpool duc _ _ _) key
|
|||
( return True
|
||||
, giveup "not found"
|
||||
)
|
||||
checkremote =
|
||||
let fallback = Ssh.inAnnex repo key
|
||||
in P2PHelper.checkpresent (Ssh.runProto rmt connpool (cantCheck rmt) fallback) key
|
||||
checkremote = P2PHelper.checkpresent (Ssh.runProto rmt connpool (cantCheck rmt)) key
|
||||
checklocal = ifM duc
|
||||
( guardUsable repo (cantCheck repo) $
|
||||
maybe (cantCheck repo) return
|
||||
|
@ -458,9 +454,7 @@ dropKey' repo r st@(State connpool duc _ _ _) key
|
|||
, giveup "remote does not have expected annex.uuid value"
|
||||
)
|
||||
| Git.repoIsHttp repo = giveup "dropping from http remote not supported"
|
||||
| otherwise = commitOnCleanup repo r st $ do
|
||||
let fallback = Ssh.dropKey' repo key
|
||||
P2PHelper.remove (Ssh.runProto r connpool (return False) fallback) key
|
||||
| otherwise = P2PHelper.remove (Ssh.runProto r connpool (return False)) key
|
||||
|
||||
lockKey :: Remote -> State -> Key -> (VerifiedCopy -> Annex r) -> Annex r
|
||||
lockKey r st key callback = do
|
||||
|
@ -482,63 +476,20 @@ lockKey' repo r st@(State connpool duc _ _ _) key callback
|
|||
)
|
||||
| Git.repoIsSsh repo = do
|
||||
showLocking r
|
||||
let withconn = Ssh.withP2PSshConnection r connpool fallback
|
||||
let withconn = Ssh.withP2PSshConnection r connpool failedlock
|
||||
P2PHelper.lock withconn Ssh.runProtoConn (uuid r) key callback
|
||||
| otherwise = failedlock
|
||||
where
|
||||
fallback = withNullHandle $ \nullh -> do
|
||||
Just (cmd, params) <- Ssh.git_annex_shell ConsumeStdin
|
||||
repo "lockcontent"
|
||||
[Param $ serializeKey key] []
|
||||
let p = (proc cmd (toCommand params))
|
||||
{ std_in = CreatePipe
|
||||
, std_out = CreatePipe
|
||||
, std_err = UseHandle nullh
|
||||
}
|
||||
bracketIO (createProcess p) cleanupProcess fallback'
|
||||
|
||||
fallback' (Just hin, Just hout, Nothing, p) = do
|
||||
v <- liftIO $ tryIO $ getProtocolLine hout
|
||||
let signaldone = void $ tryNonAsync $ liftIO $ mapM_ tryNonAsync
|
||||
[ hPutStrLn hout ""
|
||||
, hFlush hout
|
||||
, hClose hin
|
||||
, hClose hout
|
||||
, void $ waitForProcess p
|
||||
]
|
||||
let checkexited = not . isJust <$> getProcessExitCode p
|
||||
case v of
|
||||
Left _exited -> do
|
||||
showNote "lockcontent failed"
|
||||
liftIO $ do
|
||||
hClose hin
|
||||
hClose hout
|
||||
void $ waitForProcess p
|
||||
failedlock
|
||||
Right l
|
||||
| l == Just Ssh.contentLockedMarker -> bracket_
|
||||
noop
|
||||
signaldone
|
||||
(withVerifiedCopy LockedCopy r checkexited callback)
|
||||
| otherwise -> do
|
||||
showNote "lockcontent failed"
|
||||
signaldone
|
||||
failedlock
|
||||
fallback' _ = error "internal"
|
||||
|
||||
failedlock = giveup "can't lock content"
|
||||
|
||||
{- Tries to copy a key's content from a remote's annex to a file. -}
|
||||
copyFromRemote :: Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
copyFromRemote = copyFromRemote' False
|
||||
|
||||
copyFromRemote' :: Bool -> Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
copyFromRemote' forcersync r st key file dest meterupdate vc = do
|
||||
copyFromRemote r st key file dest meterupdate vc = do
|
||||
repo <- getRepo r
|
||||
copyFromRemote'' repo forcersync r st key file dest meterupdate vc
|
||||
copyFromRemote'' repo r st key file dest meterupdate vc
|
||||
|
||||
copyFromRemote'' :: Git.Repo -> Bool -> Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest meterupdate vc
|
||||
copyFromRemote'' :: Git.Repo -> Remote -> State -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
copyFromRemote'' repo r st@(State connpool _ _ _ _) key file dest meterupdate vc
|
||||
| Git.repoIsHttp repo = do
|
||||
iv <- startVerifyKeyContentIncrementally vc key
|
||||
gc <- Annex.getGitConfig
|
||||
|
@ -566,90 +517,12 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
|
|||
then return v
|
||||
else giveup "failed to retrieve content from remote"
|
||||
Nothing -> giveup "content is not present in remote"
|
||||
| Git.repoIsSsh repo = if forcersync
|
||||
then do
|
||||
(ok, v) <- fallback meterupdate
|
||||
if ok
|
||||
then return v
|
||||
else giveup "failed to retrieve content from remote"
|
||||
else P2PHelper.retrieve
|
||||
| Git.repoIsSsh repo =
|
||||
P2PHelper.retrieve
|
||||
(gitconfig r)
|
||||
(\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p))
|
||||
(Ssh.runProto r connpool (return (False, UnVerified)))
|
||||
key file dest meterupdate vc
|
||||
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
|
||||
where
|
||||
fallback p = unVerified $ feedprogressback $ \p' -> do
|
||||
oh <- mkOutputHandlerQuiet
|
||||
Ssh.rsyncHelper oh (Just (combineMeterUpdate p' p))
|
||||
=<< Ssh.rsyncParamsRemote False r Download key dest file
|
||||
|
||||
{- Feed local rsync's progress info back to the remote,
|
||||
- by forking a feeder thread that runs
|
||||
- git-annex-shell transferinfo at the same time
|
||||
- git-annex-shell sendkey is running.
|
||||
-
|
||||
- To avoid extra password prompts, this is only done when ssh
|
||||
- connection caching is supported.
|
||||
- Note that it actually waits for rsync to indicate
|
||||
- progress before starting transferinfo, in order
|
||||
- to ensure ssh connection caching works and reuses
|
||||
- the connection set up for the sendkey.
|
||||
-
|
||||
- Also note that older git-annex-shell does not support
|
||||
- transferinfo, so stderr is dropped and failure ignored.
|
||||
-}
|
||||
feedprogressback a = ifM (isJust <$> sshCacheDir)
|
||||
( feedprogressback' a
|
||||
, a $ const noop
|
||||
)
|
||||
feedprogressback' a = do
|
||||
u <- getUUID
|
||||
let AssociatedFile afile = file
|
||||
let fields = (Fields.remoteUUID, fromUUID u)
|
||||
: maybe [] (\f -> [(Fields.associatedFile, fromRawFilePath f)]) afile
|
||||
Just (cmd, params) <- Ssh.git_annex_shell ConsumeStdin
|
||||
repo "transferinfo"
|
||||
[Param $ serializeKey key] fields
|
||||
v <- liftIO (newEmptySV :: IO (MSampleVar Integer))
|
||||
pv <- liftIO $ newEmptyMVar
|
||||
tid <- liftIO $ forkIO $ void $ tryIO $ do
|
||||
bytes <- readSV v
|
||||
p <- createProcess $
|
||||
(proc cmd (toCommand params))
|
||||
{ std_in = CreatePipe
|
||||
, std_err = CreatePipe
|
||||
}
|
||||
putMVar pv p
|
||||
hClose $ stderrHandle p
|
||||
let h = stdinHandle p
|
||||
let send b = do
|
||||
hPrint h b
|
||||
hFlush h
|
||||
send bytes
|
||||
forever $
|
||||
send =<< readSV v
|
||||
let feeder = \n -> do
|
||||
meterupdate n
|
||||
writeSV v (fromBytesProcessed n)
|
||||
|
||||
-- It can easily take 0.3 seconds to clean up after
|
||||
-- the transferinfo, and all that's involved is shutting
|
||||
-- down the process and associated thread cleanly. So,
|
||||
-- do it in the background.
|
||||
let cleanup = forkIO $ do
|
||||
void $ tryIO $ killThread tid
|
||||
void $ tryNonAsync $
|
||||
maybe noop (void . waitForProcess . processHandle)
|
||||
=<< tryTakeMVar pv
|
||||
|
||||
let forcestop = do
|
||||
void $ tryIO $ killThread tid
|
||||
void $ tryNonAsync $
|
||||
maybe noop cleanupProcess
|
||||
=<< tryTakeMVar pv
|
||||
|
||||
bracketIO noop (const cleanup) (const $ a feeder)
|
||||
`onException` liftIO forcestop
|
||||
|
||||
copyFromRemoteCheap :: State -> Git.Repo -> Maybe (Key -> AssociatedFile -> FilePath -> Annex ())
|
||||
#ifndef mingw32_HOST_OS
|
||||
|
@ -681,9 +554,9 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
|
|||
copylocal =<< Annex.Content.prepSendAnnex' key
|
||||
, giveup "remote does not have expected annex.uuid value"
|
||||
)
|
||||
| Git.repoIsSsh repo = commitOnCleanup repo r st $
|
||||
| Git.repoIsSsh repo =
|
||||
P2PHelper.store (gitconfig r)
|
||||
(Ssh.runProto r connpool (return False) . copyremotefallback)
|
||||
(Ssh.runProto r connpool (return False))
|
||||
key file meterupdate
|
||||
|
||||
| otherwise = giveup "copying to non-ssh repo not supported"
|
||||
|
@ -715,16 +588,6 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
|
|||
)
|
||||
unless res $
|
||||
giveup "failed to send content to remote"
|
||||
copyremotefallback p = either (const False) id
|
||||
<$> tryNonAsync (copyremotefallback' p)
|
||||
copyremotefallback' p = Annex.Content.sendAnnex key noop $ \object -> do
|
||||
-- This is too broad really, but recvkey normally
|
||||
-- verifies content anyway, so avoid complicating
|
||||
-- it with a local sendAnnex check and rollback.
|
||||
let unlocked = True
|
||||
oh <- mkOutputHandlerQuiet
|
||||
Ssh.rsyncHelper oh (Just p)
|
||||
=<< Ssh.rsyncParamsRemote unlocked r Upload key object file
|
||||
|
||||
fsckOnRemote :: Git.Repo -> [CommandParam] -> Annex (IO Bool)
|
||||
fsckOnRemote r params
|
||||
|
@ -816,21 +679,7 @@ commitOnCleanup repo r st a = go `after` a
|
|||
| not $ Git.repoIsUrl repo = onLocalFast st $
|
||||
doQuietSideAction $
|
||||
Annex.Branch.commit =<< Annex.Branch.commitMessage
|
||||
| otherwise = do
|
||||
Just (shellcmd, shellparams) <-
|
||||
Ssh.git_annex_shell NoConsumeStdin
|
||||
repo "commit" [] []
|
||||
|
||||
-- Throw away stderr, since the remote may not
|
||||
-- have a new enough git-annex shell to
|
||||
-- support committing.
|
||||
liftIO $ void $ catchMaybeIO $ withNullHandle $ \nullh ->
|
||||
let p = (proc shellcmd (toCommand shellparams))
|
||||
{ std_out = UseHandle nullh
|
||||
, std_err = UseHandle nullh
|
||||
}
|
||||
in withCreateProcess p $ \_ _ _ ->
|
||||
forceSuccessProcess p
|
||||
| otherwise = noop
|
||||
|
||||
wantHardLink :: Annex Bool
|
||||
wantHardLink = (annexHardLink <$> Annex.getGitConfig)
|
||||
|
|
|
@ -31,22 +31,22 @@ type ProtoConnRunner c = forall a. P2P.Proto a -> ClosableConnection c -> Annex
|
|||
-- the pool when done.
|
||||
type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> Annex a
|
||||
|
||||
store :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex ()
|
||||
store :: RemoteGitConfig -> ProtoRunner Bool -> Key -> AssociatedFile -> MeterUpdate -> Annex ()
|
||||
store gc runner k af p = do
|
||||
let sizer = KeySizer k (fmap (toRawFilePath . fst) <$> prepSendAnnex k)
|
||||
let bwlimit = remoteAnnexBwLimit gc
|
||||
metered (Just p) sizer bwlimit $ \_ p' ->
|
||||
runner p' (P2P.put k af p') >>= \case
|
||||
runner (P2P.put k af p') >>= \case
|
||||
Just True -> return ()
|
||||
Just False -> giveup "Transfer failed"
|
||||
Nothing -> remoteUnavail
|
||||
|
||||
retrieve :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
retrieve :: RemoteGitConfig -> (ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||
retrieve gc runner k af dest p verifyconfig = do
|
||||
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
||||
let bwlimit = remoteAnnexBwLimit gc
|
||||
metered (Just p) k bwlimit $ \m p' ->
|
||||
runner p' (P2P.get dest k iv af m p') >>= \case
|
||||
runner (P2P.get dest k iv af m p') >>= \case
|
||||
Just (True, v) -> return v
|
||||
Just (False, _) -> giveup "Transfer failed"
|
||||
Nothing -> remoteUnavail
|
||||
|
|
|
@ -130,20 +130,14 @@ rsyncHelper oh m params = do
|
|||
|
||||
{- Generates rsync parameters that ssh to the remote and asks it
|
||||
- to either receive or send the key's content. -}
|
||||
rsyncParamsRemote :: Bool -> Remote -> Direction -> Key -> FilePath -> AssociatedFile -> Annex [CommandParam]
|
||||
rsyncParamsRemote unlocked r direction key file (AssociatedFile afile) = do
|
||||
rsyncParamsRemote :: Remote -> Direction -> Key -> FilePath -> Annex [CommandParam]
|
||||
rsyncParamsRemote r direction key file = do
|
||||
u <- getUUID
|
||||
let fields = (Fields.remoteUUID, fromUUID u)
|
||||
: (Fields.unlocked, if unlocked then "1" else "")
|
||||
-- Send direct field for unlocked content, for backwards
|
||||
-- compatability.
|
||||
: (Fields.direct, if unlocked then "1" else "")
|
||||
: maybe [] (\f -> [(Fields.associatedFile, fromRawFilePath f)]) afile
|
||||
repo <- getRepo r
|
||||
Just (shellcmd, shellparams) <- git_annex_shell ConsumeStdin repo
|
||||
(if direction == Download then "sendkey" else "recvkey")
|
||||
[ Param $ serializeKey key ]
|
||||
fields
|
||||
[(Fields.remoteUUID, fromUUID u)]
|
||||
-- Convert the ssh command into rsync command line.
|
||||
let eparam = rsyncShell (Param shellcmd:shellparams)
|
||||
o <- rsyncParams r direction
|
||||
|
@ -183,11 +177,6 @@ rsyncParams r direction = do
|
|||
| otherwise = remoteAnnexRsyncUploadOptions gc
|
||||
gc = gitconfig r
|
||||
|
||||
-- Used by git-annex-shell lockcontent to indicate the content is
|
||||
-- successfully locked.
|
||||
contentLockedMarker :: String
|
||||
contentLockedMarker = "OK"
|
||||
|
||||
-- A connection over ssh to git-annex shell speaking the P2P protocol.
|
||||
type P2PSshConnection = P2P.ClosableConnection
|
||||
(P2P.RunState, P2P.P2PConnection, ProcessHandle, TVar StderrHandlerState)
|
||||
|
@ -320,9 +309,9 @@ newStderrHandler errh ph = do
|
|||
|
||||
-- Runs a P2P Proto action on a remote when it supports that,
|
||||
-- otherwise the fallback action.
|
||||
runProto :: Remote -> P2PSshConnectionPool -> Annex a -> Annex a -> P2P.Proto a -> Annex (Maybe a)
|
||||
runProto r connpool badproto fallback proto = Just <$>
|
||||
(getP2PSshConnection r connpool >>= maybe fallback go)
|
||||
runProto :: Remote -> P2PSshConnectionPool -> Annex a -> P2P.Proto a -> Annex (Maybe a)
|
||||
runProto r connpool onerr proto = Just <$>
|
||||
(getP2PSshConnection r connpool >>= maybe onerr go)
|
||||
where
|
||||
go c = do
|
||||
(c', v) <- runProtoConn proto c
|
||||
|
@ -330,9 +319,7 @@ runProto r connpool badproto fallback proto = Just <$>
|
|||
Just res -> do
|
||||
liftIO $ storeP2PSshConnection connpool c'
|
||||
return res
|
||||
-- Running the proto failed, either due to a protocol
|
||||
-- error or a network error.
|
||||
Nothing -> badproto
|
||||
Nothing -> onerr
|
||||
|
||||
runProtoConn :: P2P.Proto a -> P2PSshConnection -> Annex (P2PSshConnection, Maybe a)
|
||||
runProtoConn _ P2P.ClosedConnection = return (P2P.ClosedConnection, Nothing)
|
||||
|
|
|
@ -55,8 +55,8 @@ chainGen addr r u rc gc rs = do
|
|||
{ uuid = u
|
||||
, cost = cst
|
||||
, name = Git.repoDescribe r
|
||||
, storeKey = store gc (const protorunner)
|
||||
, retrieveKeyFile = retrieve gc (const protorunner)
|
||||
, storeKey = store gc protorunner
|
||||
, retrieveKeyFile = retrieve gc protorunner
|
||||
, retrieveKeyFileCheap = Nothing
|
||||
, retrievalSecurityPolicy = RetrievalAllKeysSecure
|
||||
, removeKey = remove protorunner
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue