refactor
This commit is contained in:
parent
5a68acb521
commit
68659f4998
5 changed files with 108 additions and 105 deletions
98
Assistant/Sync.hs
Normal file
98
Assistant/Sync.hs
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
{- git-annex assistant repo syncing
|
||||||
|
-
|
||||||
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Assistant.Sync where
|
||||||
|
|
||||||
|
import Assistant.Common
|
||||||
|
import Assistant.Pushes
|
||||||
|
import Assistant.Alert
|
||||||
|
import Assistant.ThreadedMonad
|
||||||
|
import Assistant.DaemonStatus
|
||||||
|
import Assistant.ScanRemotes
|
||||||
|
import qualified Command.Sync
|
||||||
|
import Utility.Parallel
|
||||||
|
import qualified Git
|
||||||
|
import qualified Git.Branch
|
||||||
|
import qualified Git.Command
|
||||||
|
import qualified Remote
|
||||||
|
import qualified Annex.Branch
|
||||||
|
|
||||||
|
import Data.Time.Clock
|
||||||
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
{- Syncs with remotes that may have been disconnected for a while.
|
||||||
|
-
|
||||||
|
- After getting git in sync, queues a scan for file transfers.
|
||||||
|
-}
|
||||||
|
syncRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO ()
|
||||||
|
syncRemotes _ _ _ _ [] = noop
|
||||||
|
syncRemotes threadname st dstatus scanremotes rs = do
|
||||||
|
void $ alertWhile dstatus (syncAlert rs) $ do
|
||||||
|
sync =<< runThreadState st (inRepo Git.Branch.current)
|
||||||
|
addScanRemotes scanremotes rs
|
||||||
|
where
|
||||||
|
sync (Just branch) = do
|
||||||
|
runThreadState st $ manualPull (Just branch) rs
|
||||||
|
now <- getCurrentTime
|
||||||
|
pushToRemotes threadname now st Nothing rs
|
||||||
|
{- No local branch exists yet, but we can try pulling. -}
|
||||||
|
sync Nothing = do
|
||||||
|
runThreadState st $ manualPull Nothing rs
|
||||||
|
return True
|
||||||
|
|
||||||
|
{- Updates the local sync branch, then pushes it to all remotes, in
|
||||||
|
- parallel.
|
||||||
|
-
|
||||||
|
- Avoids running possibly long-duration commands in the Annex monad, so
|
||||||
|
- as not to block other threads. -}
|
||||||
|
pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO Bool
|
||||||
|
pushToRemotes threadname now st mpushmap remotes = do
|
||||||
|
(g, branch) <- runThreadState st $
|
||||||
|
(,) <$> fromRepo id <*> inRepo Git.Branch.current
|
||||||
|
go True branch g remotes
|
||||||
|
where
|
||||||
|
go _ Nothing _ _ = return True -- no branch, so nothing to do
|
||||||
|
go shouldretry (Just branch) g rs = do
|
||||||
|
debug threadname
|
||||||
|
[ "pushing to"
|
||||||
|
, show rs
|
||||||
|
]
|
||||||
|
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
|
||||||
|
(succeeded, failed) <- inParallel (push g branch) rs
|
||||||
|
let ok = null failed
|
||||||
|
case mpushmap of
|
||||||
|
Nothing -> noop
|
||||||
|
Just pushmap ->
|
||||||
|
changeFailedPushMap pushmap $ \m ->
|
||||||
|
M.union (makemap failed) $
|
||||||
|
M.difference m (makemap succeeded)
|
||||||
|
unless (ok) $
|
||||||
|
debug threadname
|
||||||
|
[ "failed to push to"
|
||||||
|
, show failed
|
||||||
|
]
|
||||||
|
if (ok || not shouldretry)
|
||||||
|
then return ok
|
||||||
|
else retry branch g failed
|
||||||
|
|
||||||
|
makemap l = M.fromList $ zip l (repeat now)
|
||||||
|
|
||||||
|
push g branch remote = Command.Sync.pushBranch remote branch g
|
||||||
|
|
||||||
|
retry branch g rs = do
|
||||||
|
debug threadname [ "trying manual pull to resolve failed pushes" ]
|
||||||
|
runThreadState st $ manualPull (Just branch) rs
|
||||||
|
go False (Just branch) g rs
|
||||||
|
|
||||||
|
{- Manually pull from remotes and merge their branches. -}
|
||||||
|
manualPull :: (Maybe Git.Ref) -> [Remote] -> Annex ()
|
||||||
|
manualPull currentbranch remotes = do
|
||||||
|
forM_ remotes $ \r ->
|
||||||
|
inRepo $ Git.Command.runBool "fetch" [Param $ Remote.name r]
|
||||||
|
Annex.Branch.forceUpdate
|
||||||
|
forM_ remotes $ \r ->
|
||||||
|
Command.Sync.mergeRemote r currentbranch
|
|
@ -12,15 +12,13 @@ module Assistant.Threads.Merger (
|
||||||
|
|
||||||
import Assistant.Common
|
import Assistant.Common
|
||||||
import Assistant.ThreadedMonad
|
import Assistant.ThreadedMonad
|
||||||
|
import Assistant.Sync
|
||||||
import Utility.DirWatcher
|
import Utility.DirWatcher
|
||||||
import Utility.Types.DirWatcher
|
import Utility.Types.DirWatcher
|
||||||
import qualified Annex.Branch
|
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import qualified Git.Command
|
|
||||||
import qualified Git.Merge
|
import qualified Git.Merge
|
||||||
import qualified Git.Branch
|
import qualified Git.Branch
|
||||||
import qualified Command.Sync
|
import qualified Command.Sync
|
||||||
import qualified Remote
|
|
||||||
|
|
||||||
thisThread :: ThreadName
|
thisThread :: ThreadName
|
||||||
thisThread = "Merger"
|
thisThread = "Merger"
|
||||||
|
@ -84,15 +82,3 @@ onAdd g file _
|
||||||
|
|
||||||
mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
|
mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
|
||||||
mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
|
mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
|
||||||
|
|
||||||
{- Manually pull from remotes and merge their branches. Called by the pusher
|
|
||||||
- when a push fails, which can happen due to a remote not having pushed
|
|
||||||
- changes to us. That could be because it doesn't have us as a remote, or
|
|
||||||
- because the assistant is not running there, or other reasons. -}
|
|
||||||
manualPull :: (Maybe Git.Ref) -> [Remote] -> Annex ()
|
|
||||||
manualPull currentbranch remotes = do
|
|
||||||
forM_ remotes $ \r ->
|
|
||||||
inRepo $ Git.Command.runBool "fetch" [Param $ Remote.name r]
|
|
||||||
Annex.Branch.forceUpdate
|
|
||||||
forM_ remotes $ \r ->
|
|
||||||
Command.Sync.mergeRemote r currentbranch
|
|
||||||
|
|
|
@ -14,21 +14,17 @@ import Assistant.Common
|
||||||
import Assistant.ThreadedMonad
|
import Assistant.ThreadedMonad
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.ScanRemotes
|
import Assistant.ScanRemotes
|
||||||
import Assistant.Threads.Pusher (pushToRemotes)
|
import Assistant.Sync
|
||||||
import Assistant.Alert
|
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
import Utility.Mounts
|
import Utility.Mounts
|
||||||
import Remote.List
|
import Remote.List
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
import Assistant.Threads.Merger
|
|
||||||
import qualified Git.Branch
|
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import qualified Control.Exception as E
|
import qualified Control.Exception as E
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import Data.Time.Clock
|
|
||||||
|
|
||||||
#if WITH_DBUS
|
#if WITH_DBUS
|
||||||
import Utility.DBus
|
import Utility.DBus
|
||||||
|
@ -146,23 +142,9 @@ handleMounts st dstatus scanremotes wasmounted nowmounted =
|
||||||
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO ()
|
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO ()
|
||||||
handleMount st dstatus scanremotes dir = do
|
handleMount st dstatus scanremotes dir = do
|
||||||
debug thisThread ["detected mount of", dir]
|
debug thisThread ["detected mount of", dir]
|
||||||
rs <- remotesUnder st dstatus dir
|
syncRemotes thisThread st dstatus scanremotes
|
||||||
unless (null rs) $ do
|
=<< filter (Git.repoIsLocal . Remote.repo)
|
||||||
let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs
|
<$> remotesUnder st dstatus dir
|
||||||
unless (null nonspecial) $ do
|
|
||||||
void $ alertWhile dstatus (syncAlert nonspecial) $ do
|
|
||||||
debug thisThread ["syncing with", show nonspecial]
|
|
||||||
sync nonspecial =<< runThreadState st (inRepo Git.Branch.current)
|
|
||||||
addScanRemotes scanremotes nonspecial
|
|
||||||
where
|
|
||||||
sync rs (Just branch) = do
|
|
||||||
runThreadState st $ manualPull (Just branch) rs
|
|
||||||
now <- getCurrentTime
|
|
||||||
pushToRemotes thisThread now st Nothing rs
|
|
||||||
{- No local branch exists yet, but we can try pulling. -}
|
|
||||||
sync rs Nothing = do
|
|
||||||
runThreadState st $ manualPull Nothing rs
|
|
||||||
return True
|
|
||||||
|
|
||||||
{- Finds remotes located underneath the mount point.
|
{- Finds remotes located underneath the mount point.
|
||||||
-
|
-
|
||||||
|
|
|
@ -14,17 +14,13 @@ import Assistant.Common
|
||||||
import Assistant.ThreadedMonad
|
import Assistant.ThreadedMonad
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.ScanRemotes
|
import Assistant.ScanRemotes
|
||||||
import Assistant.Threads.Pusher (pushToRemotes)
|
import Assistant.Sync
|
||||||
import Assistant.Alert
|
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
import Remote.List
|
import Remote.List
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
import Assistant.Threads.Merger
|
|
||||||
import qualified Git.Branch
|
|
||||||
|
|
||||||
import qualified Control.Exception as E
|
import qualified Control.Exception as E
|
||||||
import Data.Time.Clock
|
|
||||||
|
|
||||||
#if WITH_DBUS
|
#if WITH_DBUS
|
||||||
import Utility.DBus
|
import Utility.DBus
|
||||||
|
@ -128,20 +124,9 @@ pollingThread st dstatus scanremotes = runEvery (Seconds 3600) $
|
||||||
|
|
||||||
handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
|
handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
|
||||||
handleConnection st dstatus scanremotes = do
|
handleConnection st dstatus scanremotes = do
|
||||||
rs <- networkRemotes st
|
syncRemotes thisThread st dstatus scanremotes =<<
|
||||||
unless (null rs) $ do
|
filter (Git.repoIsUrl . Remote.repo)
|
||||||
let nonspecial = filter (Git.repoIsUrl . Remote.repo) rs
|
<$> networkRemotes st
|
||||||
unless (null nonspecial) $ do
|
|
||||||
void $ alertWhile dstatus (syncAlert nonspecial) $ do
|
|
||||||
debug thisThread ["syncing with", show nonspecial]
|
|
||||||
sync nonspecial =<< runThreadState st (inRepo Git.Branch.current)
|
|
||||||
addScanRemotes scanremotes nonspecial
|
|
||||||
where
|
|
||||||
sync rs (Just branch) = do
|
|
||||||
runThreadState st $ manualPull (Just branch) rs
|
|
||||||
now <- getCurrentTime
|
|
||||||
pushToRemotes thisThread now st Nothing rs
|
|
||||||
sync _ _ = return True
|
|
||||||
|
|
||||||
{- Finds network remotes. -}
|
{- Finds network remotes. -}
|
||||||
networkRemotes :: ThreadState -> IO [Remote]
|
networkRemotes :: ThreadState -> IO [Remote]
|
||||||
|
|
|
@ -12,15 +12,11 @@ import Assistant.Commits
|
||||||
import Assistant.Pushes
|
import Assistant.Pushes
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Assistant.ThreadedMonad
|
import Assistant.ThreadedMonad
|
||||||
import Assistant.Threads.Merger
|
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import qualified Command.Sync
|
import Assistant.Sync
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
import Utility.Parallel
|
|
||||||
import qualified Git.Branch
|
|
||||||
|
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
import qualified Data.Map as M
|
|
||||||
|
|
||||||
thisThread :: ThreadName
|
thisThread :: ThreadName
|
||||||
thisThread = "Pusher"
|
thisThread = "Pusher"
|
||||||
|
@ -76,47 +72,3 @@ shouldPush :: UTCTime -> [Commit] -> Bool
|
||||||
shouldPush _now commits
|
shouldPush _now commits
|
||||||
| not (null commits) = True
|
| not (null commits) = True
|
||||||
| otherwise = False
|
| otherwise = False
|
||||||
|
|
||||||
{- Updates the local sync branch, then pushes it to all remotes, in
|
|
||||||
- parallel.
|
|
||||||
-
|
|
||||||
- Avoids running possibly long-duration commands in the Annex monad, so
|
|
||||||
- as not to block other threads. -}
|
|
||||||
pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO Bool
|
|
||||||
pushToRemotes threadname now st mpushmap remotes = do
|
|
||||||
(g, branch) <- runThreadState st $
|
|
||||||
(,) <$> fromRepo id <*> inRepo Git.Branch.current
|
|
||||||
go True branch g remotes
|
|
||||||
where
|
|
||||||
go _ Nothing _ _ = return True -- no branch, so nothing to do
|
|
||||||
go shouldretry (Just branch) g rs = do
|
|
||||||
debug threadname
|
|
||||||
[ "pushing to"
|
|
||||||
, show rs
|
|
||||||
]
|
|
||||||
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
|
|
||||||
(succeeded, failed) <- inParallel (push g branch) rs
|
|
||||||
let ok = null failed
|
|
||||||
case mpushmap of
|
|
||||||
Nothing -> noop
|
|
||||||
Just pushmap ->
|
|
||||||
changeFailedPushMap pushmap $ \m ->
|
|
||||||
M.union (makemap failed) $
|
|
||||||
M.difference m (makemap succeeded)
|
|
||||||
unless (ok) $
|
|
||||||
debug threadname
|
|
||||||
[ "failed to push to"
|
|
||||||
, show failed
|
|
||||||
]
|
|
||||||
if (ok || not shouldretry)
|
|
||||||
then return ok
|
|
||||||
else retry branch g failed
|
|
||||||
|
|
||||||
makemap l = M.fromList $ zip l (repeat now)
|
|
||||||
|
|
||||||
push g branch remote = Command.Sync.pushBranch remote branch g
|
|
||||||
|
|
||||||
retry branch g rs = do
|
|
||||||
debug threadname [ "trying manual pull to resolve failed pushes" ]
|
|
||||||
runThreadState st $ manualPull (Just branch) rs
|
|
||||||
go False (Just branch) g rs
|
|
||||||
|
|
Loading…
Reference in a new issue