ensure that git-annex branch is pushed after a successful transfer
I now have this topology working: assistant ---> {bare repo, special remote} <--- assistant And, I think, also this one: +----------- bare repo --------+ v v assistant ---> special remote <--- assistant While before with assistant <---> assistant connections, both sides got location info updated after a transfer, in this topology, the bare repo *might* get its location info updated, but the other assistant has no way to know that it did. And a special remote doesn't record location info, so transfers to it won't propigate out location log changes at all. So, for these to work, after a transfer succeeds, the git-annex branch needs to be pushed. This is done by recording a synthetic commit has occurred, which lets the pusher handle pushing out the change (which will include actually committing any still journalled changes to the git-annex branch). Of course, this means rather a lot more syncing action than happened before. At least the pusher bundles together very close together pushes, somewhat. Currently it just waits 2 seconds between each push.
This commit is contained in:
parent
5406416234
commit
4ac2fd0a22
10 changed files with 41 additions and 28 deletions
|
@ -197,7 +197,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
|
||||||
mapM_ (startthread dstatus)
|
mapM_ (startthread dstatus)
|
||||||
[ watch $ commitThread st changechan commitchan transferqueue dstatus
|
[ watch $ commitThread st changechan commitchan transferqueue dstatus
|
||||||
#ifdef WITH_WEBAPP
|
#ifdef WITH_WEBAPP
|
||||||
, assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots pushnotifier urlrenderer Nothing webappwaiter
|
, assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots pushnotifier commitchan urlrenderer Nothing webappwaiter
|
||||||
#ifdef WITH_PAIRING
|
#ifdef WITH_PAIRING
|
||||||
, assist $ pairListenerThread st dstatus scanremotes urlrenderer
|
, assist $ pairListenerThread st dstatus scanremotes urlrenderer
|
||||||
#endif
|
#endif
|
||||||
|
@ -207,7 +207,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
|
||||||
, assist $ mergeThread st dstatus transferqueue branchhandle
|
, assist $ mergeThread st dstatus transferqueue branchhandle
|
||||||
, assist $ transferWatcherThread st dstatus transferqueue
|
, assist $ transferWatcherThread st dstatus transferqueue
|
||||||
, assist $ transferPollerThread st dstatus
|
, assist $ transferPollerThread st dstatus
|
||||||
, assist $ transfererThread st dstatus transferqueue transferslots
|
, assist $ transfererThread st dstatus transferqueue transferslots commitchan
|
||||||
, assist $ daemonStatusThread st dstatus
|
, assist $ daemonStatusThread st dstatus
|
||||||
, assist $ sanityCheckerThread st dstatus transferqueue changechan
|
, assist $ sanityCheckerThread st dstatus transferqueue changechan
|
||||||
, assist $ mountWatcherThread st dstatus scanremotes pushnotifier
|
, assist $ mountWatcherThread st dstatus scanremotes pushnotifier
|
||||||
|
|
|
@ -9,12 +9,9 @@ module Assistant.Commits where
|
||||||
|
|
||||||
import Utility.TSet
|
import Utility.TSet
|
||||||
|
|
||||||
import Data.Time.Clock
|
|
||||||
|
|
||||||
type CommitChan = TSet Commit
|
type CommitChan = TSet Commit
|
||||||
|
|
||||||
data Commit = Commit UTCTime
|
data Commit = Commit
|
||||||
deriving (Show)
|
|
||||||
|
|
||||||
newCommitChan :: IO CommitChan
|
newCommitChan :: IO CommitChan
|
||||||
newCommitChan = newTSet
|
newCommitChan = newTSet
|
||||||
|
@ -30,5 +27,5 @@ refillCommits :: CommitChan -> [Commit] -> IO ()
|
||||||
refillCommits = putTSet
|
refillCommits = putTSet
|
||||||
|
|
||||||
{- Records a commit in the channel. -}
|
{- Records a commit in the channel. -}
|
||||||
recordCommit :: CommitChan -> Commit -> IO ()
|
recordCommit :: CommitChan -> IO ()
|
||||||
recordCommit = putTSet1
|
recordCommit = flip putTSet1 Commit
|
||||||
|
|
|
@ -65,7 +65,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do
|
||||||
]
|
]
|
||||||
void $ alertWhile dstatus commitAlert $
|
void $ alertWhile dstatus commitAlert $
|
||||||
runThreadState st commitStaged
|
runThreadState st commitStaged
|
||||||
recordCommit commitchan (Commit time)
|
recordCommit commitchan
|
||||||
else refill readychanges
|
else refill readychanges
|
||||||
else refill changes
|
else refill changes
|
||||||
where
|
where
|
||||||
|
|
|
@ -24,7 +24,6 @@ import qualified Git.LsTree as LsTree
|
||||||
import qualified Annex.Branch
|
import qualified Annex.Branch
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
|
|
||||||
import Data.Time.Clock
|
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
|
|
||||||
thisThread :: ThreadName
|
thisThread :: ThreadName
|
||||||
|
@ -56,8 +55,7 @@ configMonitorThread st dstatus branchhandle commitchan = thread $ do
|
||||||
reloadConfigs st dstatus changedconfigs
|
reloadConfigs st dstatus changedconfigs
|
||||||
{- Record a commit to get this config
|
{- Record a commit to get this config
|
||||||
- change pushed out to remotes. -}
|
- change pushed out to remotes. -}
|
||||||
time <- getCurrentTime
|
recordCommit commitchan
|
||||||
recordCommit commitchan (Commit time)
|
|
||||||
go r new
|
go r new
|
||||||
|
|
||||||
{- Config files, and their checksums. -}
|
{- Config files, and their checksums. -}
|
||||||
|
|
|
@ -49,12 +49,12 @@ pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Secon
|
||||||
-- Next, wait until at least one commit has been made
|
-- Next, wait until at least one commit has been made
|
||||||
commits <- getCommits commitchan
|
commits <- getCommits commitchan
|
||||||
-- Now see if now's a good time to push.
|
-- Now see if now's a good time to push.
|
||||||
now <- getCurrentTime
|
if shouldPush commits
|
||||||
if shouldPush now commits
|
|
||||||
then do
|
then do
|
||||||
remotes <- filter pushable . syncRemotes
|
remotes <- filter pushable . syncRemotes
|
||||||
<$> getDaemonStatus dstatus
|
<$> getDaemonStatus dstatus
|
||||||
unless (null remotes) $
|
unless (null remotes) $ do
|
||||||
|
now <- getCurrentTime
|
||||||
void $ alertWhile dstatus (pushAlert remotes) $
|
void $ alertWhile dstatus (pushAlert remotes) $
|
||||||
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
|
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
|
||||||
else do
|
else do
|
||||||
|
@ -77,7 +77,7 @@ pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Secon
|
||||||
- already determines batches of changes, so we can't easily determine
|
- already determines batches of changes, so we can't easily determine
|
||||||
- batches better.
|
- batches better.
|
||||||
-}
|
-}
|
||||||
shouldPush :: UTCTime -> [Commit] -> Bool
|
shouldPush :: [Commit] -> Bool
|
||||||
shouldPush _now commits
|
shouldPush commits
|
||||||
| not (null commits) = True
|
| not (null commits) = True
|
||||||
| otherwise = False
|
| otherwise = False
|
||||||
|
|
|
@ -13,6 +13,7 @@ import Assistant.DaemonStatus
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
import Assistant.TransferSlots
|
import Assistant.TransferSlots
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
|
import Assistant.Commits
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
@ -30,20 +31,20 @@ maxTransfers :: Int
|
||||||
maxTransfers = 1
|
maxTransfers = 1
|
||||||
|
|
||||||
{- Dispatches transfers from the queue. -}
|
{- Dispatches transfers from the queue. -}
|
||||||
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> NamedThread
|
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread
|
||||||
transfererThread st dstatus transferqueue slots = thread $ go =<< readProgramFile
|
transfererThread st dstatus transferqueue slots commitchan = thread $ go =<< readProgramFile
|
||||||
where
|
where
|
||||||
thread = NamedThread thisThread
|
thread = NamedThread thisThread
|
||||||
go program = forever $ inTransferSlot dstatus slots $
|
go program = forever $ inTransferSlot dstatus slots $
|
||||||
maybe (return Nothing) (uncurry $ startTransfer st dstatus program)
|
maybe (return Nothing) (uncurry $ startTransfer st dstatus commitchan program)
|
||||||
=<< getNextTransfer transferqueue dstatus notrunning
|
=<< getNextTransfer transferqueue dstatus notrunning
|
||||||
{- Skip transfers that are already running. -}
|
{- Skip transfers that are already running. -}
|
||||||
notrunning = isNothing . startedTime
|
notrunning = isNothing . startedTime
|
||||||
|
|
||||||
{- By the time this is called, the daemonstatus's transfer map should
|
{- By the time this is called, the daemonstatus's transfer map should
|
||||||
- already have been updated to include the transfer. -}
|
- already have been updated to include the transfer. -}
|
||||||
startTransfer :: ThreadState -> DaemonStatusHandle -> FilePath -> Transfer -> TransferInfo -> TransferGenerator
|
startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath -> Transfer -> TransferInfo -> TransferGenerator
|
||||||
startTransfer st dstatus program t info = case (transferRemote info, associatedFile info) of
|
startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of
|
||||||
(Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info)
|
(Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info)
|
||||||
( do
|
( do
|
||||||
debug thisThread [ "Transferring:" , show t ]
|
debug thisThread [ "Transferring:" , show t ]
|
||||||
|
@ -66,11 +67,19 @@ startTransfer st dstatus program t info = case (transferRemote info, associatedF
|
||||||
{- Alerts are only shown for successful transfers.
|
{- Alerts are only shown for successful transfers.
|
||||||
- Transfers can temporarily fail for many reasons,
|
- Transfers can temporarily fail for many reasons,
|
||||||
- so there's no point in bothering the user about
|
- so there's no point in bothering the user about
|
||||||
- those. The assistant should recover. -}
|
- those. The assistant should recover.
|
||||||
whenM ((==) ExitSuccess <$> waitForProcess pid) $ void $
|
-
|
||||||
addAlert dstatus $
|
- Also, after a successful transfer, the location
|
||||||
|
- log has changed. Indicate that a commit has been
|
||||||
|
- made, in order to queue a push of the git-annex
|
||||||
|
- branch out to remotes that did not participate
|
||||||
|
- in the transfer.
|
||||||
|
-}
|
||||||
|
whenM ((==) ExitSuccess <$> waitForProcess pid) $ do
|
||||||
|
void $ addAlert dstatus $
|
||||||
makeAlertFiller True $
|
makeAlertFiller True $
|
||||||
transferFileAlert direction True file
|
transferFileAlert direction True file
|
||||||
|
recordCommit commitchan
|
||||||
where
|
where
|
||||||
params =
|
params =
|
||||||
[ Param "transferkey"
|
[ Param "transferkey"
|
||||||
|
|
|
@ -33,6 +33,7 @@ import Assistant.ScanRemotes
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
import Assistant.TransferSlots
|
import Assistant.TransferSlots
|
||||||
import Assistant.Pushes
|
import Assistant.Pushes
|
||||||
|
import Assistant.Commits
|
||||||
import Utility.WebApp
|
import Utility.WebApp
|
||||||
import Utility.FileMode
|
import Utility.FileMode
|
||||||
import Utility.TempFile
|
import Utility.TempFile
|
||||||
|
@ -57,11 +58,12 @@ webAppThread
|
||||||
-> TransferQueue
|
-> TransferQueue
|
||||||
-> TransferSlots
|
-> TransferSlots
|
||||||
-> PushNotifier
|
-> PushNotifier
|
||||||
|
-> CommitChan
|
||||||
-> UrlRenderer
|
-> UrlRenderer
|
||||||
-> Maybe (IO String)
|
-> Maybe (IO String)
|
||||||
-> Maybe (Url -> FilePath -> IO ())
|
-> Maybe (Url -> FilePath -> IO ())
|
||||||
-> NamedThread
|
-> NamedThread
|
||||||
webAppThread mst dstatus scanremotes transferqueue transferslots pushnotifier urlrenderer postfirstrun onstartup = thread $ do
|
webAppThread mst dstatus scanremotes transferqueue transferslots pushnotifier commitchan urlrenderer postfirstrun onstartup = thread $ do
|
||||||
webapp <- WebApp
|
webapp <- WebApp
|
||||||
<$> pure mst
|
<$> pure mst
|
||||||
<*> pure dstatus
|
<*> pure dstatus
|
||||||
|
@ -69,6 +71,7 @@ webAppThread mst dstatus scanremotes transferqueue transferslots pushnotifier ur
|
||||||
<*> pure transferqueue
|
<*> pure transferqueue
|
||||||
<*> pure transferslots
|
<*> pure transferslots
|
||||||
<*> pure pushnotifier
|
<*> pure pushnotifier
|
||||||
|
<*> pure commitchan
|
||||||
<*> (pack <$> genRandomToken)
|
<*> (pack <$> genRandomToken)
|
||||||
<*> getreldir mst
|
<*> getreldir mst
|
||||||
<*> pure $(embed "static")
|
<*> pure $(embed "static")
|
||||||
|
|
|
@ -18,6 +18,7 @@ import Assistant.ScanRemotes
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
import Assistant.TransferSlots
|
import Assistant.TransferSlots
|
||||||
import Assistant.Pushes
|
import Assistant.Pushes
|
||||||
|
import Assistant.Commits
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Assistant.Pairing
|
import Assistant.Pairing
|
||||||
import Utility.NotificationBroadcaster
|
import Utility.NotificationBroadcaster
|
||||||
|
@ -40,6 +41,7 @@ data WebApp = WebApp
|
||||||
, transferQueue :: TransferQueue
|
, transferQueue :: TransferQueue
|
||||||
, transferSlots :: TransferSlots
|
, transferSlots :: TransferSlots
|
||||||
, pushNotifier :: PushNotifier
|
, pushNotifier :: PushNotifier
|
||||||
|
, commitChan :: CommitChan
|
||||||
, secretToken :: Text
|
, secretToken :: Text
|
||||||
, relDir :: Maybe FilePath
|
, relDir :: Maybe FilePath
|
||||||
, getStatic :: Static
|
, getStatic :: Static
|
||||||
|
|
|
@ -135,9 +135,10 @@ startTransfer t = do
|
||||||
let st = fromJust $ threadState webapp
|
let st = fromJust $ threadState webapp
|
||||||
let dstatus = daemonStatus webapp
|
let dstatus = daemonStatus webapp
|
||||||
let slots = transferSlots webapp
|
let slots = transferSlots webapp
|
||||||
|
let commitchan = commitChan webapp
|
||||||
liftIO $ inImmediateTransferSlot dstatus slots $ do
|
liftIO $ inImmediateTransferSlot dstatus slots $ do
|
||||||
program <- readProgramFile
|
program <- readProgramFile
|
||||||
Transferrer.startTransfer st dstatus program t info
|
Transferrer.startTransfer st dstatus commitchan program t info
|
||||||
|
|
||||||
getCurrentTransfers :: Handler TransferMap
|
getCurrentTransfers :: Handler TransferMap
|
||||||
getCurrentTransfers = currentTransfers
|
getCurrentTransfers = currentTransfers
|
||||||
|
|
|
@ -16,6 +16,7 @@ import Assistant.ScanRemotes
|
||||||
import Assistant.TransferQueue
|
import Assistant.TransferQueue
|
||||||
import Assistant.TransferSlots
|
import Assistant.TransferSlots
|
||||||
import Assistant.Pushes
|
import Assistant.Pushes
|
||||||
|
import Assistant.Commits
|
||||||
import Assistant.Threads.WebApp
|
import Assistant.Threads.WebApp
|
||||||
import Assistant.WebApp
|
import Assistant.WebApp
|
||||||
import Assistant.Install
|
import Assistant.Install
|
||||||
|
@ -106,11 +107,13 @@ firstRun = do
|
||||||
transferslots <- newTransferSlots
|
transferslots <- newTransferSlots
|
||||||
urlrenderer <- newUrlRenderer
|
urlrenderer <- newUrlRenderer
|
||||||
pushnotifier <- newPushNotifier
|
pushnotifier <- newPushNotifier
|
||||||
|
commitchan <- newCommitChan
|
||||||
v <- newEmptyMVar
|
v <- newEmptyMVar
|
||||||
let callback a = Just $ a v
|
let callback a = Just $ a v
|
||||||
void $ runNamedThread dstatus $
|
void $ runNamedThread dstatus $
|
||||||
webAppThread Nothing dstatus scanremotes
|
webAppThread Nothing dstatus scanremotes
|
||||||
transferqueue transferslots pushnotifier urlrenderer
|
transferqueue transferslots pushnotifier commitchan
|
||||||
|
urlrenderer
|
||||||
(callback signaler) (callback mainthread)
|
(callback signaler) (callback mainthread)
|
||||||
where
|
where
|
||||||
signaler v = do
|
signaler v = do
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue