don't try to transfer data to/from XMPP remotes
Partition syncRemotes into ones needing git sync (ie, non-special remotes), and ones needing data sync (ie, non-XMPP remotes).
This commit is contained in:
parent
217eeede43
commit
5e44ab177c
11 changed files with 40 additions and 35 deletions
|
@ -10,11 +10,13 @@ module Assistant.DaemonStatus where
|
||||||
import Assistant.Common
|
import Assistant.Common
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Utility.TempFile
|
import Utility.TempFile
|
||||||
|
import Assistant.Types.NetMessager
|
||||||
import Utility.NotificationBroadcaster
|
import Utility.NotificationBroadcaster
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import Logs.Trust
|
import Logs.Trust
|
||||||
import qualified Remote
|
import qualified Remote
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
|
import qualified Git
|
||||||
import Config
|
import Config
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
@ -23,6 +25,7 @@ import Data.Time.Clock.POSIX
|
||||||
import Data.Time
|
import Data.Time
|
||||||
import System.Locale
|
import System.Locale
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.Text as T
|
||||||
|
|
||||||
getDaemonStatus :: Assistant DaemonStatus
|
getDaemonStatus :: Assistant DaemonStatus
|
||||||
getDaemonStatus = (atomically . readTMVar) <<~ daemonStatusHandle
|
getDaemonStatus = (atomically . readTMVar) <<~ daemonStatusHandle
|
||||||
|
@ -41,20 +44,23 @@ modifyDaemonStatus a = do
|
||||||
sendNotification $ changeNotifier s
|
sendNotification $ changeNotifier s
|
||||||
return b
|
return b
|
||||||
|
|
||||||
{- Syncable remotes ordered by cost. -}
|
{- Returns a function that updates the lists of syncable remotes. -}
|
||||||
calcSyncRemotes :: Annex [Remote]
|
calcSyncRemotes :: Annex (DaemonStatus -> DaemonStatus)
|
||||||
calcSyncRemotes = do
|
calcSyncRemotes = do
|
||||||
rs <- filterM (repoSyncable . Remote.repo) =<<
|
rs <- filterM (repoSyncable . Remote.repo) =<<
|
||||||
concat . Remote.byCost <$> Remote.enabledRemoteList
|
concat . Remote.byCost <$> Remote.enabledRemoteList
|
||||||
alive <- trustExclude DeadTrusted (map Remote.uuid rs)
|
alive <- trustExclude DeadTrusted (map Remote.uuid rs)
|
||||||
let good r = Remote.uuid r `elem` alive
|
let good r = Remote.uuid r `elem` alive
|
||||||
return $ filter good rs
|
let syncable = filter good rs
|
||||||
|
return $ \dstatus -> dstatus
|
||||||
|
{ syncRemotes = syncable
|
||||||
|
, syncGitRemotes = filter (not . Remote.specialRemote) syncable
|
||||||
|
, syncDataRemotes = filter (not . isXMPPRemote) syncable
|
||||||
|
}
|
||||||
|
|
||||||
{- Updates the sycRemotes list from the list of all remotes in Annex state. -}
|
{- Updates the sycRemotes list from the list of all remotes in Annex state. -}
|
||||||
updateSyncRemotes :: Assistant ()
|
updateSyncRemotes :: Assistant ()
|
||||||
updateSyncRemotes = do
|
updateSyncRemotes = modifyDaemonStatus_ =<< liftAnnex calcSyncRemotes
|
||||||
remotes <- liftAnnex calcSyncRemotes
|
|
||||||
modifyDaemonStatus_ $ \s -> s { syncRemotes = remotes }
|
|
||||||
|
|
||||||
{- Load any previous daemon status file, and store it in a MVar for this
|
{- Load any previous daemon status file, and store it in a MVar for this
|
||||||
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
||||||
|
@ -64,12 +70,11 @@ startDaemonStatus = do
|
||||||
status <- liftIO $
|
status <- liftIO $
|
||||||
flip catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus
|
flip catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus
|
||||||
transfers <- M.fromList <$> getTransfers
|
transfers <- M.fromList <$> getTransfers
|
||||||
remotes <- calcSyncRemotes
|
addsync <- calcSyncRemotes
|
||||||
liftIO $ atomically $ newTMVar status
|
liftIO $ atomically $ newTMVar $ addsync $ status
|
||||||
{ scanComplete = False
|
{ scanComplete = False
|
||||||
, sanityCheckRunning = False
|
, sanityCheckRunning = False
|
||||||
, currentTransfers = transfers
|
, currentTransfers = transfers
|
||||||
, syncRemotes = remotes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{- Don't just dump out the structure, because it will change over time,
|
{- Don't just dump out the structure, because it will change over time,
|
||||||
|
@ -221,3 +226,12 @@ alertDuring :: Alert -> Assistant a -> Assistant a
|
||||||
alertDuring alert a = do
|
alertDuring alert a = do
|
||||||
i <- addAlert $ alert { alertClass = Activity }
|
i <- addAlert $ alert { alertClass = Activity }
|
||||||
removeAlert i `after` a
|
removeAlert i `after` a
|
||||||
|
|
||||||
|
{- Remotes using the XMPP transport have urls like xmpp::user@host -}
|
||||||
|
isXMPPRemote :: Remote -> Bool
|
||||||
|
isXMPPRemote remote = Git.repoIsUrl r && "xmpp::" `isPrefixOf` Git.repoLocation r
|
||||||
|
where
|
||||||
|
r = Remote.repo remote
|
||||||
|
|
||||||
|
getXMPPClientID :: Remote -> ClientID
|
||||||
|
getXMPPClientID r = T.pack $ drop (length "xmpp::") (Git.repoLocation (Remote.repo r))
|
||||||
|
|
|
@ -23,7 +23,7 @@ import Config
|
||||||
handleDrops :: Bool -> Key -> AssociatedFile -> Assistant ()
|
handleDrops :: Bool -> Key -> AssociatedFile -> Assistant ()
|
||||||
handleDrops _ _ Nothing = noop
|
handleDrops _ _ Nothing = noop
|
||||||
handleDrops fromhere key f = do
|
handleDrops fromhere key f = do
|
||||||
syncrs <- syncRemotes <$> getDaemonStatus
|
syncrs <- syncDataRemotes <$> getDaemonStatus
|
||||||
liftAnnex $ do
|
liftAnnex $ do
|
||||||
locs <- loggedLocations key
|
locs <- loggedLocations key
|
||||||
handleDrops' locs syncrs fromhere key f
|
handleDrops' locs syncrs fromhere key f
|
||||||
|
|
|
@ -95,12 +95,3 @@ queueNetPushMessage _ = return False
|
||||||
waitNetPushMessage :: PushSide -> Assistant (NetMessage)
|
waitNetPushMessage :: PushSide -> Assistant (NetMessage)
|
||||||
waitNetPushMessage side = (atomically . readTChan)
|
waitNetPushMessage side = (atomically . readTChan)
|
||||||
<<~ (getSide side . netMessagesPush . netMessager)
|
<<~ (getSide side . netMessagesPush . netMessager)
|
||||||
|
|
||||||
{- Remotes using the XMPP transport have urls like xmpp::user@host -}
|
|
||||||
isXMPPRemote :: Remote -> Bool
|
|
||||||
isXMPPRemote remote = Git.repoIsUrl r && "xmpp::" `isPrefixOf` Git.repoLocation r
|
|
||||||
where
|
|
||||||
r = Remote.repo remote
|
|
||||||
|
|
||||||
getXMPPClientID :: Remote -> ClientID
|
|
||||||
getXMPPClientID r = T.pack $ drop (length "xmpp::") (Git.repoLocation (Remote.repo r))
|
|
||||||
|
|
|
@ -15,7 +15,6 @@ import Assistant.Alert
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.Sync
|
import Assistant.Sync
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
import qualified Remote
|
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
|
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
|
@ -46,7 +45,8 @@ pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do
|
||||||
-- Now see if now's a good time to push.
|
-- Now see if now's a good time to push.
|
||||||
if shouldPush commits
|
if shouldPush commits
|
||||||
then do
|
then do
|
||||||
remotes <- filter pushable . syncRemotes <$> getDaemonStatus
|
remotes <- filter (not . Remote.readonly)
|
||||||
|
. syncGitRemotes <$> getDaemonStatus
|
||||||
unless (null remotes) $
|
unless (null remotes) $
|
||||||
void $ alertWhile (pushAlert remotes) $ do
|
void $ alertWhile (pushAlert remotes) $ do
|
||||||
now <- liftIO $ getCurrentTime
|
now <- liftIO $ getCurrentTime
|
||||||
|
@ -54,11 +54,6 @@ pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do
|
||||||
else do
|
else do
|
||||||
debug ["delaying push of", show (length commits), "commits"]
|
debug ["delaying push of", show (length commits), "commits"]
|
||||||
refillCommits commits
|
refillCommits commits
|
||||||
where
|
|
||||||
pushable r
|
|
||||||
| Remote.specialRemote r = False
|
|
||||||
| Remote.readonly r = False
|
|
||||||
| otherwise = True
|
|
||||||
|
|
||||||
{- Decide if now is a good time to push to remotes.
|
{- Decide if now is a good time to push to remotes.
|
||||||
-
|
-
|
||||||
|
|
|
@ -57,7 +57,7 @@ transferScannerThread = NamedThread "TransferScanner" $ do
|
||||||
- and then the system (or us) crashed, and that info was
|
- and then the system (or us) crashed, and that info was
|
||||||
- lost.
|
- lost.
|
||||||
-}
|
-}
|
||||||
startupScan = addScanRemotes True =<< syncRemotes <$> getDaemonStatus
|
startupScan = addScanRemotes True =<< syncDataRemotes <$> getDaemonStatus
|
||||||
|
|
||||||
{- This is a cheap scan for failed transfers involving a remote. -}
|
{- This is a cheap scan for failed transfers involving a remote. -}
|
||||||
failedTransferScan :: Remote -> Assistant ()
|
failedTransferScan :: Remote -> Assistant ()
|
||||||
|
@ -114,7 +114,7 @@ expensiveScan rs = unless onlyweb $ do
|
||||||
findtransfers f (key, _) = do
|
findtransfers f (key, _) = do
|
||||||
{- The syncable remotes may have changed since this
|
{- The syncable remotes may have changed since this
|
||||||
- scan began. -}
|
- scan began. -}
|
||||||
syncrs <- syncRemotes <$> getDaemonStatus
|
syncrs <- syncDataRemotes <$> getDaemonStatus
|
||||||
liftAnnex $ do
|
liftAnnex $ do
|
||||||
locs <- loggedLocations key
|
locs <- loggedLocations key
|
||||||
present <- inAnnex key
|
present <- inAnnex key
|
||||||
|
|
|
@ -190,7 +190,7 @@ xmppThread a = do
|
||||||
pull :: [UUID] -> Assistant ()
|
pull :: [UUID] -> Assistant ()
|
||||||
pull [] = noop
|
pull [] = noop
|
||||||
pull us = do
|
pull us = do
|
||||||
rs <- filter matching . syncRemotes <$> getDaemonStatus
|
rs <- filter matching . syncGitRemotes <$> getDaemonStatus
|
||||||
debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
|
debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
|
||||||
pullone rs =<< liftAnnex (inRepo Git.Branch.current)
|
pullone rs =<< liftAnnex (inRepo Git.Branch.current)
|
||||||
where
|
where
|
||||||
|
|
|
@ -57,7 +57,7 @@ queueTransfersMatching matching schedule k f direction
|
||||||
where
|
where
|
||||||
go = do
|
go = do
|
||||||
rs <- liftAnnex . sufficientremotes
|
rs <- liftAnnex . sufficientremotes
|
||||||
=<< syncRemotes <$> getDaemonStatus
|
=<< syncDataRemotes <$> getDaemonStatus
|
||||||
let matchingrs = filter (matching . Remote.uuid) rs
|
let matchingrs = filter (matching . Remote.uuid) rs
|
||||||
if null matchingrs
|
if null matchingrs
|
||||||
then defer
|
then defer
|
||||||
|
@ -94,7 +94,7 @@ queueDeferredDownloads :: Schedule -> Assistant ()
|
||||||
queueDeferredDownloads schedule = do
|
queueDeferredDownloads schedule = do
|
||||||
q <- getAssistant transferQueue
|
q <- getAssistant transferQueue
|
||||||
l <- liftIO $ atomically $ swapTVar (deferreddownloads q) []
|
l <- liftIO $ atomically $ swapTVar (deferreddownloads q) []
|
||||||
rs <- syncRemotes <$> getDaemonStatus
|
rs <- syncDataRemotes <$> getDaemonStatus
|
||||||
left <- filterM (queue rs) l
|
left <- filterM (queue rs) l
|
||||||
unless (null left) $
|
unless (null left) $
|
||||||
liftIO $ atomically $ modifyTVar' (deferreddownloads q) $
|
liftIO $ atomically $ modifyTVar' (deferreddownloads q) $
|
||||||
|
|
|
@ -33,8 +33,12 @@ data DaemonStatus = DaemonStatus
|
||||||
-- Messages to display to the user.
|
-- Messages to display to the user.
|
||||||
, alertMap :: AlertMap
|
, alertMap :: AlertMap
|
||||||
, lastAlertId :: AlertId
|
, lastAlertId :: AlertId
|
||||||
-- Ordered list of remotes to sync with.
|
-- Ordered list of all remotes that can be synced with
|
||||||
, syncRemotes :: [Remote]
|
, syncRemotes :: [Remote]
|
||||||
|
-- Ordered list of remotes to sync git with
|
||||||
|
, syncGitRemotes :: [Remote]
|
||||||
|
-- Ordered list of remotes to sync data with
|
||||||
|
, syncDataRemotes :: [Remote]
|
||||||
-- Pairing request that is in progress.
|
-- Pairing request that is in progress.
|
||||||
, pairingInProgress :: Maybe PairingInProgress
|
, pairingInProgress :: Maybe PairingInProgress
|
||||||
-- Broadcasts notifications about all changes to the DaemonStatus
|
-- Broadcasts notifications about all changes to the DaemonStatus
|
||||||
|
@ -60,6 +64,8 @@ newDaemonStatus = DaemonStatus
|
||||||
<*> pure M.empty
|
<*> pure M.empty
|
||||||
<*> pure firstAlertId
|
<*> pure firstAlertId
|
||||||
<*> pure []
|
<*> pure []
|
||||||
|
<*> pure []
|
||||||
|
<*> pure []
|
||||||
<*> pure Nothing
|
<*> pure Nothing
|
||||||
<*> newNotificationBroadcaster
|
<*> newNotificationBroadcaster
|
||||||
<*> newNotificationBroadcaster
|
<*> newNotificationBroadcaster
|
||||||
|
|
|
@ -17,7 +17,6 @@ import Assistant.WebApp.SideBar
|
||||||
import Assistant.WebApp.Utility
|
import Assistant.WebApp.Utility
|
||||||
import Assistant.WebApp.Configurators.Local
|
import Assistant.WebApp.Configurators.Local
|
||||||
import Utility.Yesod
|
import Utility.Yesod
|
||||||
import Assistant.NetMessager
|
|
||||||
import qualified Remote
|
import qualified Remote
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
import Annex.UUID (getUUID)
|
import Annex.UUID (getUUID)
|
||||||
|
|
|
@ -100,7 +100,7 @@ buddyListDisplay = do
|
||||||
autoUpdate ident NotifierBuddyListR (10 :: Int) (10 :: Int)
|
autoUpdate ident NotifierBuddyListR (10 :: Int) (10 :: Int)
|
||||||
#ifdef WITH_XMPP
|
#ifdef WITH_XMPP
|
||||||
buddies <- lift $ liftAssistant $ do
|
buddies <- lift $ liftAssistant $ do
|
||||||
rs <- filter isXMPPRemote . syncRemotes <$> getDaemonStatus
|
rs <- filter isXMPPRemote . syncGitRemotes <$> getDaemonStatus
|
||||||
let pairedwith = catMaybes $ map (parseJID . getXMPPClientID) rs
|
let pairedwith = catMaybes $ map (parseJID . getXMPPClientID) rs
|
||||||
catMaybes . map (buddySummary pairedwith)
|
catMaybes . map (buddySummary pairedwith)
|
||||||
<$> (getBuddyList <<~ buddyList)
|
<$> (getBuddyList <<~ buddyList)
|
||||||
|
|
|
@ -238,7 +238,7 @@ xmppRemotes cid = case baseJID <$> parseJID cid of
|
||||||
Nothing -> return []
|
Nothing -> return []
|
||||||
Just jid -> do
|
Just jid -> do
|
||||||
let loc = gitXMPPLocation jid
|
let loc = gitXMPPLocation jid
|
||||||
filter (matching loc . Remote.repo) . syncRemotes
|
filter (matching loc . Remote.repo) . syncGitRemotes
|
||||||
<$> getDaemonStatus
|
<$> getDaemonStatus
|
||||||
where
|
where
|
||||||
matching loc r = repoIsUrl r && repoLocation r == loc
|
matching loc r = repoIsUrl r && repoLocation r == loc
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue