one jobid per thread

And, relay ERROR on to all listening threads.
This commit is contained in:
Joey Hess 2020-08-14 14:24:46 -04:00
parent 72561563d9
commit 7da2d4dd2d
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
2 changed files with 51 additions and 50 deletions

View file

@ -28,15 +28,20 @@ runRelayToExternalAsync external st = do
jidmap <- newTVarIO M.empty jidmap <- newTVarIO M.empty
sendq <- newSendQueue sendq <- newSendQueue
nextjid <- newTVarIO (JobId 1) nextjid <- newTVarIO (JobId 1)
void $ async $ sendloop st nextjid jidmap sendq void $ async $ sendloop st sendq
void $ async $ receiveloop external st jidmap sendq void $ async $ receiveloop external st jidmap sendq
return $ ExternalAsyncRelay $ do return $ ExternalAsyncRelay $ do
jidv <- newTVarIO Nothing
receiveq <- newReceiveQueue receiveq <- newReceiveQueue
jid <- atomically $ do
jid@(JobId n) <- readTVar nextjid
let !jid' = JobId (succ n)
writeTVar nextjid jid'
modifyTVar' jidmap $ M.insert jid receiveq
return jid
return $ ExternalState return $ ExternalState
{ externalSend = \msg -> { externalSend = \msg ->
atomically $ writeTBMChan sendq atomically $ writeTBMChan sendq
(toAsyncWrapped msg, (jidv, receiveq)) (toAsyncWrapped msg, jid)
, externalReceive = atomically (readTBMChan receiveq) , externalReceive = atomically (readTBMChan receiveq)
-- This shuts down the whole relay. -- This shuts down the whole relay.
, externalShutdown = shutdown external st sendq , externalShutdown = shutdown external st sendq
@ -52,13 +57,9 @@ runRelayToExternalAsync external st = do
type ReceiveQueue = TBMChan String type ReceiveQueue = TBMChan String
type SendQueue = TBMChan (AsyncWrapped, Conn) type SendQueue = TBMChan (AsyncWrapped, JobId)
type Conn = (TVar (Maybe JobId), ReceiveQueue) type JidMap = TVar (M.Map JobId ReceiveQueue)
type JidMap = TVar (M.Map JobId Conn)
type NextJid = TVar JobId
newReceiveQueue :: IO ReceiveQueue newReceiveQueue :: IO ReceiveQueue
newReceiveQueue = newTBMChanIO 10 newReceiveQueue = newTBMChanIO 10
@ -71,11 +72,18 @@ receiveloop external st jidmap sendq = externalReceive st >>= \case
Just l -> case parseMessage l :: Maybe AsyncMessage of Just l -> case parseMessage l :: Maybe AsyncMessage of
Just (AsyncMessage jid msg) -> Just (AsyncMessage jid msg) ->
M.lookup jid <$> readTVarIO jidmap >>= \case M.lookup jid <$> readTVarIO jidmap >>= \case
Just (_jidv, c) -> do Just c -> do
atomically $ writeTBMChan c msg atomically $ writeTBMChan c msg
receiveloop external st jidmap sendq receiveloop external st jidmap sendq
Nothing -> protoerr "unknown job number" Nothing -> protoerr "unknown job number"
_ -> protoerr "unexpected non-async message" Nothing -> case parseMessage l :: Maybe ExceptionalMessage of
Just msg -> do
-- ERROR is relayed to all listeners
m <- readTVarIO jidmap
forM (M.elems m) $ \c ->
atomically $ writeTBMChan c l
receiveloop external st jidmap sendq
Nothing -> protoerr "unexpected non-async message"
Nothing -> closeandshutdown Nothing -> closeandshutdown
where where
protoerr s = do protoerr s = do
@ -85,30 +93,21 @@ receiveloop external st jidmap sendq = externalReceive st >>= \case
closeandshutdown = do closeandshutdown = do
shutdown external st sendq True shutdown external st sendq True
m <- atomically $ readTVar jidmap m <- atomically $ readTVar jidmap
forM_ (M.elems m) (atomically . closeTBMChan . snd) forM_ (M.elems m) (atomically . closeTBMChan)
sendloop :: ExternalState -> NextJid -> JidMap -> SendQueue -> IO () sendloop :: ExternalState -> SendQueue -> IO ()
sendloop st nextjid jidmap sendq = atomically (readTBMChan sendq) >>= \case sendloop st sendq = atomically (readTBMChan sendq) >>= \case
Just (wrappedmsg, conn@(jidv, _)) -> do Just (wrappedmsg, jid) -> do
case wrappedmsg of case wrappedmsg of
AsyncWrappedRequest msg -> do
jid <- atomically $ do
jid@(JobId n) <- readTVar nextjid
let !jid' = JobId (succ n)
writeTVar nextjid jid'
writeTVar jidv (Just jid)
modifyTVar' jidmap $ M.insert jid conn
return jid
externalSend st $ wrapjid msg jid
AsyncWrappedRemoteResponse msg -> AsyncWrappedRemoteResponse msg ->
readTVarIO jidv >>= \case externalSend st $ wrapjid msg jid
Just jid -> externalSend st $ wrapjid msg jid AsyncWrappedRequest msg ->
Nothing -> error "failed to find jid" externalSend st $ wrapjid msg jid
AsyncWrappedExceptionalMessage msg -> AsyncWrappedExceptionalMessage msg ->
externalSend st msg externalSend st msg
AsyncWrappedAsyncMessage msg -> AsyncWrappedAsyncMessage msg ->
externalSend st msg externalSend st msg
sendloop st nextjid jidmap sendq sendloop st sendq
Nothing -> return () Nothing -> return ()
where where
wrapjid msg jid = AsyncMessage jid $ unwords $ Proto.formatMessage msg wrapjid msg jid = AsyncMessage jid $ unwords $ Proto.formatMessage msg

View file

@ -29,7 +29,7 @@ that includes `ASYNC`, and the external special remote responding in kind.
EXTENSIONS ASYNC EXTENSIONS ASYNC
From this point forward, every message in the protocol is tagged with a job From this point forward, every message in the protocol is tagged with a job
number, by prefixing it with "J n". number, by prefixing it with "J n".
As usual, the first message git-annex sends is generally PREPARE: As usual, the first message git-annex sends is generally PREPARE:
@ -43,31 +43,31 @@ included in the reply:
Suppose git-annex wants to make some transfers. It can request several Suppose git-annex wants to make some transfers. It can request several
at the same time, using different job numbers: at the same time, using different job numbers:
J 2 TRANSFER RETRIEVE Key1 file1 J 1 TRANSFER RETRIEVE Key1 file1
J 3 TRANSFER RETRIEVE Key2 file2 J 2 TRANSFER RETRIEVE Key2 file2
The special remote can now perform both transfers at the same time. The special remote can now perform both transfers at the same time.
If it sends PROGRESS messages for these transfers, they have to be tagged If it sends PROGRESS messages for these transfers, they have to be tagged
with the job number: with the job number:
J 2 PROGRESS 10 J 1 PROGRESS 10
J 3 PROGRESS 500 J 2 PROGRESS 500
J 2 PROGRESS 20 J 1 PROGRESS 20
The special remote can also send messages that query git-annex for some The special remote can also send messages that query git-annex for some
information. These messages and the reply will also be tagged with a job information. These messages and the reply will also be tagged with a job
number. number.
J 2 GETCONFIG url J 1 GETCONFIG url
J 4 RETRIEVE Key3 file3 J 3 RETRIEVE Key3 file3
J 2 VALUE http://example.com/ J 1 VALUE http://example.com/
One transfers are done, the special remote sends `TRANSFER-SUCCESS` tagged One transfers are done, the special remote sends `TRANSFER-SUCCESS` tagged
with the job number. with the job number.
J 3 TRANSFER-SUCCESS RETRIEVE Key2 J 2 TRANSFER-SUCCESS RETRIEVE Key2
J 2 PROGRESS 100 J 1 PROGRESS 100
J 2 TRANSFER-SUCCESS RETRIEVE Key1 J 1 TRANSFER-SUCCESS RETRIEVE Key1
Lots of different jobs can be requested at the same time. Lots of different jobs can be requested at the same time.
@ -78,9 +78,6 @@ Lots of different jobs can be requested at the same time.
J 6 REMOVE-SUCCESS Key3 J 6 REMOVE-SUCCESS Key3
J 5 CHECKPRESENT-FAILURE Key4 J 5 CHECKPRESENT-FAILURE Key4
A job always starts with a request by git-annex, and once the special
remote sends a reply -- or replies -- to that request, the job is done.
An example of sending multiple replies to a request is `LISTCONFIGS`, eg: An example of sending multiple replies to a request is `LISTCONFIGS`, eg:
J 7 LISTCONFIGS J 7 LISTCONFIGS
@ -88,10 +85,15 @@ An example of sending multiple replies to a request is `LISTCONFIGS`, eg:
J 7 CONFIG bar other config J 7 CONFIG bar other config
J 7 CONFIGEND J 7 CONFIGEND
Job numbers are not reused within a given run of a special remote, ## notes
but once git-annex has seen the last message it expects for a job,
sending other messages tagged with that job number will be rejected
as a protocol error.
To avoid overflow, job numbers should be treated as at least 64 bit There will generally be one job for each thread that git-annex runs
values (or as strings) by the special remote. concurrently, so around the same number as the -J value, although in some
cases git-annex does more concurrent operations than the -J value.
`PREPARE` is sent only once per run of a special remote
program, and despite being tagged with a job number, it should prepare the
special remote to run that and any other jobs.
`ERROR` should not be tagged with a job number if either git-annex
or the special remote needs to send it.