git-annex/Assistant/Threads/Committer.hs

480 lines
16 KiB
Haskell
Raw Normal View History

2012-06-19 06:40:21 +00:00
{- git-annex assistant commit thread
2012-06-13 16:36:33 +00:00
-
- Copyright 2012 Joey Hess <id@joeyh.name>
2012-06-23 05:20:40 +00:00
-
- Licensed under the GNU GPL version 3 or higher.
2012-06-13 16:36:33 +00:00
-}
{-# LANGUAGE CPP #-}
2012-06-25 20:10:10 +00:00
module Assistant.Threads.Committer where
2012-06-13 16:36:33 +00:00
import Assistant.Common
2012-06-19 06:40:21 +00:00
import Assistant.Changes
2012-10-29 23:30:23 +00:00
import Assistant.Types.Changes
2012-06-22 17:39:44 +00:00
import Assistant.Commits
import Assistant.Alert
2012-10-30 18:34:48 +00:00
import Assistant.DaemonStatus
import Assistant.TransferQueue
import Assistant.Drop
import Logs.Transfer
import Logs.Location
2012-06-13 16:36:33 +00:00
import qualified Annex.Queue
import qualified Git.LsFiles
import qualified Command.Add
import Utility.ThreadScheduler
import qualified Utility.Lsof as Lsof
2012-06-19 06:40:21 +00:00
import qualified Utility.DirWatcher as DirWatcher
import Types.KeySource
import Config
import Annex.Content
fully support core.symlinks=false in all relevant symlink handling code Refactored annex link code into nice clean new library. Audited and dealt with calls to createSymbolicLink. Remaining calls are all safe, because: Annex/Link.hs: ( liftIO $ createSymbolicLink linktarget file only when core.symlinks=true Assistant/WebApp/Configurators/Local.hs: createSymbolicLink link link test if symlinks can be made Command/Fix.hs: liftIO $ createSymbolicLink link file command only works in indirect mode Command/FromKey.hs: liftIO $ createSymbolicLink link file command only works in indirect mode Command/Indirect.hs: liftIO $ createSymbolicLink l f refuses to run if core.symlinks=false Init.hs: createSymbolicLink f f2 test if symlinks can be made Remote/Directory.hs: go [file] = catchBoolIO $ createSymbolicLink file f >> return True fast key linking; catches failure to make symlink and falls back to copy Remote/Git.hs: liftIO $ catchBoolIO $ createSymbolicLink loc file >> return True ditto Upgrade/V1.hs: liftIO $ createSymbolicLink link f v1 repos could not be on a filesystem w/o symlinks Audited and dealt with calls to readSymbolicLink. Remaining calls are all safe, because: Annex/Link.hs: ( liftIO $ catchMaybeIO $ readSymbolicLink file only when core.symlinks=true Assistant/Threads/Watcher.hs: ifM ((==) (Just link) <$> liftIO (catchMaybeIO $ readSymbolicLink file)) code that fixes real symlinks when inotify sees them It's ok to not fix psdueo-symlinks. Assistant/Threads/Watcher.hs: mlink <- liftIO (catchMaybeIO $ readSymbolicLink file) ditto Command/Fix.hs: stopUnless ((/=) (Just link) <$> liftIO (catchMaybeIO $ readSymbolicLink file)) $ do command only works in indirect mode Upgrade/V1.hs: getsymlink = takeFileName <$> readSymbolicLink file v1 repos could not be on a filesystem w/o symlinks Audited and dealt with calls to isSymbolicLink. (Typically used with getSymbolicLinkStatus, but that is just used because getFileStatus is not as robust; it also works on pseudolinks.) Remaining calls are all safe, because: Assistant/Threads/SanityChecker.hs: | isSymbolicLink s -> addsymlink file ms only handles staging of symlinks that were somehow not staged (might need to be updated to support pseudolinks, but this is only a belt-and-suspenders check anyway, and I've never seen the code run) Command/Add.hs: if isSymbolicLink s || not (isRegularFile s) avoids adding symlinks to the annex, so not relevant Command/Indirect.hs: | isSymbolicLink s -> void $ flip whenAnnexed f $ only allowed on systems that support symlinks Command/Indirect.hs: whenM (liftIO $ not . isSymbolicLink <$> getSymbolicLinkStatus f) $ do ditto Seek.hs:notSymlink f = liftIO $ not . isSymbolicLink <$> getSymbolicLinkStatus f used to find unlocked files, only relevant in indirect mode Utility/FSEvents.hs: | Files.isSymbolicLink s = runhook addSymlinkHook $ Just s Utility/FSEvents.hs: | Files.isSymbolicLink s -> Utility/INotify.hs: | Files.isSymbolicLink s -> Utility/INotify.hs: checkfiletype Files.isSymbolicLink addSymlinkHook f Utility/Kqueue.hs: | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change all above are lower-level, not relevant Audited and dealt with calls to isSymLink. Remaining calls are all safe, because: Annex/Direct.hs: | isSymLink (getmode item) = This is looking at git diff-tree objects, not files on disk Command/Unused.hs: | isSymLink (LsTree.mode l) = do This is looking at git ls-tree, not file on disk Utility/FileMode.hs:isSymLink :: FileMode -> Bool Utility/FileMode.hs:isSymLink = checkMode symbolicLinkMode low-level Done!!
2013-02-17 19:05:55 +00:00
import Annex.Link
import Annex.CatFile
import qualified Annex
import Utility.InodeCache
import Annex.Content.Direct
import qualified Command.Sync
import qualified Git.Branch
2012-06-13 16:36:33 +00:00
import Data.Time.Clock
import Data.Tuple.Utils
import qualified Data.Set as S
import qualified Data.Map as M
import Data.Either
import Control.Concurrent
2012-06-13 16:36:33 +00:00
{- This thread makes git commits at appropriate times. -}
2012-10-29 15:40:22 +00:00
commitThread :: NamedThread
commitThread = namedThread "Committer" $ do
havelsof <- liftIO $ inPath "lsof"
delayadd <- liftAnnex $
maybe delayaddDefault (return . Just . Seconds)
=<< annexDelayAdd <$> Annex.getGitConfig
msg <- liftAnnex Command.Sync.commitMsg
waitChangeTime $ \(changes, time) -> do
readychanges <- handleAdds havelsof delayadd changes
if shouldCommit False time (length readychanges) readychanges
then do
debug
[ "committing"
, show (length readychanges)
, "changes"
]
void $ alertWhile commitAlert $
liftAnnex $ commitStaged msg
recordCommit
let numchanges = length readychanges
mapM_ checkChangeContent readychanges
return numchanges
else do
refill readychanges
return 0
refill :: [Change] -> Assistant ()
refill [] = noop
refill cs = do
debug ["delaying commit of", show (length cs), "changes"]
refillChanges cs
{- Wait for one or more changes to arrive to be committed, and then
- runs an action to commit them. If more changes arrive while this is
- going on, they're handled intelligently, batching up changes into
- large commits where possible, doing rename detection, and
- commiting immediately otherwise. -}
waitChangeTime :: (([Change], UTCTime) -> Assistant Int) -> Assistant ()
waitChangeTime a = waitchanges 0
where
waitchanges lastcommitsize = do
-- Wait one one second as a simple rate limiter.
liftIO $ threadDelaySeconds (Seconds 1)
-- Now, wait until at least one change is available for
-- processing.
cs <- getChanges
handlechanges cs lastcommitsize
handlechanges changes lastcommitsize = do
let len = length changes
-- See if now's a good time to commit.
now <- liftIO getCurrentTime
scanning <- not . scanComplete <$> getDaemonStatus
case (lastcommitsize >= maxCommitSize, shouldCommit scanning now len changes, possiblyrename changes) of
(True, True, _)
| len > maxCommitSize ->
2013-12-16 20:24:57 +00:00
a (changes, now) >>= waitchanges
| otherwise -> aftermaxcommit changes
(_, True, False) ->
2013-12-16 20:24:57 +00:00
a (changes, now) >>= waitchanges
(_, True, True) -> do
morechanges <- getrelatedchanges changes
2013-12-16 20:24:57 +00:00
a (changes ++ morechanges, now) >>= waitchanges
_ -> do
refill changes
waitchanges lastcommitsize
{- Did we perhaps only get one of the AddChange and RmChange pair
- that make up a file rename? Or some of the pairs that make up
- a directory rename?
-}
2013-10-03 02:59:07 +00:00
possiblyrename = all renamepart
renamepart (PendingAddChange _ _) = True
renamepart c = isRmChange c
{- Gets changes related to the passed changes, without blocking
- very long.
-
- If there are multiple RmChanges, this is probably a directory
- rename, in which case it may be necessary to wait longer to get
- all the Changes involved.
-}
getrelatedchanges oldchanges
| length (filter isRmChange oldchanges) > 1 =
concat <$> getbatchchanges []
| otherwise = do
liftIO humanImperceptibleDelay
getAnyChanges
getbatchchanges cs = do
liftIO $ threadDelay $ fromIntegral $ oneSecond `div` 10
cs' <- getAnyChanges
if null cs'
then return cs
else getbatchchanges (cs':cs)
{- The last commit was maximum size, so it's very likely there
- are more changes and we'd like to ensure we make another commit
- of maximum size if possible.
-
- But, it can take a while for the Watcher to wake back up
- after a commit. It can get blocked by another thread
- that is using the Annex state, such as a git-annex branch
- commit. Especially after such a large commit, this can
- take several seconds. When this happens, it defeats the
- normal commit batching, which sees some old changes the
- Watcher found while the commit was being prepared, and sees
- no recent ones, and wants to commit immediately.
-
- All that we need to do, then, is wait for the Watcher to
- wake up, and queue up one more change.
-
- However, it's also possible that we're at the end of changes for
- now. So to avoid waiting a really long time before committing
- those changes we have, poll for up to 30 seconds, and then
- commit them.
-
- Also, try to run something in Annex, to ensure we block
- longer if the Annex state is indeed blocked.
-}
aftermaxcommit oldchanges = loop (30 :: Int)
where
loop 0 = continue oldchanges
loop n = do
liftAnnex noop -- ensure Annex state is free
liftIO $ threadDelaySeconds (Seconds 1)
changes <- getAnyChanges
if null changes
then loop (n - 1)
else continue (oldchanges ++ changes)
continue cs
| null cs = waitchanges 0
| otherwise = handlechanges cs 0
isRmChange :: Change -> Bool
isRmChange (Change { changeInfo = i }) | i == RmChange = True
isRmChange _ = False
{- An amount of time that is hopefully imperceptably short for humans,
- while long enough for a computer to get some work done.
- Note that 0.001 is a little too short for rename change batching to
- work. -}
humanImperceptibleInterval :: NominalDiffTime
humanImperceptibleInterval = 0.01
humanImperceptibleDelay :: IO ()
humanImperceptibleDelay = threadDelay $
truncate $ humanImperceptibleInterval * fromIntegral oneSecond
maxCommitSize :: Int
maxCommitSize = 5000
{- Decide if now is a good time to make a commit.
- Note that the list of changes has an undefined order.
-
- Current strategy: If there have been 10 changes within the past second,
- a batch activity is taking place, so wait for later.
-}
shouldCommit :: Bool -> UTCTime -> Int -> [Change] -> Bool
shouldCommit scanning now len changes
| scanning = len >= maxCommitSize
| len == 0 = False
| len >= maxCommitSize = True
| length recentchanges < 10 = True
| otherwise = False -- batch activity
2012-10-29 15:40:22 +00:00
where
thissecond c = timeDelta c <= 1
recentchanges = filter thissecond changes
timeDelta c = now `diffUTCTime` changeTime c
2012-06-13 16:36:33 +00:00
commitStaged :: String -> Annex Bool
commitStaged msg = do
{- This could fail if there's another commit being made by
- something else. -}
v <- tryNonAsync Annex.Queue.flush
case v of
Left _ -> return False
Right _ -> do
ok <- Command.Sync.commitStaged Git.Branch.AutomaticCommit msg
when ok $
Command.Sync.updateSyncBranch =<< inRepo Git.Branch.current
return ok
2012-06-13 16:36:33 +00:00
{- OSX needs a short delay after a file is added before locking it down,
- when using a non-direct mode repository, as pasting a file seems to
- try to set file permissions or otherwise access the file after closing
- it. -}
delayaddDefault :: Annex (Maybe Seconds)
#ifdef darwin_HOST_OS
delayaddDefault = ifM isDirect
( return Nothing
, return $ Just $ Seconds 1
)
#else
delayaddDefault = return Nothing
#endif
{- If there are PendingAddChanges, or InProcessAddChanges, the files
- have not yet actually been added to the annex, and that has to be done
- now, before committing.
-
- Deferring the adds to this point causes batches to be bundled together,
- which allows faster checking with lsof that the files are not still open
- for write by some other process, and faster checking with git-ls-files
- that the files are not already checked into git.
-
- When a file is added, Inotify will notice the new symlink. So this waits
- for additional Changes to arrive, so that the symlink has hopefully been
- staged before returning, and will be committed immediately.
-
- OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
- created and staged.
-
- Returns a list of all changes that are ready to be committed.
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
handleAdds :: Bool -> Maybe Seconds -> [Change] -> Assistant [Change]
handleAdds havelsof delayadd cs = returnWhen (null incomplete) $ do
let (pending, inprocess) = partition isPendingAddChange incomplete
direct <- liftAnnex isDirect
(pending', cleanup) <- if direct
then return (pending, noop)
else findnew pending
(postponed, toadd) <- partitionEithers <$> safeToAdd havelsof delayadd pending' inprocess
cleanup
unless (null postponed) $
2012-10-29 23:30:23 +00:00
refillChanges postponed
returnWhen (null toadd) $ do
added <- addaction toadd $
catMaybes <$> if direct
then adddirect toadd
else forM toadd add
if DirWatcher.eventsCoalesce || null added || direct
then return $ added ++ otherchanges
else do
r <- handleAdds havelsof delayadd =<< getChanges
return $ r ++ added ++ otherchanges
2012-10-29 15:40:22 +00:00
where
(incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
findnew [] = return ([], noop)
2012-10-29 15:40:22 +00:00
findnew pending@(exemplar:_) = do
(newfiles, cleanup) <- liftAnnex $
2012-10-29 15:40:22 +00:00
inRepo (Git.LsFiles.notInRepo False $ map changeFile pending)
-- note: timestamp info is lost here
let ts = changeTime exemplar
2013-10-03 02:59:07 +00:00
return (map (PendingAddChange ts) newfiles, void $ liftIO cleanup)
2012-10-29 15:40:22 +00:00
returnWhen c a
| c = return otherchanges
| otherwise = a
2012-10-29 15:40:22 +00:00
add :: Change -> Assistant (Maybe Change)
add change@(InProcessAddChange { keySource = ks }) =
2013-10-03 02:59:07 +00:00
catchDefaultIO Nothing <~> doadd
where
doadd = sanitycheck ks $ do
2013-10-03 02:59:07 +00:00
(mkey, mcache) <- liftAnnex $ do
showStart "add" $ keyFilename ks
Command.Add.ingest $ Just ks
maybe (failedingest change) (done change mcache $ keyFilename ks) mkey
2012-10-29 20:49:47 +00:00
add _ = return Nothing
{- In direct mode, avoid overhead of re-injesting a renamed
- file, by examining the other Changes to see if a removed
- file has the same InodeCache as the new file. If so,
- we can just update bookkeeping, and stage the file in git.
-}
adddirect :: [Change] -> Assistant [Maybe Change]
adddirect toadd = do
ct <- liftAnnex compareInodeCachesWith
m <- liftAnnex $ removedKeysMap ct cs
delta <- liftAnnex getTSDelta
if M.null m
then forM toadd add
else forM toadd $ \c -> do
mcache <- liftIO $ genInodeCache (changeFile c) delta
case mcache of
Nothing -> add c
Just cache ->
case M.lookup (inodeCacheToKey ct cache) m of
Nothing -> add c
Just k -> fastadd c k
fastadd :: Change -> Key -> Assistant (Maybe Change)
fastadd change key = do
let source = keySource change
liftAnnex $ Command.Add.finishIngestDirect key source
done change Nothing (keyFilename source) key
removedKeysMap :: InodeComparisonType -> [Change] -> Annex (M.Map InodeCacheKey Key)
removedKeysMap ct l = do
mks <- forM (filter isRmChange l) $ \c ->
catKeyFile $ changeFile c
M.fromList . concat <$> mapM mkpairs (catMaybes mks)
where
mkpairs k = map (\c -> (inodeCacheToKey ct c, k)) <$>
recordedInodeCache k
failedingest change = do
refill [retryChange change]
2012-10-29 15:40:22 +00:00
liftAnnex showEndFail
return Nothing
done change mcache file key = liftAnnex $ do
logStatus key InfoPresent
link <- ifM isDirect
( calcRepo $ gitAnnexLink file key
, Command.Add.link file key mcache
)
2013-10-03 02:59:07 +00:00
whenM (pure DirWatcher.eventsCoalesce <||> isDirect) $
stageSymlink file =<< hashSymlink link
showEndOk
return $ Just $ finishedChange change key
2012-10-29 15:40:22 +00:00
{- Check that the keysource's keyFilename still exists,
- and is still a hard link to its contentLocation,
- before ingesting it. -}
sanitycheck keysource a = do
fs <- liftIO $ getSymbolicLinkStatus $ keyFilename keysource
ks <- liftIO $ getSymbolicLinkStatus $ contentLocation keysource
if deviceID ks == deviceID fs && fileID ks == fileID fs
then a
else do
-- remove the hard link
when (contentLocation keysource /= keyFilename keysource) $
void $ liftIO $ tryIO $ removeFile $ contentLocation keysource
return Nothing
{- Shown an alert while performing an action to add a file or
2013-07-10 19:37:40 +00:00
- files. When only a few files are added, their names are shown
- in the alert. When it's a batch add, the number of files added
- is shown.
-
- Add errors tend to be transient and will be
- automatically dealt with, so the alert is always told
- the add succeeded.
-}
addaction [] a = a
2013-07-10 19:37:40 +00:00
addaction toadd a = alertWhile' (addFileAlert $ map changeFile toadd) $
(,)
<$> pure True
<*> a
{- Files can Either be Right to be added now,
- or are unsafe, and must be Left for later.
-
- Check by running lsof on the repository.
-}
safeToAdd :: Bool -> Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change]
safeToAdd _ _ [] [] = return []
safeToAdd havelsof delayadd pending inprocess = do
2012-10-29 15:40:22 +00:00
maybe noop (liftIO . threadDelaySeconds) delayadd
liftAnnex $ do
2013-10-03 02:59:07 +00:00
keysources <- forM pending $ Command.Add.lockDown . changeFile
let inprocess' = inprocess ++ mapMaybe mkinprocess (zip pending keysources)
openfiles <- if havelsof
then S.fromList . map fst3 . filter openwrite <$>
findopenfiles (map keySource inprocess')
else pure S.empty
let checked = map (check openfiles) inprocess'
{- If new events are received when files are closed,
- there's no need to retry any changes that cannot
- be done now. -}
if DirWatcher.closingTracked
then do
mapM_ canceladd $ lefts checked
allRight $ rights checked
else return checked
2012-10-29 15:40:22 +00:00
where
check openfiles change@(InProcessAddChange { keySource = ks })
| S.member (contentLocation ks) openfiles = Left change
check _ change = Right change
2013-10-03 02:59:07 +00:00
mkinprocess (c, Just ks) = Just InProcessAddChange
2012-10-29 15:40:22 +00:00
{ changeTime = changeTime c
, keySource = ks
}
mkinprocess (_, Nothing) = Nothing
2012-10-29 15:40:22 +00:00
canceladd (InProcessAddChange { keySource = ks }) = do
warning $ keyFilename ks
++ " still has writers, not adding"
-- remove the hard link
when (contentLocation ks /= keyFilename ks) $
void $ liftIO $ tryIO $ removeFile $ contentLocation ks
2012-10-29 15:40:22 +00:00
canceladd _ = noop
2013-02-11 21:24:12 +00:00
openwrite (_file, mode, _pid)
| mode == Lsof.OpenWriteOnly = True
| mode == Lsof.OpenReadWrite = True
| mode == Lsof.OpenUnknown = True
| otherwise = False
2012-10-29 15:40:22 +00:00
allRight = return . map Right
{- Normally the KeySources are locked down inside the temp directory,
- so can just lsof that, which is quite efficient.
-
- In crippled filesystem mode, there is no lock down, so must run lsof
- on each individual file.
-}
findopenfiles keysources = ifM crippledFileSystem
( liftIO $ do
let segments = segmentXargs $ map keyFilename keysources
concat <$> forM segments (\fs -> Lsof.query $ "--" : fs)
, do
tmpdir <- fromRepo gitAnnexTmpMiscDir
liftIO $ Lsof.queryDir tmpdir
)
{- After a Change is committed, queue any necessary transfers or drops
- of the content of the key.
-
- This is not done during the startup scan, because the expensive
- transfer scan does the same thing then.
-}
checkChangeContent :: Change -> Assistant ()
checkChangeContent change@(Change { changeInfo = i }) =
case changeInfoKey i of
Nothing -> noop
Just k -> whenM (scanComplete <$> getDaemonStatus) $ do
present <- liftAnnex $ inAnnex k
void $ if present
then queueTransfers "new file created" Next k (Just f) Upload
else queueTransfers "new or renamed file wanted" Next k (Just f) Download
handleDrops "file renamed" present k (Just f) Nothing
where
f = changeFile change
checkChangeContent _ = noop