async proto basically working

Simplified the protocol by removing END-ASYNC.

There's a STM crash when a non-async protocol message is sent, which
needs to be fixed.
This commit is contained in:
Joey Hess 2020-08-13 15:49:43 -04:00
parent c9e8cafb98
commit 7546e686a2
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 210 additions and 146 deletions

View file

@ -504,18 +504,17 @@ handleRequest' st external req mp responsehandler
withurl mk uri = handleRemoteRequest $ mk $ withurl mk uri = handleRemoteRequest $ mk $
setDownloader (show uri) OtherDownloader setDownloader (show uri) OtherDownloader
sendMessage :: Sendable m => ExternalState -> m -> Annex () sendMessage :: (Sendable m, ToAsyncWrapped m) => ExternalState -> m -> Annex ()
sendMessage st m = liftIO $ externalSend st line sendMessage st m = liftIO $ externalSend st m
where
line = unwords $ formatMessage m
sendMessageAddonProcess :: AddonProcess.ExternalAddonProcess -> String -> IO () sendMessageAddonProcess :: Sendable m => AddonProcess.ExternalAddonProcess -> m -> IO ()
sendMessageAddonProcess p line = do sendMessageAddonProcess p m = do
AddonProcess.protocolDebug p True line AddonProcess.protocolDebug p True line
hPutStrLn h line hPutStrLn h line
hFlush h hFlush h
where where
h = AddonProcess.externalSend p h = AddonProcess.externalSend p
line = unwords $ formatMessage m
receiveMessageAddonProcess :: AddonProcess.ExternalAddonProcess -> IO (Maybe String) receiveMessageAddonProcess :: AddonProcess.ExternalAddonProcess -> IO (Maybe String)
receiveMessageAddonProcess p = do receiveMessageAddonProcess p = do
@ -551,7 +550,7 @@ receiveMessage
receiveMessage st external handleresponse handlerequest handleexceptional = receiveMessage st external handleresponse handlerequest handleexceptional =
go =<< liftIO (externalReceive st) go =<< liftIO (externalReceive st)
where where
go Nothing = protocolError False "" go Nothing = protocolError False "<EOF>"
go (Just s) = case parseMessage s :: Maybe Response of go (Just s) = case parseMessage s :: Maybe Response of
Just resp -> case handleresponse resp of Just resp -> case handleresponse resp of
Nothing -> protocolError True s Nothing -> protocolError True s

View file

@ -1,16 +1,19 @@
{- External remote protocol async extension. {- External remote protocol async extension.
- -
- Copyright 2013-2020 Joey Hess <id@joeyh.name> - Copyright 2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE BangPatterns #-}
module Remote.External.AsyncExtension where module Remote.External.AsyncExtension (runRelayToExternalAsync) where
import Common import Common
import Messages
import Remote.External.Types import Remote.External.Types
import Utility.SimpleProtocol as Proto
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
@ -22,14 +25,29 @@ import qualified Data.Map.Strict as M
-- process. -- process.
runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay
runRelayToExternalAsync external st = do runRelayToExternalAsync external st = do
startcomm <- runRelayToExternalAsync' external st jidmap <- newTVarIO M.empty
mapjid <- newTVarIO M.empty
commcounter <- newTVarIO 0
newconns <- newTVarIO []
sendq <- newSendQueue
void $ async $ sendloop st newconns mapjid jidmap sendq
void $ async $ receiveloop external st newconns jidmap mapjid sendq
return $ ExternalAsyncRelay $ do return $ ExternalAsyncRelay $ do
(sendh, receiveh, shutdown) <- startcomm n <- atomically $ do
n <- readTVar commcounter
let n' = succ n
writeTVar commcounter n'
return n'
receiveq <- newReceiveQueue
return $ ExternalState return $ ExternalState
{ externalSend = atomically . writeTBMChan sendh { externalSend = \msg ->
, externalReceive = fmap join $ atomically $ readTBMChan receiveh atomically $ writeTBMChan sendq
( toAsyncWrapped msg
, (n, receiveq)
)
, externalReceive = atomically (readTBMChan receiveq)
-- This shuts down the whole relay. -- This shuts down the whole relay.
, externalShutdown = shutdown , externalShutdown = shutdown external st sendq
-- These three TVars are shared amoung all -- These three TVars are shared amoung all
-- ExternalStates that use this relay; they're -- ExternalStates that use this relay; they're
-- common state about the external process. -- common state about the external process.
@ -40,71 +58,59 @@ runRelayToExternalAsync external st = do
, externalConfigChanges = externalConfigChanges st , externalConfigChanges = externalConfigChanges st
} }
runRelayToExternalAsync' type ReceiveQueue = TBMChan String
:: External
-> ExternalState type SendQueue = TBMChan (AsyncWrapped, Conn)
-> IO (IO (TBMChan String, TBMChan (Maybe String), Bool -> IO ()))
runRelayToExternalAsync' external st = do type ConnNum = Integer
newreqs <- newTVarIO []
startedcomms <- newTVarIO [] type Conn = (ConnNum, ReceiveQueue)
let startcomm = do
toq <- newTBMChanIO 10 type NewConns = TVar [Conn]
fromq <- newTBMChanIO 10
let c = (toq, fromq, shutdown) type MapJid = TVar (M.Map ConnNum JobId)
atomically $ do
l <- readTVar startedcomms type JidMap = TVar (M.Map JobId Conn)
-- This append is ok because the maximum size
-- is the number of jobs that git-annex is newReceiveQueue :: IO ReceiveQueue
-- configured to use, which is a relatively newReceiveQueue = newTBMChanIO 10
-- small number.
writeTVar startedcomms (l ++ [c]) newSendQueue :: IO SendQueue
return c newSendQueue = newTBMChanIO 10
void $ async $ sendloop newreqs startedcomms
void $ async $ receiveloop newreqs M.empty receiveloop :: External -> ExternalState -> NewConns -> JidMap -> MapJid -> SendQueue -> IO ()
return startcomm receiveloop external st newconns jidmap mapjid sendq = externalReceive st >>= \case
Just l -> case parseMessage l :: Maybe AsyncMessage of
Just (RESULT_ASYNC msg) -> getnext newconns >>= \case
Just (_n, c) -> do
relayto c msg
loop
Nothing -> abort "unexpected RESULT-ASYNC"
Just (START_ASYNC jid) -> getnext newconns >>= \case
Just v@(n, _c) -> do
atomically $ do
modifyTVar' jidmap $ M.insert jid v
modifyTVar' mapjid $ M.insert n jid
loop
Nothing -> abort "unexpected START-ASYNC"
Just (ASYNC jid msg) -> getjid jid >>= \case
Just (_n, c) -> do
relayto c msg
loop
Nothing -> abort "ASYNC with unknown jobid"
_ -> abort "unexpected non-async message"
Nothing -> do
-- Unable to receive anything more from the
-- process, so it's not usable any longer.
m <- readTVarIO jidmap
forM_ (M.elems m) (closerelayto . snd)
shutdown external st sendq True
where where
receiveloop newreqs jidmap = externalReceive st >>= \case loop = receiveloop external st newconns jidmap mapjid sendq
Just l -> case parseMessage l :: Maybe AsyncMessage of
Just (RESULT_ASYNC msg) -> getnext newreqs >>= \case
Just c -> do
relayto c msg
receiveloop newreqs jidmap
Nothing -> protoerr "unexpected RESULT-ASYNC"
Just (START_ASYNC jid msg) -> getnext newreqs >>= \case
Just c -> do
relayto c msg
let !jidmap' = M.insert jid c jidmap
receiveloop newreqs jidmap'
Nothing -> protoerr "unexpected START-ASYNC"
Just (END_ASYNC jid msg) -> case M.lookup jid jidmap of
Just c -> do
relayto c msg
closerelayto c
let !jidmap' = M.delete jid jidmap
receiveloop newreqs jidmap'
Nothing -> protoerr "END-ASYNC with unknown jobid"
Just (ASYNC jid msg) -> case M.lookup jid jidmap of
Just c -> do
relayto c msg
let !jidmap' = M.delete jid jidmap
receiveloop newreqs jidmap'
Nothing -> protoerr "ASYNC with unknown jobid"
_ -> protoerr "unexpected non-async message"
Nothing -> do
-- Unable to receive anything more from the
-- process, so it's not usable any longer.
forM_ (M.elems jidmap) closerelayto
shutdown True
sendloop newreqs startedcomms = do relayto q msg = atomically $ writeTBMChan q msg
error "TODO"
relayto (toq, _fromq) msg = closerelayto q = atomically $ closeTBMChan q
atomically $ writeTBMChan toq msg
closerelayto (toq, fromq) = do
atomically $ closeTBMChan toq
atomically $ closeTBMChan fromq
getnext l = atomically $ readTVar l >>= \case getnext l = atomically $ readTVar l >>= \case
[] -> return Nothing [] -> return Nothing
@ -112,14 +118,56 @@ runRelayToExternalAsync' external st = do
writeTVar l rest writeTVar l rest
return (Just c) return (Just c)
shutdown b = do getjid jid = M.lookup jid <$> readTVarIO jidmap
r <- atomically $ do
r <- tryTakeTMVar (externalAsync external) abort s = do
putTMVar (externalAsync external) warningIO (protoerr s)
UncheckedExternalAsync shutdown external st sendq True
return r
case r of sendloop :: ExternalState -> NewConns -> MapJid -> JidMap -> SendQueue -> IO ()
Just (ExternalAsync _) -> externalShutdown st b sendloop st newconns mapjid jidmap sendq = atomically (readTBMChan sendq) >>= \case
_ -> noop Just (wrappedmsg, c@(n, _)) -> do
let newconn = atomically $ do
protoerr s = giveup ("async special remote protocol error: " ++ s) -- This append is not too expensive,
-- because the list length is limited
-- to the maximum number of jobs.
modifyTVar' newconns (++[c])
M.lookup n <$> readTVar mapjid >>= \case
Nothing -> return ()
Just jid -> do
modifyTVar' jidmap (M.delete jid)
modifyTVar' mapjid (M.delete n)
case wrappedmsg of
AsyncWrappedRequest msg -> do
newconn
externalSend st msg
AsyncWrappedExceptionalMessage msg -> do
newconn
externalSend st msg
AsyncWrappedRemoteResponse msg ->
externalSend st =<< wrapremoteresponse msg n
AsyncWrappedAsyncReply msg ->
externalSend st msg
sendloop st newconns mapjid jidmap sendq
Nothing -> return ()
where
wrapremoteresponse msg n =
M.lookup n <$> readTVarIO mapjid >>= \case
Just jid -> return $ REPLY_ASYNC jid $
unwords $ Proto.formatMessage msg
Nothing -> error "failed to find jobid"
shutdown :: External -> ExternalState -> SendQueue -> Bool -> IO ()
shutdown external st sendq b = do
r <- atomically $ do
r <- tryTakeTMVar (externalAsync external)
putTMVar (externalAsync external)
UncheckedExternalAsync
return r
case r of
Just (ExternalAsync _) -> externalShutdown st b
_ -> noop
atomically $ closeTBMChan sendq
protoerr :: String -> String
protoerr s = "async external special remote protocol error: " ++ s

View file

@ -5,7 +5,7 @@
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
{-# LANGUAGE FlexibleInstances, TypeSynonymInstances #-} {-# LANGUAGE FlexibleInstances, TypeSynonymInstances, RankNTypes #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-orphans #-}
@ -32,6 +32,10 @@ module Remote.External.Types (
RemoteResponse(..), RemoteResponse(..),
ExceptionalMessage(..), ExceptionalMessage(..),
AsyncMessage(..), AsyncMessage(..),
AsyncReply(..),
AsyncWrapped(..),
ToAsyncWrapped(..),
JobId,
ErrorMsg, ErrorMsg,
Setting, Setting,
Description, Description,
@ -82,15 +86,14 @@ newExternal externaltype u c gc rs = liftIO $ External
type ExternalType = String type ExternalType = String
data ExternalState data ExternalState = ExternalState
= ExternalState { externalSend :: forall t. (Proto.Sendable t, ToAsyncWrapped t) => t -> IO ()
{ externalSend :: String -> IO () , externalReceive :: IO (Maybe String)
, externalReceive :: IO (Maybe String) , externalShutdown :: Bool -> IO ()
, externalShutdown :: Bool -> IO () , externalPrepared :: TVar PrepareStatus
, externalPrepared :: TVar PrepareStatus , externalConfig :: TVar ParsedRemoteConfig
, externalConfig :: TVar ParsedRemoteConfig , externalConfigChanges :: TVar (RemoteConfig -> RemoteConfig)
, externalConfigChanges :: TVar (RemoteConfig -> RemoteConfig) }
}
type PID = Int type PID = Int
@ -360,22 +363,47 @@ instance Proto.Receivable ExceptionalMessage where
parseCommand "ERROR" = Proto.parse1 ERROR parseCommand "ERROR" = Proto.parse1 ERROR
parseCommand _ = Proto.parseFail parseCommand _ = Proto.parseFail
-- Messages used by the async protocol extension. -- Messages sent by the special remote in the async protocol extension.
data AsyncMessage data AsyncMessage
= START_ASYNC JobId WrappedMsg = START_ASYNC JobId
| END_ASYNC JobId WrappedMsg
| RESULT_ASYNC WrappedMsg
| ASYNC JobId WrappedMsg | ASYNC JobId WrappedMsg
| REPLY_ASYNC JobId WrappedMsg | RESULT_ASYNC WrappedMsg
-- Reply sent in the async protocol extension.
data AsyncReply
= REPLY_ASYNC JobId WrappedMsg
instance Proto.Receivable AsyncMessage where instance Proto.Receivable AsyncMessage where
parseCommand "START-ASYNC" = Proto.parse2 START_ASYNC parseCommand "START-ASYNC" = Proto.parse1 START_ASYNC
parseCommand "END-ASYNC" = Proto.parse2 END_ASYNC
parseCommand "RESULT-ASYNC" = Proto.parse1 RESULT_ASYNC
parseCommand "ASYNC" = Proto.parse2 ASYNC parseCommand "ASYNC" = Proto.parse2 ASYNC
parseCommand "REPLY-ASYNC" = Proto.parse2 REPLY_ASYNC parseCommand "RESULT-ASYNC" = Proto.parse1 RESULT_ASYNC
parseCommand _ = Proto.parseFail parseCommand _ = Proto.parseFail
instance Proto.Sendable AsyncReply where
formatMessage (REPLY_ASYNC jid msg) = ["REPLY-ASYNC", jid, msg]
data AsyncWrapped
= AsyncWrappedRemoteResponse RemoteResponse
| AsyncWrappedRequest Request
| AsyncWrappedExceptionalMessage ExceptionalMessage
| AsyncWrappedAsyncReply AsyncReply
class ToAsyncWrapped t where
toAsyncWrapped :: t -> AsyncWrapped
-- | RemoteResponse is sent wrapped in an async message.
instance ToAsyncWrapped RemoteResponse where
toAsyncWrapped = AsyncWrappedRemoteResponse
instance ToAsyncWrapped Request where
toAsyncWrapped = AsyncWrappedRequest
instance ToAsyncWrapped ExceptionalMessage where
toAsyncWrapped = AsyncWrappedExceptionalMessage
instance ToAsyncWrapped AsyncReply where
toAsyncWrapped = AsyncWrappedAsyncReply
-- Data types used for parameters when communicating with the remote. -- Data types used for parameters when communicating with the remote.
-- All are serializable. -- All are serializable.
type ErrorMsg = String type ErrorMsg = String

View file

@ -61,27 +61,27 @@ transfer too:
TRANSFER RETRIEVE 2 file2 TRANSFER RETRIEVE 2 file2
START-ASYNC 2 START-ASYNC 2
If it needs to query git-annex for some information, the special remote When the special remote sends a message, such as PROGRESS, it has to
can use `ASYNC` to send a message, and wait for git-annex to reply wrap it in ASYNC, to specify the job identifier.
in a `REPLY-ASYNC` message:
ASYNC 1 GETCONFIG url
REPLY-ASYNC 1 VALUE http://example.com/
To indicate progress of transfers, the special remote can send
`ASYNC` messages, wrapping the usual PROGRESS messages:
ASYNC 1 PROGRESS 10 ASYNC 1 PROGRESS 10
ASYNC 2 PROGRESS 500 ASYNC 2 PROGRESS 500
ASYNC 1 PROGRESS 20 ASYNC 1 PROGRESS 20
Once a transfer is done, the special remote indicates this with an This can also be used to query git-annex for some information.
`END-ASYNC` message, wrapping the usual `TRANSFER-SUCCESS` or The reply to a query is eventually sent back wrapped in `REPLY-ASYNC`.
`TRANSFER-FAILURE` message:
END-ASYNC 2 TRANSFER-SUCCESS RETRIEVE Key2 ASYNC 1 GETCONFIG url
TRANSFER RETRIEVE 3 file3
REPLY-ASYNC 1 VALUE http://example.com/
Once a transfer is done, the special remote indicates this by
wrapping the usual `TRANSFER-SUCCESS` or
`TRANSFER-FAILURE` message in `ASYNC`.
ASYNC 2 TRANSFER-SUCCESS RETRIEVE Key2
ASYNC Key1 PROGRESS 100 ASYNC Key1 PROGRESS 100
END-ASYNC 1 TRANSFER-SUCCESS RETRIEVE Key1 ASYNC 1 TRANSFER-SUCCESS RETRIEVE Key1
Not only transfers, but everything the special remote sends to git-annex Not only transfers, but everything the special remote sends to git-annex
has to be wrapped in the async protocol. has to be wrapped in the async protocol.
@ -90,11 +90,11 @@ has to be wrapped in the async protocol.
START-ASYNC 3 START-ASYNC 3
CHECKPRESENT Key4 CHECKPRESENT Key4
START-ASYNC 4 START-ASYNC 4
END-ASYNC 3 CHECKPRESENT-SUCCESS Key3 ASYNC 3 CHECKPRESENT-SUCCESS Key3
REMOVE Key3 REMOVE Key3
END-ASYNC 4 CHECKPRESENT-FAILURE Key4 ASYNC 4 CHECKPRESENT-FAILURE Key4
START_ASYNC 5 START-ASYNC 5
END-ASYNC 5 REMOVE-SUCCESS Key3 ASYNC 5 REMOVE-SUCCESS Key3
## added messages ## added messages
@ -108,27 +108,16 @@ Here's the details about the additions to the protocol.
This does not need to be sent immediately after git-annex sends a request; This does not need to be sent immediately after git-annex sends a request;
other messages can be sent in between. But the next START-ASYNC git-annex sees other messages can be sent in between. But the next START-ASYNC git-annex sees
after sending a request tells it the JobId that will be used for that request. after sending a request tells it the JobId that will be used for that request.
* `END-ASYNC JobId ReplyMsg` * `ASYNC JobId Msg`
Indicates that an async job is complete. The ReplyMsg indicates the result All the usual protocol messages that are sent by the external special
of the job, and is anything that would be sent as a protocol reply in the remote must be wrapped in this, to specify which job the message relates
non-async protocol. to.
After this, the JobId is not in use, an indeed the same value could be * `RESULT-ASYNC ResultMsg`
reused by a new `START-ASYNC` if desired.
* `RESULT-ASYNC ReplyMsg`
This is the same as sending `START-ASYNC` immediately followed by This is the same as sending `START-ASYNC` immediately followed by
`END-ASYNC`. This is often used to respond to `PREPARE`, `LISTCONFIGS`, `ASYNC` with a result message. This is often used to respond to
and other things that are trivial or just don't need to be handled async. `PREPARE` other things that are trivial or just don't need to be handled
* `ASYNC JobId InfoMsg` async.
Used to send any of the [special remote messages](https://git-annex.branchable.com/design/external_special_remote_protocol/#index5h2)
to git-annex.
Often used to send `PROGRESS`, but can also be used for other messages,
including ones that git-annex sends a reply to. When git-annex does send
a reply,
it will be wrapped in `REPLY-ASYNC`.
Can be sent at any time aftwr `START-ASYNC` and before `END-ASYNC` for
the JobId in question.
* `REPLY-ASYNC JobId Reply` * `REPLY-ASYNC JobId Reply`
Sent by git-annex when `ASYNC` has been sent and the message generated Sent by git-annex when an `ASYNC` requested a reply.
a reply. Note that this may not be the next message received from Note that this may not be the next message received from
git-annex immediately after sending an `ASYNC` request. git-annex immediately after sending an `ASYNC` request.