3290a09a70
Converted warning and similar to use StringContainingQuotedPath. Most warnings are static strings, some do refer to filepaths that need to be quoted, and others don't need quoting. Note that, since quote filters out control characters of even UnquotedString, this makes all warnings safe, even when an attacker sneaks in a control character in some other way. When json is being output, no quoting is done, since json gets its own quoting. This does, as a side effect, make warning messages in json output not be indented. The indentation is only needed to offset warning messages underneath the display of the file they apply to, so that's ok. Sponsored-by: Brett Eisenberg on Patreon
132 lines
4.5 KiB
Haskell
132 lines
4.5 KiB
Haskell
{- External remote protocol async extension.
|
|
-
|
|
- Copyright 2020 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE RankNTypes #-}
|
|
{-# LANGUAGE BangPatterns #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
|
|
module Remote.External.AsyncExtension (runRelayToExternalAsync) where
|
|
|
|
import Common
|
|
import Annex
|
|
import Messages
|
|
import Remote.External.Types
|
|
import qualified Utility.SimpleProtocol as Proto
|
|
|
|
import Control.Concurrent.Async
|
|
import Control.Concurrent.STM
|
|
import Control.Concurrent.STM.TBMChan
|
|
import qualified Data.Map.Strict as M
|
|
|
|
-- | Starts a thread that will handle all communication with the external
|
|
-- process. The input ExternalState communicates directly with the external
|
|
-- process.
|
|
runRelayToExternalAsync :: External -> ExternalState -> (Annex () -> IO ()) -> IO ExternalAsyncRelay
|
|
runRelayToExternalAsync external st annexrunner = do
|
|
jidmap <- newTVarIO M.empty
|
|
sendq <- newSendQueue
|
|
nextjid <- newTVarIO (JobId 1)
|
|
sender <- async $ sendloop st sendq
|
|
receiver <- async $ receiveloop external st jidmap sendq sender annexrunner
|
|
return $ ExternalAsyncRelay $ do
|
|
receiveq <- newReceiveQueue
|
|
jid <- atomically $ do
|
|
jid@(JobId n) <- readTVar nextjid
|
|
let !jid' = JobId (succ n)
|
|
writeTVar nextjid jid'
|
|
modifyTVar' jidmap $ M.insert jid receiveq
|
|
return jid
|
|
return $ ExternalState
|
|
{ externalSend = \msg ->
|
|
atomically $ writeTBMChan sendq
|
|
(toAsyncWrapped msg, jid)
|
|
, externalReceive = atomically (readTBMChan receiveq)
|
|
-- This shuts down the whole relay.
|
|
, externalShutdown = shutdown external st sendq sender receiver
|
|
-- These three TMVars are shared among all
|
|
-- ExternalStates that use this relay; they're
|
|
-- common state about the external process.
|
|
, externalPrepared = externalPrepared st
|
|
, externalConfig = externalConfig st
|
|
, externalConfigChanges = externalConfigChanges st
|
|
}
|
|
|
|
type ReceiveQueue = TBMChan String
|
|
|
|
type SendQueue = TBMChan (AsyncWrapped, JobId)
|
|
|
|
type JidMap = TVar (M.Map JobId ReceiveQueue)
|
|
|
|
newReceiveQueue :: IO ReceiveQueue
|
|
newReceiveQueue = newTBMChanIO 10
|
|
|
|
newSendQueue :: IO SendQueue
|
|
newSendQueue = newTBMChanIO 10
|
|
|
|
receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> (Annex () -> IO ()) -> IO ()
|
|
receiveloop external st jidmap sendq sendthread annexrunner = externalReceive st >>= \case
|
|
Just l -> case parseMessage l :: Maybe AsyncMessage of
|
|
Just (AsyncMessage jid msg) ->
|
|
M.lookup jid <$> readTVarIO jidmap >>= \case
|
|
Just c -> do
|
|
atomically $ writeTBMChan c msg
|
|
receiveloop external st jidmap sendq sendthread annexrunner
|
|
Nothing -> protoerr "unknown job number"
|
|
Nothing -> case parseMessage l :: Maybe ExceptionalMessage of
|
|
Just _ -> do
|
|
-- ERROR is relayed to all listeners
|
|
m <- readTVarIO jidmap
|
|
forM_ (M.elems m) $ \c ->
|
|
atomically $ writeTBMChan c l
|
|
receiveloop external st jidmap sendq sendthread annexrunner
|
|
Nothing -> protoerr "unexpected non-async message"
|
|
Nothing -> closeandshutdown
|
|
where
|
|
protoerr s = do
|
|
annexrunner $ warning $ "async external special remote protocol error: " <> s
|
|
closeandshutdown
|
|
|
|
closeandshutdown = do
|
|
dummy <- async noop
|
|
shutdown external st sendq sendthread dummy True
|
|
m <- atomically $ readTVar jidmap
|
|
forM_ (M.elems m) (atomically . closeTBMChan)
|
|
|
|
sendloop :: ExternalState -> SendQueue -> IO ()
|
|
sendloop st sendq = atomically (readTBMChan sendq) >>= \case
|
|
Just (wrappedmsg, jid) -> do
|
|
case wrappedmsg of
|
|
AsyncWrappedRemoteResponse msg ->
|
|
externalSend st $ wrapjid msg jid
|
|
AsyncWrappedRequest msg ->
|
|
externalSend st $ wrapjid msg jid
|
|
AsyncWrappedExceptionalMessage msg ->
|
|
externalSend st msg
|
|
AsyncWrappedAsyncMessage msg ->
|
|
externalSend st msg
|
|
sendloop st sendq
|
|
Nothing -> return ()
|
|
where
|
|
wrapjid msg jid = AsyncMessage jid $ unwords $ Proto.formatMessage msg
|
|
|
|
shutdown :: External -> ExternalState -> SendQueue -> Async () -> Async () -> Bool -> IO ()
|
|
shutdown external st sendq sendthread receivethread b = do
|
|
-- Receive thread is normally blocked reading from a handle.
|
|
-- That can block closing the handle, so it needs to be canceled.
|
|
cancel receivethread
|
|
-- Cleanly shutdown the send thread as well, allowing it to finish
|
|
-- writing anything that was buffered.
|
|
atomically $ closeTBMChan sendq
|
|
wait sendthread
|
|
r <- atomically $ do
|
|
r <- tryTakeTMVar (externalAsync external)
|
|
putTMVar (externalAsync external)
|
|
UncheckedExternalAsync
|
|
return r
|
|
case r of
|
|
Just (ExternalAsync _) -> externalShutdown st b
|
|
_ -> noop
|