allow multiple concurrent external special remote processes
Multiple external special remote processes for the same remote will be started as needed when using -J. This should not beak any existing external special remotes, because running multiple git-annex commands at the same time could already start multiple processes for the same external special remotes.
This commit is contained in:
parent
b69dea0ac3
commit
5bf4623a1d
4 changed files with 115 additions and 118 deletions
|
@ -9,6 +9,11 @@ git-annex (6.20160924) UNRELEASED; urgency=medium
|
|||
* Add "total-size" field to --json-progress output.
|
||||
* Make --json-progress output be shown even when the size of a object
|
||||
is not known.
|
||||
* Multiple external special remote processes for the same remote will be
|
||||
started as needed when using -J. This should not beak any existing
|
||||
external special remotes, because running multiple git-annex commands
|
||||
at the same time could already start multiple processes for the same
|
||||
external special remotes.
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Mon, 26 Sep 2016 16:46:19 -0400
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{- External special remote interface.
|
||||
-
|
||||
- Copyright 2013-2015 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2013-2016 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
@ -126,9 +126,8 @@ externalSetup mu _ c gc = do
|
|||
INITREMOTE_SUCCESS -> Just noop
|
||||
INITREMOTE_FAILURE errmsg -> Just $ error errmsg
|
||||
_ -> Nothing
|
||||
withExternalLock external $ \lck ->
|
||||
fromExternal lck external externalConfig $
|
||||
liftIO . atomically . readTMVar
|
||||
withExternalState external $
|
||||
liftIO . atomically . readTMVar . externalConfig
|
||||
|
||||
gitConfigSpecialRemote u c'' "externaltype" externaltype
|
||||
return (c'', u)
|
||||
|
@ -203,27 +202,28 @@ safely a = go =<< tryNonAsync a
|
|||
- While the external remote is processing the Request, it may send
|
||||
- any number of RemoteRequests, that are handled here.
|
||||
-
|
||||
- Only one request can be made at a time, so locking is used.
|
||||
- An external remote process can only handle one request at a time.
|
||||
- Concurrent requests will start up additional processes.
|
||||
-
|
||||
- May throw exceptions, for example on protocol errors, or
|
||||
- when the repository cannot be used.
|
||||
-}
|
||||
handleRequest :: External -> Request -> Maybe MeterUpdate -> (Response -> Maybe (Annex a)) -> Annex a
|
||||
handleRequest external req mp responsehandler =
|
||||
withExternalLock external $ \lck ->
|
||||
handleRequest' lck external req mp responsehandler
|
||||
withExternalState external $ \st ->
|
||||
handleRequest' st external req mp responsehandler
|
||||
|
||||
handleRequest' :: ExternalLock -> External -> Request -> Maybe MeterUpdate -> (Response -> Maybe (Annex a)) -> Annex a
|
||||
handleRequest' lck external req mp responsehandler
|
||||
handleRequest' :: ExternalState -> External -> Request -> Maybe MeterUpdate -> (Response -> Maybe (Annex a)) -> Annex a
|
||||
handleRequest' st external req mp responsehandler
|
||||
| needsPREPARE req = do
|
||||
checkPrepared lck external
|
||||
checkPrepared st external
|
||||
go
|
||||
| otherwise = go
|
||||
where
|
||||
go = do
|
||||
sendMessage lck external req
|
||||
sendMessage st external req
|
||||
loop
|
||||
loop = receiveMessage lck external responsehandler
|
||||
loop = receiveMessage st external responsehandler
|
||||
(\rreq -> Just $ handleRemoteRequest rreq >> loop)
|
||||
(\msg -> Just $ handleAsyncMessage msg >> loop)
|
||||
|
||||
|
@ -234,26 +234,24 @@ handleRequest' lck external req mp responsehandler
|
|||
handleRemoteRequest (DIRHASH_LOWER k) =
|
||||
send $ VALUE $ hashDirLower def k
|
||||
handleRemoteRequest (SETCONFIG setting value) =
|
||||
fromExternal lck external externalConfig $ \v ->
|
||||
liftIO $ atomically $ do
|
||||
m <- takeTMVar v
|
||||
putTMVar v $ M.insert setting value m
|
||||
liftIO $ atomically $ do
|
||||
let v = externalConfig st
|
||||
m <- takeTMVar v
|
||||
putTMVar v $ M.insert setting value m
|
||||
handleRemoteRequest (GETCONFIG setting) = do
|
||||
value <- fromExternal lck external externalConfig $ \v ->
|
||||
fromMaybe "" . M.lookup setting
|
||||
<$> liftIO (atomically $ readTMVar v)
|
||||
value <- fromMaybe "" . M.lookup setting
|
||||
<$> liftIO (atomically $ readTMVar $ externalConfig st)
|
||||
send $ VALUE value
|
||||
handleRemoteRequest (SETCREDS setting login password) = do
|
||||
fromExternal lck external externalConfig $ \v -> do
|
||||
c <- liftIO $ atomically $ readTMVar v
|
||||
let gc = externalGitConfig external
|
||||
c' <- setRemoteCredPair encryptionAlreadySetup c gc
|
||||
(credstorage setting)
|
||||
(Just (login, password))
|
||||
void $ liftIO $ atomically $ swapTMVar v c'
|
||||
let v = externalConfig st
|
||||
c <- liftIO $ atomically $ readTMVar v
|
||||
let gc = externalGitConfig external
|
||||
c' <- setRemoteCredPair encryptionAlreadySetup c gc
|
||||
(credstorage setting)
|
||||
(Just (login, password))
|
||||
void $ liftIO $ atomically $ swapTMVar v c'
|
||||
handleRemoteRequest (GETCREDS setting) = do
|
||||
c <- fromExternal lck external externalConfig $
|
||||
liftIO . atomically . readTMVar
|
||||
c <- liftIO $ atomically $ readTMVar $ externalConfig st
|
||||
let gc = externalGitConfig external
|
||||
creds <- fromMaybe ("", "") <$>
|
||||
getRemoteCredPair c gc (credstorage setting)
|
||||
|
@ -286,11 +284,11 @@ handleRequest' lck external req mp responsehandler
|
|||
send (VALUE "") -- end of list
|
||||
handleRemoteRequest (DEBUG msg) = liftIO $ debugM "external" msg
|
||||
handleRemoteRequest (VERSION _) =
|
||||
sendMessage lck external $ ERROR "too late to send VERSION"
|
||||
sendMessage st external (ERROR "too late to send VERSION")
|
||||
|
||||
handleAsyncMessage (ERROR err) = error $ "external special remote error: " ++ err
|
||||
|
||||
send = sendMessage lck external
|
||||
send = sendMessage st external
|
||||
|
||||
credstorage setting = CredPairStorage
|
||||
{ credPairFile = base
|
||||
|
@ -303,30 +301,28 @@ handleRequest' lck external req mp responsehandler
|
|||
withurl mk uri = handleRemoteRequest $ mk $
|
||||
setDownloader (show uri) OtherDownloader
|
||||
|
||||
sendMessage :: Sendable m => ExternalLock -> External -> m -> Annex ()
|
||||
sendMessage lck external m =
|
||||
fromExternal lck external externalSend $ \h ->
|
||||
liftIO $ do
|
||||
protocolDebug external True line
|
||||
hPutStrLn h line
|
||||
hFlush h
|
||||
sendMessage :: Sendable m => ExternalState -> External -> m -> Annex ()
|
||||
sendMessage st external m = liftIO $ do
|
||||
protocolDebug external True line
|
||||
hPutStrLn h line
|
||||
hFlush h
|
||||
where
|
||||
line = unwords $ formatMessage m
|
||||
h = externalSend st
|
||||
|
||||
{- Waits for a message from the external remote, and passes it to the
|
||||
- apppropriate handler.
|
||||
-
|
||||
- If the handler returns Nothing, this is a protocol error.-}
|
||||
receiveMessage
|
||||
:: ExternalLock
|
||||
:: ExternalState
|
||||
-> External
|
||||
-> (Response -> Maybe (Annex a))
|
||||
-> (RemoteRequest -> Maybe (Annex a))
|
||||
-> (AsyncMessage -> Maybe (Annex a))
|
||||
-> Annex a
|
||||
receiveMessage lck external handleresponse handlerequest handleasync =
|
||||
go =<< fromExternal lck external externalReceive
|
||||
(liftIO . catchMaybeIO . hGetLine)
|
||||
receiveMessage st external handleresponse handlerequest handleasync =
|
||||
go =<< liftIO (catchMaybeIO $ hGetLine $ externalReceive st)
|
||||
where
|
||||
go Nothing = protocolError False ""
|
||||
go (Just s) = do
|
||||
|
@ -348,39 +344,43 @@ protocolDebug external sendto line = debugM "external" $ unwords
|
|||
, line
|
||||
]
|
||||
|
||||
{- Starts up the external remote if it's not yet running,
|
||||
- and passes a value extracted from its state to an action.
|
||||
-}
|
||||
fromExternal :: ExternalLock -> External -> (ExternalState -> v) -> (v -> Annex a) -> Annex a
|
||||
fromExternal lck external extractor a =
|
||||
go =<< liftIO (atomically (tryReadTMVar v))
|
||||
{- While the action is running, the ExternalState provided to it will not
|
||||
- be available to any other calls.
|
||||
-
|
||||
- Starts up a new process if no ExternalStates are available. -}
|
||||
withExternalState :: External -> (ExternalState -> Annex a) -> Annex a
|
||||
withExternalState external = bracket alloc dealloc
|
||||
where
|
||||
go (Just st) = run st
|
||||
go Nothing = do
|
||||
st <- startExternal external
|
||||
void $ liftIO $ atomically $ do
|
||||
void $ tryReadTMVar v
|
||||
putTMVar v st
|
||||
|
||||
{- Handle initial protocol startup; check the VERSION
|
||||
- the remote sends. -}
|
||||
receiveMessage lck external
|
||||
(const Nothing)
|
||||
(checkVersion lck external)
|
||||
(const Nothing)
|
||||
|
||||
run st
|
||||
|
||||
run st = a $ extractor st
|
||||
v = externalState external
|
||||
|
||||
{- Starts an external remote process running, but does not handle checking
|
||||
- VERSION, etc. -}
|
||||
alloc = do
|
||||
ms <- liftIO $ atomically $ do
|
||||
l <- takeTMVar v
|
||||
case l of
|
||||
[] -> do
|
||||
putTMVar v l
|
||||
return Nothing
|
||||
(st:rest) -> do
|
||||
putTMVar v rest
|
||||
return (Just st)
|
||||
maybe (startExternal external) return ms
|
||||
|
||||
dealloc st = liftIO $ atomically $ do
|
||||
l <- takeTMVar v
|
||||
putTMVar v (st:l)
|
||||
|
||||
{- Starts an external remote process running, and checks VERSION. -}
|
||||
startExternal :: External -> Annex ExternalState
|
||||
startExternal external = do
|
||||
errrelayer <- mkStderrRelayer
|
||||
g <- Annex.gitRepo
|
||||
liftIO $ do
|
||||
st <- start errrelayer =<< Annex.gitRepo
|
||||
receiveMessage st external
|
||||
(const Nothing)
|
||||
(checkVersion st external)
|
||||
(const Nothing)
|
||||
return st
|
||||
where
|
||||
start errrelayer g = liftIO $ do
|
||||
(cmd, ps) <- findShellCommand basecmd
|
||||
let basep = (proc cmd (toCommand ps))
|
||||
{ std_in = CreatePipe
|
||||
|
@ -395,17 +395,18 @@ startExternal external = do
|
|||
fileEncoding herr
|
||||
stderrelay <- async $ errrelayer herr
|
||||
checkearlytermination =<< getProcessExitCode pid
|
||||
cv <- atomically $ newTMVar $ externalDefaultConfig external
|
||||
cv <- newTMVarIO $ externalDefaultConfig external
|
||||
pv <- newTMVarIO Unprepared
|
||||
return $ ExternalState
|
||||
{ externalSend = hin
|
||||
, externalReceive = hout
|
||||
, externalShutdown = do
|
||||
cancel stderrelay
|
||||
void $ waitForProcess pid
|
||||
, externalPrepared = Unprepared
|
||||
, externalPrepared = pv
|
||||
, externalConfig = cv
|
||||
}
|
||||
where
|
||||
|
||||
basecmd = externalRemoteProgram $ externalType external
|
||||
|
||||
propgit g p = do
|
||||
|
@ -422,12 +423,17 @@ startExternal external = do
|
|||
error $ basecmd ++ " is not installed in PATH (" ++ path ++ ")"
|
||||
)
|
||||
|
||||
-- Note: Does not stop any externals that have a withExternalState
|
||||
-- action currently running.
|
||||
stopExternal :: External -> Annex ()
|
||||
stopExternal external = liftIO $ stop =<< atomically (tryReadTMVar v)
|
||||
stopExternal external = liftIO $ do
|
||||
l <- atomically $ do
|
||||
l <- takeTMVar v
|
||||
putTMVar v []
|
||||
return l
|
||||
mapM_ stop l
|
||||
where
|
||||
stop Nothing = noop
|
||||
stop (Just st) = do
|
||||
void $ atomically $ tryTakeTMVar v
|
||||
stop st = do
|
||||
hClose $ externalSend st
|
||||
hClose $ externalReceive st
|
||||
externalShutdown st
|
||||
|
@ -436,37 +442,35 @@ stopExternal external = liftIO $ stop =<< atomically (tryReadTMVar v)
|
|||
externalRemoteProgram :: ExternalType -> String
|
||||
externalRemoteProgram externaltype = "git-annex-remote-" ++ externaltype
|
||||
|
||||
checkVersion :: ExternalLock -> External -> RemoteRequest -> Maybe (Annex ())
|
||||
checkVersion lck external (VERSION v) = Just $
|
||||
checkVersion :: ExternalState -> External -> RemoteRequest -> Maybe (Annex ())
|
||||
checkVersion st external (VERSION v) = Just $
|
||||
if v `elem` supportedProtocolVersions
|
||||
then noop
|
||||
else sendMessage lck external (ERROR "unsupported VERSION")
|
||||
else sendMessage st external (ERROR "unsupported VERSION")
|
||||
checkVersion _ _ _ = Nothing
|
||||
|
||||
{- If repo has not been prepared, sends PREPARE.
|
||||
-
|
||||
- If the repo fails to prepare, or failed before, throws an exception with
|
||||
- the error message. -}
|
||||
checkPrepared :: ExternalLock -> External -> Annex ()
|
||||
checkPrepared lck external =
|
||||
fromExternal lck external externalPrepared $ \prepared ->
|
||||
case prepared of
|
||||
Prepared -> noop
|
||||
FailedPrepare errmsg -> error errmsg
|
||||
Unprepared ->
|
||||
handleRequest' lck external PREPARE Nothing $ \resp ->
|
||||
case resp of
|
||||
PREPARE_SUCCESS -> Just $
|
||||
setprepared Prepared
|
||||
PREPARE_FAILURE errmsg -> Just $ do
|
||||
setprepared $ FailedPrepare errmsg
|
||||
error errmsg
|
||||
_ -> Nothing
|
||||
checkPrepared :: ExternalState -> External -> Annex ()
|
||||
checkPrepared st external = do
|
||||
v <- liftIO $ atomically $ readTMVar $ externalPrepared st
|
||||
case v of
|
||||
Prepared -> noop
|
||||
FailedPrepare errmsg -> error errmsg
|
||||
Unprepared ->
|
||||
handleRequest' st external PREPARE Nothing $ \resp ->
|
||||
case resp of
|
||||
PREPARE_SUCCESS -> Just $
|
||||
setprepared Prepared
|
||||
PREPARE_FAILURE errmsg -> Just $ do
|
||||
setprepared $ FailedPrepare errmsg
|
||||
error errmsg
|
||||
_ -> Nothing
|
||||
where
|
||||
setprepared status = liftIO . atomically $ do
|
||||
let v = externalState external
|
||||
st <- takeTMVar v
|
||||
void $ putTMVar v $ st { externalPrepared = status }
|
||||
setprepared status = liftIO $ atomically $ void $
|
||||
swapTMVar (externalPrepared st) status
|
||||
|
||||
{- Caches the cost in the git config to avoid needing to start up an
|
||||
- external special remote every time time just to ask it what its
|
||||
|
|
28
Remote/External/Types.hs
vendored
28
Remote/External/Types.hs
vendored
|
@ -12,8 +12,6 @@ module Remote.External.Types (
|
|||
External(..),
|
||||
newExternal,
|
||||
ExternalType,
|
||||
ExternalLock,
|
||||
withExternalLock,
|
||||
ExternalState(..),
|
||||
PrepareStatus(..),
|
||||
Proto.parseMessage,
|
||||
|
@ -44,14 +42,12 @@ import qualified Utility.SimpleProtocol as Proto
|
|||
import Control.Concurrent.STM
|
||||
import Network.URI
|
||||
|
||||
-- If the remote is not yet running, the ExternalState TMVar is empty.
|
||||
data External = External
|
||||
{ externalType :: ExternalType
|
||||
, externalUUID :: UUID
|
||||
-- Empty until the remote is running.
|
||||
, externalState :: TMVar ExternalState
|
||||
-- Empty when a remote is in use.
|
||||
, externalLock :: TMVar ExternalLock
|
||||
, externalState :: TMVar [ExternalState]
|
||||
-- ^ TMVar is never left empty; list contains states for external
|
||||
-- special remote processes that are not currently in use.
|
||||
, externalDefaultConfig :: RemoteConfig
|
||||
, externalGitConfig :: RemoteGitConfig
|
||||
}
|
||||
|
@ -60,8 +56,7 @@ newExternal :: ExternalType -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex
|
|||
newExternal externaltype u c gc = liftIO $ External
|
||||
<$> pure externaltype
|
||||
<*> pure u
|
||||
<*> atomically newEmptyTMVar
|
||||
<*> atomically (newTMVar ExternalLock)
|
||||
<*> atomically (newTMVar [])
|
||||
<*> pure c
|
||||
<*> pure gc
|
||||
|
||||
|
@ -71,23 +66,14 @@ data ExternalState = ExternalState
|
|||
{ externalSend :: Handle
|
||||
, externalReceive :: Handle
|
||||
, externalShutdown :: IO ()
|
||||
, externalPrepared :: PrepareStatus
|
||||
-- Never left empty.
|
||||
, externalPrepared :: TMVar PrepareStatus
|
||||
-- ^ Never left empty.
|
||||
, externalConfig :: TMVar RemoteConfig
|
||||
-- ^ Never left empty.
|
||||
}
|
||||
|
||||
data PrepareStatus = Unprepared | Prepared | FailedPrepare ErrorMsg
|
||||
|
||||
-- Constructor is not exported, and only created by newExternal.
|
||||
data ExternalLock = ExternalLock
|
||||
|
||||
withExternalLock :: External -> (ExternalLock -> Annex a) -> Annex a
|
||||
withExternalLock external = bracketIO setup cleanup
|
||||
where
|
||||
setup = atomically $ takeTMVar v
|
||||
cleanup = atomically . putTMVar v
|
||||
v = externalLock external
|
||||
|
||||
-- Messages that can be sent to the external remote to request it do something.
|
||||
data Request
|
||||
= PREPARE
|
||||
|
|
|
@ -7,3 +7,5 @@ This should not be hard to make to use a pool of Externals, starting up a
|
|||
new one if the pool is empty or all in use. --[[Joey]]
|
||||
|
||||
[[users/yoh]] may want this for datalad?
|
||||
|
||||
> [[done]] --[[Joey]]
|
||||
|
|
Loading…
Reference in a new issue