queue Uploads of newly added files to remotes

Added knownRemotes to DaemonStatus. This list is not entirely trivial to
calculate, and having it here should make it easier to add/remove remotes
on the fly later on. It did require plumbing the daemonstatus through to
some more threads.
This commit is contained in:
Joey Hess 2012-07-05 10:21:22 -06:00
parent b0894f00c0
commit 83c66ccaf8
5 changed files with 58 additions and 27 deletions

View file

@ -21,7 +21,8 @@
- until this is complete.
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- index, then commits.
- index, then commits. Also queues Transfer events to send added
- files to other remotes.
- Thread 6: pusher
- Waits for commits to be made, and pushes updated branches to remotes,
- in parallel. (Forks a process for each git push.)
@ -73,6 +74,7 @@ import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Commits
import Assistant.Pushes
import Assistant.TransferQueue
import Assistant.Threads.Watcher
import Assistant.Threads.Committer
import Assistant.Threads.Pusher
@ -103,9 +105,10 @@ startDaemon assistant foreground
changechan <- newChangeChan
commitchan <- newCommitChan
pushmap <- newFailedPushMap
transferqueue <- newTransferQueue
mapM_ (void . forkIO)
[ commitThread st changechan commitchan
, pushThread st commitchan pushmap
[ commitThread st changechan commitchan transferqueue dstatus
, pushThread st dstatus commitchan pushmap
, pushRetryThread st pushmap
, mergeThread st
, transferWatcherThread st dstatus

View file

@ -11,13 +11,14 @@ import Common.Annex
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.TempFile
import Logs.Transfer
import qualified Command.Sync
import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
import Logs.Transfer
import qualified Data.Map as M
data DaemonStatus = DaemonStatus
@ -31,6 +32,8 @@ data DaemonStatus = DaemonStatus
, lastSanityCheck :: Maybe POSIXTime
-- Currently running file content transfers
, currentTransfers :: M.Map Transfer TransferInfo
-- Ordered list of remotes to talk to.
, knownRemotes :: [Remote]
}
deriving (Show)
@ -43,6 +46,7 @@ newDaemonStatus = DaemonStatus
, sanityCheckRunning = False
, lastSanityCheck = Nothing
, currentTransfers = M.empty
, knownRemotes = []
}
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
@ -59,10 +63,12 @@ startDaemonStatus = do
status <- liftIO $
catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
transfers <- M.fromList <$> getTransfers
remotes <- Command.Sync.syncRemotes []
liftIO $ newMVar status
{ scanComplete = False
, sanityCheckRunning = False
, currentTransfers = transfers
, knownRemotes = remotes
}
{- This thread wakes up periodically and writes the daemon status to disk. -}

View file

@ -12,6 +12,9 @@ import Assistant.Changes
import Assistant.Commits
import Assistant.ThreadedMonad
import Assistant.Threads.Watcher
import Assistant.TransferQueue
import Assistant.DaemonStatus
import Logs.Transfer
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@ -29,8 +32,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
commitThread st changechan commitchan = runEvery (Seconds 1) $ do
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@ -39,7 +42,7 @@ commitThread st changechan commitchan = runEvery (Seconds 1) $ do
time <- getCurrentTime
if shouldCommit time changes
then do
readychanges <- handleAdds st changechan changes
readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
@ -97,8 +100,8 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
handleAdds st changechan cs = returnWhen (null pendingadds) $ do
handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
@ -110,7 +113,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
r <- handleAdds st changechan
r <- handleAdds st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
@ -121,12 +124,12 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
| otherwise = a
add :: Change -> IO (Maybe Change)
add change@(PendingAddChange { keySource = ks }) = do
r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
showStart "add" $ keyFilename ks
handle (finishedChange change) (keyFilename ks)
=<< Command.Add.ingest ks
return $ maybeMaybe r
add change@(PendingAddChange { keySource = ks }) =
liftM maybeMaybe $ catchMaybeIO $
sanitycheck ks $ runThreadState st $ do
showStart "add" $ keyFilename ks
key <- Command.Add.ingest ks
handle (finishedChange change) (keyFilename ks) key
add _ = return Nothing
maybeMaybe (Just j@(Just _)) = j
@ -141,6 +144,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
queueTransfers transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change

View file

@ -10,6 +10,7 @@ module Assistant.Threads.Pusher where
import Common.Annex
import Assistant.Commits
import Assistant.Pushes
import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Threads.Merger
import qualified Command.Sync
@ -32,9 +33,8 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
halfhour = 1800
{- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO ()
pushThread st commitchan pushmap = do
remotes <- runThreadState st $ Command.Sync.syncRemotes []
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
pushThread st daemonstatus commitchan pushmap = do
runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
@ -42,7 +42,10 @@ pushThread st commitchan pushmap = do
-- Now see if now's a good time to push.
now <- getCurrentTime
if shouldPush now commits
then pushToRemotes now st pushmap remotes
then do
remotes <- runThreadState st $
knownRemotes <$> getDaemonStatus daemonstatus
pushToRemotes now st pushmap remotes
else refillCommits commitchan commits
{- Decide if now is a good time to push to remotes.

View file

@ -8,9 +8,10 @@
module Assistant.TransferQueue where
import Common.Annex
import Utility.TSet
import Assistant.DaemonStatus
import Logs.Transfer
import Types.Remote
import qualified Remote
import Control.Concurrent.STM
@ -28,15 +29,29 @@ stubInfo f = TransferInfo
, associatedFile = f
}
{- Adds pending transfers to the end of the queue for some of the known
- remotes. (TBD: a smaller set of remotes that are sufficient to transfer to,
- rather than transferring to all.) -}
queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
queueTransfers q daemonstatus k f direction =
mapM_ (liftIO . queueTransfer q f . gentransfer)
=<< knownRemotes <$> getDaemonStatus daemonstatus
where
gentransfer r = Transfer
{ transferDirection = direction
, transferKey = k
, transferRemote = Remote.uuid r
}
{- Adds a pending transfer to the end of the queue. -}
queueTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO ()
queueTransfer q transfer f = void $ atomically $
writeTChan q (transfer, stubInfo f)
queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueTransfer q f t = void $ atomically $
writeTChan q (t, stubInfo f)
{- Adds a pending transfer to the start of the queue, to be processed next. -}
queueNextTransfer :: TransferQueue -> Transfer -> AssociatedFile -> IO ()
queueNextTransfer q transfer f = void $ atomically $
unGetTChan q (transfer, stubInfo f)
queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueNextTransfer q f t = void $ atomically $
unGetTChan q (t, stubInfo f)
{- Blocks until a pending transfer is available in the queue. -}
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)