git-annex/Assistant/Threads/Committer.hs

481 lines
16 KiB
Haskell
Raw Normal View History

2012-06-19 06:40:21 +00:00
{- git-annex assistant commit thread
2012-06-13 16:36:33 +00:00
-
- Copyright 2012-2021 Joey Hess <id@joeyh.name>
2012-06-23 05:20:40 +00:00
-
- Licensed under the GNU AGPL version 3 or higher.
2012-06-13 16:36:33 +00:00
-}
{-# LANGUAGE CPP #-}
2012-06-25 20:10:10 +00:00
module Assistant.Threads.Committer where
2012-06-13 16:36:33 +00:00
import Assistant.Common
2012-06-19 06:40:21 +00:00
import Assistant.Changes
2012-10-29 23:30:23 +00:00
import Assistant.Types.Changes
2012-06-22 17:39:44 +00:00
import Assistant.Commits
import Assistant.Alert
2012-10-30 18:34:48 +00:00
import Assistant.DaemonStatus
import Assistant.TransferQueue
import Assistant.Drop
import Types.Transfer
import Logs.Location
2012-06-13 16:36:33 +00:00
import qualified Annex.Queue
import Utility.ThreadScheduler
import qualified Utility.Lsof as Lsof
2012-06-19 06:40:21 +00:00
import qualified Utility.DirWatcher as DirWatcher
import Types.KeySource
import Types.Command
import Config
import Annex.Content
2015-12-22 17:23:33 +00:00
import Annex.Ingest
fully support core.symlinks=false in all relevant symlink handling code Refactored annex link code into nice clean new library. Audited and dealt with calls to createSymbolicLink. Remaining calls are all safe, because: Annex/Link.hs: ( liftIO $ createSymbolicLink linktarget file only when core.symlinks=true Assistant/WebApp/Configurators/Local.hs: createSymbolicLink link link test if symlinks can be made Command/Fix.hs: liftIO $ createSymbolicLink link file command only works in indirect mode Command/FromKey.hs: liftIO $ createSymbolicLink link file command only works in indirect mode Command/Indirect.hs: liftIO $ createSymbolicLink l f refuses to run if core.symlinks=false Init.hs: createSymbolicLink f f2 test if symlinks can be made Remote/Directory.hs: go [file] = catchBoolIO $ createSymbolicLink file f >> return True fast key linking; catches failure to make symlink and falls back to copy Remote/Git.hs: liftIO $ catchBoolIO $ createSymbolicLink loc file >> return True ditto Upgrade/V1.hs: liftIO $ createSymbolicLink link f v1 repos could not be on a filesystem w/o symlinks Audited and dealt with calls to readSymbolicLink. Remaining calls are all safe, because: Annex/Link.hs: ( liftIO $ catchMaybeIO $ readSymbolicLink file only when core.symlinks=true Assistant/Threads/Watcher.hs: ifM ((==) (Just link) <$> liftIO (catchMaybeIO $ readSymbolicLink file)) code that fixes real symlinks when inotify sees them It's ok to not fix psdueo-symlinks. Assistant/Threads/Watcher.hs: mlink <- liftIO (catchMaybeIO $ readSymbolicLink file) ditto Command/Fix.hs: stopUnless ((/=) (Just link) <$> liftIO (catchMaybeIO $ readSymbolicLink file)) $ do command only works in indirect mode Upgrade/V1.hs: getsymlink = takeFileName <$> readSymbolicLink file v1 repos could not be on a filesystem w/o symlinks Audited and dealt with calls to isSymbolicLink. (Typically used with getSymbolicLinkStatus, but that is just used because getFileStatus is not as robust; it also works on pseudolinks.) Remaining calls are all safe, because: Assistant/Threads/SanityChecker.hs: | isSymbolicLink s -> addsymlink file ms only handles staging of symlinks that were somehow not staged (might need to be updated to support pseudolinks, but this is only a belt-and-suspenders check anyway, and I've never seen the code run) Command/Add.hs: if isSymbolicLink s || not (isRegularFile s) avoids adding symlinks to the annex, so not relevant Command/Indirect.hs: | isSymbolicLink s -> void $ flip whenAnnexed f $ only allowed on systems that support symlinks Command/Indirect.hs: whenM (liftIO $ not . isSymbolicLink <$> getSymbolicLinkStatus f) $ do ditto Seek.hs:notSymlink f = liftIO $ not . isSymbolicLink <$> getSymbolicLinkStatus f used to find unlocked files, only relevant in indirect mode Utility/FSEvents.hs: | Files.isSymbolicLink s = runhook addSymlinkHook $ Just s Utility/FSEvents.hs: | Files.isSymbolicLink s -> Utility/INotify.hs: | Files.isSymbolicLink s -> Utility/INotify.hs: checkfiletype Files.isSymbolicLink addSymlinkHook f Utility/Kqueue.hs: | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change all above are lower-level, not relevant Audited and dealt with calls to isSymLink. Remaining calls are all safe, because: Annex/Direct.hs: | isSymLink (getmode item) = This is looking at git diff-tree objects, not files on disk Command/Unused.hs: | isSymLink (LsTree.mode l) = do This is looking at git ls-tree, not file on disk Utility/FileMode.hs:isSymLink :: FileMode -> Bool Utility/FileMode.hs:isSymLink = checkMode symbolicLinkMode low-level Done!!
2013-02-17 19:05:55 +00:00
import Annex.Link
import Annex.Perms
import Annex.CatFile
import Annex.InodeSentinal
import Annex.CurrentBranch
import Annex.FileMatcher
import qualified Annex
import Utility.InodeCache
import qualified Database.Keys
import qualified Command.Sync
import Utility.Tuple
import Utility.Metered
2012-06-13 16:36:33 +00:00
import Data.Time.Clock
import qualified Data.Set as S
import qualified Data.Map as M
import Data.Either
import Control.Concurrent
2012-06-13 16:36:33 +00:00
{- This thread makes git commits at appropriate times. -}
2012-10-29 15:40:22 +00:00
commitThread :: NamedThread
commitThread = namedThread "Committer" $ do
havelsof <- liftIO $ inSearchPath "lsof"
delayadd <- liftAnnex $
fmap Seconds . annexDelayAdd <$> Annex.getGitConfig
largefilematcher <- liftAnnex largeFilesMatcher
msg <- liftAnnex Command.Sync.commitMsg
lockdowndir <- liftAnnex $ fromRepo gitAnnexTmpWatcherDir
liftAnnex $ do
-- Clean up anything left behind by a previous process
-- on unclean shutdown.
void $ liftIO $ tryIO $ removeDirectoryRecursive
(fromRawFilePath lockdowndir)
void $ createAnnexDirectory lockdowndir
waitChangeTime $ \(changes, time) -> do
readychanges <- handleAdds (fromRawFilePath lockdowndir) havelsof largefilematcher delayadd $
simplifyChanges changes
if shouldCommit False time (length readychanges) readychanges
then do
debug
[ "committing"
, show (length readychanges)
, "changes"
]
void $ alertWhile commitAlert $
liftAnnex $ commitStaged msg
recordCommit
recordExportCommit
let numchanges = length readychanges
mapM_ checkChangeContent readychanges
return numchanges
else do
refill readychanges
return 0
refill :: [Change] -> Assistant ()
refill [] = noop
refill cs = do
debug ["delaying commit of", show (length cs), "changes"]
refillChanges cs
{- Wait for one or more changes to arrive to be committed, and then
- runs an action to commit them. If more changes arrive while this is
- going on, they're handled intelligently, batching up changes into
- large commits where possible, doing rename detection, and
- commiting immediately otherwise. -}
waitChangeTime :: (([Change], UTCTime) -> Assistant Int) -> Assistant ()
waitChangeTime a = waitchanges 0
where
waitchanges lastcommitsize = do
-- Wait one one second as a simple rate limiter.
liftIO $ threadDelaySeconds (Seconds 1)
-- Now, wait until at least one change is available for
-- processing.
cs <- getChanges
handlechanges cs lastcommitsize
handlechanges changes lastcommitsize = do
let len = length changes
-- See if now's a good time to commit.
now <- liftIO getCurrentTime
scanning <- not . scanComplete <$> getDaemonStatus
case (lastcommitsize >= maxCommitSize, shouldCommit scanning now len changes, possiblyrename changes) of
(True, True, _)
| len > maxCommitSize ->
2013-12-16 20:24:57 +00:00
a (changes, now) >>= waitchanges
| otherwise -> aftermaxcommit changes
(_, True, False) ->
2013-12-16 20:24:57 +00:00
a (changes, now) >>= waitchanges
(_, True, True) -> do
morechanges <- getrelatedchanges changes
2013-12-16 20:24:57 +00:00
a (changes ++ morechanges, now) >>= waitchanges
_ -> do
refill changes
waitchanges lastcommitsize
{- Did we perhaps only get one of the AddChange and RmChange pair
- that make up a file rename? Or some of the pairs that make up
- a directory rename?
-}
2013-10-03 02:59:07 +00:00
possiblyrename = all renamepart
renamepart (PendingAddChange _ _) = True
renamepart c = isRmChange c
{- Gets changes related to the passed changes, without blocking
- very long.
-
- If there are multiple RmChanges, this is probably a directory
- rename, in which case it may be necessary to wait longer to get
- all the Changes involved.
-}
getrelatedchanges oldchanges
| length (filter isRmChange oldchanges) > 1 =
concat <$> getbatchchanges []
| otherwise = do
liftIO humanImperceptibleDelay
getAnyChanges
getbatchchanges cs = do
liftIO $ threadDelay $ fromIntegral $ oneSecond `div` 10
cs' <- getAnyChanges
if null cs'
then return cs
else getbatchchanges (cs':cs)
{- The last commit was maximum size, so it's very likely there
- are more changes and we'd like to ensure we make another commit
- of maximum size if possible.
-
- But, it can take a while for the Watcher to wake back up
- after a commit. It can get blocked by another thread
- that is using the Annex state, such as a git-annex branch
- commit. Especially after such a large commit, this can
- take several seconds. When this happens, it defeats the
- normal commit batching, which sees some old changes the
- Watcher found while the commit was being prepared, and sees
- no recent ones, and wants to commit immediately.
-
- All that we need to do, then, is wait for the Watcher to
- wake up, and queue up one more change.
-
- However, it's also possible that we're at the end of changes for
- now. So to avoid waiting a really long time before committing
- those changes we have, poll for up to 30 seconds, and then
- commit them.
-
- Also, try to run something in Annex, to ensure we block
- longer if the Annex state is indeed blocked.
-}
aftermaxcommit oldchanges = loop (30 :: Int)
where
loop 0 = continue oldchanges
loop n = do
liftAnnex noop -- ensure Annex state is free
liftIO $ threadDelaySeconds (Seconds 1)
changes <- getAnyChanges
if null changes
then loop (n - 1)
else continue (oldchanges ++ changes)
continue cs
| null cs = waitchanges 0
| otherwise = handlechanges cs 0
isRmChange :: Change -> Bool
isRmChange (Change { changeInfo = i }) | i == RmChange = True
isRmChange _ = False
{- An amount of time that is hopefully imperceptably short for humans,
- while long enough for a computer to get some work done.
- Note that 0.001 is a little too short for rename change batching to
- work. -}
humanImperceptibleInterval :: NominalDiffTime
humanImperceptibleInterval = 0.01
humanImperceptibleDelay :: IO ()
humanImperceptibleDelay = threadDelay $
truncate $ humanImperceptibleInterval * fromIntegral oneSecond
maxCommitSize :: Int
maxCommitSize = 5000
{- Decide if now is a good time to make a commit.
- Note that the list of changes has a random order.
-
- Current strategy: If there have been 10 changes within the past second,
- a batch activity is taking place, so wait for later.
-}
shouldCommit :: Bool -> UTCTime -> Int -> [Change] -> Bool
shouldCommit scanning now len changes
| scanning = len >= maxCommitSize
| len == 0 = False
| len >= maxCommitSize = True
| length recentchanges < 10 = True
| otherwise = False -- batch activity
2012-10-29 15:40:22 +00:00
where
thissecond c = timeDelta c <= 1
recentchanges = filter thissecond changes
timeDelta c = now `diffUTCTime` changeTime c
2012-06-13 16:36:33 +00:00
commitStaged :: String -> Annex Bool
commitStaged msg = do
{- This could fail if there's another commit being made by
- something else. -}
v <- tryNonAsync Annex.Queue.flush
case v of
Left _ -> return False
Right _ -> do
cmode <- annexCommitMode <$> Annex.getGitConfig
ok <- Command.Sync.commitStaged cmode msg
when ok $
Command.Sync.updateBranches =<< getCurrentBranch
return ok
2012-06-13 16:36:33 +00:00
{- If there are PendingAddChanges, or InProcessAddChanges, the files
- have not yet actually been added, and that has to be done
- now, before committing.
-
- Deferring the adds to this point causes batches to be bundled together,
- which allows faster checking with lsof that the files are not still open
- for write by some other process, and faster checking with git-ls-files
- that the files are not already checked into git.
-
- When a file is added in locked mode, Inotify will notice the new symlink.
- So this waits for additional Changes to arrive, so that the symlink has
- hopefully been staged before returning, and will be committed immediately.
- (OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
- created and staged.)
-
- Returns a list of all changes that are ready to be committed.
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
handleAdds :: FilePath -> Bool -> GetFileMatcher -> Maybe Seconds -> [Change] -> Assistant [Change]
handleAdds lockdowndir havelsof largefilematcher delayadd cs = returnWhen (null incomplete) $ do
let (pending, inprocess) = partition isPendingAddChange incomplete
let lockdownconfig = LockDownConfig
{ lockingFile = False
, hardlinkFileTmpDir = Just (toRawFilePath lockdowndir)
}
(postponed, toadd) <- partitionEithers
<$> safeToAdd lockdowndir lockdownconfig havelsof delayadd pending inprocess
unless (null postponed) $
2012-10-29 23:30:23 +00:00
refillChanges postponed
returnWhen (null toadd) $ do
(toaddannexed, toaddsmall) <- partitionEithers
<$> mapM checksmall toadd
addsmall toaddsmall
addedannexed <- addaction toadd $
catMaybes <$> addannexed toaddannexed
return $ addedannexed ++ toaddsmall ++ otherchanges
2012-10-29 15:40:22 +00:00
where
(incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
returnWhen c a
| c = return otherchanges
| otherwise = a
checksmall change =
ifM (liftAnnex $ checkFileMatcher largefilematcher (toRawFilePath (changeFile change)))
( return (Left change)
, return (Right change)
)
addsmall [] = noop
addsmall toadd = liftAnnex $ Annex.Queue.addCommand [] "add"
[ Param "--force", Param "--"] (map changeFile toadd)
{- Avoid overhead of re-injesting a renamed unlocked file, by
- examining the other Changes to see if a removed file has the
- same InodeCache as the new file. If so, we can just update
- bookkeeping, and stage the file in git.
-}
addannexed :: [Change] -> Assistant [Maybe Change]
addannexed [] = return []
addannexed toadd = do
ct <- liftAnnex compareInodeCachesWith
m <- liftAnnex $ removedKeysMap ct cs
delta <- liftAnnex getTSDelta
let cfg = LockDownConfig
{ lockingFile = False
, hardlinkFileTmpDir = Just (toRawFilePath lockdowndir)
}
if M.null m
then forM toadd (addannexed' cfg)
else forM toadd $ \c -> do
mcache <- liftIO $ genInodeCache (toRawFilePath (changeFile c)) delta
case mcache of
Nothing -> addannexed' cfg c
Just cache ->
case M.lookup (inodeCacheToKey ct cache) m of
Nothing -> addannexed' cfg c
Just k -> fastadd c k
addannexed' :: LockDownConfig -> Change -> Assistant (Maybe Change)
addannexed' lockdownconfig change@(InProcessAddChange { lockedDown = ld }) =
catchDefaultIO Nothing <~> doadd
where
ks = keySource ld
doadd = sanitycheck ks $ do
(mkey, _mcache) <- liftAnnex $ do
showStart "add" (keyFilename ks) (SeekInput [])
ingest nullMeterUpdate (Just $ LockedDown lockdownconfig ks) Nothing
maybe (failedingest change) (done change $ fromRawFilePath $ keyFilename ks) mkey
addannexed' _ _ = return Nothing
fastadd :: Change -> Key -> Assistant (Maybe Change)
fastadd change key = do
let source = keySource $ lockedDown change
liftAnnex $ finishIngestUnlocked key source
2020-02-21 13:34:59 +00:00
done change (fromRawFilePath $ keyFilename source) key
removedKeysMap :: InodeComparisonType -> [Change] -> Annex (M.Map InodeCacheKey Key)
removedKeysMap ct l = do
mks <- forM (filter isRmChange l) $ \c ->
catKeyFile $ toRawFilePath $ changeFile c
M.fromList . concat <$> mapM mkpairs (catMaybes mks)
where
mkpairs k = map (\c -> (inodeCacheToKey ct c, k)) <$>
Database.Keys.getInodeCaches k
failedingest change = do
refill [retryChange change]
2012-10-29 15:40:22 +00:00
liftAnnex showEndFail
return Nothing
done change file key = liftAnnex $ do
logStatus key InfoPresent
mode <- liftIO $ catchMaybeIO $ fileMode <$> getFileStatus file
stagePointerFile (toRawFilePath file) mode =<< hashPointerFile key
showEndOk
return $ Just $ finishedChange change key
2012-10-29 15:40:22 +00:00
{- Check that the keysource's keyFilename still exists,
- and is still a hard link to its contentLocation,
- before ingesting it. -}
sanitycheck keysource a = do
2020-02-21 13:34:59 +00:00
fs <- liftIO $ getSymbolicLinkStatus $ fromRawFilePath $ keyFilename keysource
ks <- liftIO $ getSymbolicLinkStatus $ fromRawFilePath $ contentLocation keysource
2012-10-29 15:40:22 +00:00
if deviceID ks == deviceID fs && fileID ks == fileID fs
then a
else do
-- remove the hard link
when (contentLocation keysource /= keyFilename keysource) $
2020-02-21 13:34:59 +00:00
void $ liftIO $ tryIO $ removeFile $ fromRawFilePath $ contentLocation keysource
return Nothing
{- Shown an alert while performing an action to add a file or
2013-07-10 19:37:40 +00:00
- files. When only a few files are added, their names are shown
- in the alert. When it's a batch add, the number of files added
- is shown.
-
- Add errors tend to be transient and will be
- automatically dealt with, so the alert is always told
- the add succeeded.
-}
addaction [] a = a
2013-07-10 19:37:40 +00:00
addaction toadd a = alertWhile' (addFileAlert $ map changeFile toadd) $
(,)
<$> pure True
<*> a
{- Files can Either be Right to be added now,
- or are unsafe, and must be Left for later.
-
- Check by running lsof on the repository.
-}
safeToAdd :: FilePath -> LockDownConfig -> Bool -> Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change]
safeToAdd _ _ _ _ [] [] = return []
safeToAdd lockdowndir lockdownconfig havelsof delayadd pending inprocess = do
2012-10-29 15:40:22 +00:00
maybe noop (liftIO . threadDelaySeconds) delayadd
liftAnnex $ do
lockeddown <- forM pending $ lockDown lockdownconfig . changeFile
let inprocess' = inprocess ++ mapMaybe mkinprocess (zip pending lockeddown)
openfiles <- if havelsof
then S.fromList . map fst3 . filter openwrite <$>
findopenfiles (map (keySource . lockedDown) inprocess')
else pure S.empty
let checked = map (check openfiles) inprocess'
{- If new events are received when files are closed,
- there's no need to retry any changes that cannot
- be done now. -}
if DirWatcher.closingTracked
then do
mapM_ canceladd $ lefts checked
allRight $ rights checked
else return checked
2012-10-29 15:40:22 +00:00
where
check openfiles change@(InProcessAddChange { lockedDown = ld })
2020-02-21 13:34:59 +00:00
| S.member (fromRawFilePath (contentLocation (keySource ld))) openfiles = Left change
2012-10-29 15:40:22 +00:00
check _ change = Right change
mkinprocess (c, Just ld) = Just InProcessAddChange
2012-10-29 15:40:22 +00:00
{ changeTime = changeTime c
, lockedDown = ld
2012-10-29 15:40:22 +00:00
}
mkinprocess (_, Nothing) = Nothing
canceladd (InProcessAddChange { lockedDown = ld }) = do
let ks = keySource ld
2020-02-21 13:34:59 +00:00
warning $ fromRawFilePath (keyFilename ks)
2012-10-29 15:40:22 +00:00
++ " still has writers, not adding"
-- remove the hard link
when (contentLocation ks /= keyFilename ks) $
2020-02-21 13:34:59 +00:00
void $ liftIO $ tryIO $ removeFile $ fromRawFilePath $ contentLocation ks
2012-10-29 15:40:22 +00:00
canceladd _ = noop
2013-02-11 21:24:12 +00:00
openwrite (_file, mode, _pid)
| mode == Lsof.OpenWriteOnly = True
| mode == Lsof.OpenReadWrite = True
| mode == Lsof.OpenUnknown = True
| otherwise = False
2012-10-29 15:40:22 +00:00
allRight = return . map Right
{- Normally the KeySources are locked down inside the lockdowndir,
- so can just lsof that, which is quite efficient.
-
- In crippled filesystem mode, there is no lock down, so must run lsof
- on each individual file.
-}
findopenfiles keysources = ifM crippledFileSystem
( liftIO $ do
2020-02-21 13:34:59 +00:00
let segments = segmentXargsUnordered $
map (fromRawFilePath . keyFilename) keysources
concat <$> forM segments (\fs -> Lsof.query $ "--" : fs)
, liftIO $ Lsof.queryDir lockdowndir
)
{- After a Change is committed, queue any necessary transfers or drops
- of the content of the key.
-
- This is not done during the startup scan, because the expensive
- transfer scan does the same thing then.
-}
checkChangeContent :: Change -> Assistant ()
checkChangeContent change@(Change { changeInfo = i }) =
case changeInfoKey i of
Nothing -> noop
Just k -> whenM (scanComplete <$> getDaemonStatus) $ do
present <- liftAnnex $ inAnnex k
void $ if present
then queueTransfers "new file created" Next k af Upload
else queueTransfers "new or renamed file wanted" Next k af Download
handleDrops "file renamed" present k af []
where
f = changeFile change
af = AssociatedFile (Just (toRawFilePath f))
checkChangeContent _ = noop