split Changes and lifted
This commit is contained in:
parent
39a3adf434
commit
d2294f0dfa
5 changed files with 82 additions and 67 deletions
|
@ -7,73 +7,33 @@
|
|||
|
||||
module Assistant.Changes where
|
||||
|
||||
import Common.Annex
|
||||
import Types.KeySource
|
||||
import Assistant.Common
|
||||
import Assistant.Types.Changes
|
||||
import Utility.TSet
|
||||
|
||||
import Data.Time.Clock
|
||||
|
||||
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
|
||||
deriving (Show, Eq)
|
||||
|
||||
type ChangeChan = TSet Change
|
||||
|
||||
data Change
|
||||
= Change
|
||||
{ changeTime :: UTCTime
|
||||
, changeFile :: FilePath
|
||||
, changeType :: ChangeType
|
||||
}
|
||||
| PendingAddChange
|
||||
{ changeTime ::UTCTime
|
||||
, changeFile :: FilePath
|
||||
}
|
||||
| InProcessAddChange
|
||||
{ changeTime ::UTCTime
|
||||
, keySource :: KeySource
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
newChangeChan :: IO ChangeChan
|
||||
newChangeChan = newTSet
|
||||
|
||||
{- Handlers call this when they made a change that needs to get committed. -}
|
||||
madeChange :: FilePath -> ChangeType -> IO (Maybe Change)
|
||||
madeChange f t = Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
|
||||
madeChange :: FilePath -> ChangeType -> Assistant (Maybe Change)
|
||||
madeChange f t = Just <$> (Change <$> liftIO getCurrentTime <*> pure f <*> pure t)
|
||||
|
||||
noChange :: IO (Maybe Change)
|
||||
noChange :: Assistant (Maybe Change)
|
||||
noChange = return Nothing
|
||||
|
||||
{- Indicates an add needs to be done, but has not started yet. -}
|
||||
pendingAddChange :: FilePath -> IO (Maybe Change)
|
||||
pendingAddChange f = Just <$> (PendingAddChange <$> getCurrentTime <*> pure f)
|
||||
|
||||
isPendingAddChange :: Change -> Bool
|
||||
isPendingAddChange (PendingAddChange {}) = True
|
||||
isPendingAddChange _ = False
|
||||
|
||||
isInProcessAddChange :: Change -> Bool
|
||||
isInProcessAddChange (InProcessAddChange {}) = True
|
||||
isInProcessAddChange _ = False
|
||||
|
||||
finishedChange :: Change -> Change
|
||||
finishedChange c@(InProcessAddChange { keySource = ks }) = Change
|
||||
{ changeTime = changeTime c
|
||||
, changeFile = keyFilename ks
|
||||
, changeType = AddChange
|
||||
}
|
||||
finishedChange c = c
|
||||
pendingAddChange :: FilePath -> Assistant (Maybe Change)
|
||||
pendingAddChange f = Just <$> (PendingAddChange <$> liftIO getCurrentTime <*> pure f)
|
||||
|
||||
{- Gets all unhandled changes.
|
||||
- Blocks until at least one change is made. -}
|
||||
getChanges :: ChangeChan -> IO [Change]
|
||||
getChanges = getTSet
|
||||
getChanges :: Assistant [Change]
|
||||
getChanges = getTSet <<~ changeChan
|
||||
|
||||
{- Puts unhandled changes back into the channel.
|
||||
- Note: Original order is not preserved. -}
|
||||
refillChanges :: ChangeChan -> [Change] -> IO ()
|
||||
refillChanges = putTSet
|
||||
refillChanges :: [Change] -> Assistant ()
|
||||
refillChanges cs = flip putTSet cs <<~ changeChan
|
||||
|
||||
{- Records a change in the channel. -}
|
||||
recordChange :: ChangeChan -> Change -> IO ()
|
||||
recordChange = putTSet1
|
||||
recordChange :: Change -> Assistant ()
|
||||
recordChange c = flip putTSet1 c <<~ changeChan
|
||||
|
|
|
@ -34,7 +34,7 @@ import Assistant.TransferSlots
|
|||
import Assistant.Types.Pushes
|
||||
import Assistant.Types.BranchChange
|
||||
import Assistant.Commits
|
||||
import Assistant.Changes
|
||||
import Assistant.Types.Changes
|
||||
|
||||
newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a }
|
||||
deriving (
|
||||
|
|
|
@ -11,6 +11,7 @@ module Assistant.Threads.Committer where
|
|||
|
||||
import Assistant.Common
|
||||
import Assistant.Changes
|
||||
import Assistant.Types.Changes
|
||||
import Assistant.Commits
|
||||
import Assistant.Alert
|
||||
import Assistant.Threads.Watcher
|
||||
|
@ -45,7 +46,7 @@ commitThread = NamedThread "Committer" $ do
|
|||
-- We already waited one second as a simple rate limiter.
|
||||
-- Next, wait until at least one change is available for
|
||||
-- processing.
|
||||
changes <- getChanges <<~ changeChan
|
||||
changes <- getChanges
|
||||
-- Now see if now's a good time to commit.
|
||||
time <- liftIO getCurrentTime
|
||||
if shouldCommit time changes
|
||||
|
@ -67,7 +68,7 @@ commitThread = NamedThread "Committer" $ do
|
|||
refill [] = noop
|
||||
refill cs = do
|
||||
debug ["delaying commit of", show (length cs), "changes"]
|
||||
flip refillChanges cs <<~ changeChan
|
||||
refillChanges cs
|
||||
|
||||
commitStaged :: Annex Bool
|
||||
commitStaged = do
|
||||
|
@ -148,15 +149,14 @@ handleAdds delayadd cs = returnWhen (null incomplete) $ do
|
|||
(postponed, toadd) <- partitionEithers <$> safeToAdd delayadd pending' inprocess
|
||||
|
||||
unless (null postponed) $
|
||||
flip refillChanges postponed <<~ changeChan
|
||||
refillChanges postponed
|
||||
|
||||
returnWhen (null toadd) $ do
|
||||
added <- catMaybes <$> forM toadd add
|
||||
if DirWatcher.eventsCoalesce || null added
|
||||
then return $ added ++ otherchanges
|
||||
else do
|
||||
r <- handleAdds delayadd
|
||||
=<< getChanges <<~ changeChan
|
||||
r <- handleAdds delayadd =<< getChanges
|
||||
return $ r ++ added ++ otherchanges
|
||||
where
|
||||
(incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
|
||||
|
|
|
@ -17,6 +17,7 @@ module Assistant.Threads.Watcher (
|
|||
import Assistant.Common
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Changes
|
||||
import Assistant.Types.Changes
|
||||
import Assistant.TransferQueue
|
||||
import Assistant.Alert
|
||||
import Assistant.Drop
|
||||
|
@ -114,12 +115,12 @@ runHandler handler file filestatus = void $ do
|
|||
-- Just in case the commit thread is not
|
||||
-- flushing the queue fast enough.
|
||||
liftAnnex $ Annex.Queue.flushWhenFull
|
||||
flip recordChange change <<~ changeChan
|
||||
recordChange change
|
||||
|
||||
onAdd :: Handler
|
||||
onAdd file filestatus
|
||||
| maybe False isRegularFile filestatus = liftIO $ pendingAddChange file
|
||||
| otherwise = liftIO $ noChange
|
||||
| maybe False isRegularFile filestatus = pendingAddChange file
|
||||
| otherwise = noChange
|
||||
|
||||
{- A symlink might be an arbitrary symlink, which is just added.
|
||||
- Or, if it is a git-annex symlink, ensure it points to the content
|
||||
|
@ -160,7 +161,7 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
|
|||
| scanComplete daemonstatus = addlink link
|
||||
| otherwise = case filestatus of
|
||||
Just s
|
||||
| not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> liftIO noChange
|
||||
| not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
|
||||
_ -> addlink link
|
||||
|
||||
{- For speed, tries to reuse the existing blob for symlink target. -}
|
||||
|
@ -176,7 +177,7 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
|
|||
sha <- inRepo $
|
||||
Git.HashObject.hashObject BlobObject link
|
||||
stageSymlink file sha
|
||||
liftIO $ madeChange file LinkChange
|
||||
madeChange file LinkChange
|
||||
|
||||
{- When a new link appears, or a link is changed, after the startup
|
||||
- scan, handle getting or dropping the key's content. -}
|
||||
|
@ -197,7 +198,7 @@ onDel file _ = do
|
|||
liftAnnex $
|
||||
Annex.Queue.addUpdateIndex =<<
|
||||
inRepo (Git.UpdateIndex.unstageFile file)
|
||||
liftIO $ madeChange file RmChange
|
||||
madeChange file RmChange
|
||||
|
||||
{- A directory has been deleted, or moved, so tell git to remove anything
|
||||
- that was inside it from its cache. Since it could reappear at any time,
|
||||
|
@ -211,7 +212,7 @@ onDelDir dir _ = do
|
|||
debug ["directory deleted", dir]
|
||||
liftAnnex $ Annex.Queue.addCommand "rm"
|
||||
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
|
||||
liftIO $ madeChange dir RmDirChange
|
||||
madeChange dir RmDirChange
|
||||
|
||||
{- Called when there's an error with inotify or kqueue. -}
|
||||
onErr :: Handler
|
||||
|
@ -219,7 +220,7 @@ onErr msg _ = do
|
|||
liftAnnex $ warning msg
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg
|
||||
liftIO noChange
|
||||
noChange
|
||||
|
||||
{- Adds a symlink to the index, without ever accessing the actual symlink
|
||||
- on disk. This avoids a race if git add is used, where the symlink is
|
||||
|
|
54
Assistant/Types/Changes.hs
Normal file
54
Assistant/Types/Changes.hs
Normal file
|
@ -0,0 +1,54 @@
|
|||
{- git-annex assistant change tracking
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Types.Changes where
|
||||
|
||||
import Types.KeySource
|
||||
import Utility.TSet
|
||||
|
||||
import Data.Time.Clock
|
||||
|
||||
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
|
||||
deriving (Show, Eq)
|
||||
|
||||
type ChangeChan = TSet Change
|
||||
|
||||
data Change
|
||||
= Change
|
||||
{ changeTime :: UTCTime
|
||||
, changeFile :: FilePath
|
||||
, changeType :: ChangeType
|
||||
}
|
||||
| PendingAddChange
|
||||
{ changeTime ::UTCTime
|
||||
, changeFile :: FilePath
|
||||
}
|
||||
| InProcessAddChange
|
||||
{ changeTime ::UTCTime
|
||||
, keySource :: KeySource
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
newChangeChan :: IO ChangeChan
|
||||
newChangeChan = newTSet
|
||||
|
||||
isPendingAddChange :: Change -> Bool
|
||||
isPendingAddChange (PendingAddChange {}) = True
|
||||
isPendingAddChange _ = False
|
||||
|
||||
isInProcessAddChange :: Change -> Bool
|
||||
isInProcessAddChange (InProcessAddChange {}) = True
|
||||
isInProcessAddChange _ = False
|
||||
|
||||
finishedChange :: Change -> Change
|
||||
finishedChange c@(InProcessAddChange { keySource = ks }) = Change
|
||||
{ changeTime = changeTime c
|
||||
, changeFile = keyFilename ks
|
||||
, changeType = AddChange
|
||||
}
|
||||
finishedChange c = c
|
||||
|
Loading…
Reference in a new issue