LiveUpdate db updates working

I've tested the behavior of the thread that waits for the LiveUpdate to
be finished, and it does get signaled and exit cleanly when the
LiveUpdate is GCed instead.

Made finishedLiveUpdate wait for the thread to finish updating the
database.

There is a case where GC doesn't happen in time and the database is left
with a live update recorded in it. This should not be a problem as such
stale data can also happen when interrupted and will need to be detected
when loading the database.

Balanced preferred content expressions now call startLiveUpdate.
This commit is contained in:
Joey Hess 2024-08-24 11:49:58 -04:00
parent 84d1bb746b
commit 2f20b939b7
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 55 additions and 38 deletions

View file

@ -21,7 +21,10 @@ import qualified Data.Set as S
updateRepoSize :: LiveUpdate -> UUID -> Key -> LogStatus -> Annex ()
updateRepoSize lu u k s = do
-- XXX call finishedLiveUpdate
-- TODO update reposizes db
-- FIXME locking so the liveupdate is remove in the same
-- transaction that updates reposizes and the db too.
liftIO $ finishedLiveUpdate lu u k sc
rsv <- Annex.getRead Annex.reposizes
liftIO (takeMVar rsv) >>= \case
Nothing -> liftIO (putMVar rsv Nothing)
@ -31,10 +34,10 @@ updateRepoSize lu u k s = do
u sizemap
liftIO $ putMVar rsv (Just sizemap')
where
f = case s of
InfoPresent -> addKeyRepoSize
InfoMissing -> removeKeyRepoSize
InfoDead -> removeKeyRepoSize
(sc, f) = case s of
InfoPresent -> (AddingKey, addKeyRepoSize)
InfoMissing -> (RemovingKey, removeKeyRepoSize)
InfoDead -> (RemovingKey, removeKeyRepoSize)
addKeyRepoSize :: Key -> Maybe RepoSize -> Maybe RepoSize
addKeyRepoSize k mrs = case mrs of
@ -62,40 +65,48 @@ prepareLiveUpdate mu k sc = do
u <- maybe getUUID pure mu
startv <- liftIO newEmptyMVar
donev <- liftIO newEmptyMVar
void $ liftIO $ forkIO $ waitstart startv donev h u
return (LiveUpdate startv donev)
finishv <- liftIO newEmptyMVar
void $ liftIO $ forkIO $ waitstart startv donev finishv h u
return (LiveUpdate startv donev finishv)
where
{- Wait for startLiveUpdate, or for the LiveUpdate to get garbage
- collected in the case where it is never going to start. -}
waitstart startv donev h u = tryNonAsync (takeMVar startv) >>= \case
waitstart startv donev finishv h u = tryNonAsync (takeMVar startv) >>= \case
Right _ -> do
{- Deferring updating the database until here
- avoids overhead except in cases where preferred
- content expressions need live updates. -}
Db.startingLiveSizeChange h u k sc
waitdone donev h u
waitdone donev finishv h u
Left _ -> noop
{- Wait for finishedLiveUpdate to be called, or for the LiveUpdate to
- get garbage collected in the case where the change didn't
- actually happen. -}
waitdone donev h u = tryNonAsync (takeMVar donev) >>= \case
-- TODO if succeeded == True, need to update RepoSize db
waitdone donev finishv h u = tryNonAsync (takeMVar donev) >>= \case
-- TODO need to update RepoSize db
-- in same transaction as Db.finishedLiveSizeChange
Right (succeeded, u', k', sc')
| u' == u && k' == k && sc' == sc -> done h u
Right (u', k', sc')
| u' == u && k' == k && sc' == sc -> do
done h u
putMVar finishv ()
-- This can happen when eg, storing to a cluster
-- causes fanout and so this is called with
-- other UUIDs.
| otherwise -> waitdone donev h u
| otherwise -> waitdone donev finishv h u
Left _ -> done h u
done h u = Db.finishedLiveSizeChange h u k sc
-- Called when a preferred content check indicates that a live update is
-- needed. Can be called more than once.
startLiveUpdate :: LiveUpdate -> Annex ()
startLiveUpdate (LiveUpdate startv _donev) =
startLiveUpdate (LiveUpdate startv _donev _finishv) =
liftIO $ void $ tryPutMVar startv ()
startLiveUpdate NoLiveUpdate = noop
finishedLiveUpdate :: LiveUpdate -> Bool -> UUID -> Key -> SizeChange -> IO ()
finishedLiveUpdate (LiveUpdate _startv donev) succeeded u k sc =
putMVar donev (succeeded, u, k, sc)
finishedLiveUpdate NoLiveUpdate _ _ _ _ = noop
finishedLiveUpdate :: LiveUpdate -> UUID -> Key -> SizeChange -> IO ()
finishedLiveUpdate (LiveUpdate _startv donev finishv) u k sc = do
tryNonAsync (putMVar donev (u, k, sc)) >>= \case
Right () -> void $ tryNonAsync $ readMVar finishv
Left _ -> noop
finishedLiveUpdate NoLiveUpdate _ _ _ = noop

View file

@ -31,7 +31,7 @@ import Types.Messages
{- Parses input arguments, finds a matching Command, and runs it. -}
dispatch :: Bool -> Bool -> CmdParams -> [Command] -> [(String, String)] -> IO Git.Repo -> String -> String -> IO ()
dispatch addonok fuzzyok allargs allcmds fields getgitrepo progname progdesc =
dispatch addonok fuzzyok allargs allcmds fields getgitrepo progname progdesc = do
go addonok allcmds $
findAddonCommand subcommandname >>= \case
Just c -> go addonok (c:allcmds) noop

View file

@ -18,6 +18,7 @@ import Annex.WorkTree
import Annex.UUID
import Annex.Magic
import Annex.RepoSize
import Annex.RepoSize.LiveUpdate
import Logs.MaxSize
import Annex.Link
import Types.Link
@ -598,7 +599,7 @@ limitFullyBalanced :: Maybe UUID -> Annex GroupMap -> MkLimit Annex
limitFullyBalanced = limitFullyBalanced' "fullybalanced"
limitFullyBalanced' :: String -> Maybe UUID -> Annex GroupMap -> MkLimit Annex
limitFullyBalanced' = limitFullyBalanced'' $ \n key candidates -> do
limitFullyBalanced' = limitFullyBalanced'' $ \lu n key candidates -> do
maxsizes <- getMaxSizes
sizemap <- getRepoSizes False
threshhold <- annexFullyBalancedThreshhold <$> Annex.getGitConfig
@ -632,7 +633,7 @@ repoHasSpace keysize inrepo (RepoSize reposize) (MaxSize maxsize)
reposize + keysize <= maxsize
limitFullyBalanced''
:: (Int -> Key -> S.Set UUID -> Annex (S.Set UUID))
:: (LiveUpdate -> Int -> Key -> S.Set UUID -> Annex (S.Set UUID))
-> String
-> Maybe UUID
-> Annex GroupMap
@ -650,7 +651,7 @@ limitFullyBalanced'' filtercandidates termname mu getgroupmap want =
getgroupmap (toGroup s) n want
limitFullyBalanced'''
:: (Int -> Key -> S.Set UUID -> Annex (S.Set UUID))
:: (LiveUpdate -> Int -> Key -> S.Set UUID -> Annex (S.Set UUID))
-> String
-> Maybe UUID
-> Annex GroupMap
@ -662,13 +663,17 @@ limitFullyBalanced''' filtercandidates termname mu getgroupmap g n want = Right
gm <- getgroupmap
let groupmembers = fromMaybe S.empty $
M.lookup g (uuidsByGroup gm)
candidates <- filtercandidates n key groupmembers
return $ if S.null candidates
-- TODO locking for liveupdate
candidates <- filtercandidates lu n key groupmembers
let wanted = if S.null candidates
then False
else case (mu, M.lookup g (balancedPickerByGroup gm)) of
(Just u, Just picker) ->
u `elem` picker candidates key n
_ -> False
when wanted $
startLiveUpdate lu
return wanted
, matchNeedsFileName = False
, matchNeedsFileContent = False
, matchNeedsKey = True
@ -685,7 +690,7 @@ limitFullySizeBalanced :: Maybe UUID -> Annex GroupMap -> MkLimit Annex
limitFullySizeBalanced = limitFullySizeBalanced' "fullysizebalanced"
limitFullySizeBalanced' :: String -> Maybe UUID -> Annex GroupMap -> MkLimit Annex
limitFullySizeBalanced' = limitFullyBalanced'' $ \n key candidates -> do
limitFullySizeBalanced' = limitFullyBalanced'' $ \lu n key candidates -> do
maxsizes <- getMaxSizes
sizemap <- getRepoSizes False
filterCandidatesFullySizeBalanced maxsizes sizemap n key candidates

View file

@ -27,19 +27,11 @@ newtype MaxSize = MaxSize { fromMaxSize :: Integer }
-- Used when an action is in progress that will change the current size of
-- a repository.
--
-- The live update has been recorded as starting, and filling the MVar with
-- the correct UUID, Key, and SizeChange will record the live update
-- as complete. The Bool should be True when the action successfully
-- added/removed the key from the repository.
--
-- If the MVar gets garbage collected before it is filled, the live update
-- will be removed.
--
-- This allows other concurrent changes to the same repository take
-- the changes to its size into account. If NoLiveUpdate is used, it
-- prevents that.
data LiveUpdate
= LiveUpdate (MVar ()) (MVar (Bool, UUID, Key, SizeChange))
= LiveUpdate (MVar ()) (MVar (UUID, Key, SizeChange)) (MVar ())
| NoLiveUpdate
data SizeChange = AddingKey | RemovingKey

View file

@ -90,7 +90,8 @@ Planned schedule of work:
In the unlikely event that one thread of a process is storing a key and
another thread is dropping the same key from the same uuid, at the same
time, reconcile somehow. How? Or is this perhaps something that cannot
happen?
happen? Could just record the liveupdate for one, and not for the
other.
Also keep an in-memory cache of the live updates being performed by
the current process. For use in location log update as follows..
@ -144,8 +145,16 @@ Planned schedule of work:
* Still implementing LiveUpdate. Check for TODO XXX markers
* Check all uses of NoLiveUpdate to see if a live update can be started and
performed there.
* In the case where a copy to a remote fails (due eg to annex.diskreserve),
the LiveUpdate thread can not get a chance to catch its exception when
the LiveUpdate is gced, before git-annex exits. In this case, the
database is left with some stale entries in the live update table.
This is not a big problem because the same can happen when the process is
interrupted. Still it would be cleaner for this not to happen. Is there
any way to prevent it? Waiting 1 GC tick before exiting would do it,
I'd think, but I tried manually doing a performGC at git-annex shutdown
and it didn't help.
* The assistant is using NoLiveUpdate, but it should be posssible to plumb
a LiveUpdate through it from preferred content checking to location log