use P2P protocol for drop

Not yet used for everything else, but this is enough to
verify that it works, and do some benchmarking.

Some bugfixes included, which got it working. Also fallback to old
actions has been verified to work correctly.

Benchmarked dropping one thousand files from a ssh remote on localhost.
Using the old git-annex	40.867 seconds.
With the P2P protocol	9.905 seconds!

This commit was sponsored by Jochen Bartl on Patreon.
This commit is contained in:
Joey Hess 2018-03-08 16:21:16 -04:00
parent 16af259209
commit 6a59bc4845
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 39 additions and 30 deletions

View file

@ -14,25 +14,22 @@ import qualified P2P.Protocol as P2P
import qualified Annex import qualified Annex
import Annex.UUID import Annex.UUID
import qualified CmdLine.GitAnnexShell.Checks as Checks import qualified CmdLine.GitAnnexShell.Checks as Checks
import qualified CmdLine.GitAnnexShell.Fields as Fields
cmd :: Command cmd :: Command
cmd = noMessages $ command "p2pstdio" SectionPlumbing cmd = noMessages $ command "p2pstdio" SectionPlumbing
"communicate in P2P protocol over stdio" "communicate in P2P protocol over stdio"
paramNothing (withParams seek) paramUUID (withParams seek)
seek :: CmdParams -> CommandSeek seek :: CmdParams -> CommandSeek
seek = withNothing start seek [u] = commandAction $ start $ toUUID u
seek _ = giveup "missing UUID parameter"
start :: CommandStart start :: UUID -> CommandStart
start = do start theiruuid = do
servermode <- liftIO $ servermode <- liftIO $
Checks.checkEnvSet Checks.readOnlyEnv >>= return . \case Checks.checkEnvSet Checks.readOnlyEnv >>= return . \case
True -> P2P.ServeReadOnly True -> P2P.ServeReadOnly
False -> P2P.ServeReadWrite False -> P2P.ServeReadWrite
theiruuid <- Fields.getField Fields.remoteUUID >>= \case
Nothing -> giveup "missing remoteuuid field"
Just u -> return (toUUID u)
myuuid <- getUUID myuuid <- getUUID
conn <- stdioP2PConnection <$> Annex.gitRepo conn <- stdioP2PConnection <$> Annex.gitRepo
let server = do let server = do

View file

@ -54,10 +54,11 @@ import Remote.Helper.Export
import qualified Remote.Helper.Ssh as Ssh import qualified Remote.Helper.Ssh as Ssh
import qualified Remote.GCrypt import qualified Remote.GCrypt
import qualified Remote.P2P import qualified Remote.P2P
import P2P.Address import qualified Remote.Helper.P2P as P2PHelper
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import qualified P2P.Annex as P2P import qualified P2P.Annex as P2P
import qualified P2P.IO as P2P import qualified P2P.IO as P2P
import P2P.Address
import Annex.Path import Annex.Path
import Creds import Creds
import Messages.Progress import Messages.Progress
@ -150,11 +151,12 @@ gen r u c gc
| otherwise = case repoP2PAddress r of | otherwise = case repoP2PAddress r of
Nothing -> do Nothing -> do
duc <- mkDeferredUUIDCheck r u gc duc <- mkDeferredUUIDCheck r u gc
go duc <$> remoteCost gc defcst connpool <- Ssh.mkP2PSshConnectionPool
go duc connpool <$> remoteCost gc defcst
Just addr -> Remote.P2P.chainGen addr r u c gc Just addr -> Remote.P2P.chainGen addr r u c gc
where where
defcst = if repoCheap r then cheapRemoteCost else expensiveRemoteCost defcst = if repoCheap r then cheapRemoteCost else expensiveRemoteCost
go duc cst = Just new go duc connpool cst = Just new
where where
new = Remote new = Remote
{ uuid = u { uuid = u
@ -163,7 +165,7 @@ gen r u c gc
, storeKey = copyToRemote new duc , storeKey = copyToRemote new duc
, retrieveKeyFile = copyFromRemote new , retrieveKeyFile = copyFromRemote new
, retrieveKeyFileCheap = copyFromRemoteCheap new , retrieveKeyFileCheap = copyFromRemoteCheap new
, removeKey = dropKey new duc , removeKey = dropKey new duc connpool
, lockContent = Just (lockKey new duc) , lockContent = Just (lockKey new duc)
, checkPresent = inAnnex new duc , checkPresent = inAnnex new duc
, checkPresentCheap = repoCheap r , checkPresentCheap = repoCheap r
@ -368,8 +370,8 @@ keyUrls r key = map tourl locs'
remoteconfig = gitconfig r remoteconfig = gitconfig r
cfg = remoteGitConfig remoteconfig cfg = remoteGitConfig remoteconfig
dropKey :: Remote -> DeferredUUIDCheck -> Key -> Annex Bool dropKey :: Remote -> DeferredUUIDCheck -> Ssh.P2PSshConnectionPool -> Key -> Annex Bool
dropKey r duc key dropKey r duc connpool key
| not $ Git.repoIsUrl (repo r) = ifM duc | not $ Git.repoIsUrl (repo r) = ifM duc
( guardUsable (repo r) (return False) $ ( guardUsable (repo r) (return False) $
commitOnCleanup r $ onLocalFast r $ do commitOnCleanup r $ onLocalFast r $ do
@ -383,7 +385,9 @@ dropKey r duc key
, return False , return False
) )
| Git.repoIsHttp (repo r) = giveup "dropping from http remote not supported" | Git.repoIsHttp (repo r) = giveup "dropping from http remote not supported"
| otherwise = commitOnCleanup r $ Ssh.dropKey (repo r) key | otherwise = commitOnCleanup r $ do
let fallback = Ssh.dropKey (repo r) key
P2PHelper.remove (runProto r connpool fallback) key
lockKey :: Remote -> DeferredUUIDCheck -> Key -> (VerifiedCopy -> Annex r) -> Annex r lockKey :: Remote -> DeferredUUIDCheck -> Key -> (VerifiedCopy -> Annex r) -> Annex r
lockKey r duc key callback lockKey r duc key callback
@ -758,12 +762,12 @@ mkDeferredUUIDCheck r u gc
-- Runs a P2P Proto action on a remote when it supports that, -- Runs a P2P Proto action on a remote when it supports that,
-- otherwise the fallback action. -- otherwise the fallback action.
runSsh :: Remote -> Ssh.P2PSshConnectionPool -> Annex a -> P2P.Proto a -> Annex a runProto :: Remote -> Ssh.P2PSshConnectionPool -> Annex a -> P2P.Proto a -> Annex (Maybe a)
runSsh r connpool fallback proto = runProto r connpool fallback proto = Just <$>
Ssh.getP2PSshConnection r connpool >>= maybe fallback go (Ssh.getP2PSshConnection r connpool >>= maybe fallback go)
where where
go c = do go c = do
(c', v) <- runSsh' proto c (c', v) <- runProtoConn proto c
case v of case v of
Just res -> do Just res -> do
liftIO $ Ssh.storeP2PSshConnection connpool c' liftIO $ Ssh.storeP2PSshConnection connpool c'
@ -773,9 +777,9 @@ runSsh r connpool fallback proto =
-- connection, and run the fallback. -- connection, and run the fallback.
Nothing -> fallback Nothing -> fallback
runSsh' :: P2P.Proto a -> Ssh.P2PSshConnection -> Annex (Ssh.P2PSshConnection, Maybe a) runProtoConn :: P2P.Proto a -> Ssh.P2PSshConnection -> Annex (Ssh.P2PSshConnection, Maybe a)
runSsh' _ P2P.ClosedConnection = return (P2P.ClosedConnection, Nothing) runProtoConn _ P2P.ClosedConnection = return (P2P.ClosedConnection, Nothing)
runSsh' a conn@(P2P.OpenConnection (c, _pid)) = runProtoConn a conn@(P2P.OpenConnection (c, _pid)) =
P2P.runFullProto P2P.Client c a >>= \case P2P.runFullProto P2P.Client c a >>= \case
Right r -> return (conn, Just r) Right r -> return (conn, Just r)
-- When runFullProto fails, the connection is no longer -- When runFullProto fails, the connection is no longer

View file

@ -26,7 +26,6 @@ import Config
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import qualified P2P.IO as P2P import qualified P2P.IO as P2P
import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
toRepo :: ConsumeStdin -> Git.Repo -> RemoteGitConfig -> SshCommand -> Annex (FilePath, [CommandParam]) toRepo :: ConsumeStdin -> Git.Repo -> RemoteGitConfig -> SshCommand -> Annex (FilePath, [CommandParam])
@ -192,7 +191,7 @@ closeP2PSshConnection :: P2PSshConnection -> IO P2PSshConnection
closeP2PSshConnection P2P.ClosedConnection = return P2P.ClosedConnection closeP2PSshConnection P2P.ClosedConnection = return P2P.ClosedConnection
closeP2PSshConnection (P2P.OpenConnection (conn, pid)) = do closeP2PSshConnection (P2P.OpenConnection (conn, pid)) = do
P2P.closeConnection conn P2P.closeConnection conn
void $ async $ waitForProcess pid void $ waitForProcess pid
return P2P.ClosedConnection return P2P.ClosedConnection
-- Pool of connections over ssh to git-annex-shell p2pstdio. -- Pool of connections over ssh to git-annex-shell p2pstdio.
@ -235,8 +234,10 @@ storeP2PSshConnection connpool conn = atomically $ modifyTVar' connpool $ \case
-- If the remote does not support the P2P protocol, that's remembered in -- If the remote does not support the P2P protocol, that's remembered in
-- the connection pool. -- the connection pool.
openP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection) openP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection)
openP2PSshConnection r connpool = openP2PSshConnection r connpool = do
git_annex_shell ConsumeStdin (repo r) "p2pstdio" [] [] >>= \case u <- getUUID
let ps = [Param (fromUUID u)]
git_annex_shell ConsumeStdin (repo r) "p2pstdio" ps [] >>= \case
Nothing -> do Nothing -> do
liftIO $ rememberunsupported liftIO $ rememberunsupported
return Nothing return Nothing
@ -254,11 +255,11 @@ openP2PSshConnection r connpool =
let conn = P2P.P2PConnection let conn = P2P.P2PConnection
{ P2P.connRepo = repo r { P2P.connRepo = repo r
, P2P.connCheckAuth = const False , P2P.connCheckAuth = const False
, P2P.connIhdl = from , P2P.connIhdl = to
, P2P.connOhdl = to , P2P.connOhdl = from
} }
let c = P2P.OpenConnection (conn, pid) let c = P2P.OpenConnection (conn, pid)
-- When the connection is successful, the peer -- When the connection is successful, the remote
-- will send an AUTH_SUCCESS with its uuid. -- will send an AUTH_SUCCESS with its uuid.
tryNonAsync (P2P.runNetProto conn $ P2P.postAuth) >>= \case tryNonAsync (P2P.runNetProto conn $ P2P.postAuth) >>= \case
Right (Right (Just theiruuid)) | theiruuid == uuid r -> Right (Right (Just theiruuid)) | theiruuid == uuid r ->

View file

@ -90,12 +90,15 @@ first "/~/" or "/~user/" is expanded to the specified home directory.
Sets up a repository as a gcrypt repository. Sets up a repository as a gcrypt repository.
* p2pstdio directory * p2pstdio directory uuid
This causes git-annex-shell to communicate using the git-annex p2p This causes git-annex-shell to communicate using the git-annex p2p
protocol over stdio. When supported by git-annex-shell, this allows protocol over stdio. When supported by git-annex-shell, this allows
multiple actions to be run over a single connection, improving speed. multiple actions to be run over a single connection, improving speed.
The uuid is the one belonging to the repository that will be
communicating with git-annex-shell.
# OPTIONS # OPTIONS
Most options are the same as in git-annex. The ones specific Most options are the same as in git-annex. The ones specific

View file

@ -40,3 +40,7 @@ Implementation todos:
git-annex-shell recvkey has a speed optimisation, when it's told the file git-annex-shell recvkey has a speed optimisation, when it's told the file
being sent is locked, it can avoid an expensive verification. being sent is locked, it can avoid an expensive verification.
* Maybe similar for transfers in the other direction? * Maybe similar for transfers in the other direction?
* What happens when the assistant is running and some connections are open
and it moves between networks?
* If it's unable to ssh to a host to run p2pstdio, it will fall back to the
old method. What if the host is down, does this double the timeout?