
239 lines
7.5 KiB
Raw Normal View History

2012-06-19 02:40:21 -04:00
{- git-annex assistant commit thread
2012-06-13 12:36:33 -04:00
- Copyright 2012 Joey Hess <>
2012-06-23 01:20:40 -04:00
- Licensed under the GNU GPL version 3 or higher.
2012-06-13 12:36:33 -04:00
2012-06-25 16:10:10 -04:00
module Assistant.Threads.Committer where
2012-06-13 12:36:33 -04:00
import Assistant.Common
2012-06-19 02:40:21 -04:00
import Assistant.Changes
2012-06-22 13:39:44 -04:00
import Assistant.Commits
import Assistant.Alert
2012-06-13 12:36:33 -04:00
import Assistant.ThreadedMonad
2012-06-25 16:10:10 -04:00
import Assistant.Threads.Watcher
import Assistant.TransferQueue
import Assistant.DaemonStatus
import Logs.Transfer
2012-06-19 00:23:14 -04:00
import qualified Annex
2012-06-13 12:36:33 -04:00
import qualified Annex.Queue
import qualified Git.Command
2012-06-19 02:40:21 -04:00
import qualified Git.HashObject
import Git.Types
import qualified Command.Add
import Utility.ThreadScheduler
import qualified Utility.Lsof as Lsof
2012-06-19 02:40:21 -04:00
import qualified Utility.DirWatcher as DirWatcher
import Types.KeySource
2012-06-13 12:36:33 -04:00
import Data.Time.Clock
import Data.Tuple.Utils
import qualified Data.Set as S
import Data.Either
2012-06-13 12:36:33 -04:00
thisThread :: ThreadName
thisThread = "Committer"
2012-06-13 12:36:33 -04:00
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
commitThread st changechan commitchan transferqueue dstatus = thread $ runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
changes <- getChanges changechan
2012-06-13 12:36:33 -04:00
-- Now see if now's a good time to commit.
time <- getCurrentTime
if shouldCommit time changes
then do
readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
debug thisThread
[ "committing"
, show (length readychanges)
, "changes"
2012-08-02 14:02:35 -04:00
void $ alertWhile dstatus commitAlert $
tryIO (runThreadState st commitStaged)
>> return True
2012-06-22 13:39:44 -04:00
recordCommit commitchan (Commit time)
else refill readychanges
else refill changes
thread = NamedThread thisThread
refill [] = noop
refill cs = do
debug thisThread
[ "delaying commit of"
, show (length cs)
, "changes"
refillChanges changechan cs
2012-06-13 12:36:33 -04:00
commitStaged :: Annex ()
commitStaged = do
inRepo $ "commit"
[ Param "--allow-empty-message"
, Param "-m", Param ""
-- Empty commits may be made if tree changes cancel
-- each other out, etc
, Param "--allow-empty"
-- Avoid running the usual git-annex pre-commit hook;
-- watch does the same symlink fixing, and we don't want
-- to deal with unlocked files in these commits.
, Param "--quiet"
{- Decide if now is a good time to make a commit.
- Note that the list of change times has an undefined order.
- Current strategy: If there have been 10 changes within the past second,
2012-06-13 12:36:33 -04:00
- a batch activity is taking place, so wait for later.
shouldCommit :: UTCTime -> [Change] -> Bool
shouldCommit now changes
| len == 0 = False
| len > 10000 = True -- avoid bloating queue too much
| length (filter thisSecond changes) < 10 = True
| otherwise = False -- batch activity
len = length changes
thisSecond c = now `diffUTCTime` changeTime c <= 1
{- If there are PendingAddChanges, the files have not yet actually been
- added to the annex (probably), and that has to be done now, before
- committing.
- Deferring the adds to this point causes batches to be bundled together,
- which allows faster checking with lsof that the files are not still open
- for write by some other process.
- When a file is added, Inotify will notice the new symlink. So this waits
- for additional Changes to arrive, so that the symlink has hopefully been
- staged before returning, and will be committed immediately.
- OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
- created and staged.
- Returns a list of all changes that are ready to be committed.
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
unless (null postponed) $
refillChanges changechan postponed
returnWhen (null toadd) $ do
added <- catMaybes <$> forM toadd add
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
r <- handleAdds st changechan transferqueue dstatus
2012-06-19 02:40:21 -04:00
=<< getChanges changechan
return $ r ++ added ++ otherchanges
(pendingadds, otherchanges) = partition isPendingAddChange cs
returnWhen c a
| c = return otherchanges
| otherwise = a
add :: Change -> IO (Maybe Change)
add change@(PendingAddChange { keySource = ks }) =
alertWhile' dstatus (addFileAlert $ keyFilename ks) $
2012-08-02 13:57:34 -04:00
liftM ret $ catchMaybeIO $
sanitycheck ks $ runThreadState st $ do
showStart "add" $ keyFilename ks
key <- Command.Add.ingest ks
handle (finishedChange change) (keyFilename ks) key
2012-08-02 13:57:34 -04:00
{- Add errors tend to be transient and will
- be automatically dealt with, so don't
- pass to the alert code. -}
ret (Just j@(Just _)) = (True, j)
ret _ = (True, Nothing)
add _ = return Nothing
handle _ _ Nothing = do
return Nothing
handle change file (Just key) = do
2012-06-19 02:40:21 -04:00
link <- file key True
when DirWatcher.eventsCoalesce $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
queueTransfers Next transferqueue dstatus key (Just file) Upload
return $ Just change
{- Check that the keysource's keyFilename still exists,
- and is still a hard link to its contentLocation,
- before ingesting it. -}
sanitycheck keysource a = do
fs <- getSymbolicLinkStatus $ keyFilename keysource
ks <- getSymbolicLinkStatus $ contentLocation keysource
if deviceID ks == deviceID fs && fileID ks == fileID fs
then a
else return Nothing
{- PendingAddChanges can Either be Right to be added now,
- or are unsafe, and must be Left for later.
- Check by running lsof on the temp directory, which
- the KeySources are locked down in.
safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change]
safeToAdd st changes = runThreadState st $
ifM (Annex.getState Annex.force)
( allRight changes -- force bypasses lsof check
2012-06-19 00:23:14 -04:00
, do
tmpdir <- fromRepo gitAnnexTmpDir
openfiles <- S.fromList . map fst3 . filter openwrite <$>
2012-06-19 00:23:14 -04:00
liftIO (Lsof.queryDir tmpdir)
2012-07-29 14:10:17 -04:00
2012-08-06 17:09:23 -04:00
-- TODO this is here for debugging a problem on
-- OSX, and is pretty expensive, so remove later
2012-07-29 14:10:17 -04:00
liftIO $ debug thisThread
[ "checking changes:"
, show changes
, "vs open files:"
, show openfiles
let checked = map (check openfiles) changes
{- If new events are received when files are closed,
- there's no need to retry any changes that cannot
- be done now. -}
if DirWatcher.closingTracked
then do
mapM_ canceladd $ lefts checked
allRight $ rights checked
else return checked
2012-06-19 00:23:14 -04:00
check openfiles change@(PendingAddChange { keySource = ks })
| S.member (contentLocation ks) openfiles = Left change
check _ change = Right change
canceladd (PendingAddChange { keySource = ks }) = do
warning $ keyFilename ks
++ " still has writers, not adding"
-- remove the hard link
void $ liftIO $ tryIO $
removeFile $ contentLocation ks
canceladd _ = noop
openwrite (_file, mode, _pid) =
mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
allRight = return . map Right