partially fix concurrency issue in updating the rollingtotal
It's possible for two processes or threads to both be doing the same operation at the same time. Eg, both dropping the same key. If one finishes and updates the rollingtotal, then the other one needs to be prevented from later updating the rollingtotal as well. And they could finish at the same time, or with some time in between. Addressed this by making updateRepoSize be called with the journal locked, and only once it's been determined that there is an actual location change to record in the log. updateRepoSize waits for the database to be updated. When there is a redundant operation, updateRepoSize won't be called, and the redundant LiveUpdate will be removed from the database on garbage collection. But: There will be a window where the redundant LiveUpdate is still visible in the db, and processes can see it, combine it with the rollingtotal, and arrive at the wrong size. This is a small window, but it still ought to be addressed. Unsure if it would always be safe to remove the redundant LiveUpdate? Consider the case where two drops and a get are all running concurrently somehow, and the order they finish is [drop, get, drop]. The second drop seems redundant to the first, but it would not be safe to remove it. While this seems unlikely, it's hard to rule out that a get and drop at different stages can both be running at the same time.
This commit is contained in:
parent
03c7f99957
commit
db89e39df6
6 changed files with 42 additions and 72 deletions
|
@ -412,9 +412,11 @@ change ru file f = lockJournal $ \jl -> f <$> getToChange ru file >>= set jl ru
|
||||||
|
|
||||||
{- Applies a function which can modify the content of a file, or not.
|
{- Applies a function which can modify the content of a file, or not.
|
||||||
-
|
-
|
||||||
- Returns True when the file was modified. -}
|
- When the file was modified, runs the onchange action, and returns
|
||||||
maybeChange :: Journalable content => RegardingUUID -> RawFilePath -> (L.ByteString -> Maybe content) -> Annex Bool
|
- True. The action is run while the journal is still locked,
|
||||||
maybeChange ru file f = lockJournal $ \jl -> do
|
- so another concurrent call to this cannot happen while it is running. -}
|
||||||
|
maybeChange :: Journalable content => RegardingUUID -> RawFilePath -> (L.ByteString -> Maybe content) -> Annex () -> Annex Bool
|
||||||
|
maybeChange ru file f onchange = lockJournal $ \jl -> do
|
||||||
v <- getToChange ru file
|
v <- getToChange ru file
|
||||||
case f v of
|
case f v of
|
||||||
Just jv ->
|
Just jv ->
|
||||||
|
@ -422,6 +424,7 @@ maybeChange ru file f = lockJournal $ \jl -> do
|
||||||
in if v /= b
|
in if v /= b
|
||||||
then do
|
then do
|
||||||
set jl ru file b
|
set jl ru file b
|
||||||
|
onchange
|
||||||
return True
|
return True
|
||||||
else return False
|
else return False
|
||||||
_ -> return False
|
_ -> return False
|
||||||
|
|
|
@ -10,7 +10,6 @@
|
||||||
module Annex.RepoSize.LiveUpdate where
|
module Annex.RepoSize.LiveUpdate where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Annex
|
|
||||||
import Logs.Presence.Pure
|
import Logs.Presence.Pure
|
||||||
import qualified Database.RepoSize as Db
|
import qualified Database.RepoSize as Db
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
|
@ -20,22 +19,17 @@ import qualified Data.Map.Strict as M
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import System.Process
|
import System.Process
|
||||||
|
|
||||||
|
{- Called when a location log change is journalled, so the LiveUpdate
|
||||||
|
- is done. This is called with the journal still locked, so no concurrent
|
||||||
|
- changes can happen while it's running. Waits for the database
|
||||||
|
- to be updated. -}
|
||||||
updateRepoSize :: LiveUpdate -> UUID -> Key -> LogStatus -> Annex ()
|
updateRepoSize :: LiveUpdate -> UUID -> Key -> LogStatus -> Annex ()
|
||||||
updateRepoSize lu u k s = do
|
updateRepoSize lu u k s = liftIO $ finishedLiveUpdate lu u k sc
|
||||||
liftIO $ finishedLiveUpdate lu u k sc
|
|
||||||
rsv <- Annex.getRead Annex.reposizes
|
|
||||||
liftIO (takeMVar rsv) >>= \case
|
|
||||||
Nothing -> liftIO (putMVar rsv Nothing)
|
|
||||||
Just sizemap -> do
|
|
||||||
let !sizemap' = M.adjust
|
|
||||||
(fromMaybe (RepoSize 0) . f k . Just)
|
|
||||||
u sizemap
|
|
||||||
liftIO $ putMVar rsv (Just sizemap')
|
|
||||||
where
|
where
|
||||||
(sc, f) = case s of
|
sc = case s of
|
||||||
InfoPresent -> (AddingKey, addKeyRepoSize)
|
InfoPresent -> AddingKey
|
||||||
InfoMissing -> (RemovingKey, removeKeyRepoSize)
|
InfoMissing -> RemovingKey
|
||||||
InfoDead -> (RemovingKey, removeKeyRepoSize)
|
InfoDead -> RemovingKey
|
||||||
|
|
||||||
addKeyRepoSize :: Key -> Maybe RepoSize -> Maybe RepoSize
|
addKeyRepoSize :: Key -> Maybe RepoSize -> Maybe RepoSize
|
||||||
addKeyRepoSize k mrs = case mrs of
|
addKeyRepoSize k mrs = case mrs of
|
||||||
|
@ -88,11 +82,8 @@ prepareLiveUpdate mu k sc = do
|
||||||
|
|
||||||
{- Wait for finishedLiveUpdate to be called, or for the LiveUpdate
|
{- Wait for finishedLiveUpdate to be called, or for the LiveUpdate
|
||||||
- to get garbage collected in the case where the change didn't
|
- to get garbage collected in the case where the change didn't
|
||||||
- actually happen. -}
|
- actually happen. Updates the database. -}
|
||||||
waitdone donev finishv h u cid = tryNonAsync (takeMVar donev) >>= \case
|
waitdone donev finishv h u cid = tryNonAsync (takeMVar donev) >>= \case
|
||||||
-- TODO need to update local state too, and it must be done
|
|
||||||
-- with locking around the state update and this database
|
|
||||||
-- update.
|
|
||||||
Right (Just (u', k', sc'))
|
Right (Just (u', k', sc'))
|
||||||
| u' == u && k' == k && sc' == sc -> do
|
| u' == u && k' == k && sc' == sc -> do
|
||||||
Db.successfullyFinishedLiveSizeChange h u k sc cid
|
Db.successfullyFinishedLiveSizeChange h u k sc cid
|
||||||
|
|
|
@ -36,6 +36,7 @@ recordContentIdentifier (RemoteStateHandle u) cid k = do
|
||||||
(Annex.Branch.RegardingUUID [u])
|
(Annex.Branch.RegardingUUID [u])
|
||||||
(remoteContentIdentifierLogFile config k)
|
(remoteContentIdentifierLogFile config k)
|
||||||
(addcid c . parseLog)
|
(addcid c . parseLog)
|
||||||
|
noop
|
||||||
where
|
where
|
||||||
addcid c v
|
addcid c v
|
||||||
| cid `elem` l = Nothing -- no change needed
|
| cid `elem` l = Nothing -- no change needed
|
||||||
|
|
|
@ -84,13 +84,12 @@ logChange lu key u@(UUID _) s
|
||||||
| isClusterUUID u = noop
|
| isClusterUUID u = noop
|
||||||
| otherwise = do
|
| otherwise = do
|
||||||
config <- Annex.getGitConfig
|
config <- Annex.getGitConfig
|
||||||
changed <- maybeAddLog
|
void $ maybeAddLog
|
||||||
(Annex.Branch.RegardingUUID [u])
|
(Annex.Branch.RegardingUUID [u])
|
||||||
(locationLogFile config key)
|
(locationLogFile config key)
|
||||||
s
|
s
|
||||||
(LogInfo (fromUUID u))
|
(LogInfo (fromUUID u))
|
||||||
when changed $
|
(updateRepoSize lu u key s)
|
||||||
updateRepoSize lu u key s
|
|
||||||
logChange _ _ NoUUID _ = noop
|
logChange _ _ NoUUID _ = noop
|
||||||
|
|
||||||
{- Returns a list of repository UUIDs that, according to the log, have
|
{- Returns a list of repository UUIDs that, according to the log, have
|
||||||
|
|
|
@ -50,17 +50,19 @@ addLog' ru file logstatus loginfo c =
|
||||||
- older timestamp, that LogLine is preserved, rather than updating the log
|
- older timestamp, that LogLine is preserved, rather than updating the log
|
||||||
- with a newer timestamp.
|
- with a newer timestamp.
|
||||||
-
|
-
|
||||||
- Returns True when the log was changed.
|
- When the log was changed, the onchange action is run (with the journal
|
||||||
|
- still locked to prevent any concurrent changes) and True is returned.
|
||||||
-}
|
-}
|
||||||
maybeAddLog :: Annex.Branch.RegardingUUID -> RawFilePath -> LogStatus -> LogInfo -> Annex Bool
|
maybeAddLog :: Annex.Branch.RegardingUUID -> RawFilePath -> LogStatus -> LogInfo -> Annex () -> Annex Bool
|
||||||
maybeAddLog ru file logstatus loginfo = do
|
maybeAddLog ru file logstatus loginfo onchange = do
|
||||||
c <- currentVectorClock
|
c <- currentVectorClock
|
||||||
Annex.Branch.maybeChange ru file $ \b ->
|
let f = \b ->
|
||||||
let old = parseLog b
|
let old = parseLog b
|
||||||
line = genLine logstatus loginfo c old
|
line = genLine logstatus loginfo c old
|
||||||
in do
|
in do
|
||||||
m <- insertNewStatus line $ logMap old
|
m <- insertNewStatus line $ logMap old
|
||||||
return $ buildLog $ mapLog m
|
return $ buildLog $ mapLog m
|
||||||
|
Annex.Branch.maybeChange ru file f onchange
|
||||||
|
|
||||||
genLine :: LogStatus -> LogInfo -> CandidateVectorClock -> [LogLine] -> LogLine
|
genLine :: LogStatus -> LogInfo -> CandidateVectorClock -> [LogLine] -> LogLine
|
||||||
genLine logstatus loginfo c old = LogLine c' logstatus loginfo
|
genLine logstatus loginfo c old = LogLine c' logstatus loginfo
|
||||||
|
|
|
@ -77,10 +77,16 @@ Planned schedule of work:
|
||||||
Listing process ID, thread ID, UUID, key, addition or removal
|
Listing process ID, thread ID, UUID, key, addition or removal
|
||||||
(done)
|
(done)
|
||||||
|
|
||||||
|
Add to reposizes db a table for sizechanges. This has for each UUID
|
||||||
|
a rolling total which is the total size changes that have accumulated
|
||||||
|
since the last update of the reposizes table.
|
||||||
|
So adding the reposizes table to sizechanges gives the current
|
||||||
|
size.
|
||||||
|
|
||||||
Make checking the balanced preferred content limit record a
|
Make checking the balanced preferred content limit record a
|
||||||
live update in the table (done)
|
live update in the table (done)
|
||||||
|
|
||||||
... and use other live updates in making its decision
|
... and use other live updates and sizechanges in making its decision
|
||||||
|
|
||||||
Note: This will only work when preferred content is being checked.
|
Note: This will only work when preferred content is being checked.
|
||||||
If a git-annex copy without --auto is run, for example, it won't
|
If a git-annex copy without --auto is run, for example, it won't
|
||||||
|
@ -92,33 +98,19 @@ Planned schedule of work:
|
||||||
same time, so each thread always sees a consistent picture of what is
|
same time, so each thread always sees a consistent picture of what is
|
||||||
happening. Use locking as necessary.
|
happening. Use locking as necessary.
|
||||||
|
|
||||||
In the unlikely event that one thread of a process is storing a key and
|
When updating location log for a key, when there is actually a change,
|
||||||
another thread is dropping the same key from the same uuid, at the same
|
update the db, remove the live update (done) and update the sizechanges
|
||||||
time, reconcile somehow. How? Or is this perhaps something that cannot
|
table in the same transaction.
|
||||||
happen? Could just record the liveupdate for one, and not for the
|
|
||||||
other.
|
|
||||||
|
|
||||||
Also keep an in-memory cache of the live updates being performed by
|
Two concurrent processes might both start the same action, eg dropping
|
||||||
the current process. For use in location log update as follows..
|
a key, and both succeed, and so both update the location log. One needs
|
||||||
|
to update the log and the sizechanges table. The other needs to see
|
||||||
Make updating location log for a key that is in the in-memory cache
|
that it has no actual change to report, and so avoid updating the
|
||||||
of the live update table update the db, removing it from that table,
|
location log (already the case) and avoid updating the sizechanges
|
||||||
and updating the in-memory reposizes. (done)
|
table. (done)
|
||||||
|
|
||||||
Make updading location log have locking to make sure redundant
|
|
||||||
information is never visible:
|
|
||||||
Take lock, journal update, remove from live update table.
|
|
||||||
|
|
||||||
Detect when an upload (or drop) fails, and remove from the live
|
Detect when an upload (or drop) fails, and remove from the live
|
||||||
update table and in-memory cache. (done)
|
update table. (done)
|
||||||
|
|
||||||
Have a counter in the reposizes table that is updated on write. This
|
|
||||||
can be used to quickly determine if it has changed. On every check of
|
|
||||||
balanced preferred content, check the counter, and if it's been changed
|
|
||||||
by another process, re-run calcRepoSizes. This would be expensive, but
|
|
||||||
it would only happen when another process is running at the same time.
|
|
||||||
The counter could also be a per-UUID counter, so two processes
|
|
||||||
operating on different remotes would not have overhead.
|
|
||||||
|
|
||||||
When loading the live update table, check if PIDs in it are still
|
When loading the live update table, check if PIDs in it are still
|
||||||
running (and are still git-annex), and if not, remove stale entries
|
running (and are still git-annex), and if not, remove stale entries
|
||||||
|
@ -153,24 +145,6 @@ Planned schedule of work:
|
||||||
|
|
||||||
* Still implementing LiveUpdate. Check for TODO XXX markers
|
* Still implementing LiveUpdate. Check for TODO XXX markers
|
||||||
|
|
||||||
* Could two processes both doing the same operation end up both
|
|
||||||
calling successfullyFinishedLiveSizeChange with the same repo uuid and
|
|
||||||
key? If so, the rolling total would get out of wack.
|
|
||||||
|
|
||||||
Logs.Location.logChange only calls updateRepoSize when the presence
|
|
||||||
actually changed. So if one process does something and then the other
|
|
||||||
process also does the same thing (eg both drop), the second process
|
|
||||||
will see what the first process recorded, and won't update the size
|
|
||||||
redundantly.
|
|
||||||
|
|
||||||
But: What if they're running at the same time? It seems
|
|
||||||
likely that Annex.Branch.maybeChange does not handle that in a way
|
|
||||||
that will guarantee this doesn't happen. Does anything else guarantee
|
|
||||||
it?
|
|
||||||
|
|
||||||
Can additional locking be added to avoid it? Probably, but it
|
|
||||||
will add overhead and so should be avoided in the NoLiveUpdate case.
|
|
||||||
|
|
||||||
* In the case where a copy to a remote fails (due eg to annex.diskreserve),
|
* In the case where a copy to a remote fails (due eg to annex.diskreserve),
|
||||||
the LiveUpdate thread can not get a chance to catch its exception when
|
the LiveUpdate thread can not get a chance to catch its exception when
|
||||||
the LiveUpdate is gced, before git-annex exits. In this case, the
|
the LiveUpdate is gced, before git-annex exits. In this case, the
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue