move TransferrerPool from assistant
This old code will now be useful for git-annex beyond the assistant. git-annex won't use the CheckTransferrer part, and won't run transferkeys as a batch process, and will want withTransferrer to not shut down transferkeys processes. Still, the rest of this is a good fit for what I need now. Also removed some dead code, and simplified a little bit. This commit was sponsored by Mark Reidenbach on Patreon.
This commit is contained in:
parent
438d5be1f7
commit
72e5764a87
6 changed files with 103 additions and 76 deletions
|
@ -1,42 +1,46 @@
|
||||||
{- A pool of "git-annex transferkeys" processes
|
{- A pool of "git-annex transferkeys" processes
|
||||||
-
|
-
|
||||||
- Copyright 2013 Joey Hess <id@joeyh.name>
|
- Copyright 2013-2020 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
module Assistant.TransferrerPool where
|
{-# LANGUAGE RankNTypes #-}
|
||||||
|
|
||||||
import Assistant.Common
|
module Annex.TransferrerPool where
|
||||||
import Assistant.Types.TransferrerPool
|
|
||||||
|
import Annex.Common
|
||||||
|
import Types.TransferrerPool
|
||||||
import Types.Transfer
|
import Types.Transfer
|
||||||
import Utility.Batch
|
import Utility.Batch
|
||||||
import Messages.Serialized
|
import Messages.Serialized
|
||||||
|
|
||||||
import qualified Command.TransferKeys as T
|
import qualified Command.TransferKeys as T
|
||||||
|
|
||||||
import Control.Concurrent.STM hiding (check)
|
import Control.Concurrent.STM hiding (check)
|
||||||
import Control.Exception (throw)
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import Control.Monad.IO.Class (MonadIO)
|
||||||
|
|
||||||
{- Runs an action with a Transferrer from the pool.
|
{- Runs an action with a Transferrer from the pool.
|
||||||
-
|
-
|
||||||
- Only one Transferrer is left running in the pool at a time.
|
- When minimizeprocesses is True, only one Transferrer is left running
|
||||||
- So if this needed to start a new Transferrer, it's stopped when done.
|
- in the pool at a time. So if this needed to start a new Transferrer,
|
||||||
|
- it's stopped when done. Otherwise, idle processes are left in the pool
|
||||||
|
- for use later.
|
||||||
-}
|
-}
|
||||||
withTransferrer :: FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
|
withTransferrer :: Bool -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
|
||||||
withTransferrer program batchmaker pool a = do
|
withTransferrer minimizeprocesses program batchmaker pool a = do
|
||||||
(mi, leftinpool) <- atomically (popTransferrerPool pool)
|
(mi, leftinpool) <- atomically (popTransferrerPool pool)
|
||||||
i@(TransferrerPoolItem (Just t) check) <- case mi of
|
i@(TransferrerPoolItem (Just t) check) <- case mi of
|
||||||
Nothing -> mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker
|
Nothing -> mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker
|
||||||
Just i -> checkTransferrerPoolItem program batchmaker i
|
Just i -> checkTransferrerPoolItem program batchmaker i
|
||||||
v <- tryNonAsync $ a t
|
a t `finally` returntopool leftinpool check t i
|
||||||
if leftinpool == 0
|
where
|
||||||
then atomically $ pushTransferrerPool pool i
|
returntopool leftinpool check t i
|
||||||
else do
|
| not minimizeprocesses || leftinpool == 0 =
|
||||||
|
atomically $ pushTransferrerPool pool i
|
||||||
|
| otherwise = do
|
||||||
void $ forkIO $ stopTransferrer t
|
void $ forkIO $ stopTransferrer t
|
||||||
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
|
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
|
||||||
either throw return v
|
|
||||||
|
|
||||||
{- Check if a Transferrer from the pool is still ok to be used.
|
{- Check if a Transferrer from the pool is still ok to be used.
|
||||||
- If not, stop it and start a new one. -}
|
- If not, stop it and start a new one. -}
|
||||||
|
@ -56,13 +60,21 @@ checkTransferrerPoolItem program batchmaker i = case i of
|
||||||
|
|
||||||
{- Requests that a Transferrer perform a Transfer, and waits for it to
|
{- Requests that a Transferrer perform a Transfer, and waits for it to
|
||||||
- finish. -}
|
- finish. -}
|
||||||
performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool
|
performTransfer
|
||||||
performTransfer transferrer t info = catchBoolIO $ do
|
:: (Monad m, MonadIO m, MonadMask m)
|
||||||
|
=> Transferrer
|
||||||
|
-> Transfer
|
||||||
|
-> TransferInfo
|
||||||
|
-> (forall a. Annex a -> m a)
|
||||||
|
-- ^ Run an annex action in the monad. Will not be used with
|
||||||
|
-- actions that block for a long time.
|
||||||
|
-> m Bool
|
||||||
|
performTransfer transferrer t info runannex = catchBoolIO $ do
|
||||||
(liftIO $ T.sendRequest t info (transferrerWrite transferrer))
|
(liftIO $ T.sendRequest t info (transferrerWrite transferrer))
|
||||||
relaySerializedOutput
|
relaySerializedOutput
|
||||||
(liftIO $ T.readResponse (transferrerRead transferrer))
|
(liftIO $ T.readResponse (transferrerRead transferrer))
|
||||||
(liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer))
|
(liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer))
|
||||||
liftAnnex
|
runannex
|
||||||
|
|
||||||
{- Starts a new git-annex transferkeys process, setting up handles
|
{- Starts a new git-annex transferkeys process, setting up handles
|
||||||
- that will be used to communicate with it. -}
|
- that will be used to communicate with it. -}
|
||||||
|
@ -84,13 +96,8 @@ mkTransferrer program batchmaker = do
|
||||||
, transferrerHandle = pid
|
, transferrerHandle = pid
|
||||||
}
|
}
|
||||||
|
|
||||||
{- Checks if a Transferrer is still running. If not, makes a new one. -}
|
{- Closing the fds will stop the transferrer, but only when it's in between
|
||||||
checkTransferrer :: FilePath -> BatchCommandMaker -> Transferrer -> IO Transferrer
|
- transfers. -}
|
||||||
checkTransferrer program batchmaker t =
|
|
||||||
maybe (return t) (const $ mkTransferrer program batchmaker)
|
|
||||||
=<< getProcessExitCode (transferrerHandle t)
|
|
||||||
|
|
||||||
{- Closing the fds will stop the transferrer. -}
|
|
||||||
stopTransferrer :: Transferrer -> IO ()
|
stopTransferrer :: Transferrer -> IO ()
|
||||||
stopTransferrer t = do
|
stopTransferrer t = do
|
||||||
hClose $ transferrerRead t
|
hClose $ transferrerRead t
|
|
@ -13,7 +13,7 @@ import Assistant.Common
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
import Assistant.Types.TransferSlots
|
import Assistant.Types.TransferSlots
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.TransferrerPool
|
import Annex.TransferrerPool
|
||||||
import Assistant.Types.TransferrerPool
|
import Assistant.Types.TransferrerPool
|
||||||
import Assistant.Types.TransferQueue
|
import Assistant.Types.TransferQueue
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
|
@ -83,7 +83,7 @@ runTransferThread' :: FilePath -> BatchCommandMaker -> AssistantData -> (Transfe
|
||||||
runTransferThread' program batchmaker d run = go
|
runTransferThread' program batchmaker d run = go
|
||||||
where
|
where
|
||||||
go = catchPauseResume $
|
go = catchPauseResume $
|
||||||
withTransferrer program batchmaker (transferrerPool d)
|
withTransferrer True program batchmaker (transferrerPool d)
|
||||||
run
|
run
|
||||||
pause = catchPauseResume $
|
pause = catchPauseResume $
|
||||||
runEvery (Seconds 86400) noop
|
runEvery (Seconds 86400) noop
|
||||||
|
@ -155,7 +155,7 @@ genTransfer t info = case transferRemote info of
|
||||||
- usual cleanup. However, first check if something else is
|
- usual cleanup. However, first check if something else is
|
||||||
- running the transfer, to avoid removing active transfers.
|
- running the transfer, to avoid removing active transfers.
|
||||||
-}
|
-}
|
||||||
go remote transferrer = ifM (performTransfer transferrer t info)
|
go remote transferrer = ifM (performTransfer transferrer t info liftAnnex)
|
||||||
( do
|
( do
|
||||||
case associatedFile info of
|
case associatedFile info of
|
||||||
AssociatedFile Nothing -> noop
|
AssociatedFile Nothing -> noop
|
||||||
|
|
|
@ -5,57 +5,16 @@
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
module Assistant.Types.TransferrerPool where
|
module Assistant.Types.TransferrerPool (
|
||||||
|
module Types.TransferrerPool,
|
||||||
|
checkNetworkConnections,
|
||||||
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Types.TransferrerPool
|
||||||
import Utility.NotificationBroadcaster
|
import Utility.NotificationBroadcaster
|
||||||
import Assistant.Types.DaemonStatus
|
import Assistant.Types.DaemonStatus
|
||||||
|
|
||||||
import Control.Concurrent.STM hiding (check)
|
import Control.Concurrent.STM
|
||||||
|
|
||||||
type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem])
|
|
||||||
|
|
||||||
type CheckTransferrer = IO Bool
|
|
||||||
type MkCheckTransferrer = IO (IO Bool)
|
|
||||||
|
|
||||||
{- Each item in the pool may have a transferrer running, and has an
|
|
||||||
- IO action that can be used to check if it's still ok to use the
|
|
||||||
- transferrer. -}
|
|
||||||
data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer
|
|
||||||
|
|
||||||
data Transferrer = Transferrer
|
|
||||||
{ transferrerRead :: Handle
|
|
||||||
, transferrerWrite :: Handle
|
|
||||||
, transferrerHandle :: ProcessHandle
|
|
||||||
}
|
|
||||||
|
|
||||||
newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool
|
|
||||||
newTransferrerPool c = newTVarIO (c, [])
|
|
||||||
|
|
||||||
popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int)
|
|
||||||
popTransferrerPool p = do
|
|
||||||
(c, l) <- readTVar p
|
|
||||||
case l of
|
|
||||||
[] -> return (Nothing, 0)
|
|
||||||
(i:is) -> do
|
|
||||||
writeTVar p (c, is)
|
|
||||||
return $ (Just i, length is)
|
|
||||||
|
|
||||||
pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM ()
|
|
||||||
pushTransferrerPool p i = do
|
|
||||||
(c, l) <- readTVar p
|
|
||||||
let l' = i:l
|
|
||||||
writeTVar p (c, l')
|
|
||||||
|
|
||||||
{- Note that making a CheckTransferrer may allocate resources,
|
|
||||||
- such as a NotificationHandle, so it's important that the returned
|
|
||||||
- TransferrerPoolItem is pushed into the pool, and not left to be
|
|
||||||
- garbage collected. -}
|
|
||||||
mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem
|
|
||||||
mkTransferrerPoolItem p t = do
|
|
||||||
mkcheck <- atomically $ fst <$> readTVar p
|
|
||||||
check <- mkcheck
|
|
||||||
return $ TransferrerPoolItem (Just t) check
|
|
||||||
|
|
||||||
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
|
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
|
||||||
checkNetworkConnections dstatushandle = do
|
checkNetworkConnections dstatushandle = do
|
||||||
|
|
56
Types/TransferrerPool.hs
Normal file
56
Types/TransferrerPool.hs
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
{- A pool of "git-annex transferkeys" processes available for use
|
||||||
|
-
|
||||||
|
- Copyright 2013-2020 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Types.TransferrerPool where
|
||||||
|
|
||||||
|
import Annex.Common
|
||||||
|
|
||||||
|
import Control.Concurrent.STM hiding (check)
|
||||||
|
|
||||||
|
type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem])
|
||||||
|
|
||||||
|
type CheckTransferrer = IO Bool
|
||||||
|
type MkCheckTransferrer = IO (IO Bool)
|
||||||
|
|
||||||
|
{- Each item in the pool may have a transferrer running, and has an
|
||||||
|
- IO action that can be used to check if it's still ok to use the
|
||||||
|
- transferrer. -}
|
||||||
|
data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer
|
||||||
|
|
||||||
|
data Transferrer = Transferrer
|
||||||
|
{ transferrerRead :: Handle
|
||||||
|
, transferrerWrite :: Handle
|
||||||
|
, transferrerHandle :: ProcessHandle
|
||||||
|
}
|
||||||
|
|
||||||
|
newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool
|
||||||
|
newTransferrerPool c = newTVarIO (c, [])
|
||||||
|
|
||||||
|
popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int)
|
||||||
|
popTransferrerPool p = do
|
||||||
|
(c, l) <- readTVar p
|
||||||
|
case l of
|
||||||
|
[] -> return (Nothing, 0)
|
||||||
|
(i:is) -> do
|
||||||
|
writeTVar p (c, is)
|
||||||
|
return $ (Just i, length is)
|
||||||
|
|
||||||
|
pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM ()
|
||||||
|
pushTransferrerPool p i = do
|
||||||
|
(c, l) <- readTVar p
|
||||||
|
let l' = i:l
|
||||||
|
writeTVar p (c, l')
|
||||||
|
|
||||||
|
{- Note that making a CheckTransferrer may allocate resources,
|
||||||
|
- such as a NotificationHandle, so it's important that the returned
|
||||||
|
- TransferrerPoolItem is pushed into the pool, and not left to be
|
||||||
|
- garbage collected. -}
|
||||||
|
mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem
|
||||||
|
mkTransferrerPoolItem p t = do
|
||||||
|
mkcheck <- atomically $ fst <$> readTVar p
|
||||||
|
check <- mkcheck
|
||||||
|
return $ TransferrerPoolItem (Just t) check
|
|
@ -10,6 +10,7 @@
|
||||||
module Utility.Batch (
|
module Utility.Batch (
|
||||||
batch,
|
batch,
|
||||||
BatchCommandMaker,
|
BatchCommandMaker,
|
||||||
|
nonBatchCommandMaker,
|
||||||
getBatchCommandMaker,
|
getBatchCommandMaker,
|
||||||
toBatchCommand,
|
toBatchCommand,
|
||||||
batchCommand,
|
batchCommand,
|
||||||
|
@ -50,6 +51,9 @@ batch a = a
|
||||||
- are available in the path. -}
|
- are available in the path. -}
|
||||||
type BatchCommandMaker = (String, [CommandParam]) -> (String, [CommandParam])
|
type BatchCommandMaker = (String, [CommandParam]) -> (String, [CommandParam])
|
||||||
|
|
||||||
|
nonBatchCommandMaker :: BatchCommandMaker
|
||||||
|
nonBatchCommandMaker = id
|
||||||
|
|
||||||
getBatchCommandMaker :: IO BatchCommandMaker
|
getBatchCommandMaker :: IO BatchCommandMaker
|
||||||
getBatchCommandMaker = do
|
getBatchCommandMaker = do
|
||||||
#ifndef mingw32_HOST_OS
|
#ifndef mingw32_HOST_OS
|
||||||
|
|
|
@ -479,7 +479,6 @@ Executable git-annex
|
||||||
Assistant.Threads.Watcher
|
Assistant.Threads.Watcher
|
||||||
Assistant.TransferQueue
|
Assistant.TransferQueue
|
||||||
Assistant.TransferSlots
|
Assistant.TransferSlots
|
||||||
Assistant.TransferrerPool
|
|
||||||
Assistant.Types.Alert
|
Assistant.Types.Alert
|
||||||
Assistant.Types.BranchChange
|
Assistant.Types.BranchChange
|
||||||
Assistant.Types.Changes
|
Assistant.Types.Changes
|
||||||
|
@ -666,6 +665,7 @@ Executable git-annex
|
||||||
Annex.TaggedPush
|
Annex.TaggedPush
|
||||||
Annex.Tmp
|
Annex.Tmp
|
||||||
Annex.Transfer
|
Annex.Transfer
|
||||||
|
Annex.TransferrerPool
|
||||||
Annex.UntrustedFilePath
|
Annex.UntrustedFilePath
|
||||||
Annex.UpdateInstead
|
Annex.UpdateInstead
|
||||||
Annex.UUID
|
Annex.UUID
|
||||||
|
@ -1027,6 +1027,7 @@ Executable git-annex
|
||||||
Types.StoreRetrieve
|
Types.StoreRetrieve
|
||||||
Types.Test
|
Types.Test
|
||||||
Types.Transfer
|
Types.Transfer
|
||||||
|
Types.TransferrerPool
|
||||||
Types.TrustLevel
|
Types.TrustLevel
|
||||||
Types.UUID
|
Types.UUID
|
||||||
Types.UrlContents
|
Types.UrlContents
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue