reorg threads
This commit is contained in:
parent
19eee6a1df
commit
0b146f9ecc
6 changed files with 14 additions and 14 deletions
199
Assistant/Threads/Committer.hs
Normal file
199
Assistant/Threads/Committer.hs
Normal file
|
@ -0,0 +1,199 @@
|
|||
{- git-annex assistant commit thread
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.Committer where
|
||||
|
||||
import Common.Annex
|
||||
import Assistant.Changes
|
||||
import Assistant.Commits
|
||||
import Assistant.ThreadedMonad
|
||||
import Assistant.Threads.Watcher
|
||||
import qualified Annex
|
||||
import qualified Annex.Queue
|
||||
import qualified Git.Command
|
||||
import qualified Git.HashObject
|
||||
import Git.Types
|
||||
import qualified Command.Add
|
||||
import Utility.ThreadScheduler
|
||||
import qualified Utility.Lsof as Lsof
|
||||
import qualified Utility.DirWatcher as DirWatcher
|
||||
import Types.KeySource
|
||||
|
||||
import Data.Time.Clock
|
||||
import Data.Tuple.Utils
|
||||
import qualified Data.Set as S
|
||||
import Data.Either
|
||||
|
||||
{- This thread makes git commits at appropriate times. -}
|
||||
commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
|
||||
commitThread st changechan commitchan = runEvery (Seconds 1) $ do
|
||||
-- We already waited one second as a simple rate limiter.
|
||||
-- Next, wait until at least one change is available for
|
||||
-- processing.
|
||||
changes <- getChanges changechan
|
||||
-- Now see if now's a good time to commit.
|
||||
time <- getCurrentTime
|
||||
if shouldCommit time changes
|
||||
then do
|
||||
readychanges <- handleAdds st changechan changes
|
||||
if shouldCommit time readychanges
|
||||
then do
|
||||
void $ tryIO $ runThreadState st commitStaged
|
||||
recordCommit commitchan (Commit time)
|
||||
else refillChanges changechan readychanges
|
||||
else refillChanges changechan changes
|
||||
|
||||
commitStaged :: Annex ()
|
||||
commitStaged = do
|
||||
Annex.Queue.flush
|
||||
inRepo $ Git.Command.run "commit"
|
||||
[ Param "--allow-empty-message"
|
||||
, Param "-m", Param ""
|
||||
-- Empty commits may be made if tree changes cancel
|
||||
-- each other out, etc
|
||||
, Param "--allow-empty"
|
||||
-- Avoid running the usual git-annex pre-commit hook;
|
||||
-- watch does the same symlink fixing, and we don't want
|
||||
-- to deal with unlocked files in these commits.
|
||||
, Param "--quiet"
|
||||
]
|
||||
|
||||
{- Decide if now is a good time to make a commit.
|
||||
- Note that the list of change times has an undefined order.
|
||||
-
|
||||
- Current strategy: If there have been 10 changes within the past second,
|
||||
- a batch activity is taking place, so wait for later.
|
||||
-}
|
||||
shouldCommit :: UTCTime -> [Change] -> Bool
|
||||
shouldCommit now changes
|
||||
| len == 0 = False
|
||||
| len > 10000 = True -- avoid bloating queue too much
|
||||
| length (filter thisSecond changes) < 10 = True
|
||||
| otherwise = False -- batch activity
|
||||
where
|
||||
len = length changes
|
||||
thisSecond c = now `diffUTCTime` changeTime c <= 1
|
||||
|
||||
{- If there are PendingAddChanges, the files have not yet actually been
|
||||
- added to the annex (probably), and that has to be done now, before
|
||||
- committing.
|
||||
-
|
||||
- Deferring the adds to this point causes batches to be bundled together,
|
||||
- which allows faster checking with lsof that the files are not still open
|
||||
- for write by some other process.
|
||||
-
|
||||
- When a file is added, Inotify will notice the new symlink. So this waits
|
||||
- for additional Changes to arrive, so that the symlink has hopefully been
|
||||
- staged before returning, and will be committed immediately.
|
||||
-
|
||||
- OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
|
||||
- created and staged.
|
||||
-
|
||||
- Returns a list of all changes that are ready to be committed.
|
||||
- Any pending adds that are not ready yet are put back into the ChangeChan,
|
||||
- where they will be retried later.
|
||||
-}
|
||||
handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
|
||||
handleAdds st changechan cs = returnWhen (null pendingadds) $ do
|
||||
(postponed, toadd) <- partitionEithers <$>
|
||||
safeToAdd st pendingadds
|
||||
|
||||
unless (null postponed) $
|
||||
refillChanges changechan postponed
|
||||
|
||||
returnWhen (null toadd) $ do
|
||||
added <- catMaybes <$> forM toadd add
|
||||
if (DirWatcher.eventsCoalesce || null added)
|
||||
then return $ added ++ otherchanges
|
||||
else do
|
||||
r <- handleAdds st changechan
|
||||
=<< getChanges changechan
|
||||
return $ r ++ added ++ otherchanges
|
||||
where
|
||||
(pendingadds, otherchanges) = partition isPendingAddChange cs
|
||||
|
||||
returnWhen c a
|
||||
| c = return otherchanges
|
||||
| otherwise = a
|
||||
|
||||
add :: Change -> IO (Maybe Change)
|
||||
add change@(PendingAddChange { keySource = ks }) = do
|
||||
r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
|
||||
showStart "add" $ keyFilename ks
|
||||
handle (finishedChange change) (keyFilename ks)
|
||||
=<< Command.Add.ingest ks
|
||||
return $ maybeMaybe r
|
||||
add _ = return Nothing
|
||||
|
||||
maybeMaybe (Just j@(Just _)) = j
|
||||
maybeMaybe _ = Nothing
|
||||
|
||||
handle _ _ Nothing = do
|
||||
showEndFail
|
||||
return Nothing
|
||||
handle change file (Just key) = do
|
||||
link <- Command.Add.link file key True
|
||||
when DirWatcher.eventsCoalesce $ do
|
||||
sha <- inRepo $
|
||||
Git.HashObject.hashObject BlobObject link
|
||||
stageSymlink file sha
|
||||
showEndOk
|
||||
return $ Just change
|
||||
|
||||
{- Check that the keysource's keyFilename still exists,
|
||||
- and is still a hard link to its contentLocation,
|
||||
- before ingesting it. -}
|
||||
sanitycheck keysource a = do
|
||||
fs <- getSymbolicLinkStatus $ keyFilename keysource
|
||||
ks <- getSymbolicLinkStatus $ contentLocation keysource
|
||||
if deviceID ks == deviceID fs && fileID ks == fileID fs
|
||||
then a
|
||||
else return Nothing
|
||||
|
||||
{- PendingAddChanges can Either be Right to be added now,
|
||||
- or are unsafe, and must be Left for later.
|
||||
-
|
||||
- Check by running lsof on the temp directory, which
|
||||
- the KeySources are locked down in.
|
||||
-}
|
||||
safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change]
|
||||
safeToAdd st changes = runThreadState st $
|
||||
ifM (Annex.getState Annex.force)
|
||||
( allRight changes -- force bypasses lsof check
|
||||
, do
|
||||
tmpdir <- fromRepo gitAnnexTmpDir
|
||||
openfiles <- S.fromList . map fst3 . filter openwrite <$>
|
||||
liftIO (Lsof.queryDir tmpdir)
|
||||
|
||||
let checked = map (check openfiles) changes
|
||||
|
||||
{- If new events are received when files are closed,
|
||||
- there's no need to retry any changes that cannot
|
||||
- be done now. -}
|
||||
if DirWatcher.closingTracked
|
||||
then do
|
||||
mapM_ canceladd $ lefts checked
|
||||
allRight $ rights checked
|
||||
else return checked
|
||||
)
|
||||
where
|
||||
check openfiles change@(PendingAddChange { keySource = ks })
|
||||
| S.member (contentLocation ks) openfiles = Left change
|
||||
check _ change = Right change
|
||||
|
||||
canceladd (PendingAddChange { keySource = ks }) = do
|
||||
warning $ keyFilename ks
|
||||
++ " still has writers, not adding"
|
||||
-- remove the hard link
|
||||
void $ liftIO $ tryIO $
|
||||
removeFile $ contentLocation ks
|
||||
canceladd _ = noop
|
||||
|
||||
openwrite (_file, mode, _pid) =
|
||||
mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
|
||||
|
||||
allRight = return . map Right
|
72
Assistant/Threads/Merger.hs
Normal file
72
Assistant/Threads/Merger.hs
Normal file
|
@ -0,0 +1,72 @@
|
|||
{- git-annex assistant git merge thread
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.Merger where
|
||||
|
||||
import Common.Annex
|
||||
import Assistant.ThreadedMonad
|
||||
import Utility.DirWatcher
|
||||
import Utility.Types.DirWatcher
|
||||
import qualified Git
|
||||
import qualified Git.Merge
|
||||
import qualified Git.Branch
|
||||
import qualified Command.Sync
|
||||
|
||||
{- This thread watches for changes to .git/refs/heads/synced/*,
|
||||
- which indicate incoming pushes. It merges those pushes into the
|
||||
- currently checked out branch. -}
|
||||
mergeThread :: ThreadState -> IO ()
|
||||
mergeThread st = do
|
||||
g <- runThreadState st $ fromRepo id
|
||||
let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
|
||||
createDirectoryIfMissing True dir
|
||||
let hook a = Just $ runHandler g a
|
||||
let hooks = mkWatchHooks
|
||||
{ addHook = hook onAdd
|
||||
, errHook = hook onErr
|
||||
}
|
||||
watchDir dir (const False) hooks id
|
||||
where
|
||||
|
||||
type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
|
||||
|
||||
{- Runs an action handler.
|
||||
-
|
||||
- Exceptions are ignored, otherwise a whole thread could be crashed.
|
||||
-}
|
||||
runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
|
||||
runHandler g handler file filestatus = void $ do
|
||||
either print (const noop) =<< tryIO go
|
||||
where
|
||||
go = handler g file filestatus
|
||||
|
||||
{- Called when there's an error with inotify. -}
|
||||
onErr :: Handler
|
||||
onErr _ msg _ = error msg
|
||||
|
||||
{- Called when a new branch ref is written.
|
||||
-
|
||||
- This relies on git's atomic method of updating branch ref files,
|
||||
- which is to first write the new file to .lock, and then rename it
|
||||
- over the old file. So, ignore .lock files, and the rename ensures
|
||||
- the watcher sees a new file being added on each update.
|
||||
-
|
||||
- At startup, synthetic add events fire, causing this to run, but that's
|
||||
- ok; it ensures that any changes pushed since the last time the assistant
|
||||
- ran are merged in.
|
||||
-}
|
||||
onAdd :: Handler
|
||||
onAdd g file _
|
||||
| ".lock" `isSuffixOf` file = noop
|
||||
| otherwise = do
|
||||
let branch = Git.Ref $ "refs" </> "heads" </> takeFileName file
|
||||
current <- Git.Branch.current g
|
||||
when (Just branch == current) $
|
||||
void $ mergeBranch branch g
|
||||
|
||||
mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
|
||||
mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
|
69
Assistant/Threads/Pusher.hs
Normal file
69
Assistant/Threads/Pusher.hs
Normal file
|
@ -0,0 +1,69 @@
|
|||
{- git-annex assistant git pushing thread
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.Pusher where
|
||||
|
||||
import Common.Annex
|
||||
import Assistant.Commits
|
||||
import Assistant.ThreadedMonad
|
||||
import qualified Command.Sync
|
||||
import Utility.ThreadScheduler
|
||||
import Utility.Parallel
|
||||
|
||||
import Data.Time.Clock
|
||||
|
||||
data FailedPush = FailedPush
|
||||
{ failedRemote :: Remote
|
||||
, failedTimeStamp :: UTCTime
|
||||
}
|
||||
|
||||
{- This thread pushes git commits out to remotes. -}
|
||||
pushThread :: ThreadState -> CommitChan -> IO ()
|
||||
pushThread st commitchan = do
|
||||
remotes <- runThreadState st $ Command.Sync.syncRemotes []
|
||||
runEveryWith (Seconds 2) [] $ \failedpushes -> do
|
||||
-- We already waited two seconds as a simple rate limiter.
|
||||
-- Next, wait until at least one commit has been made
|
||||
commits <- getCommits commitchan
|
||||
-- Now see if now's a good time to push.
|
||||
time <- getCurrentTime
|
||||
if shouldPush time commits failedpushes
|
||||
then pushToRemotes time st remotes
|
||||
else do
|
||||
refillCommits commitchan commits
|
||||
return failedpushes
|
||||
|
||||
{- Decide if now is a good time to push to remotes.
|
||||
-
|
||||
- Current strategy: Immediately push all commits. The commit machinery
|
||||
- already determines batches of changes, so we can't easily determine
|
||||
- batches better.
|
||||
-
|
||||
- TODO: FailedPushs are only retried the next time there's a commit.
|
||||
- Should retry them periodically, or when a remote that was not available
|
||||
- becomes available.
|
||||
-}
|
||||
shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool
|
||||
shouldPush _now commits _failedremotes
|
||||
| not (null commits) = True
|
||||
| otherwise = False
|
||||
|
||||
{- Updates the local sync branch, then pushes it to all remotes, in
|
||||
- parallel.
|
||||
-
|
||||
- Avoids running possibly long-duration commands in the Annex monad, so
|
||||
- as not to block other threads. -}
|
||||
pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush]
|
||||
pushToRemotes now st remotes = do
|
||||
(g, branch) <- runThreadState st $
|
||||
(,) <$> fromRepo id <*> Command.Sync.currentBranch
|
||||
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
|
||||
map (`FailedPush` now) <$> inParallel (push g branch) remotes
|
||||
where
|
||||
push g branch remote =
|
||||
ifM (Command.Sync.pushBranch remote branch g)
|
||||
( exitSuccess, exitFailure)
|
83
Assistant/Threads/SanityChecker.hs
Normal file
83
Assistant/Threads/SanityChecker.hs
Normal file
|
@ -0,0 +1,83 @@
|
|||
{- git-annex assistant sanity checker
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.SanityChecker (
|
||||
sanityCheckerThread
|
||||
) where
|
||||
|
||||
import Common.Annex
|
||||
import qualified Git.LsFiles
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.ThreadedMonad
|
||||
import Assistant.Changes
|
||||
import Utility.ThreadScheduler
|
||||
import qualified Assistant.Threads.Watcher as Watcher
|
||||
|
||||
import Data.Time.Clock.POSIX
|
||||
|
||||
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
|
||||
sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
|
||||
sanityCheckerThread st status changechan = forever $ do
|
||||
waitForNextCheck st status
|
||||
|
||||
runThreadState st $
|
||||
modifyDaemonStatus status $ \s -> s
|
||||
{ sanityCheckRunning = True }
|
||||
|
||||
now <- getPOSIXTime -- before check started
|
||||
catchIO (check st status changechan)
|
||||
(runThreadState st . warning . show)
|
||||
|
||||
runThreadState st $ do
|
||||
modifyDaemonStatus status $ \s -> s
|
||||
{ sanityCheckRunning = False
|
||||
, lastSanityCheck = Just now
|
||||
}
|
||||
|
||||
{- Only run one check per day, from the time of the last check. -}
|
||||
waitForNextCheck :: ThreadState -> DaemonStatusHandle -> IO ()
|
||||
waitForNextCheck st status = do
|
||||
v <- runThreadState st $
|
||||
lastSanityCheck <$> getDaemonStatus status
|
||||
now <- getPOSIXTime
|
||||
threadDelaySeconds $ Seconds $ calcdelay now v
|
||||
where
|
||||
calcdelay _ Nothing = oneDay
|
||||
calcdelay now (Just lastcheck)
|
||||
| lastcheck < now = max oneDay $
|
||||
oneDay - truncate (now - lastcheck)
|
||||
| otherwise = oneDay
|
||||
|
||||
oneDay :: Int
|
||||
oneDay = 24 * 60 * 60
|
||||
|
||||
{- It's important to stay out of the Annex monad as much as possible while
|
||||
- running potentially expensive parts of this check, since remaining in it
|
||||
- will block the watcher. -}
|
||||
check :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
|
||||
check st status changechan = do
|
||||
g <- runThreadState st $ do
|
||||
showSideAction "Running daily check"
|
||||
fromRepo id
|
||||
-- Find old unstaged symlinks, and add them to git.
|
||||
unstaged <- Git.LsFiles.notInRepo False ["."] g
|
||||
now <- getPOSIXTime
|
||||
forM_ unstaged $ \file -> do
|
||||
ms <- catchMaybeIO $ getSymbolicLinkStatus file
|
||||
case ms of
|
||||
Just s | toonew (statusChangeTime s) now -> noop
|
||||
| isSymbolicLink s ->
|
||||
addsymlink file ms
|
||||
_ -> noop
|
||||
where
|
||||
toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
|
||||
slop = fromIntegral tenMinutes
|
||||
insanity m = runThreadState st $ warning m
|
||||
addsymlink file s = do
|
||||
insanity $ "found unstaged symlink: " ++ file
|
||||
Watcher.runHandler st status changechan
|
||||
Watcher.onAddSymlink file s
|
218
Assistant/Threads/Watcher.hs
Normal file
218
Assistant/Threads/Watcher.hs
Normal file
|
@ -0,0 +1,218 @@
|
|||
{- git-annex assistant tree watcher
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Assistant.Threads.Watcher where
|
||||
|
||||
import Common.Annex
|
||||
import Assistant.ThreadedMonad
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Changes
|
||||
import Utility.DirWatcher
|
||||
import Utility.Types.DirWatcher
|
||||
import qualified Annex
|
||||
import qualified Annex.Queue
|
||||
import qualified Git.Command
|
||||
import qualified Git.UpdateIndex
|
||||
import qualified Git.HashObject
|
||||
import qualified Git.LsFiles
|
||||
import qualified Backend
|
||||
import qualified Command.Add
|
||||
import Annex.Content
|
||||
import Annex.CatFile
|
||||
import Git.Types
|
||||
|
||||
import Data.Bits.Utils
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
|
||||
checkCanWatch :: Annex ()
|
||||
checkCanWatch
|
||||
| canWatch =
|
||||
unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $
|
||||
needLsof
|
||||
| otherwise = error "watch mode is not available on this system"
|
||||
|
||||
needLsof :: Annex ()
|
||||
needLsof = error $ unlines
|
||||
[ "The lsof command is needed for watch mode to be safe, and is not in PATH."
|
||||
, "To override lsof checks to ensure that files are not open for writing"
|
||||
, "when added to the annex, you can use --force"
|
||||
, "Be warned: This can corrupt data in the annex, and make fsck complain."
|
||||
]
|
||||
|
||||
watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
|
||||
watchThread st dstatus changechan = watchDir "." ignored hooks startup
|
||||
where
|
||||
startup = statupScan st dstatus
|
||||
hook a = Just $ runHandler st dstatus changechan a
|
||||
hooks = WatchHooks
|
||||
{ addHook = hook onAdd
|
||||
, delHook = hook onDel
|
||||
, addSymlinkHook = hook onAddSymlink
|
||||
, delDirHook = hook onDelDir
|
||||
, errHook = hook onErr
|
||||
}
|
||||
|
||||
{- Initial scartup scan. The action should return once the scan is complete. -}
|
||||
statupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a
|
||||
statupScan st dstatus scanner = do
|
||||
runThreadState st $
|
||||
showAction "scanning"
|
||||
r <- scanner
|
||||
runThreadState st $
|
||||
modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
|
||||
|
||||
-- Notice any files that were deleted before watching was started.
|
||||
runThreadState st $ do
|
||||
inRepo $ Git.Command.run "add" [Param "--update"]
|
||||
showAction "started"
|
||||
|
||||
return r
|
||||
|
||||
ignored :: FilePath -> Bool
|
||||
ignored = ig . takeFileName
|
||||
where
|
||||
ig ".git" = True
|
||||
ig ".gitignore" = True
|
||||
ig ".gitattributes" = True
|
||||
ig _ = False
|
||||
|
||||
type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
|
||||
|
||||
{- Runs an action handler, inside the Annex monad, and if there was a
|
||||
- change, adds it to the ChangeChan.
|
||||
-
|
||||
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
||||
-}
|
||||
runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
|
||||
runHandler st dstatus changechan handler file filestatus = void $ do
|
||||
r <- tryIO go
|
||||
case r of
|
||||
Left e -> print e
|
||||
Right Nothing -> noop
|
||||
Right (Just change) -> recordChange changechan change
|
||||
where
|
||||
go = runThreadState st $ handler file filestatus dstatus
|
||||
|
||||
{- During initial directory scan, this will be run for any regular files
|
||||
- that are already checked into git. We don't want to turn those into
|
||||
- symlinks, so do a check. This is rather expensive, but only happens
|
||||
- during startup.
|
||||
-
|
||||
- It's possible for the file to still be open for write by some process.
|
||||
- This can happen in a few ways; one is if two processes had the file open
|
||||
- and only one has just closed it. We want to avoid adding a file to the
|
||||
- annex that is open for write, to avoid anything being able to change it.
|
||||
-
|
||||
- We could run lsof on the file here to check for other writers.
|
||||
- But, that's slow, and even if there is currently a writer, we will want
|
||||
- to add the file *eventually*. Instead, the file is locked down as a hard
|
||||
- link in a temp directory, with its write bits disabled, for later
|
||||
- checking with lsof, and a Change is returned containing a KeySource
|
||||
- using that hard link. The committer handles running lsof and finishing
|
||||
- the add.
|
||||
-}
|
||||
onAdd :: Handler
|
||||
onAdd file filestatus dstatus
|
||||
| maybe False isRegularFile filestatus = do
|
||||
ifM (scanComplete <$> getDaemonStatus dstatus)
|
||||
( go
|
||||
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
|
||||
( noChange
|
||||
, go
|
||||
)
|
||||
)
|
||||
| otherwise = noChange
|
||||
where
|
||||
go = pendingAddChange =<< Command.Add.lockDown file
|
||||
|
||||
{- A symlink might be an arbitrary symlink, which is just added.
|
||||
- Or, if it is a git-annex symlink, ensure it points to the content
|
||||
- before adding it.
|
||||
-}
|
||||
onAddSymlink :: Handler
|
||||
onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
|
||||
where
|
||||
go (Just (key, _)) = do
|
||||
link <- calcGitLink file key
|
||||
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
||||
( ensurestaged link =<< getDaemonStatus dstatus
|
||||
, do
|
||||
liftIO $ removeFile file
|
||||
liftIO $ createSymbolicLink link file
|
||||
addlink link
|
||||
)
|
||||
go Nothing = do -- other symlink
|
||||
link <- liftIO (readSymbolicLink file)
|
||||
ensurestaged link =<< getDaemonStatus dstatus
|
||||
|
||||
{- This is often called on symlinks that are already
|
||||
- staged correctly. A symlink may have been deleted
|
||||
- and being re-added, or added when the watcher was
|
||||
- not running. So they're normally restaged to make sure.
|
||||
-
|
||||
- As an optimisation, during the status scan, avoid
|
||||
- restaging everything. Only links that were created since
|
||||
- the last time the daemon was running are staged.
|
||||
- (If the daemon has never ran before, avoid staging
|
||||
- links too.)
|
||||
-}
|
||||
ensurestaged link daemonstatus
|
||||
| scanComplete daemonstatus = addlink link
|
||||
| otherwise = case filestatus of
|
||||
Just s
|
||||
| not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
|
||||
_ -> addlink link
|
||||
|
||||
{- For speed, tries to reuse the existing blob for
|
||||
- the symlink target. -}
|
||||
addlink link = do
|
||||
v <- catObjectDetails $ Ref $ ':':file
|
||||
case v of
|
||||
Just (currlink, sha)
|
||||
| s2w8 link == L.unpack currlink ->
|
||||
stageSymlink file sha
|
||||
_ -> do
|
||||
sha <- inRepo $
|
||||
Git.HashObject.hashObject BlobObject link
|
||||
stageSymlink file sha
|
||||
madeChange file LinkChange
|
||||
|
||||
onDel :: Handler
|
||||
onDel file _ _dstatus = do
|
||||
Annex.Queue.addUpdateIndex =<<
|
||||
inRepo (Git.UpdateIndex.unstageFile file)
|
||||
madeChange file RmChange
|
||||
|
||||
{- A directory has been deleted, or moved, so tell git to remove anything
|
||||
- that was inside it from its cache. Since it could reappear at any time,
|
||||
- use --cached to only delete it from the index.
|
||||
-
|
||||
- Note: This could use unstageFile, but would need to run another git
|
||||
- command to get the recursive list of files in the directory, so rm is
|
||||
- just as good. -}
|
||||
onDelDir :: Handler
|
||||
onDelDir dir _ _dstatus = do
|
||||
Annex.Queue.addCommand "rm"
|
||||
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
|
||||
madeChange dir RmDirChange
|
||||
|
||||
{- Called when there's an error with inotify. -}
|
||||
onErr :: Handler
|
||||
onErr msg _ _dstatus = do
|
||||
warning msg
|
||||
return Nothing
|
||||
|
||||
{- Adds a symlink to the index, without ever accessing the actual symlink
|
||||
- on disk. This avoids a race if git add is used, where the symlink is
|
||||
- changed to something else immediately after creation.
|
||||
-}
|
||||
stageSymlink :: FilePath -> Sha -> Annex ()
|
||||
stageSymlink file sha =
|
||||
Annex.Queue.addUpdateIndex =<<
|
||||
inRepo (Git.UpdateIndex.stageSymlink file sha)
|
Loading…
Add table
Add a link
Reference in a new issue