split and lift Assistant.Pushes

This commit is contained in:
Joey Hess 2012-10-29 17:52:43 -04:00
parent d5a5c05a79
commit 87ba4f8677
8 changed files with 82 additions and 64 deletions

View file

@ -31,7 +31,7 @@ import Assistant.DaemonStatus
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.TransferSlots import Assistant.TransferSlots
import Assistant.Pushes import Assistant.Types.Pushes
import Assistant.Commits import Assistant.Commits
import Assistant.Changes import Assistant.Changes
import Assistant.BranchChange import Assistant.BranchChange

View file

@ -7,7 +7,8 @@
module Assistant.Pushes where module Assistant.Pushes where
import Common.Annex import Assistant.Common
import Assistant.Types.Pushes
import Utility.TSet import Utility.TSet
import Control.Concurrent.STM import Control.Concurrent.STM
@ -15,29 +16,13 @@ import Control.Concurrent.MSampleVar
import Data.Time.Clock import Data.Time.Clock
import qualified Data.Map as M import qualified Data.Map as M
{- Track the most recent push failure for each remote. -}
type PushMap = M.Map Remote UTCTime
type FailedPushMap = TMVar PushMap
{- The TSet is recent, successful pushes that other remotes should be
- notified about.
-
- The MSampleVar is written to when the PushNotifier thread should be
- restarted for some reason.
-}
data PushNotifier = PushNotifier (TSet UUID) (MSampleVar ())
{- The TMVar starts empty, and is left empty when there are no
- failed pushes. This way we can block until there are some failed pushes.
-}
newFailedPushMap :: IO FailedPushMap
newFailedPushMap = atomically newEmptyTMVar
{- Blocks until there are failed pushes. {- Blocks until there are failed pushes.
- Returns Remotes whose pushes failed a given time duration or more ago. - Returns Remotes whose pushes failed a given time duration or more ago.
- (This may be an empty list.) -} - (This may be an empty list.) -}
getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote] getFailedPushesBefore :: NominalDiffTime -> Assistant [Remote]
getFailedPushesBefore v duration = do getFailedPushesBefore duration = do
v <- getAssistant failedPushMap
liftIO $ do
m <- atomically $ readTMVar v m <- atomically $ readTMVar v
now <- getCurrentTime now <- getCurrentTime
return $ M.keys $ M.filter (not . toorecent now) m return $ M.keys $ M.filter (not . toorecent now) m
@ -45,29 +30,25 @@ getFailedPushesBefore v duration = do
toorecent now time = now `diffUTCTime` time < duration toorecent now time = now `diffUTCTime` time < duration
{- Modifies the map. -} {- Modifies the map. -}
changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO () changeFailedPushMap :: (PushMap -> PushMap) -> Assistant ()
changeFailedPushMap v a = atomically $ changeFailedPushMap a = do
store . a . fromMaybe M.empty =<< tryTakeTMVar v v <- getAssistant failedPushMap
liftIO $ atomically $ store v . a . fromMaybe M.empty =<< tryTakeTMVar v
where where
{- tryTakeTMVar empties the TMVar; refill it only if {- tryTakeTMVar empties the TMVar; refill it only if
- the modified map is not itself empty -} - the modified map is not itself empty -}
store m store v m
| m == M.empty = noop | m == M.empty = noop
| otherwise = putTMVar v $! m | otherwise = putTMVar v $! m
newPushNotifier :: IO PushNotifier notifyPush :: [UUID] -> Assistant ()
newPushNotifier = PushNotifier notifyPush us = flip putTSet us <<~ (pushNotifierSuccesses . pushNotifier)
<$> newTSet
<*> newEmptySV
notifyPush :: [UUID] -> PushNotifier -> IO () waitPush :: Assistant [UUID]
notifyPush us (PushNotifier s _) = putTSet s us waitPush = getTSet <<~ (pushNotifierSuccesses . pushNotifier)
waitPush :: PushNotifier -> IO [UUID] notifyRestart :: Assistant ()
waitPush (PushNotifier s _) = getTSet s notifyRestart = flip writeSV () <<~ (pushNotifierWaiter . pushNotifier)
notifyRestart :: PushNotifier -> IO () waitRestart :: Assistant ()
notifyRestart (PushNotifier _ sv) = writeSV sv () waitRestart = readSV <<~ (pushNotifierWaiter . pushNotifier)
waitRestart :: PushNotifier -> IO ()
waitRestart (PushNotifier _ sv) = readSV sv

View file

@ -100,15 +100,13 @@ pushToRemotes now notifypushes remotes = do
if null failed if null failed
then do then do
when notifypushes $ when notifypushes $
notifyPush (map Remote.uuid succeeded) <<~ pushNotifier notifyPush (map Remote.uuid succeeded)
return True return True
else if shouldretry else if shouldretry
then retry branch g u failed then retry branch g u failed
else fallback branch g u failed else fallback branch g u failed
updatemap succeeded failed = do updatemap succeeded failed = changeFailedPushMap $ \m ->
pushmap <- getAssistant failedPushMap
liftIO $ changeFailedPushMap pushmap $ \m ->
M.union (makemap failed) $ M.union (makemap failed) $
M.difference m (makemap succeeded) M.difference m (makemap succeeded)
makemap l = M.fromList $ zip l (repeat now) makemap l = M.fromList $ zip l (repeat now)
@ -124,7 +122,7 @@ pushToRemotes now notifypushes remotes = do
inParallel (pushfallback g u branch) rs inParallel (pushfallback g u branch) rs
updatemap succeeded failed updatemap succeeded failed
when (notifypushes && (not $ null succeeded)) $ when (notifypushes && (not $ null succeeded)) $
notifyPush (map Remote.uuid succeeded) <<~ pushNotifier notifyPush (map Remote.uuid succeeded)
return $ null failed return $ null failed
push g branch remote = Command.Sync.pushBranch remote branch g push g branch remote = Command.Sync.pushBranch remote branch g

View file

@ -62,7 +62,7 @@ dbusThread = do
) )
handleconn = do handleconn = do
debug ["detected network connection"] debug ["detected network connection"]
notifyRestart <<~ pushNotifier notifyRestart
handleConnection handleConnection
onerr e _ = do onerr e _ = do
liftAnnex $ liftAnnex $

View file

@ -28,17 +28,15 @@ pushNotifierThread :: NamedThread
pushNotifierThread = NamedThread "PushNotifier" $ do pushNotifierThread = NamedThread "PushNotifier" $ do
iodebug <- asIO debug iodebug <- asIO debug
iopull <- asIO pull iopull <- asIO pull
pn <- getAssistant pushNotifier iowaitpush <- asIO $ const waitPush
controllerThread pn <~> xmppClient pn iodebug iopull ioclient <- asIO2 $ xmppClient $ iowaitpush ()
forever $ do
tid <- liftIO $ forkIO $ ioclient iodebug iopull
waitRestart
liftIO $ killThread tid
controllerThread :: PushNotifier -> IO () -> IO () xmppClient :: (IO [UUID]) -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant ()
controllerThread pushnotifier xmppclient = forever $ do xmppClient iowaitpush iodebug iopull = do
tid <- forkIO xmppclient
waitRestart pushnotifier
killThread tid
xmppClient :: PushNotifier -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant ()
xmppClient pushnotifier iodebug iopull = do
v <- liftAnnex getXMPPCreds v <- liftAnnex getXMPPCreds
case v of case v of
Nothing -> noop Nothing -> noop
@ -63,7 +61,7 @@ xmppClient pushnotifier iodebug iopull = do
threadDelaySeconds (Seconds 300) threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime loop c =<< getCurrentTime
sendnotifications = forever $ do sendnotifications = forever $ do
us <- liftIO $ waitPush pushnotifier us <- liftIO iowaitpush
putStanza $ gitAnnexPresence $ encodePushNotification us putStanza $ gitAnnexPresence $ encodePushNotification us
receivenotifications = forever $ do receivenotifications = forever $ do
s <- getStanza s <- getStanza

View file

@ -27,8 +27,7 @@ pushRetryThread :: NamedThread
pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
-- We already waited half an hour, now wait until there are failed -- We already waited half an hour, now wait until there are failed
-- pushes to retry. -- pushes to retry.
pushmap <- getAssistant failedPushMap topush <- getFailedPushesBefore (fromIntegral halfhour)
topush <- liftIO $ getFailedPushesBefore pushmap (fromIntegral halfhour)
unless (null topush) $ do unless (null topush) $ do
debug ["retrying", show (length topush), "failed pushes"] debug ["retrying", show (length topush), "failed pushes"]
void $ alertWhile (pushRetryAlert topush) $ do void $ alertWhile (pushRetryAlert topush) $ do

42
Assistant/Types/Pushes.hs Normal file
View file

@ -0,0 +1,42 @@
{- git-annex assistant push tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Types.Pushes where
import Common.Annex
import Utility.TSet
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.Time.Clock
import qualified Data.Map as M
{- Track the most recent push failure for each remote. -}
type PushMap = M.Map Remote UTCTime
type FailedPushMap = TMVar PushMap
{- The TSet is recent, successful pushes that other remotes should be
- notified about.
-
- The MSampleVar is written to when the PushNotifier thread should be
- restarted for some reason.
-}
data PushNotifier = PushNotifier
{ pushNotifierSuccesses :: TSet UUID
, pushNotifierWaiter :: MSampleVar ()
}
{- The TMVar starts empty, and is left empty when there are no
- failed pushes. This way we can block until there are some failed pushes.
-}
newFailedPushMap :: IO FailedPushMap
newFailedPushMap = atomically newEmptyTMVar
newPushNotifier :: IO PushNotifier
newPushNotifier = PushNotifier
<$> newTSet
<*> newEmptySV

View file

@ -58,7 +58,7 @@ getXMPPR = xmppPage $ do
where where
storecreds creds = do storecreds creds = do
void $ runAnnex undefined $ setXMPPCreds creds void $ runAnnex undefined $ setXMPPCreds creds
liftIO . notifyRestart =<< getAssistantY pushNotifier runAssistantY notifyRestart
redirect ConfigR redirect ConfigR
#else #else
getXMPPR = xmppPage $ getXMPPR = xmppPage $