From 83c66ccaf88a10e8f4b16fc2162cbed2656b95e0 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 5 Jul 2012 10:21:22 -0600 Subject: [PATCH] queue Uploads of newly added files to remotes Added knownRemotes to DaemonStatus. This list is not entirely trivial to calculate, and having it here should make it easier to add/remove remotes on the fly later on. It did require plumbing the daemonstatus through to some more threads. --- Assistant.hs | 9 ++++++--- Assistant/DaemonStatus.hs | 8 +++++++- Assistant/Threads/Committer.hs | 28 ++++++++++++++++------------ Assistant/Threads/Pusher.hs | 11 +++++++---- Assistant/TransferQueue.hs | 29 ++++++++++++++++++++++------- 5 files changed, 58 insertions(+), 27 deletions(-) diff --git a/Assistant.hs b/Assistant.hs index 40f53d55ee..548850e92d 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -21,7 +21,8 @@ - until this is complete. - Thread 5: committer - Waits for changes to occur, and runs the git queue to update its - - index, then commits. + - index, then commits. Also queues Transfer events to send added + - files to other remotes. - Thread 6: pusher - Waits for commits to be made, and pushes updated branches to remotes, - in parallel. (Forks a process for each git push.) @@ -73,6 +74,7 @@ import Assistant.DaemonStatus import Assistant.Changes import Assistant.Commits import Assistant.Pushes +import Assistant.TransferQueue import Assistant.Threads.Watcher import Assistant.Threads.Committer import Assistant.Threads.Pusher @@ -103,9 +105,10 @@ startDaemon assistant foreground changechan <- newChangeChan commitchan <- newCommitChan pushmap <- newFailedPushMap + transferqueue <- newTransferQueue mapM_ (void . forkIO) - [ commitThread st changechan commitchan - , pushThread st commitchan pushmap + [ commitThread st changechan commitchan transferqueue dstatus + , pushThread st dstatus commitchan pushmap , pushRetryThread st pushmap , mergeThread st , transferWatcherThread st dstatus diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 10161a96cb..a3e909904f 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -11,13 +11,14 @@ import Common.Annex import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.TempFile +import Logs.Transfer +import qualified Command.Sync import Control.Concurrent import System.Posix.Types import Data.Time.Clock.POSIX import Data.Time import System.Locale -import Logs.Transfer import qualified Data.Map as M data DaemonStatus = DaemonStatus @@ -31,6 +32,8 @@ data DaemonStatus = DaemonStatus , lastSanityCheck :: Maybe POSIXTime -- Currently running file content transfers , currentTransfers :: M.Map Transfer TransferInfo + -- Ordered list of remotes to talk to. + , knownRemotes :: [Remote] } deriving (Show) @@ -43,6 +46,7 @@ newDaemonStatus = DaemonStatus , sanityCheckRunning = False , lastSanityCheck = Nothing , currentTransfers = M.empty + , knownRemotes = [] } getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus @@ -59,10 +63,12 @@ startDaemonStatus = do status <- liftIO $ catchDefaultIO (readDaemonStatusFile file) newDaemonStatus transfers <- M.fromList <$> getTransfers + remotes <- Command.Sync.syncRemotes [] liftIO $ newMVar status { scanComplete = False , sanityCheckRunning = False , currentTransfers = transfers + , knownRemotes = remotes } {- This thread wakes up periodically and writes the daemon status to disk. -} diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs index 488056fa2b..ff5cc9eabc 100644 --- a/Assistant/Threads/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -12,6 +12,9 @@ import Assistant.Changes import Assistant.Commits import Assistant.ThreadedMonad import Assistant.Threads.Watcher +import Assistant.TransferQueue +import Assistant.DaemonStatus +import Logs.Transfer import qualified Annex import qualified Annex.Queue import qualified Git.Command @@ -29,8 +32,8 @@ import qualified Data.Set as S import Data.Either {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO () -commitThread st changechan commitchan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO () +commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. @@ -39,7 +42,7 @@ commitThread st changechan commitchan = runEvery (Seconds 1) $ do time <- getCurrentTime if shouldCommit time changes then do - readychanges <- handleAdds st changechan changes + readychanges <- handleAdds st changechan transferqueue dstatus changes if shouldCommit time readychanges then do void $ tryIO $ runThreadState st commitStaged @@ -97,8 +100,8 @@ shouldCommit now changes - Any pending adds that are not ready yet are put back into the ChangeChan, - where they will be retried later. -} -handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change] -handleAdds st changechan cs = returnWhen (null pendingadds) $ do +handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change] +handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do (postponed, toadd) <- partitionEithers <$> safeToAdd st pendingadds @@ -110,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do if (DirWatcher.eventsCoalesce || null added) then return $ added ++ otherchanges else do - r <- handleAdds st changechan + r <- handleAdds st changechan transferqueue dstatus =<< getChanges changechan return $ r ++ added ++ otherchanges where @@ -121,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do | otherwise = a add :: Change -> IO (Maybe Change) - add change@(PendingAddChange { keySource = ks }) = do - r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do - showStart "add" $ keyFilename ks - handle (finishedChange change) (keyFilename ks) - =<< Command.Add.ingest ks - return $ maybeMaybe r + add change@(PendingAddChange { keySource = ks }) = + liftM maybeMaybe $ catchMaybeIO $ + sanitycheck ks $ runThreadState st $ do + showStart "add" $ keyFilename ks + key <- Command.Add.ingest ks + handle (finishedChange change) (keyFilename ks) key add _ = return Nothing maybeMaybe (Just j@(Just _)) = j @@ -141,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha + queueTransfers transferqueue dstatus key (Just file) Upload showEndOk return $ Just change diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 04d3435287..6d6836120e 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -10,6 +10,7 @@ module Assistant.Threads.Pusher where import Common.Annex import Assistant.Commits import Assistant.Pushes +import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Threads.Merger import qualified Command.Sync @@ -32,9 +33,8 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do halfhour = 1800 {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO () -pushThread st commitchan pushmap = do - remotes <- runThreadState st $ Command.Sync.syncRemotes [] +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO () +pushThread st daemonstatus commitchan pushmap = do runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made @@ -42,7 +42,10 @@ pushThread st commitchan pushmap = do -- Now see if now's a good time to push. now <- getCurrentTime if shouldPush now commits - then pushToRemotes now st pushmap remotes + then do + remotes <- runThreadState st $ + knownRemotes <$> getDaemonStatus daemonstatus + pushToRemotes now st pushmap remotes else refillCommits commitchan commits {- Decide if now is a good time to push to remotes. diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index 979cbb80f5..fc25b057d3 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -8,9 +8,10 @@ module Assistant.TransferQueue where import Common.Annex -import Utility.TSet +import Assistant.DaemonStatus import Logs.Transfer import Types.Remote +import qualified Remote import Control.Concurrent.STM @@ -28,15 +29,29 @@ stubInfo f = TransferInfo , associatedFile = f } +{- Adds pending transfers to the end of the queue for some of the known + - remotes. (TBD: a smaller set of remotes that are sufficient to transfer to, + - rather than transferring to all.) -} +queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () +queueTransfers q daemonstatus k f direction = + mapM_ (liftIO . queueTransfer q f . gentransfer) + =<< knownRemotes <$> getDaemonStatus daemonstatus + where + gentransfer r = Transfer + { transferDirection = direction + , transferKey = k + , transferRemote = Remote.uuid r + } + {- Adds a pending transfer to the end of the queue. -} -queueTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO () -queueTransfer q transfer f = void $ atomically $ - writeTChan q (transfer, stubInfo f) +queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () +queueTransfer q f t = void $ atomically $ + writeTChan q (t, stubInfo f) {- Adds a pending transfer to the start of the queue, to be processed next. -} -queueNextTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO () -queueNextTransfer q transfer f = void $ atomically $ - unGetTChan q (transfer, stubInfo f) +queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () +queueNextTransfer q f t = void $ atomically $ + unGetTChan q (t, stubInfo f) {- Blocks until a pending transfer is available in the queue. -} getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)