move TransferrerPool from Assistant state to Annex state
This commit was sponsored by Graham Spencer on Patreon.
This commit is contained in:
parent
72e5764a87
commit
47016fc656
7 changed files with 38 additions and 48 deletions
4
Annex.hs
4
Annex.hs
|
@ -70,6 +70,7 @@ import Types.WorkerPool
|
||||||
import Types.IndexFiles
|
import Types.IndexFiles
|
||||||
import Types.CatFileHandles
|
import Types.CatFileHandles
|
||||||
import Types.RemoteConfig
|
import Types.RemoteConfig
|
||||||
|
import Types.TransferrerPool
|
||||||
import qualified Database.Keys.Handle as Keys
|
import qualified Database.Keys.Handle as Keys
|
||||||
import Utility.InodeCache
|
import Utility.InodeCache
|
||||||
import Utility.Url
|
import Utility.Url
|
||||||
|
@ -156,6 +157,7 @@ data AnnexState = AnnexState
|
||||||
, cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)])
|
, cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)])
|
||||||
, urloptions :: Maybe UrlOptions
|
, urloptions :: Maybe UrlOptions
|
||||||
, insmudgecleanfilter :: Bool
|
, insmudgecleanfilter :: Bool
|
||||||
|
, transferrerpool :: TransferrerPool
|
||||||
}
|
}
|
||||||
|
|
||||||
newState :: GitConfig -> Git.Repo -> IO AnnexState
|
newState :: GitConfig -> Git.Repo -> IO AnnexState
|
||||||
|
@ -165,6 +167,7 @@ newState c r = do
|
||||||
o <- newMessageState
|
o <- newMessageState
|
||||||
sc <- newTMVarIO False
|
sc <- newTMVarIO False
|
||||||
kh <- Keys.newDbHandle
|
kh <- Keys.newDbHandle
|
||||||
|
tp <- newTransferrerPool
|
||||||
return $ AnnexState
|
return $ AnnexState
|
||||||
{ repo = r
|
{ repo = r
|
||||||
, repoadjustment = return
|
, repoadjustment = return
|
||||||
|
@ -217,6 +220,7 @@ newState c r = do
|
||||||
, cachedgitenv = Nothing
|
, cachedgitenv = Nothing
|
||||||
, urloptions = Nothing
|
, urloptions = Nothing
|
||||||
, insmudgecleanfilter = False
|
, insmudgecleanfilter = False
|
||||||
|
, transferrerpool = tp
|
||||||
}
|
}
|
||||||
|
|
||||||
{- Makes an Annex state object for the specified git repo.
|
{- Makes an Annex state object for the specified git repo.
|
||||||
|
|
|
@ -27,11 +27,11 @@ import Control.Monad.IO.Class (MonadIO)
|
||||||
- it's stopped when done. Otherwise, idle processes are left in the pool
|
- it's stopped when done. Otherwise, idle processes are left in the pool
|
||||||
- for use later.
|
- for use later.
|
||||||
-}
|
-}
|
||||||
withTransferrer :: Bool -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
|
withTransferrer :: Bool -> MkCheckTransferrer -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
|
||||||
withTransferrer minimizeprocesses program batchmaker pool a = do
|
withTransferrer minimizeprocesses mkcheck program batchmaker pool a = do
|
||||||
(mi, leftinpool) <- atomically (popTransferrerPool pool)
|
(mi, leftinpool) <- atomically (popTransferrerPool pool)
|
||||||
i@(TransferrerPoolItem (Just t) check) <- case mi of
|
i@(TransferrerPoolItem (Just t) check) <- case mi of
|
||||||
Nothing -> mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker
|
Nothing -> mkTransferrerPoolItem mkcheck =<< mkTransferrer program batchmaker
|
||||||
Just i -> checkTransferrerPoolItem program batchmaker i
|
Just i -> checkTransferrerPoolItem program batchmaker i
|
||||||
a t `finally` returntopool leftinpool check t i
|
a t `finally` returntopool leftinpool check t i
|
||||||
where
|
where
|
||||||
|
|
|
@ -35,7 +35,6 @@ import Assistant.Types.DaemonStatus
|
||||||
import Assistant.Types.ScanRemotes
|
import Assistant.Types.ScanRemotes
|
||||||
import Assistant.Types.TransferQueue
|
import Assistant.Types.TransferQueue
|
||||||
import Assistant.Types.TransferSlots
|
import Assistant.Types.TransferSlots
|
||||||
import Assistant.Types.TransferrerPool
|
|
||||||
import Assistant.Types.Pushes
|
import Assistant.Types.Pushes
|
||||||
import Assistant.Types.BranchChange
|
import Assistant.Types.BranchChange
|
||||||
import Assistant.Types.Commits
|
import Assistant.Types.Commits
|
||||||
|
@ -65,7 +64,6 @@ data AssistantData = AssistantData
|
||||||
, scanRemoteMap :: ScanRemoteMap
|
, scanRemoteMap :: ScanRemoteMap
|
||||||
, transferQueue :: TransferQueue
|
, transferQueue :: TransferQueue
|
||||||
, transferSlots :: TransferSlots
|
, transferSlots :: TransferSlots
|
||||||
, transferrerPool :: TransferrerPool
|
|
||||||
, failedPushMap :: FailedPushMap
|
, failedPushMap :: FailedPushMap
|
||||||
, failedExportMap :: FailedPushMap
|
, failedExportMap :: FailedPushMap
|
||||||
, commitChan :: CommitChan
|
, commitChan :: CommitChan
|
||||||
|
@ -85,7 +83,6 @@ newAssistantData st dstatus = AssistantData
|
||||||
<*> newScanRemoteMap
|
<*> newScanRemoteMap
|
||||||
<*> newTransferQueue
|
<*> newTransferQueue
|
||||||
<*> newTransferSlots
|
<*> newTransferSlots
|
||||||
<*> newTransferrerPool (checkNetworkConnections dstatus)
|
|
||||||
<*> newFailedPushMap
|
<*> newFailedPushMap
|
||||||
<*> newFailedPushMap
|
<*> newFailedPushMap
|
||||||
<*> newCommitChan
|
<*> newCommitChan
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- git-annex assistant transfer slots
|
{- git-annex assistant transfer slots
|
||||||
-
|
-
|
||||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
- Copyright 2012-2020 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -9,12 +9,15 @@
|
||||||
|
|
||||||
module Assistant.TransferSlots where
|
module Assistant.TransferSlots where
|
||||||
|
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
|
||||||
import Assistant.Common
|
import Assistant.Common
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
|
import Utility.NotificationBroadcaster
|
||||||
import Assistant.Types.TransferSlots
|
import Assistant.Types.TransferSlots
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Annex.TransferrerPool
|
import Annex.TransferrerPool
|
||||||
import Assistant.Types.TransferrerPool
|
import Types.TransferrerPool
|
||||||
import Assistant.Types.TransferQueue
|
import Assistant.Types.TransferQueue
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
|
@ -25,6 +28,7 @@ import Types.Transfer
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import qualified Git
|
import qualified Git
|
||||||
|
import qualified Annex
|
||||||
import qualified Remote
|
import qualified Remote
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
@ -75,15 +79,19 @@ runTransferThread :: FilePath -> BatchCommandMaker -> Maybe (Transfer, TransferI
|
||||||
runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots
|
runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots
|
||||||
runTransferThread program batchmaker (Just (t, info, a)) = do
|
runTransferThread program batchmaker (Just (t, info, a)) = do
|
||||||
d <- getAssistant id
|
d <- getAssistant id
|
||||||
|
mkcheck <- checkNetworkConnections
|
||||||
|
<$> getAssistant daemonStatusHandle
|
||||||
aio <- asIO1 a
|
aio <- asIO1 a
|
||||||
tid <- liftIO $ forkIO $ runTransferThread' program batchmaker d aio
|
tid <- liftIO $ forkIO $ runTransferThread' mkcheck program batchmaker d aio
|
||||||
updateTransferInfo t $ info { transferTid = Just tid }
|
updateTransferInfo t $ info { transferTid = Just tid }
|
||||||
|
|
||||||
runTransferThread' :: FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO ()
|
runTransferThread' :: MkCheckTransferrer -> FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO ()
|
||||||
runTransferThread' program batchmaker d run = go
|
runTransferThread' mkcheck program batchmaker d run = go
|
||||||
where
|
where
|
||||||
go = catchPauseResume $
|
go = catchPauseResume $ do
|
||||||
withTransferrer True program batchmaker (transferrerPool d)
|
p <- runAssistant d $ liftAnnex $
|
||||||
|
Annex.getState Annex.transferrerpool
|
||||||
|
withTransferrer True mkcheck program batchmaker p
|
||||||
run
|
run
|
||||||
pause = catchPauseResume $
|
pause = catchPauseResume $
|
||||||
runEvery (Seconds 86400) noop
|
runEvery (Seconds 86400) noop
|
||||||
|
@ -298,3 +306,9 @@ startTransfer t = do
|
||||||
|
|
||||||
getCurrentTransfers :: Assistant TransferMap
|
getCurrentTransfers :: Assistant TransferMap
|
||||||
getCurrentTransfers = currentTransfers <$> getDaemonStatus
|
getCurrentTransfers = currentTransfers <$> getDaemonStatus
|
||||||
|
|
||||||
|
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
|
||||||
|
checkNetworkConnections dstatushandle = do
|
||||||
|
dstatus <- atomically $ readTVar dstatushandle
|
||||||
|
h <- newNotificationHandle False (networkConnectedNotifier dstatus)
|
||||||
|
return $ not <$> checkNotification h
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
{- A pool of "git-annex transferkeys" processes available for use
|
|
||||||
-
|
|
||||||
- Copyright 2013 Joey Hess <id@joeyh.name>
|
|
||||||
-
|
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
|
||||||
-}
|
|
||||||
|
|
||||||
module Assistant.Types.TransferrerPool (
|
|
||||||
module Types.TransferrerPool,
|
|
||||||
checkNetworkConnections,
|
|
||||||
) where
|
|
||||||
|
|
||||||
import Types.TransferrerPool
|
|
||||||
import Utility.NotificationBroadcaster
|
|
||||||
import Assistant.Types.DaemonStatus
|
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
|
||||||
|
|
||||||
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
|
|
||||||
checkNetworkConnections dstatushandle = do
|
|
||||||
dstatus <- atomically $ readTVar dstatushandle
|
|
||||||
h <- newNotificationHandle False (networkConnectedNotifier dstatus)
|
|
||||||
return $ not <$> checkNotification h
|
|
|
@ -7,11 +7,11 @@
|
||||||
|
|
||||||
module Types.TransferrerPool where
|
module Types.TransferrerPool where
|
||||||
|
|
||||||
import Annex.Common
|
import Common
|
||||||
|
|
||||||
import Control.Concurrent.STM hiding (check)
|
import Control.Concurrent.STM hiding (check)
|
||||||
|
|
||||||
type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem])
|
type TransferrerPool = TVar [TransferrerPoolItem]
|
||||||
|
|
||||||
type CheckTransferrer = IO Bool
|
type CheckTransferrer = IO Bool
|
||||||
type MkCheckTransferrer = IO (IO Bool)
|
type MkCheckTransferrer = IO (IO Bool)
|
||||||
|
@ -27,30 +27,29 @@ data Transferrer = Transferrer
|
||||||
, transferrerHandle :: ProcessHandle
|
, transferrerHandle :: ProcessHandle
|
||||||
}
|
}
|
||||||
|
|
||||||
newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool
|
newTransferrerPool :: IO TransferrerPool
|
||||||
newTransferrerPool c = newTVarIO (c, [])
|
newTransferrerPool = newTVarIO []
|
||||||
|
|
||||||
popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int)
|
popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int)
|
||||||
popTransferrerPool p = do
|
popTransferrerPool p = do
|
||||||
(c, l) <- readTVar p
|
l <- readTVar p
|
||||||
case l of
|
case l of
|
||||||
[] -> return (Nothing, 0)
|
[] -> return (Nothing, 0)
|
||||||
(i:is) -> do
|
(i:is) -> do
|
||||||
writeTVar p (c, is)
|
writeTVar p is
|
||||||
return $ (Just i, length is)
|
return $ (Just i, length is)
|
||||||
|
|
||||||
pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM ()
|
pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM ()
|
||||||
pushTransferrerPool p i = do
|
pushTransferrerPool p i = do
|
||||||
(c, l) <- readTVar p
|
l <- readTVar p
|
||||||
let l' = i:l
|
let l' = i:l
|
||||||
writeTVar p (c, l')
|
writeTVar p l'
|
||||||
|
|
||||||
{- Note that making a CheckTransferrer may allocate resources,
|
{- Note that making a CheckTransferrer may allocate resources,
|
||||||
- such as a NotificationHandle, so it's important that the returned
|
- such as a NotificationHandle, so it's important that the returned
|
||||||
- TransferrerPoolItem is pushed into the pool, and not left to be
|
- TransferrerPoolItem is pushed into the pool, and not left to be
|
||||||
- garbage collected. -}
|
- garbage collected. -}
|
||||||
mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem
|
mkTransferrerPoolItem :: MkCheckTransferrer -> Transferrer -> IO TransferrerPoolItem
|
||||||
mkTransferrerPoolItem p t = do
|
mkTransferrerPoolItem mkcheck t = do
|
||||||
mkcheck <- atomically $ fst <$> readTVar p
|
|
||||||
check <- mkcheck
|
check <- mkcheck
|
||||||
return $ TransferrerPoolItem (Just t) check
|
return $ TransferrerPoolItem (Just t) check
|
||||||
|
|
|
@ -494,7 +494,6 @@ Executable git-annex
|
||||||
Assistant.Types.ThreadedMonad
|
Assistant.Types.ThreadedMonad
|
||||||
Assistant.Types.TransferQueue
|
Assistant.Types.TransferQueue
|
||||||
Assistant.Types.TransferSlots
|
Assistant.Types.TransferSlots
|
||||||
Assistant.Types.TransferrerPool
|
|
||||||
Assistant.Types.UrlRenderer
|
Assistant.Types.UrlRenderer
|
||||||
Assistant.Unused
|
Assistant.Unused
|
||||||
Assistant.Upgrade
|
Assistant.Upgrade
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue