converted 6 more threads

This commit is contained in:
Joey Hess 2012-10-29 11:40:22 -04:00
parent bad88e404a
commit 76768ad977
8 changed files with 350 additions and 370 deletions

View file

@ -182,26 +182,24 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
dstatus <- getAssistant daemonStatusHandle
changechan <- getAssistant changeChan
commitchan <- getAssistant commitChan
pushmap <- getAssistant failedPushMap
transferqueue <- getAssistant transferQueue
transferslots <- getAssistant transferSlots
scanremotes <- getAssistant scanRemoteMap
branchhandle <- getAssistant branchChangeHandle
pushnotifier <- getAssistant pushNotifier
#ifdef WITH_WEBAPP
urlrenderer <- liftIO newUrlRenderer
#endif
mapM_ (startthread d)
[ watch $ commitThread st changechan commitchan transferqueue dstatus
[ watch $ commitThread
#ifdef WITH_WEBAPP
, assist $ webAppThread d urlrenderer False Nothing webappwaiter
#ifdef WITH_PAIRING
, assist $ pairListenerThread st dstatus scanremotes urlrenderer
#endif
#endif
, assist $ pushThread st dstatus commitchan pushmap pushnotifier
, assist $ pushRetryThread st dstatus pushmap pushnotifier
, assist $ mergeThread st dstatus transferqueue branchhandle
, assist $ pushThread
, assist $ pushRetryThread
, assist $ mergeThread
, assist $ transferWatcherThread st dstatus transferqueue
, assist $ transferPollerThread
, assist $ transfererThread st dstatus transferqueue transferslots commitchan
@ -210,10 +208,10 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
, assist $ mountWatcherThread st dstatus scanremotes pushnotifier
, assist $ netWatcherThread
, assist $ netWatcherFallbackThread
, assist $ transferScannerThread st dstatus scanremotes transferqueue
, assist $ configMonitorThread st dstatus branchhandle commitchan
, assist $ transferScannerThread
, assist $ configMonitorThread
#ifdef WITH_XMPP
, assist $ pushNotifierThread st dstatus pushnotifier
, assist $ pushNotifierThread
#endif
, watch $ watchThread
]

View file

@ -13,7 +13,6 @@ import Assistant.Common
import Assistant.Changes
import Assistant.Commits
import Assistant.Alert
import Assistant.ThreadedMonad
import Assistant.Threads.Watcher
import Assistant.TransferQueue
import Assistant.DaemonStatus
@ -37,48 +36,40 @@ import Data.Tuple.Utils
import qualified Data.Set as S
import Data.Either
thisThread :: ThreadName
thisThread = "Committer"
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do
delayadd <- runThreadState st $
commitThread :: NamedThread
commitThread = NamedThread "Committer" $ do
delayadd <- liftAnnex $
maybe delayaddDefault (Just . Seconds) . readish
<$> getConfig (annexConfig "delayadd") ""
runEvery (Seconds 1) $ do
runEvery (Seconds 1) <~> do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
changes <- getChanges changechan
changes <- getChanges <<~ changeChan
-- Now see if now's a good time to commit.
time <- getCurrentTime
time <- liftIO getCurrentTime
if shouldCommit time changes
then do
readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes
readychanges <- handleAdds delayadd changes
if shouldCommit time readychanges
then do
brokendebug thisThread
debug
[ "committing"
, show (length readychanges)
, "changes"
]
void $ alertWhile dstatus commitAlert $
runThreadState st commitStaged
recordCommit commitchan
dstatus <- getAssistant daemonStatusHandle
void $ alertWhile dstatus commitAlert <~>
liftAnnex commitStaged
recordCommit <<~ commitChan
else refill readychanges
else refill changes
where
thread = NamedThread thisThread
refill [] = noop
refill cs = do
brokendebug thisThread
[ "delaying commit of"
, show (length cs)
, "changes"
]
refillChanges changechan cs
where
refill [] = noop
refill cs = do
debug ["delaying commit of", show (length cs), "changes"]
flip refillChanges cs <<~ changeChan
commitStaged :: Annex Bool
commitStaged = do
@ -99,12 +90,12 @@ commitStaged = do
- each other out, etc. Git returns nonzero on those,
- so don't propigate out commit failures. -}
return True
where
nomessage ps
| Git.Version.older "1.7.2" = Param "-m"
: Param "autocommit" : ps
| otherwise = Param "--allow-empty-message"
: Param "-m" : Param "" : ps
where
nomessage ps
| Git.Version.older "1.7.2" = Param "-m"
: Param "autocommit" : ps
| otherwise = Param "--allow-empty-message"
: Param "-m" : Param "" : ps
{- Decide if now is a good time to make a commit.
- Note that the list of change times has an undefined order.
@ -118,9 +109,9 @@ shouldCommit now changes
| len > 10000 = True -- avoid bloating queue too much
| length (filter thisSecond changes) < 10 = True
| otherwise = False -- batch activity
where
len = length changes
thisSecond c = now `diffUTCTime` changeTime c <= 1
where
len = length changes
thisSecond c = now `diffUTCTime` changeTime c <= 1
{- OSX needs a short delay after a file is added before locking it down,
- as pasting a file seems to try to set file permissions or otherwise
@ -152,77 +143,82 @@ delayaddDefault = Nothing
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
handleAdds :: Maybe Seconds -> ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null incomplete) $ do
handleAdds :: Maybe Seconds -> [Change] -> Assistant [Change]
handleAdds delayadd cs = returnWhen (null incomplete) $ do
let (pending, inprocess) = partition isPendingAddChange incomplete
pending' <- findnew pending
(postponed, toadd) <- partitionEithers <$> safeToAdd delayadd st pending' inprocess
(postponed, toadd) <- partitionEithers <$> safeToAdd delayadd pending' inprocess
unless (null postponed) $
refillChanges changechan postponed
flip refillChanges postponed <<~ changeChan
returnWhen (null toadd) $ do
added <- catMaybes <$> forM toadd add
if DirWatcher.eventsCoalesce || null added
then return $ added ++ otherchanges
else do
r <- handleAdds delayadd st changechan transferqueue dstatus
=<< getChanges changechan
r <- handleAdds delayadd
=<< getChanges <<~ changeChan
return $ r ++ added ++ otherchanges
where
(incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
where
(incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs
findnew [] = return []
findnew pending@(exemplar:_) = do
(!newfiles, cleanup) <- runThreadState st $
inRepo (Git.LsFiles.notInRepo False $ map changeFile pending)
void cleanup
-- note: timestamp info is lost here
let ts = changeTime exemplar
return $ map (PendingAddChange ts) newfiles
findnew [] = return []
findnew pending@(exemplar:_) = do
(!newfiles, cleanup) <- liftAnnex $
inRepo (Git.LsFiles.notInRepo False $ map changeFile pending)
void $ liftIO cleanup
-- note: timestamp info is lost here
let ts = changeTime exemplar
return $ map (PendingAddChange ts) newfiles
returnWhen c a
| c = return otherchanges
| otherwise = a
returnWhen c a
| c = return otherchanges
| otherwise = a
add :: Change -> IO (Maybe Change)
add change@(InProcessAddChange { keySource = ks }) =
alertWhile' dstatus (addFileAlert $ keyFilename ks) $
liftM ret $ catchMaybeIO $
sanitycheck ks $ runThreadState st $ do
showStart "add" $ keyFilename ks
key <- Command.Add.ingest ks
done (finishedChange change) (keyFilename ks) key
where
{- Add errors tend to be transient and will
- be automatically dealt with, so don't
- pass to the alert code. -}
ret (Just j@(Just _)) = (True, j)
ret _ = (True, Nothing)
add _ = return Nothing
add :: Change -> Assistant (Maybe Change)
add change@(InProcessAddChange { keySource = ks }) = do
dstatus <- getAssistant daemonStatusHandle
alertWhile' dstatus (addFileAlert $ keyFilename ks) <~> add' change ks
add _ = return Nothing
done _ _ Nothing = do
showEndFail
return Nothing
done change file (Just key) = do
link <- Command.Add.link file key True
when DirWatcher.eventsCoalesce $ do
add' change ks = liftM ret $ catchMaybeIO <~> do
sanitycheck ks $ do
key <- liftAnnex $ do
showStart "add" $ keyFilename ks
Command.Add.ingest ks
done (finishedChange change) (keyFilename ks) key
where
{- Add errors tend to be transient and will be automatically
- dealt with, so don't pass to the alert code. -}
ret (Just j@(Just _)) = (True, j)
ret _ = (True, Nothing)
done _ _ Nothing = do
liftAnnex showEndFail
return Nothing
done change file (Just key) = do
link <- liftAnnex $ Command.Add.link file key True
when DirWatcher.eventsCoalesce $
liftAnnex $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
queueTransfers Next transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
showEndOk
transferqueue <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftAnnex $ queueTransfers Next transferqueue dstatus key (Just file) Upload
return $ Just change
{- Check that the keysource's keyFilename still exists,
- and is still a hard link to its contentLocation,
- before ingesting it. -}
sanitycheck keysource a = do
fs <- getSymbolicLinkStatus $ keyFilename keysource
ks <- getSymbolicLinkStatus $ contentLocation keysource
if deviceID ks == deviceID fs && fileID ks == fileID fs
then a
else return Nothing
{- Check that the keysource's keyFilename still exists,
- and is still a hard link to its contentLocation,
- before ingesting it. -}
sanitycheck keysource a = do
fs <- liftIO $ getSymbolicLinkStatus $ keyFilename keysource
ks <- liftIO $ getSymbolicLinkStatus $ contentLocation keysource
if deviceID ks == deviceID fs && fileID ks == fileID fs
then a
else return Nothing
{- Files can Either be Right to be added now,
- or are unsafe, and must be Left for later.
@ -230,11 +226,11 @@ handleAdds delayadd st changechan transferqueue dstatus cs = returnWhen (null in
- Check by running lsof on the temp directory, which
- the KeySources are locked down in.
-}
safeToAdd :: Maybe Seconds -> ThreadState -> [Change] -> [Change] -> IO [Either Change Change]
safeToAdd _ _ [] [] = return []
safeToAdd delayadd st pending inprocess = do
maybe noop threadDelaySeconds delayadd
runThreadState st $ do
safeToAdd :: Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change]
safeToAdd _ [] [] = return []
safeToAdd delayadd pending inprocess = do
maybe noop (liftIO . threadDelaySeconds) delayadd
liftAnnex $ do
keysources <- mapM Command.Add.lockDown (map changeFile pending)
let inprocess' = map mkinprocess (zip pending keysources)
tmpdir <- fromRepo gitAnnexTmpDir
@ -250,25 +246,24 @@ safeToAdd delayadd st pending inprocess = do
mapM_ canceladd $ lefts checked
allRight $ rights checked
else return checked
where
check openfiles change@(InProcessAddChange { keySource = ks })
| S.member (contentLocation ks) openfiles = Left change
check _ change = Right change
where
check openfiles change@(InProcessAddChange { keySource = ks })
| S.member (contentLocation ks) openfiles = Left change
check _ change = Right change
mkinprocess (c, ks) = InProcessAddChange
{ changeTime = changeTime c
, keySource = ks
}
mkinprocess (c, ks) = InProcessAddChange
{ changeTime = changeTime c
, keySource = ks
}
canceladd (InProcessAddChange { keySource = ks }) = do
warning $ keyFilename ks
++ " still has writers, not adding"
-- remove the hard link
void $ liftIO $ tryIO $
removeFile $ contentLocation ks
canceladd _ = noop
canceladd (InProcessAddChange { keySource = ks }) = do
warning $ keyFilename ks
++ " still has writers, not adding"
-- remove the hard link
void $ liftIO $ tryIO $ removeFile $ contentLocation ks
canceladd _ = noop
openwrite (_file, mode, _pid) =
mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
openwrite (_file, mode, _pid) =
mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
allRight = return . map Right
allRight = return . map Right

View file

@ -9,7 +9,6 @@ module Assistant.Threads.ConfigMonitor where
import Assistant.Common
import Assistant.BranchChange
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Commits
import Utility.ThreadScheduler
@ -19,10 +18,8 @@ import Logs.Remote
import Logs.PreferredContent
import Logs.Group
import Remote.List (remoteListRefresh)
import qualified Git
import qualified Git.LsTree as LsTree
import qualified Annex.Branch
import qualified Annex
import qualified Data.Set as S
@ -37,26 +34,22 @@ thisThread = "ConfigMonitor"
- if the branch has not changed in a while, configuration changes will
- be detected immediately.
-}
configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread
configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do
r <- runThreadState st Annex.gitRepo
go r =<< getConfigs r
where
thread = NamedThread thisThread
go r old = do
threadDelaySeconds (Seconds 60)
waitBranchChange branchhandle
new <- getConfigs r
when (old /= new) $ do
let changedconfigs = new `S.difference` old
brokendebug thisThread $ "reloading config" :
map fst (S.toList changedconfigs)
reloadConfigs st dstatus changedconfigs
{- Record a commit to get this config
- change pushed out to remotes. -}
recordCommit commitchan
go r new
configMonitorThread :: NamedThread
configMonitorThread = NamedThread "ConfigMonitor" $ loop =<< getConfigs
where
loop old = do
liftIO $ threadDelaySeconds (Seconds 60)
waitBranchChange <<~ branchChangeHandle
new <- getConfigs
when (old /= new) $ do
let changedconfigs = new `S.difference` old
debug $ "reloading config" :
map fst (S.toList changedconfigs)
reloadConfigs new
{- Record a commit to get this config
- change pushed out to remotes. -}
recordCommit <<~ commitChan
loop new
{- Config files, and their checksums. -}
type Configs = S.Set (FilePath, String)
@ -73,22 +66,23 @@ configFilesActions =
, (preferredContentLog, noop)
]
reloadConfigs :: ThreadState -> DaemonStatusHandle -> Configs -> IO ()
reloadConfigs st dstatus changedconfigs = runThreadState st $ do
sequence_ as
void preferredContentMapLoad
reloadConfigs :: Configs -> Assistant ()
reloadConfigs changedconfigs = do
liftAnnex $ do
sequence_ as
void preferredContentMapLoad
{- Changes to the remote log, or the trust log, can affect the
- syncRemotes list -}
when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $
updateSyncRemotes dstatus
where
(fs, as) = unzip $ filter (flip S.member changedfiles . fst)
configFilesActions
changedfiles = S.map fst changedconfigs
when (Logs.Remote.remoteLog `elem` fs || Logs.Trust.trustLog `elem` fs) $
liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
where
(fs, as) = unzip $ filter (flip S.member changedfiles . fst)
configFilesActions
changedfiles = S.map fst changedconfigs
getConfigs :: Git.Repo -> IO Configs
getConfigs r = S.fromList . map extract
<$> LsTree.lsTreeFiles Annex.Branch.fullname files r
where
files = map fst configFilesActions
extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem)
getConfigs :: Assistant Configs
getConfigs = S.fromList . map extract
<$> liftAnnex (inRepo $ LsTree.lsTreeFiles Annex.Branch.fullname files)
where
files = map fst configFilesActions
extract treeitem = (LsTree.file treeitem, LsTree.sha treeitem)

View file

@ -8,8 +8,6 @@
module Assistant.Threads.Merger where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.TransferQueue
import Assistant.BranchChange
import Utility.DirWatcher
@ -24,36 +22,34 @@ thisThread = "Merger"
{- This thread watches for changes to .git/refs/, and handles incoming
- pushes. -}
mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread
mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do
g <- runThreadState st gitRepo
mergeThread :: NamedThread
mergeThread = NamedThread "Merger" $ do
g <- liftAnnex gitRepo
let dir = Git.localGitDir g </> "refs"
createDirectoryIfMissing True dir
let hook a = Just $ runHandler st dstatus transferqueue branchchange a
liftIO $ createDirectoryIfMissing True dir
let hook a = Just <$> asIO2 (runHandler a)
addhook <- hook onAdd
errhook <- hook onErr
let hooks = mkWatchHooks
{ addHook = hook onAdd
, errHook = hook onErr
{ addHook = addhook
, errHook = errhook
}
void $ watchDir dir (const False) hooks id
brokendebug thisThread ["watching", dir]
where
thread = NamedThread thisThread
void $ liftIO $ watchDir dir (const False) hooks id
debug ["watching", dir]
type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> FilePath -> Maybe FileStatus -> IO ()
type Handler = FilePath -> Assistant ()
{- Runs an action handler.
-
- Exceptions are ignored, otherwise a whole thread could be crashed.
-}
runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> Handler -> FilePath -> Maybe FileStatus -> IO ()
runHandler st dstatus transferqueue branchchange handler file filestatus = void $
either print (const noop) =<< tryIO go
where
go = handler st dstatus transferqueue branchchange file filestatus
runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant ()
runHandler handler file _filestatus =
either (liftIO . print) (const noop) =<< tryIO <~> handler file
{- Called when there's an error with inotify. -}
onErr :: Handler
onErr _ _ _ _ msg _ = error msg
onErr msg = error msg
{- Called when a new branch ref is written.
-
@ -67,29 +63,29 @@ onErr _ _ _ _ msg _ = error msg
- ran are merged in.
-}
onAdd :: Handler
onAdd st dstatus transferqueue branchchange file _
onAdd file
| ".lock" `isSuffixOf` file = noop
| isAnnexBranch file = do
branchChanged branchchange
runThreadState st $
branchChanged <<~ branchChangeHandle
transferqueue <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftAnnex $
whenM Annex.Branch.forceUpdate $
queueDeferredDownloads Later transferqueue dstatus
| "/synced/" `isInfixOf` file = runThreadState st $ do
mergecurrent =<< inRepo Git.Branch.current
| "/synced/" `isInfixOf` file = do
mergecurrent =<< liftAnnex (inRepo Git.Branch.current)
| otherwise = noop
where
changedbranch = fileToBranch file
mergecurrent (Just current)
| equivBranches changedbranch current = do
liftIO $ brokendebug thisThread
[ "merging"
, show changedbranch
, "into"
, show current
]
void $ inRepo $
Git.Merge.mergeNonInteractive changedbranch
mergecurrent _ = noop
where
changedbranch = fileToBranch file
mergecurrent (Just current)
| equivBranches changedbranch current = do
debug
[ "merging", show changedbranch
, "into", show current
]
void $ liftAnnex $ inRepo $
Git.Merge.mergeNonInteractive changedbranch
mergecurrent _ = noop
equivBranches :: Git.Ref -> Git.Ref -> Bool
equivBranches x y = base x == base y

View file

@ -12,7 +12,6 @@ module Assistant.Threads.PushNotifier where
import Assistant.Common
import Assistant.XMPP
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Pushes
import Assistant.Sync
@ -25,56 +24,56 @@ import qualified Data.Set as S
import qualified Git.Branch
import Data.Time.Clock
thisThread :: ThreadName
thisThread = "PushNotifier"
pushNotifierThread :: NamedThread
pushNotifierThread = NamedThread "PushNotifier" $ do
iodebug <- asIO debug
iopull <- asIO pull
pn <- getAssistant pushNotifier
controllerThread pn <~> xmppClient pn iodebug iopull
controllerThread :: PushNotifier -> IO () -> IO ()
controllerThread pushnotifier a = forever $ do
tid <- forkIO a
controllerThread pushnotifier xmppclient = forever $ do
tid <- forkIO xmppclient
waitRestart pushnotifier
killThread tid
pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $
controllerThread pushnotifier $ do
v <- runThreadState st $ getXMPPCreds
case v of
Nothing -> noop
Just c -> loop c =<< getCurrentTime
where
loop c starttime = do
void $ connectXMPP c $ \jid -> do
fulljid <- bindJID jid
liftIO $ brokendebug thisThread ["XMPP connected", show fulljid]
putStanza $ gitAnnexPresence gitAnnexSignature
s <- getSession
_ <- liftIO $ forkIO $ void $ runXMPP s $
receivenotifications
sendnotifications
now <- getCurrentTime
if diffUTCTime now starttime > 300
then do
brokendebug thisThread ["XMPP connection lost; reconnecting"]
loop c now
else do
brokendebug thisThread ["XMPP connection failed; will retry"]
threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime
sendnotifications = forever $ do
us <- liftIO $ waitPush pushnotifier
putStanza $ gitAnnexPresence $ encodePushNotification us
receivenotifications = forever $ do
s <- getStanza
liftIO $ brokendebug thisThread ["received XMPP:", show s]
case s of
ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
liftIO $ pull st dstatus $
concat $ catMaybes $
map decodePushNotification $
presencePayloads p
_ -> noop
xmppClient :: PushNotifier -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant ()
xmppClient pushnotifier iodebug iopull = do
v <- liftAnnex getXMPPCreds
case v of
Nothing -> noop
Just c -> liftIO $ loop c =<< getCurrentTime
where
loop c starttime = do
void $ connectXMPP c $ \jid -> do
fulljid <- bindJID jid
liftIO $ iodebug ["XMPP connected", show fulljid]
putStanza $ gitAnnexPresence gitAnnexSignature
s <- getSession
_ <- liftIO $ forkIO $ void $ runXMPP s $
receivenotifications
sendnotifications
now <- getCurrentTime
if diffUTCTime now starttime > 300
then do
iodebug ["XMPP connection lost; reconnecting"]
loop c now
else do
iodebug ["XMPP connection failed; will retry"]
threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime
sendnotifications = forever $ do
us <- liftIO $ waitPush pushnotifier
putStanza $ gitAnnexPresence $ encodePushNotification us
receivenotifications = forever $ do
s <- getStanza
liftIO $ iodebug ["received XMPP:", show s]
case s of
ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
liftIO $ iopull $ concat $ catMaybes $
map decodePushNotification $
presencePayloads p
_ -> noop
{- We only pull from one remote out of the set listed in the push
- notification, as an optimisation.
@ -89,18 +88,18 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $
- fully up-to-date. If that happens, the pushRetryThread will come along
- and retry the push, and we'll get another notification once it succeeds,
- and pull again. -}
pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO ()
pull _ _ [] = noop
pull st dstatus us = do
rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
brokendebug thisThread $ "push notification for" :
map (fromUUID . Remote.uuid ) rs
pullone rs =<< runThreadState st (inRepo Git.Branch.current)
where
matching r = Remote.uuid r `S.member` s
s = S.fromList us
pull :: [UUID] -> Assistant ()
pull [] = noop
pull us = do
rs <- filter matching . syncRemotes <$> daemonStatus
debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
st <- getAssistant threadState
liftIO . pullone st rs =<< liftAnnex (inRepo Git.Branch.current)
where
matching r = Remote.uuid r `S.member` s
s = S.fromList us
pullone [] _ = noop
pullone (r:rs) branch =
unlessM (all id . fst <$> manualPull st branch [r]) $
pullone rs branch
pullone _ [] _ = noop
pullone st (r:rs) branch =
unlessM (all id . fst <$> manualPull st branch [r]) $
pullone st rs branch

View file

@ -11,7 +11,6 @@ import Assistant.Common
import Assistant.Commits
import Assistant.Pushes
import Assistant.Alert
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Sync
import Utility.ThreadScheduler
@ -24,52 +23,49 @@ thisThread :: ThreadName
thisThread = "Pusher"
{- This thread retries pushes that failed before. -}
pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do
pushRetryThread :: NamedThread
pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
pushmap <- getAssistant failedPushMap
topush <- liftIO $ getFailedPushesBefore pushmap (fromIntegral halfhour)
unless (null topush) $ do
brokendebug thisThread
[ "retrying"
, show (length topush)
, "failed pushes"
]
now <- getCurrentTime
void $ alertWhile dstatus (pushRetryAlert topush) $
debug ["retrying", show (length topush), "failed pushes"]
now <- liftIO $ getCurrentTime
st <- getAssistant threadState
pushnotifier <- getAssistant pushNotifier
dstatus <- getAssistant daemonStatusHandle
void $ liftIO $ alertWhile dstatus (pushRetryAlert topush) $
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush
where
halfhour = 1800
thread = NamedThread thisThread
{- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do
pushThread :: NamedThread
pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
commits <- getCommits <<~ commitChan
-- Now see if now's a good time to push.
if shouldPush commits
then do
remotes <- filter pushable . syncRemotes
<$> getDaemonStatus dstatus
remotes <- filter pushable . syncRemotes <$> daemonStatus
unless (null remotes) $ do
now <- getCurrentTime
void $ alertWhile dstatus (pushAlert remotes) $
now <- liftIO $ getCurrentTime
st <- getAssistant threadState
pushmap <- getAssistant failedPushMap
pushnotifier <- getAssistant pushNotifier
dstatus <- getAssistant daemonStatusHandle
void $ liftIO $ alertWhile dstatus (pushAlert remotes) $
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do
brokendebug thisThread
[ "delaying push of"
, show (length commits)
, "commits"
]
refillCommits commitchan commits
where
thread = NamedThread thisThread
pushable r
| Remote.specialRemote r = False
| Remote.readonly r = False
| otherwise = True
debug ["delaying push of", show (length commits), "commits"]
flip refillCommits commits <<~ commitChan
where
pushable r
| Remote.specialRemote r = False
| Remote.readonly r = False
| otherwise = True
{- Decide if now is a good time to push to remotes.
-

View file

@ -10,7 +10,6 @@ module Assistant.Threads.TransferScanner where
import Assistant.Common
import Assistant.ScanRemotes
import Assistant.TransferQueue
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Alert
import Assistant.Drop
@ -27,64 +26,64 @@ import Annex.Wanted
import qualified Data.Set as S
thisThread :: ThreadName
thisThread = "TransferScanner"
{- This thread waits until a remote needs to be scanned, to find transfers
- that need to be made, to keep data in sync.
-}
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread
transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do
transferScannerThread :: NamedThread
transferScannerThread = NamedThread "TransferScanner" $ do
startupScan
go S.empty
where
thread = NamedThread thisThread
go scanned = do
threadDelaySeconds (Seconds 2)
(rs, infos) <- unzip <$> getScanRemote scanremotes
if any fullScan infos || any (`S.notMember` scanned) rs
then do
expensiveScan st dstatus transferqueue rs
go $ scanned `S.union` S.fromList rs
else do
mapM_ (failedTransferScan st dstatus transferqueue) rs
go scanned
{- All available remotes are scanned in full on startup,
- for multiple reasons, including:
-
- * This may be the first run, and there may be remotes
- already in place, that need to be synced.
- * We may have run before, and scanned a remote, but
- only been in a subdirectory of the git remote, and so
- not synced it all.
- * We may have run before, and had transfers queued,
- and then the system (or us) crashed, and that info was
- lost.
-}
startupScan = addScanRemotes scanremotes True
=<< syncRemotes <$> getDaemonStatus dstatus
where
go scanned = do
liftIO $ threadDelaySeconds (Seconds 2)
(rs, infos) <- unzip <$> getScanRemote <<~ scanRemoteMap
if any fullScan infos || any (`S.notMember` scanned) rs
then do
expensiveScan rs
go $ scanned `S.union` S.fromList rs
else do
mapM_ failedTransferScan rs
go scanned
{- All available remotes are scanned in full on startup,
- for multiple reasons, including:
-
- * This may be the first run, and there may be remotes
- already in place, that need to be synced.
- * We may have run before, and scanned a remote, but
- only been in a subdirectory of the git remote, and so
- not synced it all.
- * We may have run before, and had transfers queued,
- and then the system (or us) crashed, and that info was
- lost.
-}
startupScan = do
scanremotes <- getAssistant scanRemoteMap
liftIO . addScanRemotes scanremotes True
=<< syncRemotes <$> daemonStatus
{- This is a cheap scan for failed transfers involving a remote. -}
failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
failedTransferScan st dstatus transferqueue r = do
failed <- runThreadState st $ getFailedTransfers (Remote.uuid r)
runThreadState st $ mapM_ removeFailedTransfer $ map fst failed
failedTransferScan :: Remote -> Assistant ()
failedTransferScan r = do
failed <- liftAnnex $ getFailedTransfers (Remote.uuid r)
liftAnnex $ mapM_ removeFailedTransfer $ map fst failed
mapM_ retry failed
where
retry (t, info)
| transferDirection t == Download = do
{- Check if the remote still has the key.
- If not, relies on the expensiveScan to
- get it queued from some other remote. -}
whenM (runThreadState st $ remoteHas r $ transferKey t) $
requeue t info
| otherwise = do
{- The Transferrer checks when uploading
- that the remote doesn't already have the
- key, so it's not redundantly checked
- here. -}
where
retry (t, info)
| transferDirection t == Download = do
{- Check if the remote still has the key.
- If not, relies on the expensiveScan to
- get it queued from some other remote. -}
whenM (liftAnnex $ remoteHas r $ transferKey t) $
requeue t info
requeue t info = queueTransferWhenSmall
| otherwise = do
{- The Transferrer checks when uploading
- that the remote doesn't already have the
- key, so it's not redundantly checked here. -}
requeue t info
requeue t info = do
transferqueue <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftIO $ queueTransferWhenSmall
transferqueue dstatus (associatedFile info) t r
{- This is a expensive scan through the full git work tree, finding
@ -98,42 +97,45 @@ failedTransferScan st dstatus transferqueue r = do
- TODO: It would be better to first drop as much as we can, before
- transferring much, to minimise disk use.
-}
expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO ()
expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
brokendebug thisThread ["starting scan of", show visiblers]
void $ alertWhile dstatus (scanAlert visiblers) $ do
g <- runThreadState st gitRepo
(files, cleanup) <- LsFiles.inRepo [] g
expensiveScan :: [Remote] -> Assistant ()
expensiveScan rs = unless onlyweb $ do
debug ["starting scan of", show visiblers]
dstatus <- getAssistant daemonStatusHandle
void $ alertWhile dstatus (scanAlert visiblers) <~> do
g <- liftAnnex gitRepo
(files, cleanup) <- liftIO $ LsFiles.inRepo [] g
forM_ files $ \f -> do
ts <- runThreadState st $
ifAnnexed f (findtransfers f) (return [])
ts <- liftAnnex $
ifAnnexed f (findtransfers dstatus f) (return [])
mapM_ (enqueue f) ts
void cleanup
void $ liftIO cleanup
return True
brokendebug thisThread ["finished scan of", show visiblers]
where
onlyweb = all (== webUUID) $ map Remote.uuid rs
visiblers = let rs' = filter (not . Remote.readonly) rs
in if null rs' then rs else rs'
enqueue f (r, t) = do
brokendebug thisThread ["queuing", show t]
queueTransferWhenSmall transferqueue dstatus (Just f) t r
findtransfers f (key, _) = do
locs <- loggedLocations key
{- The syncable remotes may have changed since this
- scan began. -}
syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus
present <- inAnnex key
debug ["finished scan of", show visiblers]
where
onlyweb = all (== webUUID) $ map Remote.uuid rs
visiblers = let rs' = filter (not . Remote.readonly) rs
in if null rs' then rs else rs'
enqueue f (r, t) = do
debug ["queuing", show t]
transferqueue <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftIO $ queueTransferWhenSmall transferqueue dstatus (Just f) t r
findtransfers dstatus f (key, _) = do
locs <- loggedLocations key
{- The syncable remotes may have changed since this
- scan began. -}
syncrs <- liftIO $ syncRemotes <$> getDaemonStatus dstatus
present <- inAnnex key
handleDrops' locs syncrs present key (Just f)
handleDrops' locs syncrs present key (Just f)
let slocs = S.fromList locs
let use a = return $ catMaybes $ map (a key slocs) syncrs
if present
then filterM (wantSend (Just f) . Remote.uuid . fst)
=<< use (genTransfer Upload False)
else ifM (wantGet $ Just f)
( use (genTransfer Download True) , return [] )
let slocs = S.fromList locs
let use a = return $ catMaybes $ map (a key slocs) syncrs
if present
then filterM (wantSend (Just f) . Remote.uuid . fst)
=<< use (genTransfer Upload False)
else ifM (wantGet $ Just f)
( use (genTransfer Download True) , return [] )
genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer)
genTransfer direction want key slocs r

View file

@ -159,10 +159,10 @@ onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
ensurestaged link daemonstatus
| scanComplete daemonstatus = addlink link
| otherwise = case filestatus of
Just s | changedrecently s -> liftIO noChange
Just s | not (changedrecently s) -> liftIO noChange
_ -> addlink link
where
changedrecently s = not $
changedrecently s =
afterLastDaemonRun (statusChangeTime s) daemonstatus
{- For speed, tries to reuse the existing blob for symlink target. -}