lifted Assistant.Sync into Assistant monad
lots of nice cleanups
This commit is contained in:
parent
1948202b32
commit
5d57b28a34
9 changed files with 88 additions and 117 deletions
|
@ -8,9 +8,6 @@
|
||||||
module Assistant.MakeRemote where
|
module Assistant.MakeRemote where
|
||||||
|
|
||||||
import Assistant.Common
|
import Assistant.Common
|
||||||
import Assistant.ThreadedMonad
|
|
||||||
import Assistant.DaemonStatus
|
|
||||||
import Assistant.ScanRemotes
|
|
||||||
import Assistant.Ssh
|
import Assistant.Ssh
|
||||||
import Assistant.Sync
|
import Assistant.Sync
|
||||||
import qualified Types.Remote as R
|
import qualified Types.Remote as R
|
||||||
|
@ -28,11 +25,11 @@ import qualified Data.Map as M
|
||||||
import Data.Char
|
import Data.Char
|
||||||
|
|
||||||
{- Sets up and begins syncing with a new ssh or rsync remote. -}
|
{- Sets up and begins syncing with a new ssh or rsync remote. -}
|
||||||
makeSshRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Bool -> SshData -> IO Remote
|
makeSshRemote :: Bool -> SshData -> Assistant Remote
|
||||||
makeSshRemote st dstatus scanremotes forcersync sshdata = do
|
makeSshRemote forcersync sshdata = do
|
||||||
r <- runThreadState st $
|
r <- liftAnnex $
|
||||||
addRemote $ maker (sshRepoName sshdata) sshurl
|
addRemote $ maker (sshRepoName sshdata) sshurl
|
||||||
syncNewRemote st dstatus scanremotes r
|
syncNewRemote r
|
||||||
return r
|
return r
|
||||||
where
|
where
|
||||||
rsync = forcersync || rsyncOnly sshdata
|
rsync = forcersync || rsyncOnly sshdata
|
||||||
|
|
|
@ -42,10 +42,7 @@ finishedPairing msg keypair = do
|
||||||
, "git-annex-shell -c configlist " ++ T.unpack (sshDirectory sshdata)
|
, "git-annex-shell -c configlist " ++ T.unpack (sshDirectory sshdata)
|
||||||
]
|
]
|
||||||
""
|
""
|
||||||
st <- getAssistant threadState
|
void $ makeSshRemote False sshdata
|
||||||
dstatus <- getAssistant daemonStatusHandle
|
|
||||||
scanremotes <- getAssistant scanRemoteMap
|
|
||||||
void $ liftIO $ makeSshRemote st dstatus scanremotes False sshdata
|
|
||||||
|
|
||||||
{- Mostly a straightforward conversion. Except:
|
{- Mostly a straightforward conversion. Except:
|
||||||
- * Determine the best hostname to use to contact the host.
|
- * Determine the best hostname to use to contact the host.
|
||||||
|
|
|
@ -36,25 +36,29 @@ import Control.Concurrent
|
||||||
- the remotes have diverged from the local git-annex branch. Otherwise,
|
- the remotes have diverged from the local git-annex branch. Otherwise,
|
||||||
- it's sufficient to requeue failed transfers.
|
- it's sufficient to requeue failed transfers.
|
||||||
-}
|
-}
|
||||||
reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO ()
|
reconnectRemotes :: Bool -> [Remote] -> Assistant ()
|
||||||
reconnectRemotes _ _ _ _ _ [] = noop
|
reconnectRemotes _ [] = noop
|
||||||
reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $
|
reconnectRemotes notifypushes rs = void $ do
|
||||||
alertWhile dstatus (syncAlert rs) $ do
|
dstatus <- getAssistant daemonStatusHandle
|
||||||
|
alertWhile dstatus (syncAlert rs) <~> do
|
||||||
(ok, diverged) <- sync
|
(ok, diverged) <- sync
|
||||||
=<< runThreadState st (inRepo Git.Branch.current)
|
=<< liftAnnex (inRepo Git.Branch.current)
|
||||||
addScanRemotes scanremotes diverged rs
|
scanremotes <- getAssistant scanRemoteMap
|
||||||
|
liftIO $ addScanRemotes scanremotes diverged rs
|
||||||
return ok
|
return ok
|
||||||
where
|
where
|
||||||
(gitremotes, _specialremotes) =
|
(gitremotes, _specialremotes) =
|
||||||
partition (Git.repoIsUrl . Remote.repo) rs
|
partition (Git.repoIsUrl . Remote.repo) rs
|
||||||
sync (Just branch) = do
|
sync (Just branch) = do
|
||||||
diverged <- snd <$> manualPull st (Just branch) gitremotes
|
st <- getAssistant threadState
|
||||||
now <- getCurrentTime
|
diverged <- liftIO $ snd <$> manualPull st (Just branch) gitremotes
|
||||||
ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes
|
now <- liftIO getCurrentTime
|
||||||
|
ok <- pushToRemotes now notifypushes gitremotes
|
||||||
return (ok, diverged)
|
return (ok, diverged)
|
||||||
{- No local branch exists yet, but we can try pulling. -}
|
{- No local branch exists yet, but we can try pulling. -}
|
||||||
sync Nothing = do
|
sync Nothing = do
|
||||||
diverged <- snd <$> manualPull st Nothing gitremotes
|
st <- getAssistant threadState
|
||||||
|
diverged <- liftIO $ snd <$> manualPull st Nothing gitremotes
|
||||||
return (True, diverged)
|
return (True, diverged)
|
||||||
|
|
||||||
{- Updates the local sync branch, then pushes it to all remotes, in
|
{- Updates the local sync branch, then pushes it to all remotes, in
|
||||||
|
@ -81,9 +85,9 @@ reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $
|
||||||
- them. While ugly, those branches are reserved for pushing by us, and
|
- them. While ugly, those branches are reserved for pushing by us, and
|
||||||
- so our pushes will succeed.
|
- so our pushes will succeed.
|
||||||
-}
|
-}
|
||||||
pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool
|
pushToRemotes :: UTCTime -> Bool -> [Remote] -> Assistant Bool
|
||||||
pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
|
pushToRemotes now notifypushes remotes = do
|
||||||
(g, branch, u) <- runThreadState st $ do
|
(g, branch, u) <- liftAnnex $ do
|
||||||
Annex.Branch.commit "update"
|
Annex.Branch.commit "update"
|
||||||
(,,)
|
(,,)
|
||||||
<$> gitRepo
|
<$> gitRepo
|
||||||
|
@ -93,43 +97,39 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
|
||||||
where
|
where
|
||||||
go _ Nothing _ _ _ = return True -- no branch, so nothing to do
|
go _ Nothing _ _ _ = return True -- no branch, so nothing to do
|
||||||
go shouldretry (Just branch) g u rs = do
|
go shouldretry (Just branch) g u rs = do
|
||||||
brokendebug threadname
|
debug ["pushing to", show rs]
|
||||||
[ "pushing to"
|
liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
|
||||||
, show rs
|
(succeeded, failed) <- liftIO $ inParallel (push g branch) rs
|
||||||
]
|
|
||||||
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
|
|
||||||
(succeeded, failed) <- inParallel (push g branch) rs
|
|
||||||
updatemap succeeded []
|
updatemap succeeded []
|
||||||
let ok = null failed
|
if null failed
|
||||||
if ok
|
|
||||||
then do
|
then do
|
||||||
maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier
|
when notifypushes $
|
||||||
return ok
|
notifyPush (map Remote.uuid succeeded) <<~ pushNotifier
|
||||||
|
return True
|
||||||
else if shouldretry
|
else if shouldretry
|
||||||
then retry branch g u failed
|
then retry branch g u failed
|
||||||
else fallback branch g u failed
|
else fallback branch g u failed
|
||||||
|
|
||||||
updatemap succeeded failed = case mpushmap of
|
updatemap succeeded failed = do
|
||||||
Nothing -> noop
|
pushmap <- getAssistant failedPushMap
|
||||||
Just pushmap -> changeFailedPushMap pushmap $ \m ->
|
liftIO $ changeFailedPushMap pushmap $ \m ->
|
||||||
M.union (makemap failed) $
|
M.union (makemap failed) $
|
||||||
M.difference m (makemap succeeded)
|
M.difference m (makemap succeeded)
|
||||||
makemap l = M.fromList $ zip l (repeat now)
|
makemap l = M.fromList $ zip l (repeat now)
|
||||||
|
|
||||||
retry branch g u rs = do
|
retry branch g u rs = do
|
||||||
brokendebug threadname [ "trying manual pull to resolve failed pushes" ]
|
debug ["trying manual pull to resolve failed pushes"]
|
||||||
void $ manualPull st (Just branch) rs
|
st <- getAssistant threadState
|
||||||
|
void $ liftIO $ manualPull st (Just branch) rs
|
||||||
go False (Just branch) g u rs
|
go False (Just branch) g u rs
|
||||||
|
|
||||||
fallback branch g u rs = do
|
fallback branch g u rs = do
|
||||||
brokendebug threadname
|
debug ["fallback pushing to", show rs]
|
||||||
[ "fallback pushing to"
|
(succeeded, failed) <- liftIO $
|
||||||
, show rs
|
inParallel (pushfallback g u branch) rs
|
||||||
]
|
|
||||||
(succeeded, failed) <- inParallel (pushfallback g u branch) rs
|
|
||||||
updatemap succeeded failed
|
updatemap succeeded failed
|
||||||
unless (null succeeded) $
|
when (notifypushes && (not $ null succeeded)) $
|
||||||
maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier
|
notifyPush (map Remote.uuid succeeded) <<~ pushNotifier
|
||||||
return $ null failed
|
return $ null failed
|
||||||
|
|
||||||
push g branch remote = Command.Sync.pushBranch remote branch g
|
push g branch remote = Command.Sync.pushBranch remote branch g
|
||||||
|
@ -160,7 +160,8 @@ manualPull st currentbranch remotes = do
|
||||||
return (results, haddiverged)
|
return (results, haddiverged)
|
||||||
|
|
||||||
{- Start syncing a newly added remote, using a background thread. -}
|
{- Start syncing a newly added remote, using a background thread. -}
|
||||||
syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO ()
|
syncNewRemote :: Remote -> Assistant ()
|
||||||
syncNewRemote st dstatus scanremotes remote = do
|
syncNewRemote remote = do
|
||||||
runThreadState st $ updateSyncRemotes dstatus
|
liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
|
||||||
void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote]
|
thread <- asIO2 reconnectRemotes
|
||||||
|
void $ liftIO $ forkIO $ thread False [remote]
|
||||||
|
|
|
@ -159,11 +159,7 @@ handleMount :: FilePath -> Assistant ()
|
||||||
handleMount dir = do
|
handleMount dir = do
|
||||||
debug ["detected mount of", dir]
|
debug ["detected mount of", dir]
|
||||||
rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir
|
rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir
|
||||||
d <- getAssistant id
|
reconnectRemotes True rs
|
||||||
liftIO $
|
|
||||||
reconnectRemotes (threadName d) (threadState d)
|
|
||||||
(daemonStatusHandle d) (scanRemoteMap d)
|
|
||||||
(Just $ pushNotifier d) rs
|
|
||||||
|
|
||||||
{- Finds remotes located underneath the mount point.
|
{- Finds remotes located underneath the mount point.
|
||||||
-
|
-
|
||||||
|
|
|
@ -123,11 +123,7 @@ listenWicdConnections client callback =
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
handleConnection :: Assistant ()
|
handleConnection :: Assistant ()
|
||||||
handleConnection = do
|
handleConnection = reconnectRemotes True =<< networkRemotes
|
||||||
d <- getAssistant id
|
|
||||||
liftIO . reconnectRemotes (threadName d) (threadState d)
|
|
||||||
(daemonStatusHandle d) (scanRemoteMap d) (Just $ pushNotifier d)
|
|
||||||
=<< networkRemotes
|
|
||||||
|
|
||||||
{- Finds network remotes. -}
|
{- Finds network remotes. -}
|
||||||
networkRemotes :: Assistant [Remote]
|
networkRemotes :: Assistant [Remote]
|
||||||
|
|
|
@ -32,11 +32,9 @@ pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
|
||||||
unless (null topush) $ do
|
unless (null topush) $ do
|
||||||
debug ["retrying", show (length topush), "failed pushes"]
|
debug ["retrying", show (length topush), "failed pushes"]
|
||||||
now <- liftIO $ getCurrentTime
|
now <- liftIO $ getCurrentTime
|
||||||
st <- getAssistant threadState
|
|
||||||
pushnotifier <- getAssistant pushNotifier
|
|
||||||
dstatus <- getAssistant daemonStatusHandle
|
dstatus <- getAssistant daemonStatusHandle
|
||||||
void $ liftIO $ alertWhile dstatus (pushRetryAlert topush) $
|
void $ alertWhile dstatus (pushRetryAlert topush) <~>
|
||||||
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush
|
pushToRemotes now True topush
|
||||||
where
|
where
|
||||||
halfhour = 1800
|
halfhour = 1800
|
||||||
|
|
||||||
|
@ -52,12 +50,9 @@ pushThread = NamedThread "Pusher" $ runEvery (Seconds 2) <~> do
|
||||||
remotes <- filter pushable . syncRemotes <$> daemonStatus
|
remotes <- filter pushable . syncRemotes <$> daemonStatus
|
||||||
unless (null remotes) $ do
|
unless (null remotes) $ do
|
||||||
now <- liftIO $ getCurrentTime
|
now <- liftIO $ getCurrentTime
|
||||||
st <- getAssistant threadState
|
|
||||||
pushmap <- getAssistant failedPushMap
|
|
||||||
pushnotifier <- getAssistant pushNotifier
|
|
||||||
dstatus <- getAssistant daemonStatusHandle
|
dstatus <- getAssistant daemonStatusHandle
|
||||||
void $ liftIO $ alertWhile dstatus (pushAlert remotes) $
|
void $ alertWhile dstatus (pushAlert remotes) <~>
|
||||||
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
|
pushToRemotes now True remotes
|
||||||
else do
|
else do
|
||||||
debug ["delaying push of", show (length commits), "commits"]
|
debug ["delaying push of", show (length commits), "commits"]
|
||||||
flip refillCommits commits <<~ commitChan
|
flip refillCommits commits <<~ commitChan
|
||||||
|
|
|
@ -124,5 +124,5 @@ makeS3Remote (S3Creds ak sk) name setup config = do
|
||||||
makeSpecialRemote name S3.remote config
|
makeSpecialRemote name S3.remote config
|
||||||
return remotename
|
return remotename
|
||||||
setup r
|
setup r
|
||||||
liftIO $ syncNewRemote st (daemonStatusHandle d) (scanRemoteMap d) r
|
runAssistantY $ syncNewRemote r
|
||||||
redirect $ EditNewCloudRepositoryR $ Remote.uuid r
|
redirect $ EditNewCloudRepositoryR $ Remote.uuid r
|
||||||
|
|
|
@ -283,12 +283,7 @@ makeSsh' rsync setup sshdata keypair =
|
||||||
|
|
||||||
makeSshRepo :: Bool -> (Remote -> Handler ()) -> SshData -> Handler RepHtml
|
makeSshRepo :: Bool -> (Remote -> Handler ()) -> SshData -> Handler RepHtml
|
||||||
makeSshRepo forcersync setup sshdata = do
|
makeSshRepo forcersync setup sshdata = do
|
||||||
d <- getAssistantY id
|
r <- runAssistantY $ makeSshRemote forcersync sshdata
|
||||||
r <- liftIO $ makeSshRemote
|
|
||||||
(threadState d)
|
|
||||||
(daemonStatusHandle d)
|
|
||||||
(scanRemoteMap d)
|
|
||||||
forcersync sshdata
|
|
||||||
setup r
|
setup r
|
||||||
redirect $ EditNewCloudRepositoryR $ Remote.uuid r
|
redirect $ EditNewCloudRepositoryR $ Remote.uuid r
|
||||||
|
|
||||||
|
|
|
@ -61,13 +61,7 @@ changeSyncFlag r enabled = runAnnex undefined $ do
|
||||||
|
|
||||||
{- Start syncing remote, using a background thread. -}
|
{- Start syncing remote, using a background thread. -}
|
||||||
syncRemote :: Remote -> Handler ()
|
syncRemote :: Remote -> Handler ()
|
||||||
syncRemote remote = do
|
syncRemote = runAssistantY . syncNewRemote
|
||||||
d <- getAssistantY id
|
|
||||||
liftIO $ syncNewRemote
|
|
||||||
(threadState d)
|
|
||||||
(daemonStatusHandle d)
|
|
||||||
(scanRemoteMap d)
|
|
||||||
remote
|
|
||||||
|
|
||||||
pauseTransfer :: Transfer -> Handler ()
|
pauseTransfer :: Transfer -> Handler ()
|
||||||
pauseTransfer = cancelTransfer True
|
pauseTransfer = cancelTransfer True
|
||||||
|
|
Loading…
Reference in a new issue