moved transfer queueing out of watcher and into committer
This cleaned up the code quite a bit; now the committer just looks at the Change to see if it's a change that needs to have a transfer queued for it. If I later want to add dropping keys for files that were removed, or something like that, this should make it straightforward. This also fixes a bug. In direct mode, moving a file out of an archive directory failed to start a transfer to get its content. The problem was that the file had not been committed to git yet, and so the transfer code didn't want to touch it, since fileKey failed to get its key. Only starting transfers after a commit avoids this problem.
This commit is contained in:
parent
69ab9701eb
commit
65a4c7966f
5 changed files with 50 additions and 43 deletions
|
@ -14,7 +14,7 @@ import Utility.TSet
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
|
|
||||||
{- Handlers call this when they made a change that needs to get committed. -}
|
{- Handlers call this when they made a change that needs to get committed. -}
|
||||||
madeChange :: FilePath -> ChangeType -> Assistant (Maybe Change)
|
madeChange :: FilePath -> ChangeInfo -> Assistant (Maybe Change)
|
||||||
madeChange f t = Just <$> (Change <$> liftIO getCurrentTime <*> pure f <*> pure t)
|
madeChange f t = Just <$> (Change <$> liftIO getCurrentTime <*> pure f <*> pure t)
|
||||||
|
|
||||||
noChange :: Assistant (Maybe Change)
|
noChange :: Assistant (Maybe Change)
|
||||||
|
|
|
@ -16,6 +16,7 @@ import Assistant.Commits
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
|
import Assistant.Drop
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import qualified Annex.Queue
|
import qualified Annex.Queue
|
||||||
|
@ -64,6 +65,7 @@ commitThread = namedThread "Committer" $ do
|
||||||
void $ alertWhile commitAlert $
|
void $ alertWhile commitAlert $
|
||||||
liftAnnex commitStaged
|
liftAnnex commitStaged
|
||||||
recordCommit
|
recordCommit
|
||||||
|
mapM_ checkChangeContent readychanges
|
||||||
else refill readychanges
|
else refill readychanges
|
||||||
else refill changes
|
else refill changes
|
||||||
where
|
where
|
||||||
|
@ -196,7 +198,7 @@ handleAdds delayadd cs = returnWhen (null incomplete) $ do
|
||||||
key <- liftAnnex $ do
|
key <- liftAnnex $ do
|
||||||
showStart "add" $ keyFilename ks
|
showStart "add" $ keyFilename ks
|
||||||
Command.Add.ingest $ Just ks
|
Command.Add.ingest $ Just ks
|
||||||
done (finishedChange change) (keyFilename ks) key
|
maybe failedingest (done change $ keyFilename ks) key
|
||||||
where
|
where
|
||||||
{- Add errors tend to be transient and will be automatically
|
{- Add errors tend to be transient and will be automatically
|
||||||
- dealt with, so don't pass to the alert code. -}
|
- dealt with, so don't pass to the alert code. -}
|
||||||
|
@ -204,10 +206,11 @@ handleAdds delayadd cs = returnWhen (null incomplete) $ do
|
||||||
ret _ = (True, Nothing)
|
ret _ = (True, Nothing)
|
||||||
add _ = return Nothing
|
add _ = return Nothing
|
||||||
|
|
||||||
done _ _ Nothing = do
|
failedingest = do
|
||||||
liftAnnex showEndFail
|
liftAnnex showEndFail
|
||||||
return Nothing
|
return Nothing
|
||||||
done change file (Just key) = do
|
|
||||||
|
done change file key = do
|
||||||
liftAnnex $ do
|
liftAnnex $ do
|
||||||
logStatus key InfoPresent
|
logStatus key InfoPresent
|
||||||
link <- ifM isDirect
|
link <- ifM isDirect
|
||||||
|
@ -217,8 +220,7 @@ handleAdds delayadd cs = returnWhen (null incomplete) $ do
|
||||||
whenM (pure DirWatcher.eventsCoalesce <||> isDirect) $ do
|
whenM (pure DirWatcher.eventsCoalesce <||> isDirect) $ do
|
||||||
stageSymlink file =<< hashSymlink link
|
stageSymlink file =<< hashSymlink link
|
||||||
showEndOk
|
showEndOk
|
||||||
queueTransfers "newly added file" Next key (Just file) Upload
|
return $ Just $ finishedChange change key
|
||||||
return $ Just change
|
|
||||||
|
|
||||||
{- Check that the keysource's keyFilename still exists,
|
{- Check that the keysource's keyFilename still exists,
|
||||||
- and is still a hard link to its contentLocation,
|
- and is still a hard link to its contentLocation,
|
||||||
|
@ -299,3 +301,22 @@ safeToAdd delayadd pending inprocess = do
|
||||||
tmpdir <- fromRepo gitAnnexTmpDir
|
tmpdir <- fromRepo gitAnnexTmpDir
|
||||||
liftIO $ Lsof.queryDir tmpdir
|
liftIO $ Lsof.queryDir tmpdir
|
||||||
)
|
)
|
||||||
|
|
||||||
|
{- After a Change is committed, queue any necessary transfers or drops
|
||||||
|
- of the content of the key.
|
||||||
|
-
|
||||||
|
- This is not done during the startup scan, because the expensive
|
||||||
|
- transfer scan does the same thing then.
|
||||||
|
-}
|
||||||
|
checkChangeContent :: Change -> Assistant ()
|
||||||
|
checkChangeContent (Change { changeInfo = i , changeFile = f }) =
|
||||||
|
case changeInfoKey i of
|
||||||
|
Nothing -> noop
|
||||||
|
Just k -> whenM (scanComplete <$> getDaemonStatus) $ do
|
||||||
|
present <- liftAnnex $ inAnnex k
|
||||||
|
if present
|
||||||
|
then queueTransfers "new file created" Next k (Just f) Upload
|
||||||
|
else queueTransfers "new or renamed file wanted" Next k (Just f) Download
|
||||||
|
handleDrops "file renamed" present k (Just f) Nothing
|
||||||
|
checkChangeContent _ = noop
|
||||||
|
|
||||||
|
|
|
@ -20,10 +20,7 @@ import Assistant.Common
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.Changes
|
import Assistant.Changes
|
||||||
import Assistant.Types.Changes
|
import Assistant.Types.Changes
|
||||||
import Assistant.TransferQueue
|
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Assistant.Drop
|
|
||||||
import Logs.Transfer
|
|
||||||
import Utility.DirWatcher
|
import Utility.DirWatcher
|
||||||
import Utility.Types.DirWatcher
|
import Utility.Types.DirWatcher
|
||||||
import Utility.Lsof
|
import Utility.Lsof
|
||||||
|
@ -178,6 +175,7 @@ onAdd file filestatus
|
||||||
- really been modified. -}
|
- really been modified. -}
|
||||||
onAddDirect :: Handler
|
onAddDirect :: Handler
|
||||||
onAddDirect file fs = do
|
onAddDirect file fs = do
|
||||||
|
debug ["add direct", file]
|
||||||
v <- liftAnnex $ catKeyFile file
|
v <- liftAnnex $ catKeyFile file
|
||||||
case (v, fs) of
|
case (v, fs) of
|
||||||
(Just key, Just filestatus) ->
|
(Just key, Just filestatus) ->
|
||||||
|
@ -201,20 +199,16 @@ onAddSymlink isdirect file filestatus = go =<< liftAnnex (Backend.lookupFile fil
|
||||||
liftAnnex $ void $ addAssociatedFile key file
|
liftAnnex $ void $ addAssociatedFile key file
|
||||||
link <- liftAnnex $ calcGitLink file key
|
link <- liftAnnex $ calcGitLink file key
|
||||||
ifM ((==) (Just link) <$> liftIO (catchMaybeIO $ readSymbolicLink file))
|
ifM ((==) (Just link) <$> liftIO (catchMaybeIO $ readSymbolicLink file))
|
||||||
( do
|
( ensurestaged (Just link) (Just key) =<< getDaemonStatus
|
||||||
s <- getDaemonStatus
|
|
||||||
checkcontent key s
|
|
||||||
ensurestaged (Just link) s
|
|
||||||
, do
|
, do
|
||||||
unless isdirect $ do
|
unless isdirect $ do
|
||||||
liftIO $ removeFile file
|
liftIO $ removeFile file
|
||||||
liftAnnex $ Backend.makeAnnexLink link file
|
liftAnnex $ Backend.makeAnnexLink link file
|
||||||
checkcontent key =<< getDaemonStatus
|
addlink link (Just key)
|
||||||
addlink link
|
|
||||||
)
|
)
|
||||||
go Nothing = do -- other symlink
|
go Nothing = do -- other symlink
|
||||||
mlink <- liftIO (catchMaybeIO $ readSymbolicLink file)
|
mlink <- liftIO (catchMaybeIO $ readSymbolicLink file)
|
||||||
ensurestaged mlink =<< getDaemonStatus
|
ensurestaged mlink Nothing =<< getDaemonStatus
|
||||||
|
|
||||||
{- This is often called on symlinks that are already
|
{- This is often called on symlinks that are already
|
||||||
- staged correctly. A symlink may have been deleted
|
- staged correctly. A symlink may have been deleted
|
||||||
|
@ -227,16 +221,16 @@ onAddSymlink isdirect file filestatus = go =<< liftAnnex (Backend.lookupFile fil
|
||||||
- (If the daemon has never ran before, avoid staging
|
- (If the daemon has never ran before, avoid staging
|
||||||
- links too.)
|
- links too.)
|
||||||
-}
|
-}
|
||||||
ensurestaged (Just link) daemonstatus
|
ensurestaged (Just link) mk daemonstatus
|
||||||
| scanComplete daemonstatus = addlink link
|
| scanComplete daemonstatus = addlink link mk
|
||||||
| otherwise = case filestatus of
|
| otherwise = case filestatus of
|
||||||
Just s
|
Just s
|
||||||
| not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
|
| not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
|
||||||
_ -> addlink link
|
_ -> addlink link mk
|
||||||
ensurestaged Nothing _ = noChange
|
ensurestaged Nothing _ _ = noChange
|
||||||
|
|
||||||
{- For speed, tries to reuse the existing blob for symlink target. -}
|
{- For speed, tries to reuse the existing blob for symlink target. -}
|
||||||
addlink link = do
|
addlink link mk = do
|
||||||
debug ["add symlink", file]
|
debug ["add symlink", file]
|
||||||
liftAnnex $ do
|
liftAnnex $ do
|
||||||
v <- catObjectDetails $ Ref $ ':':file
|
v <- catObjectDetails $ Ref $ ':':file
|
||||||
|
@ -245,20 +239,7 @@ onAddSymlink isdirect file filestatus = go =<< liftAnnex (Backend.lookupFile fil
|
||||||
| s2w8 link == L.unpack currlink ->
|
| s2w8 link == L.unpack currlink ->
|
||||||
stageSymlink file sha
|
stageSymlink file sha
|
||||||
_ -> stageSymlink file =<< hashSymlink link
|
_ -> stageSymlink file =<< hashSymlink link
|
||||||
madeChange file LinkChange
|
madeChange file $ LinkChange mk
|
||||||
|
|
||||||
{- When a new link appears, or a link is changed, after the startup
|
|
||||||
- scan, handle getting or dropping the key's content.
|
|
||||||
- Also, moving or copying a link may caused it be be transferred
|
|
||||||
- elsewhere, so check that too. -}
|
|
||||||
checkcontent key daemonstatus
|
|
||||||
| scanComplete daemonstatus = do
|
|
||||||
present <- liftAnnex $ inAnnex key
|
|
||||||
if present
|
|
||||||
then queueTransfers "new file created" Next key (Just file) Upload
|
|
||||||
else queueTransfers "new or renamed file wanted" Next key (Just file) Download
|
|
||||||
handleDrops "file renamed" present key (Just file) Nothing
|
|
||||||
| otherwise = noop
|
|
||||||
|
|
||||||
onDel :: Handler
|
onDel :: Handler
|
||||||
onDel file _ = do
|
onDel file _ = do
|
||||||
|
|
|
@ -8,20 +8,26 @@
|
||||||
module Assistant.Types.Changes where
|
module Assistant.Types.Changes where
|
||||||
|
|
||||||
import Types.KeySource
|
import Types.KeySource
|
||||||
|
import Types.Key
|
||||||
import Utility.TSet
|
import Utility.TSet
|
||||||
|
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
|
|
||||||
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
|
data ChangeInfo = AddChange Key | LinkChange (Maybe Key) | RmChange | RmDirChange
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
changeInfoKey :: ChangeInfo -> Maybe Key
|
||||||
|
changeInfoKey (AddChange k) = Just k
|
||||||
|
changeInfoKey (LinkChange (Just k)) = Just k
|
||||||
|
changeInfoKey _ = Nothing
|
||||||
|
|
||||||
type ChangeChan = TSet Change
|
type ChangeChan = TSet Change
|
||||||
|
|
||||||
data Change
|
data Change
|
||||||
= Change
|
= Change
|
||||||
{ changeTime :: UTCTime
|
{ changeTime :: UTCTime
|
||||||
, changeFile :: FilePath
|
, changeFile :: FilePath
|
||||||
, changeType :: ChangeType
|
, changeInfo :: ChangeInfo
|
||||||
}
|
}
|
||||||
| PendingAddChange
|
| PendingAddChange
|
||||||
{ changeTime ::UTCTime
|
{ changeTime ::UTCTime
|
||||||
|
@ -44,11 +50,10 @@ isInProcessAddChange :: Change -> Bool
|
||||||
isInProcessAddChange (InProcessAddChange {}) = True
|
isInProcessAddChange (InProcessAddChange {}) = True
|
||||||
isInProcessAddChange _ = False
|
isInProcessAddChange _ = False
|
||||||
|
|
||||||
finishedChange :: Change -> Change
|
finishedChange :: Change -> Key -> Change
|
||||||
finishedChange c@(InProcessAddChange { keySource = ks }) = Change
|
finishedChange c@(InProcessAddChange { keySource = ks }) k = Change
|
||||||
{ changeTime = changeTime c
|
{ changeTime = changeTime c
|
||||||
, changeFile = keyFilename ks
|
, changeFile = keyFilename ks
|
||||||
, changeType = AddChange
|
, changeInfo = AddChange k
|
||||||
}
|
}
|
||||||
finishedChange c = c
|
finishedChange c _ = c
|
||||||
|
|
||||||
|
|
2
debian/changelog
vendored
2
debian/changelog
vendored
|
@ -8,7 +8,7 @@ git-annex (4.20130228) UNRELEASED; urgency=low
|
||||||
* assistant: Avoid noise in logs from git commit about typechanged
|
* assistant: Avoid noise in logs from git commit about typechanged
|
||||||
files in direct mode repositories.
|
files in direct mode repositories.
|
||||||
* assistant: Fix dropping content when a file is moved to an archive
|
* assistant: Fix dropping content when a file is moved to an archive
|
||||||
directory.
|
directory, and getting contennt when a file is moved back out.
|
||||||
* assistant: Set gc.auto=0 when creating repositories to prevent
|
* assistant: Set gc.auto=0 when creating repositories to prevent
|
||||||
automatic commits from causing git-gc runs.
|
automatic commits from causing git-gc runs.
|
||||||
* assistant: If gc.auto=0, run git-gc once a day, packing loose objects
|
* assistant: If gc.auto=0, run git-gc once a day, packing loose objects
|
||||||
|
|
Loading…
Add table
Reference in a new issue