add exporter thread to assistant
This is similar to the pusher thread, but a separate thread because git pushes can be done in parallel with exports, and updating a big export should not prevent other git pushes going out in the meantime. The exportThread only runs at most every 30 seconds, since updating an export is more expensive than pushing. This may need to be tuned. Added a separate channel for export commits; the committer records a commit in that channel. Also, reconnectRemotes records a dummy commit, to make the exporter thread wake up and make sure all exports are up-to-date. So, connecting a drive with a directory special remote export will immediately update it, and getting online will automatically update S3 and WebDAV exports. The transfer queue is not involved in exports. Instead, failed exports are retried much like failed pushes. This commit was sponsored by Ewen McNeill.
This commit is contained in:
parent
46d19648ee
commit
d71c65ca0a
13 changed files with 124 additions and 22 deletions
|
@ -18,6 +18,7 @@ import Assistant.Threads.DaemonStatus
|
||||||
import Assistant.Threads.Watcher
|
import Assistant.Threads.Watcher
|
||||||
import Assistant.Threads.Committer
|
import Assistant.Threads.Committer
|
||||||
import Assistant.Threads.Pusher
|
import Assistant.Threads.Pusher
|
||||||
|
import Assistant.Threads.Exporter
|
||||||
import Assistant.Threads.Merger
|
import Assistant.Threads.Merger
|
||||||
import Assistant.Threads.TransferWatcher
|
import Assistant.Threads.TransferWatcher
|
||||||
import Assistant.Threads.Transferrer
|
import Assistant.Threads.Transferrer
|
||||||
|
@ -152,6 +153,8 @@ startDaemon assistant foreground startdelay cannotrun listenhost startbrowser =
|
||||||
#endif
|
#endif
|
||||||
, assist pushThread
|
, assist pushThread
|
||||||
, assist pushRetryThread
|
, assist pushRetryThread
|
||||||
|
, assist exportThread
|
||||||
|
, assist exportRetryThread
|
||||||
, assist mergeThread
|
, assist mergeThread
|
||||||
, assist transferWatcherThread
|
, assist transferWatcherThread
|
||||||
, assist transferPollerThread
|
, assist transferPollerThread
|
||||||
|
|
|
@ -21,3 +21,12 @@ getCommits = (atomically . getTList) <<~ commitChan
|
||||||
{- Records a commit in the channel. -}
|
{- Records a commit in the channel. -}
|
||||||
recordCommit :: Assistant ()
|
recordCommit :: Assistant ()
|
||||||
recordCommit = (atomically . flip consTList Commit) <<~ commitChan
|
recordCommit = (atomically . flip consTList Commit) <<~ commitChan
|
||||||
|
|
||||||
|
{- Gets all unhandled export commits.
|
||||||
|
- Blocks until at least one export commit is made. -}
|
||||||
|
getExportCommits :: Assistant [Commit]
|
||||||
|
getExportCommits = (atomically . getTList) <<~ exportCommitChan
|
||||||
|
|
||||||
|
{- Records an export commit in the channel. -}
|
||||||
|
recordExportCommit :: Assistant ()
|
||||||
|
recordExportCommit = (atomically . flip consTList Commit) <<~ exportCommitChan
|
||||||
|
|
|
@ -62,7 +62,9 @@ data AssistantData = AssistantData
|
||||||
, transferSlots :: TransferSlots
|
, transferSlots :: TransferSlots
|
||||||
, transferrerPool :: TransferrerPool
|
, transferrerPool :: TransferrerPool
|
||||||
, failedPushMap :: FailedPushMap
|
, failedPushMap :: FailedPushMap
|
||||||
|
, failedExportMap :: FailedPushMap
|
||||||
, commitChan :: CommitChan
|
, commitChan :: CommitChan
|
||||||
|
, exportCommitChan :: CommitChan
|
||||||
, changePool :: ChangePool
|
, changePool :: ChangePool
|
||||||
, repoProblemChan :: RepoProblemChan
|
, repoProblemChan :: RepoProblemChan
|
||||||
, branchChangeHandle :: BranchChangeHandle
|
, branchChangeHandle :: BranchChangeHandle
|
||||||
|
@ -80,6 +82,8 @@ newAssistantData st dstatus = AssistantData
|
||||||
<*> newTransferSlots
|
<*> newTransferSlots
|
||||||
<*> newTransferrerPool (checkNetworkConnections dstatus)
|
<*> newTransferrerPool (checkNetworkConnections dstatus)
|
||||||
<*> newFailedPushMap
|
<*> newFailedPushMap
|
||||||
|
<*> newFailedPushMap
|
||||||
|
<*> newCommitChan
|
||||||
<*> newCommitChan
|
<*> newCommitChan
|
||||||
<*> newChangePool
|
<*> newChangePool
|
||||||
<*> newRepoProblemChan
|
<*> newRepoProblemChan
|
||||||
|
|
|
@ -17,24 +17,21 @@ import qualified Data.Map as M
|
||||||
{- Blocks until there are failed pushes.
|
{- Blocks until there are failed pushes.
|
||||||
- Returns Remotes whose pushes failed a given time duration or more ago.
|
- Returns Remotes whose pushes failed a given time duration or more ago.
|
||||||
- (This may be an empty list.) -}
|
- (This may be an empty list.) -}
|
||||||
getFailedPushesBefore :: NominalDiffTime -> Assistant [Remote]
|
getFailedPushesBefore :: NominalDiffTime -> FailedPushMap -> Assistant [Remote]
|
||||||
getFailedPushesBefore duration = do
|
getFailedPushesBefore duration v = liftIO $ do
|
||||||
v <- getAssistant failedPushMap
|
m <- atomically $ readTMVar v
|
||||||
liftIO $ do
|
now <- getCurrentTime
|
||||||
m <- atomically $ readTMVar v
|
return $ M.keys $ M.filter (not . toorecent now) m
|
||||||
now <- getCurrentTime
|
|
||||||
return $ M.keys $ M.filter (not . toorecent now) m
|
|
||||||
where
|
where
|
||||||
toorecent now time = now `diffUTCTime` time < duration
|
toorecent now time = now `diffUTCTime` time < duration
|
||||||
|
|
||||||
{- Modifies the map. -}
|
{- Modifies the map. -}
|
||||||
changeFailedPushMap :: (PushMap -> PushMap) -> Assistant ()
|
changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> Assistant ()
|
||||||
changeFailedPushMap a = do
|
changeFailedPushMap v f = liftIO $ atomically $
|
||||||
v <- getAssistant failedPushMap
|
store . f . fromMaybe M.empty =<< tryTakeTMVar v
|
||||||
liftIO $ atomically $ store v . a . fromMaybe M.empty =<< tryTakeTMVar v
|
|
||||||
where
|
where
|
||||||
{- tryTakeTMVar empties the TMVar; refill it only if
|
{- tryTakeTMVar empties the TMVar; refill it only if
|
||||||
- the modified map is not itself empty -}
|
- the modified map is not itself empty -}
|
||||||
store v m
|
store m
|
||||||
| m == M.empty = noop
|
| m == M.empty = noop
|
||||||
| otherwise = putTMVar v $! m
|
| otherwise = putTMVar v $! m
|
||||||
|
|
|
@ -33,6 +33,7 @@ import Assistant.Threads.Watcher (watchThread, WatcherControl(..))
|
||||||
import Assistant.TransferSlots
|
import Assistant.TransferSlots
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
import Assistant.RepoProblem
|
import Assistant.RepoProblem
|
||||||
|
import Assistant.Commits
|
||||||
import Types.Transfer
|
import Types.Transfer
|
||||||
|
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
|
@ -48,10 +49,10 @@ import Control.Concurrent
|
||||||
- it's sufficient to requeue failed transfers.
|
- it's sufficient to requeue failed transfers.
|
||||||
-
|
-
|
||||||
- Also handles signaling any connectRemoteNotifiers, after the syncing is
|
- Also handles signaling any connectRemoteNotifiers, after the syncing is
|
||||||
- done.
|
- done, and records an export commit to make any exports be updated.
|
||||||
-}
|
-}
|
||||||
reconnectRemotes :: [Remote] -> Assistant ()
|
reconnectRemotes :: [Remote] -> Assistant ()
|
||||||
reconnectRemotes [] = noop
|
reconnectRemotes [] = recordExportCommit
|
||||||
reconnectRemotes rs = void $ do
|
reconnectRemotes rs = void $ do
|
||||||
rs' <- liftIO $ filterM (Remote.checkAvailable True) rs
|
rs' <- liftIO $ filterM (Remote.checkAvailable True) rs
|
||||||
unless (null rs') $ do
|
unless (null rs') $ do
|
||||||
|
@ -60,6 +61,7 @@ reconnectRemotes rs = void $ do
|
||||||
whenM (liftIO $ Remote.checkAvailable False r) $
|
whenM (liftIO $ Remote.checkAvailable False r) $
|
||||||
repoHasProblem (Remote.uuid r) (syncRemote r)
|
repoHasProblem (Remote.uuid r) (syncRemote r)
|
||||||
mapM_ signal $ filter (`notElem` failedrs) rs'
|
mapM_ signal $ filter (`notElem` failedrs) rs'
|
||||||
|
recordExportCommit
|
||||||
where
|
where
|
||||||
gitremotes = filter (notspecialremote . Remote.repo) rs
|
gitremotes = filter (notspecialremote . Remote.repo) rs
|
||||||
(_xmppremotes, nonxmppremotes) = partition Remote.isXMPPRemote rs
|
(_xmppremotes, nonxmppremotes) = partition Remote.isXMPPRemote rs
|
||||||
|
@ -143,9 +145,11 @@ pushToRemotes' now remotes = do
|
||||||
then retry currbranch g u failed
|
then retry currbranch g u failed
|
||||||
else fallback branch g u failed
|
else fallback branch g u failed
|
||||||
|
|
||||||
updatemap succeeded failed = changeFailedPushMap $ \m ->
|
updatemap succeeded failed = do
|
||||||
M.union (makemap failed) $
|
v <- getAssistant failedPushMap
|
||||||
M.difference m (makemap succeeded)
|
changeFailedPushMap v $ \m ->
|
||||||
|
M.union (makemap failed) $
|
||||||
|
M.difference m (makemap succeeded)
|
||||||
makemap l = M.fromList $ zip l (repeat now)
|
makemap l = M.fromList $ zip l (repeat now)
|
||||||
|
|
||||||
retry currbranch g u rs = do
|
retry currbranch g u rs = do
|
||||||
|
|
|
@ -67,6 +67,7 @@ commitThread = namedThread "Committer" $ do
|
||||||
void $ alertWhile commitAlert $
|
void $ alertWhile commitAlert $
|
||||||
liftAnnex $ commitStaged msg
|
liftAnnex $ commitStaged msg
|
||||||
recordCommit
|
recordCommit
|
||||||
|
recordExportCommit
|
||||||
let numchanges = length readychanges
|
let numchanges = length readychanges
|
||||||
mapM_ checkChangeContent readychanges
|
mapM_ checkChangeContent readychanges
|
||||||
return numchanges
|
return numchanges
|
||||||
|
|
78
Assistant/Threads/Exporter.hs
Normal file
78
Assistant/Threads/Exporter.hs
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
{- git-annex assistant export updating thread
|
||||||
|
-
|
||||||
|
- Copyright 2017 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Assistant.Threads.Exporter where
|
||||||
|
|
||||||
|
import Assistant.Common
|
||||||
|
import Assistant.Commits
|
||||||
|
import Assistant.Pushes
|
||||||
|
import Assistant.DaemonStatus
|
||||||
|
import Annex.Concurrent
|
||||||
|
import Utility.ThreadScheduler
|
||||||
|
import qualified Annex
|
||||||
|
import qualified Remote
|
||||||
|
import qualified Types.Remote as Remote
|
||||||
|
import qualified Command.Sync
|
||||||
|
|
||||||
|
import Control.Concurrent.Async
|
||||||
|
import Data.Time.Clock
|
||||||
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
{- This thread retries exports that failed before. -}
|
||||||
|
exportRetryThread :: NamedThread
|
||||||
|
exportRetryThread = namedThread "ExportRetrier" $ runEvery (Seconds halfhour) <~> do
|
||||||
|
-- We already waited half an hour, now wait until there are failed
|
||||||
|
-- exports to retry.
|
||||||
|
toexport <- getFailedPushesBefore (fromIntegral halfhour)
|
||||||
|
=<< getAssistant failedExportMap
|
||||||
|
unless (null toexport) $ do
|
||||||
|
debug ["retrying", show (length toexport), "failed exports"]
|
||||||
|
void $ exportToRemotes toexport
|
||||||
|
where
|
||||||
|
halfhour = 1800
|
||||||
|
|
||||||
|
{- This thread updates exports soon after git commits are made. -}
|
||||||
|
exportThread :: NamedThread
|
||||||
|
exportThread = namedThread "Exporter" $ runEvery (Seconds 30) <~> do
|
||||||
|
-- We already waited two seconds as a simple rate limiter.
|
||||||
|
-- Next, wait until at least one commit has been made
|
||||||
|
void getExportCommits
|
||||||
|
-- Now see if now's a good time to push.
|
||||||
|
void $ exportToRemotes =<< exportTargets
|
||||||
|
|
||||||
|
{- We want to avoid exporting to remotes that are marked readonly.
|
||||||
|
-
|
||||||
|
- Also, avoid exporting to local remotes we can easily tell are not available,
|
||||||
|
- to avoid ugly messages when a removable drive is not attached.
|
||||||
|
-}
|
||||||
|
exportTargets :: Assistant [Remote]
|
||||||
|
exportTargets = liftIO . filterM (Remote.checkAvailable True)
|
||||||
|
=<< candidates <$> getDaemonStatus
|
||||||
|
where
|
||||||
|
candidates = filter (not . Remote.readonly) . exportRemotes
|
||||||
|
|
||||||
|
exportToRemotes :: [Remote] -> Assistant ()
|
||||||
|
exportToRemotes rs = do
|
||||||
|
-- This is a long-duration action which runs in the Annex monad,
|
||||||
|
-- so don't just liftAnnex to run it; fork the Annex state.
|
||||||
|
runner <- liftAnnex $ forkState $
|
||||||
|
forM rs $ \r -> do
|
||||||
|
Annex.changeState $ \st -> st { Annex.errcounter = 0 }
|
||||||
|
start <- liftIO getCurrentTime
|
||||||
|
void $ Command.Sync.seekExportContent rs
|
||||||
|
-- Look at command error counter to see if the export
|
||||||
|
-- didn't work.
|
||||||
|
failed <- (> 0) <$> Annex.getState Annex.errcounter
|
||||||
|
Annex.changeState $ \st -> st { Annex.errcounter = 0 }
|
||||||
|
return $ if failed
|
||||||
|
then Just (r, start)
|
||||||
|
else Nothing
|
||||||
|
failed <- catMaybes
|
||||||
|
<$> (liftAnnex =<< liftIO . wait =<< liftIO (async runner))
|
||||||
|
unless (null failed) $ do
|
||||||
|
v <- getAssistant failedExportMap
|
||||||
|
changeFailedPushMap v $ M.union $ M.fromList failed
|
|
@ -22,6 +22,7 @@ pushRetryThread = namedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
|
||||||
-- We already waited half an hour, now wait until there are failed
|
-- We already waited half an hour, now wait until there are failed
|
||||||
-- pushes to retry.
|
-- pushes to retry.
|
||||||
topush <- getFailedPushesBefore (fromIntegral halfhour)
|
topush <- getFailedPushesBefore (fromIntegral halfhour)
|
||||||
|
=<< getAssistant failedPushMap
|
||||||
unless (null topush) $ do
|
unless (null topush) $ do
|
||||||
debug ["retrying", show (length topush), "failed pushes"]
|
debug ["retrying", show (length topush), "failed pushes"]
|
||||||
void $ pushToRemotes topush
|
void $ pushToRemotes topush
|
||||||
|
|
|
@ -59,8 +59,9 @@ transferScannerThread urlrenderer = namedThread "TransferScanner" $ do
|
||||||
(s { transferScanRunning = b }, s)
|
(s { transferScanRunning = b }, s)
|
||||||
liftIO $ sendNotification $ transferNotifier ds
|
liftIO $ sendNotification $ transferNotifier ds
|
||||||
|
|
||||||
{- All git remotes are synced, and all available remotes
|
{- All git remotes are synced, all exports are updated,
|
||||||
- are scanned in full on startup, for multiple reasons, including:
|
- and all available remotes are scanned in full on startup,
|
||||||
|
- for multiple reasons, including:
|
||||||
-
|
-
|
||||||
- * This may be the first run, and there may be remotes
|
- * This may be the first run, and there may be remotes
|
||||||
- already in place, that need to be synced.
|
- already in place, that need to be synced.
|
||||||
|
|
|
@ -6,6 +6,7 @@ git-annex (6.20170819) UNRELEASED; urgency=medium
|
||||||
for use by git-annex export.
|
for use by git-annex export.
|
||||||
* Implemented export to directory, S3, and webdav special remotes.
|
* Implemented export to directory, S3, and webdav special remotes.
|
||||||
* External special remote protocol extended to support export.
|
* External special remote protocol extended to support export.
|
||||||
|
* sync, assistant: Update exports.
|
||||||
* Support building with feed-1.0, while still supporting older versions.
|
* Support building with feed-1.0, while still supporting older versions.
|
||||||
* init: Display an additional message when it detects a filesystem that
|
* init: Display an additional message when it detects a filesystem that
|
||||||
allows writing to files whose write bit is not set.
|
allows writing to files whose write bit is not set.
|
||||||
|
|
|
@ -21,6 +21,7 @@ module Command.Sync (
|
||||||
updateBranch,
|
updateBranch,
|
||||||
syncBranch,
|
syncBranch,
|
||||||
updateSyncBranch,
|
updateSyncBranch,
|
||||||
|
seekExportContent,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Command
|
import Command
|
||||||
|
@ -652,7 +653,10 @@ syncFile ebloom rs af k = do
|
||||||
|
|
||||||
{- When a remote has an export-tracking branch, change the export to
|
{- When a remote has an export-tracking branch, change the export to
|
||||||
- follow the current content of the branch. Otherwise, transfer any files
|
- follow the current content of the branch. Otherwise, transfer any files
|
||||||
- that were part of an export but are not in the remote yet. -}
|
- that were part of an export but are not in the remote yet.
|
||||||
|
-
|
||||||
|
- Returns True if any file transfers were made.
|
||||||
|
-}
|
||||||
seekExportContent :: [Remote] -> Annex Bool
|
seekExportContent :: [Remote] -> Annex Bool
|
||||||
seekExportContent rs = or <$> forM rs go
|
seekExportContent rs = or <$> forM rs go
|
||||||
where
|
where
|
||||||
|
|
|
@ -17,8 +17,6 @@ there need to be a new interface in supported remotes?
|
||||||
|
|
||||||
Work is in progress. Todo list:
|
Work is in progress. Todo list:
|
||||||
|
|
||||||
* Make assistant update tracking exports.
|
|
||||||
|
|
||||||
* Support configuring export in the assistant
|
* Support configuring export in the assistant
|
||||||
(when eg setting up a S3 special remote).
|
(when eg setting up a S3 special remote).
|
||||||
|
|
||||||
|
|
|
@ -580,6 +580,7 @@ Executable git-annex
|
||||||
Assistant.Threads.ConfigMonitor
|
Assistant.Threads.ConfigMonitor
|
||||||
Assistant.Threads.Cronner
|
Assistant.Threads.Cronner
|
||||||
Assistant.Threads.DaemonStatus
|
Assistant.Threads.DaemonStatus
|
||||||
|
Assistant.Threads.Exporter
|
||||||
Assistant.Threads.Glacier
|
Assistant.Threads.Glacier
|
||||||
Assistant.Threads.Merger
|
Assistant.Threads.Merger
|
||||||
Assistant.Threads.MountWatcher
|
Assistant.Threads.MountWatcher
|
||||||
|
|
Loading…
Reference in a new issue