deferred downloads
Now when a download is queued and there's no known remote to get it from, it's added to a deferred download list, which will be retried later. The Merger thread tries to queue any deferred downloads when it receives a push to the git-annex branch. Note that the Merger thread now also forces an update of the git-annex branch. The assistant was not updating this branch before, and it saw a (mostly) correct view of state, but now that incoming pushes go to synced/git-annex, it needs to be merged in.
This commit is contained in:
parent
7a86dc9443
commit
3c22977e44
4 changed files with 76 additions and 37 deletions
|
@ -185,7 +185,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
|
||||||
#endif
|
#endif
|
||||||
, assist $ pushThread st dstatus commitchan pushmap
|
, assist $ pushThread st dstatus commitchan pushmap
|
||||||
, assist $ pushRetryThread st dstatus pushmap
|
, assist $ pushRetryThread st dstatus pushmap
|
||||||
, assist $ mergeThread st
|
, assist $ mergeThread st dstatus transferqueue
|
||||||
, assist $ transferWatcherThread st dstatus
|
, assist $ transferWatcherThread st dstatus
|
||||||
, assist $ transferPollerThread st dstatus
|
, assist $ transferPollerThread st dstatus
|
||||||
, assist $ transfererThread st dstatus transferqueue transferslots
|
, assist $ transfererThread st dstatus transferqueue transferslots
|
||||||
|
|
|
@ -9,6 +9,8 @@ module Assistant.Threads.Merger where
|
||||||
|
|
||||||
import Assistant.Common
|
import Assistant.Common
|
||||||
import Assistant.ThreadedMonad
|
import Assistant.ThreadedMonad
|
||||||
|
import Assistant.DaemonStatus
|
||||||
|
import Assistant.TransferQueue
|
||||||
import Utility.DirWatcher
|
import Utility.DirWatcher
|
||||||
import Utility.Types.DirWatcher
|
import Utility.Types.DirWatcher
|
||||||
import qualified Annex.Branch
|
import qualified Annex.Branch
|
||||||
|
@ -19,15 +21,14 @@ import qualified Git.Branch
|
||||||
thisThread :: ThreadName
|
thisThread :: ThreadName
|
||||||
thisThread = "Merger"
|
thisThread = "Merger"
|
||||||
|
|
||||||
{- This thread watches for changes to .git/refs/, looking for
|
{- This thread watches for changes to .git/refs/, and handles incoming
|
||||||
- incoming pushes. It merges those pushes into the currently
|
- pushes. -}
|
||||||
- checked out branch. -}
|
mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread
|
||||||
mergeThread :: ThreadState -> NamedThread
|
mergeThread st dstatus transferqueue = thread $ do
|
||||||
mergeThread st = thread $ do
|
|
||||||
g <- runThreadState st $ fromRepo id
|
g <- runThreadState st $ fromRepo id
|
||||||
let dir = Git.localGitDir g </> "refs"
|
let dir = Git.localGitDir g </> "refs"
|
||||||
createDirectoryIfMissing True dir
|
createDirectoryIfMissing True dir
|
||||||
let hook a = Just $ runHandler g a
|
let hook a = Just $ runHandler st dstatus transferqueue g a
|
||||||
let hooks = mkWatchHooks
|
let hooks = mkWatchHooks
|
||||||
{ addHook = hook onAdd
|
{ addHook = hook onAdd
|
||||||
, errHook = hook onErr
|
, errHook = hook onErr
|
||||||
|
@ -37,21 +38,21 @@ mergeThread st = thread $ do
|
||||||
where
|
where
|
||||||
thread = NamedThread thisThread
|
thread = NamedThread thisThread
|
||||||
|
|
||||||
type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
|
type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
|
||||||
|
|
||||||
{- Runs an action handler.
|
{- Runs an action handler.
|
||||||
-
|
-
|
||||||
- Exceptions are ignored, otherwise a whole thread could be crashed.
|
- Exceptions are ignored, otherwise a whole thread could be crashed.
|
||||||
-}
|
-}
|
||||||
runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
|
runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
|
||||||
runHandler g handler file filestatus = void $
|
runHandler st dstatus transferqueue g handler file filestatus = void $
|
||||||
either print (const noop) =<< tryIO go
|
either print (const noop) =<< tryIO go
|
||||||
where
|
where
|
||||||
go = handler g file filestatus
|
go = handler st dstatus transferqueue g file filestatus
|
||||||
|
|
||||||
{- Called when there's an error with inotify. -}
|
{- Called when there's an error with inotify. -}
|
||||||
onErr :: Handler
|
onErr :: Handler
|
||||||
onErr _ msg _ = error msg
|
onErr _ _ _ _ msg _ = error msg
|
||||||
|
|
||||||
{- Called when a new branch ref is written.
|
{- Called when a new branch ref is written.
|
||||||
-
|
-
|
||||||
|
@ -65,14 +66,16 @@ onErr _ msg _ = error msg
|
||||||
- ran are merged in.
|
- ran are merged in.
|
||||||
-}
|
-}
|
||||||
onAdd :: Handler
|
onAdd :: Handler
|
||||||
onAdd g file _
|
onAdd st dstatus transferqueue g file _
|
||||||
| ".lock" `isSuffixOf` file = noop
|
| ".lock" `isSuffixOf` file = noop
|
||||||
| isAnnexBranch file = noop
|
| isAnnexBranch file = runThreadState st $
|
||||||
| "/synced/" `isInfixOf` file = go =<< Git.Branch.current g
|
whenM Annex.Branch.forceUpdate $
|
||||||
|
queueDeferredDownloads Later transferqueue dstatus
|
||||||
|
| "/synced/" `isInfixOf` file = mergecurrent =<< Git.Branch.current g
|
||||||
| otherwise = noop
|
| otherwise = noop
|
||||||
where
|
where
|
||||||
changedbranch = fileToBranch file
|
changedbranch = fileToBranch file
|
||||||
go (Just current)
|
mergecurrent (Just current)
|
||||||
| equivBranches changedbranch current = do
|
| equivBranches changedbranch current = do
|
||||||
liftIO $ debug thisThread
|
liftIO $ debug thisThread
|
||||||
[ "merging"
|
[ "merging"
|
||||||
|
@ -81,7 +84,7 @@ onAdd g file _
|
||||||
, show current
|
, show current
|
||||||
]
|
]
|
||||||
void $ Git.Merge.mergeNonInteractive changedbranch g
|
void $ Git.Merge.mergeNonInteractive changedbranch g
|
||||||
go _ = noop
|
mergecurrent _ = noop
|
||||||
|
|
||||||
equivBranches :: Git.Ref -> Git.Ref -> Bool
|
equivBranches :: Git.Ref -> Git.Ref -> Bool
|
||||||
equivBranches x y = base x == base y
|
equivBranches x y = base x == base y
|
||||||
|
|
|
@ -11,6 +11,7 @@ module Assistant.TransferQueue (
|
||||||
newTransferQueue,
|
newTransferQueue,
|
||||||
getTransferQueue,
|
getTransferQueue,
|
||||||
queueTransfers,
|
queueTransfers,
|
||||||
|
queueDeferredDownloads,
|
||||||
queueTransfer,
|
queueTransfer,
|
||||||
queueTransferAt,
|
queueTransferAt,
|
||||||
queueTransferWhenSmall,
|
queueTransferWhenSmall,
|
||||||
|
@ -32,6 +33,7 @@ import qualified Data.Map as M
|
||||||
data TransferQueue = TransferQueue
|
data TransferQueue = TransferQueue
|
||||||
{ queuesize :: TVar Int
|
{ queuesize :: TVar Int
|
||||||
, queuelist :: TVar [(Transfer, TransferInfo)]
|
, queuelist :: TVar [(Transfer, TransferInfo)]
|
||||||
|
, deferreddownloads :: TVar [(Key, AssociatedFile)]
|
||||||
}
|
}
|
||||||
|
|
||||||
data Schedule = Next | Later
|
data Schedule = Next | Later
|
||||||
|
@ -41,48 +43,78 @@ newTransferQueue :: IO TransferQueue
|
||||||
newTransferQueue = atomically $ TransferQueue
|
newTransferQueue = atomically $ TransferQueue
|
||||||
<$> newTVar 0
|
<$> newTVar 0
|
||||||
<*> newTVar []
|
<*> newTVar []
|
||||||
|
<*> newTVar []
|
||||||
|
|
||||||
{- Reads the queue's content without blocking or changing it. -}
|
{- Reads the queue's content without blocking or changing it. -}
|
||||||
getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
|
getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
|
||||||
getTransferQueue q = atomically $ readTVar $ queuelist q
|
getTransferQueue q = atomically $ readTVar $ queuelist q
|
||||||
|
|
||||||
stubInfo :: AssociatedFile -> Remote -> TransferInfo
|
stubInfo :: AssociatedFile -> Remote -> TransferInfo
|
||||||
stubInfo f r = TransferInfo
|
stubInfo f r = stubTransferInfo
|
||||||
{ startedTime = Nothing
|
{ transferRemote = Just r
|
||||||
, transferPid = Nothing
|
|
||||||
, transferTid = Nothing
|
|
||||||
, transferRemote = Just r
|
|
||||||
, bytesComplete = Nothing
|
|
||||||
, associatedFile = f
|
, associatedFile = f
|
||||||
, transferPaused = False
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{- Adds transfers to queue for some of the known remotes. -}
|
{- Adds transfers to queue for some of the known remotes. -}
|
||||||
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
||||||
queueTransfers schedule q dstatus k f direction = do
|
queueTransfers schedule q dstatus k f direction = do
|
||||||
rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
|
rs <- sufficientremotes
|
||||||
mapM_ go =<< sufficientremotes rs
|
=<< knownRemotes <$> liftIO (getDaemonStatus dstatus)
|
||||||
|
if null rs
|
||||||
|
then defer
|
||||||
|
else forM_ rs $ \r -> liftIO $
|
||||||
|
enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
|
||||||
where
|
where
|
||||||
sufficientremotes rs
|
sufficientremotes rs
|
||||||
-- Queue downloads from all remotes that
|
{- Queue downloads from all remotes that
|
||||||
-- have the key, with the cheapest ones first.
|
- have the key, with the cheapest ones first.
|
||||||
-- More expensive ones will only be tried if
|
- More expensive ones will only be tried if
|
||||||
-- downloading from a cheap one fails.
|
- downloading from a cheap one fails. -}
|
||||||
| direction == Download = do
|
| direction == Download = do
|
||||||
uuids <- Remote.keyLocations k
|
uuids <- Remote.keyLocations k
|
||||||
return $ filter (\r -> uuid r `elem` uuids) rs
|
return $ filter (\r -> uuid r `elem` uuids) rs
|
||||||
-- TODO: Determine a smaller set of remotes that
|
{- TODO: Determine a smaller set of remotes that
|
||||||
-- can be uploaded to, in order to ensure all
|
- can be uploaded to, in order to ensure all
|
||||||
-- remotes can access the content. Currently,
|
- remotes can access the content. Currently,
|
||||||
-- send to every remote we can.
|
- send to every remote we can. -}
|
||||||
| otherwise = return $ filter (not . Remote.readonly) rs
|
| otherwise = return $ filter (not . Remote.readonly) rs
|
||||||
gentransfer r = Transfer
|
gentransfer r = Transfer
|
||||||
{ transferDirection = direction
|
{ transferDirection = direction
|
||||||
, transferKey = k
|
, transferKey = k
|
||||||
, transferUUID = Remote.uuid r
|
, transferUUID = Remote.uuid r
|
||||||
}
|
}
|
||||||
go r = liftIO $
|
defer
|
||||||
enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
|
{- Defer this download, as no known remote has the key. -}
|
||||||
|
| direction == Download = void $ liftIO $ atomically $
|
||||||
|
modifyTVar' (deferreddownloads q) $
|
||||||
|
\l -> (k, f):l
|
||||||
|
| otherwise = noop
|
||||||
|
|
||||||
|
{- Queues any deferred downloads that can now be accomplished, leaving
|
||||||
|
- any others in the list to try again later. -}
|
||||||
|
queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex ()
|
||||||
|
queueDeferredDownloads schedule q dstatus = do
|
||||||
|
rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
|
||||||
|
l <- liftIO $ atomically $ swapTVar (deferreddownloads q) []
|
||||||
|
left <- filterM (queue rs) l
|
||||||
|
unless (null left) $
|
||||||
|
liftIO $ atomically $ modifyTVar' (deferreddownloads q) $
|
||||||
|
\new -> new ++ left
|
||||||
|
where
|
||||||
|
queue rs (k, f) = do
|
||||||
|
uuids <- Remote.keyLocations k
|
||||||
|
let sources = filter (\r -> uuid r `elem` uuids) rs
|
||||||
|
unless (null sources) $
|
||||||
|
forM_ sources $ \r -> liftIO $
|
||||||
|
enqueue schedule q dstatus
|
||||||
|
(gentransfer r) (stubInfo f r)
|
||||||
|
return $ null sources
|
||||||
|
where
|
||||||
|
gentransfer r = Transfer
|
||||||
|
{ transferDirection = Download
|
||||||
|
, transferKey = k
|
||||||
|
, transferUUID = Remote.uuid r
|
||||||
|
}
|
||||||
|
|
||||||
enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
|
enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
|
||||||
enqueue schedule q dstatus t info
|
enqueue schedule q dstatus t info
|
||||||
|
|
|
@ -39,7 +39,7 @@ all the other git clones, at both the git level and the key/value level.
|
||||||
Possible solution: C could record a download intent. (Similar to a failed
|
Possible solution: C could record a download intent. (Similar to a failed
|
||||||
download, but with an unknown source.) When C next receives a git-annex
|
download, but with an unknown source.) When C next receives a git-annex
|
||||||
branch push, it could try to requeue downloads that it has such intents
|
branch push, it could try to requeue downloads that it has such intents
|
||||||
registered for.
|
registered for. **done**
|
||||||
|
|
||||||
Note that this solution won't cover use cases the other does. For example,
|
Note that this solution won't cover use cases the other does. For example,
|
||||||
connect a USB drive A; B syncs files from it, and then should pass them to C.
|
connect a USB drive A; B syncs files from it, and then should pass them to C.
|
||||||
|
@ -85,6 +85,10 @@ all the other git clones, at both the git level and the key/value level.
|
||||||
need to use the TransferScanner, if we get and check a list of the changed
|
need to use the TransferScanner, if we get and check a list of the changed
|
||||||
files.
|
files.
|
||||||
* [[use multiple transfer slots|todo/Slow_transfer_for_a_lot_of_small_files.]]
|
* [[use multiple transfer slots|todo/Slow_transfer_for_a_lot_of_small_files.]]
|
||||||
|
* The TransferQueue's list of deferred downloads could theoretically
|
||||||
|
grow without bounds in memory. Limit it to a given number of entries,
|
||||||
|
and fall back to some other method -- either storing deferred downloads
|
||||||
|
on disk, or perhaps scheduling a TransferScanner run to get back into sync.
|
||||||
|
|
||||||
## data syncing
|
## data syncing
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue