switch to TMVars for thread safety when using the async extension
TVars were not updated atomically, which was ok when each thread got its own External that was the only thing using these TVars. But, with the async extension, several External instances can share the same var, so it needs to be a TMVar to avoid read/write conflicts. In particular, this makes PREPARE only be sent once.
This commit is contained in:
parent
7da2d4dd2d
commit
198b709561
4 changed files with 37 additions and 35 deletions
|
@ -6,6 +6,7 @@
|
||||||
-}
|
-}
|
||||||
|
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
{-# LANGUAGE OverloadedStrings #-}
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
module Remote.External (remote) where
|
module Remote.External (remote) where
|
||||||
|
|
||||||
|
@ -195,7 +196,7 @@ externalSetup _ mu _ c gc = do
|
||||||
-- responding to INITREMOTE need to be applied to
|
-- responding to INITREMOTE need to be applied to
|
||||||
-- the RemoteConfig.
|
-- the RemoteConfig.
|
||||||
changes <- withExternalState external $
|
changes <- withExternalState external $
|
||||||
liftIO . atomically . readTVar . externalConfigChanges
|
liftIO . atomically . readTMVar . externalConfigChanges
|
||||||
return (changes c')
|
return (changes c')
|
||||||
|
|
||||||
gitConfigSpecialRemote u c'' [("externaltype", externaltype)]
|
gitConfigSpecialRemote u c'' [("externaltype", externaltype)]
|
||||||
|
@ -407,28 +408,28 @@ handleRequest' st external req mp responsehandler
|
||||||
send $ VALUE $ fromRawFilePath $ hashDirLower def k
|
send $ VALUE $ fromRawFilePath $ hashDirLower def k
|
||||||
handleRemoteRequest (SETCONFIG setting value) =
|
handleRemoteRequest (SETCONFIG setting value) =
|
||||||
liftIO $ atomically $ do
|
liftIO $ atomically $ do
|
||||||
modifyTVar' (externalConfig st) $ \(ParsedRemoteConfig m c) ->
|
ParsedRemoteConfig m c <- takeTMVar (externalConfig st)
|
||||||
let m' = M.insert
|
let !m' = M.insert
|
||||||
(Accepted setting)
|
(Accepted setting)
|
||||||
(RemoteConfigValue (PassedThrough value))
|
(RemoteConfigValue (PassedThrough value))
|
||||||
m
|
m
|
||||||
c' = M.insert
|
let !c' = M.insert
|
||||||
(Accepted setting)
|
(Accepted setting)
|
||||||
(Accepted value)
|
(Accepted value)
|
||||||
c
|
c
|
||||||
in ParsedRemoteConfig m' c'
|
putTMVar (externalConfig st) (ParsedRemoteConfig m' c')
|
||||||
modifyTVar' (externalConfigChanges st) $ \f ->
|
f <- takeTMVar (externalConfigChanges st)
|
||||||
M.insert (Accepted setting) (Accepted value) . f
|
let !f' = M.insert (Accepted setting) (Accepted value) . f
|
||||||
|
putTMVar (externalConfigChanges st) f'
|
||||||
handleRemoteRequest (GETCONFIG setting) = do
|
handleRemoteRequest (GETCONFIG setting) = do
|
||||||
value <- maybe "" fromProposedAccepted
|
value <- maybe "" fromProposedAccepted
|
||||||
. (M.lookup (Accepted setting))
|
. (M.lookup (Accepted setting))
|
||||||
. unparsedRemoteConfig
|
. unparsedRemoteConfig
|
||||||
<$> liftIO (atomically $ readTVar $ externalConfig st)
|
<$> liftIO (atomically $ readTMVar $ externalConfig st)
|
||||||
send $ VALUE value
|
send $ VALUE value
|
||||||
handleRemoteRequest (SETCREDS setting login password) = case (externalUUID external, externalGitConfig external) of
|
handleRemoteRequest (SETCREDS setting login password) = case (externalUUID external, externalGitConfig external) of
|
||||||
(Just u, Just gc) -> do
|
(Just u, Just gc) -> do
|
||||||
let v = externalConfig st
|
pc <- liftIO $ atomically $ takeTMVar (externalConfig st)
|
||||||
pc <- liftIO $ atomically $ readTVar v
|
|
||||||
pc' <- setRemoteCredPair' pc encryptionAlreadySetup gc
|
pc' <- setRemoteCredPair' pc encryptionAlreadySetup gc
|
||||||
(credstorage setting u)
|
(credstorage setting u)
|
||||||
(Just (login, password))
|
(Just (login, password))
|
||||||
|
@ -437,13 +438,14 @@ handleRequest' st external req mp responsehandler
|
||||||
(unparsedRemoteConfig pc')
|
(unparsedRemoteConfig pc')
|
||||||
(unparsedRemoteConfig pc)
|
(unparsedRemoteConfig pc)
|
||||||
void $ liftIO $ atomically $ do
|
void $ liftIO $ atomically $ do
|
||||||
_ <- swapTVar v pc'
|
putTMVar (externalConfig st) pc'
|
||||||
modifyTVar' (externalConfigChanges st) $ \f ->
|
f <- takeTMVar (externalConfigChanges st)
|
||||||
M.union configchanges . f
|
let !f' = M.union configchanges . f
|
||||||
|
putTMVar (externalConfigChanges st) f'
|
||||||
_ -> senderror "cannot send SETCREDS here"
|
_ -> senderror "cannot send SETCREDS here"
|
||||||
handleRemoteRequest (GETCREDS setting) = case (externalUUID external, externalGitConfig external) of
|
handleRemoteRequest (GETCREDS setting) = case (externalUUID external, externalGitConfig external) of
|
||||||
(Just u, Just gc) -> do
|
(Just u, Just gc) -> do
|
||||||
c <- liftIO $ atomically $ readTVar $ externalConfig st
|
c <- liftIO $ atomically $ readTMVar $ externalConfig st
|
||||||
creds <- fromMaybe ("", "") <$>
|
creds <- fromMaybe ("", "") <$>
|
||||||
getRemoteCredPair c gc (credstorage setting u)
|
getRemoteCredPair c gc (credstorage setting u)
|
||||||
send $ CREDS (fst creds) (snd creds)
|
send $ CREDS (fst creds) (snd creds)
|
||||||
|
@ -649,9 +651,9 @@ startExternal' external = do
|
||||||
]
|
]
|
||||||
_ -> giveup err
|
_ -> giveup err
|
||||||
Right p -> do
|
Right p -> do
|
||||||
cv <- liftIO $ newTVarIO $ externalDefaultConfig external
|
cv <- liftIO $ newTMVarIO $ externalDefaultConfig external
|
||||||
ccv <- liftIO $ newTVarIO id
|
ccv <- liftIO $ newTMVarIO id
|
||||||
pv <- liftIO $ newTVarIO Unprepared
|
pv <- liftIO $ newTMVarIO Unprepared
|
||||||
let st = ExternalState
|
let st = ExternalState
|
||||||
{ externalSend = sendMessageAddonProcess p
|
{ externalSend = sendMessageAddonProcess p
|
||||||
, externalReceive = receiveMessageAddonProcess p
|
, externalReceive = receiveMessageAddonProcess p
|
||||||
|
@ -706,10 +708,12 @@ checkVersion _ _ = Nothing
|
||||||
- the error message. -}
|
- the error message. -}
|
||||||
checkPrepared :: ExternalState -> External -> Annex ()
|
checkPrepared :: ExternalState -> External -> Annex ()
|
||||||
checkPrepared st external = do
|
checkPrepared st external = do
|
||||||
v <- liftIO $ atomically $ readTVar $ externalPrepared st
|
v <- liftIO $ atomically $ takeTMVar $ externalPrepared st
|
||||||
case v of
|
case v of
|
||||||
Prepared -> noop
|
Prepared -> setprepared Prepared
|
||||||
FailedPrepare errmsg -> giveup errmsg
|
FailedPrepare errmsg -> do
|
||||||
|
setprepared (FailedPrepare errmsg)
|
||||||
|
giveup errmsg
|
||||||
Unprepared ->
|
Unprepared ->
|
||||||
handleRequest' st external PREPARE Nothing $ \resp ->
|
handleRequest' st external PREPARE Nothing $ \resp ->
|
||||||
case resp of
|
case resp of
|
||||||
|
@ -722,8 +726,8 @@ checkPrepared st external = do
|
||||||
giveup errmsg'
|
giveup errmsg'
|
||||||
_ -> Nothing
|
_ -> Nothing
|
||||||
where
|
where
|
||||||
setprepared status = liftIO $ atomically $ void $
|
setprepared status = liftIO $ atomically $
|
||||||
swapTVar (externalPrepared st) status
|
putTMVar (externalPrepared st) status
|
||||||
|
|
||||||
respErrorMessage :: String -> String -> String
|
respErrorMessage :: String -> String -> String
|
||||||
respErrorMessage req err
|
respErrorMessage req err
|
||||||
|
|
4
Remote/External/AsyncExtension.hs
vendored
4
Remote/External/AsyncExtension.hs
vendored
|
@ -45,11 +45,9 @@ runRelayToExternalAsync external st = do
|
||||||
, externalReceive = atomically (readTBMChan receiveq)
|
, externalReceive = atomically (readTBMChan receiveq)
|
||||||
-- This shuts down the whole relay.
|
-- This shuts down the whole relay.
|
||||||
, externalShutdown = shutdown external st sendq
|
, externalShutdown = shutdown external st sendq
|
||||||
-- These three TVars are shared amoung all
|
-- These three TMVars are shared amoung all
|
||||||
-- ExternalStates that use this relay; they're
|
-- ExternalStates that use this relay; they're
|
||||||
-- common state about the external process.
|
-- common state about the external process.
|
||||||
-- TODO: ALL code using these in Remote.External
|
|
||||||
-- has to be made async-safe.
|
|
||||||
, externalPrepared = externalPrepared st
|
, externalPrepared = externalPrepared st
|
||||||
, externalConfig = externalConfig st
|
, externalConfig = externalConfig st
|
||||||
, externalConfigChanges = externalConfigChanges st
|
, externalConfigChanges = externalConfigChanges st
|
||||||
|
|
6
Remote/External/Types.hs
vendored
6
Remote/External/Types.hs
vendored
|
@ -90,9 +90,9 @@ data ExternalState = ExternalState
|
||||||
{ externalSend :: forall t. (Proto.Sendable t, ToAsyncWrapped t) => t -> IO ()
|
{ externalSend :: forall t. (Proto.Sendable t, ToAsyncWrapped t) => t -> IO ()
|
||||||
, externalReceive :: IO (Maybe String)
|
, externalReceive :: IO (Maybe String)
|
||||||
, externalShutdown :: Bool -> IO ()
|
, externalShutdown :: Bool -> IO ()
|
||||||
, externalPrepared :: TVar PrepareStatus
|
, externalPrepared :: TMVar PrepareStatus
|
||||||
, externalConfig :: TVar ParsedRemoteConfig
|
, externalConfig :: TMVar ParsedRemoteConfig
|
||||||
, externalConfigChanges :: TVar (RemoteConfig -> RemoteConfig)
|
, externalConfigChanges :: TMVar (RemoteConfig -> RemoteConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
type PID = Int
|
type PID = Int
|
||||||
|
|
|
@ -87,7 +87,7 @@ An example of sending multiple replies to a request is `LISTCONFIGS`, eg:
|
||||||
|
|
||||||
## notes
|
## notes
|
||||||
|
|
||||||
There will generally be one job for each thread that git-annex runs
|
There will be one job number for each thread that git-annex runs
|
||||||
concurrently, so around the same number as the -J value, although in some
|
concurrently, so around the same number as the -J value, although in some
|
||||||
cases git-annex does more concurrent operations than the -J value.
|
cases git-annex does more concurrent operations than the -J value.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue