closing in on finishing live reposizes
Fixed successfullyFinishedLiveSizeChange to not update the rolling total when a redundant change is in RecentChanges. Made setRepoSizes clear RecentChanges that are no longer needed. It might be possible to clear those earlier, this is only a convenient point to do it. The reason it's safe to clear RecentChanges here is that, in order for a live update to call successfullyFinishedLiveSizeChange, a change must be made to a location log. If a RecentChange gets cleared, and just after that a new live update is started, making the same change, the location log has already been changed (since the RecentChange exists), and so when the live update succeeds, it won't call successfullyFinishedLiveSizeChange. The reason it doesn't clear RecentChanges when there is a reduntant live update is because I didn't want to think through whether or not all races are avoided in that case. The rolling total in SizeChanges is never cleared. Instead, calcJournalledRepoSizes gets the initial value of it, and then getLiveRepoSizes subtracts that initial value from the current value. Since the rolling total can only be updated by updateRepoSize, which is called with the journal locked, locking the journal in calcJournalledRepoSizes ensures that the database does not change while reading the journal.
This commit is contained in:
parent
23d44aa4aa
commit
4d2f95853d
6 changed files with 165 additions and 248 deletions
|
@ -25,10 +25,11 @@ module Database.RepoSize (
|
|||
closeDb,
|
||||
getRepoSizes,
|
||||
setRepoSizes,
|
||||
estimateLiveRepoSizes,
|
||||
startingLiveSizeChange,
|
||||
successfullyFinishedLiveSizeChange,
|
||||
removeStaleLiveSizeChange,
|
||||
recordedRepoOffsets,
|
||||
liveRepoOffsets,
|
||||
) where
|
||||
|
||||
import Annex.Common
|
||||
|
@ -164,6 +165,7 @@ setRepoSizes (RepoSizeHandle (Just h)) sizemap branchcommitsha =
|
|||
unsetRepoSize u
|
||||
forM_ (M.toList sizemap) $
|
||||
uncurry setRepoSize
|
||||
clearRecentChanges
|
||||
recordAnnexBranchCommit branchcommitsha
|
||||
setRepoSizes (RepoSizeHandle Nothing) _ _ = noop
|
||||
|
||||
|
@ -192,15 +194,29 @@ startingLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
|||
]
|
||||
startingLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||
|
||||
{- A live size change has successfully finished.
|
||||
-
|
||||
- Update the rolling total, add as a recent change,
|
||||
- and remove the live change in the same transaction.
|
||||
-
|
||||
- But, it's possible that the same change has been done by two
|
||||
- different processes or threads. If there is a matching recent change,
|
||||
- then this one is redundant, so remove it without updating the rolling
|
||||
- total.
|
||||
-}
|
||||
successfullyFinishedLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||
successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||
H.commitDb h $ do
|
||||
-- Update the rolling total, add as a recent change,
|
||||
-- and remove the live change in the same transaction.
|
||||
getRecentChange u k >>= \case
|
||||
Just sc' | sc == sc' -> remove
|
||||
_ -> go
|
||||
where
|
||||
go = do
|
||||
rollingtotal <- getSizeChangeFor u
|
||||
setSizeChangeFor u (updateRollingTotal rollingtotal sc k)
|
||||
addRecentChange u k sc
|
||||
removeLiveSizeChange u k sc sid
|
||||
remove
|
||||
remove = removeLiveSizeChange u k sc sid
|
||||
successfullyFinishedLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||
|
||||
updateRollingTotal :: FileSize -> SizeChange -> Key -> FileSize
|
||||
|
@ -231,6 +247,13 @@ getLiveSizeChanges = M.fromListWith (++) . map conv <$> selectList [] []
|
|||
let LiveSizeChanges u k sid sc = entityVal entity
|
||||
in (u, [(k, (sc, sid))])
|
||||
|
||||
getLiveSizeChanges' :: SqlPersistM [(UUID, Key, SizeChange)]
|
||||
getLiveSizeChanges' = map conv <$> selectList [] []
|
||||
where
|
||||
conv entity =
|
||||
let LiveSizeChanges u k _sid sc = entityVal entity
|
||||
in (u, k, sc)
|
||||
|
||||
getSizeChanges :: SqlPersistM (M.Map UUID FileSize)
|
||||
getSizeChanges = M.fromList . map conv <$> selectList [] []
|
||||
where
|
||||
|
@ -251,7 +274,7 @@ setSizeChangeFor u sz =
|
|||
(UniqueRepoRollingTotal u)
|
||||
(SizeChanges u sz)
|
||||
[SizeChangesRollingtotal =. sz]
|
||||
|
||||
|
||||
addRecentChange :: UUID -> Key -> SizeChange -> SqlPersistM ()
|
||||
addRecentChange u k sc =
|
||||
void $ upsertBy
|
||||
|
@ -269,19 +292,49 @@ getRecentChange u k = do
|
|||
(s:_) -> Just $ recentChangesChange $ entityVal s
|
||||
[] -> Nothing
|
||||
|
||||
{- Gets the sizes of Repos as of a commit to the git-annex branch
|
||||
- (which is not necessarily the current commit), adjusted with all
|
||||
- live changes that have happened since then or are happening now.
|
||||
getRecentChanges :: SqlPersistM [(UUID, Key, SizeChange)]
|
||||
getRecentChanges = map conv <$> selectList [] []
|
||||
where
|
||||
conv entity =
|
||||
let RecentChanges u k sc = entityVal entity
|
||||
in (u, k, sc)
|
||||
|
||||
{- Clears recent changes, except when there is a live change that is
|
||||
- redundant with a recent change. -}
|
||||
clearRecentChanges :: SqlPersistM ()
|
||||
clearRecentChanges = do
|
||||
live <- getLiveSizeChanges'
|
||||
if null live
|
||||
then deleteWhere ([] :: [Filter RecentChanges])
|
||||
else do
|
||||
let liveset = S.fromList live
|
||||
rcs <- getRecentChanges
|
||||
forM_ rcs $ \rc@(u, k, sc) ->
|
||||
when (S.notMember rc liveset) $
|
||||
deleteWhere
|
||||
[ RecentChangesRepo ==. u
|
||||
, RecentChangesKey ==. k
|
||||
, RecentChangesChange ==. sc
|
||||
]
|
||||
|
||||
{- Gets the recorded offsets to sizes of Repos, not including live
|
||||
- changes. -}
|
||||
recordedRepoOffsets :: RepoSizeHandle -> IO (M.Map UUID SizeOffset)
|
||||
recordedRepoOffsets (RepoSizeHandle (Just h)) =
|
||||
M.map SizeOffset <$> H.queryDb h getSizeChanges
|
||||
recordedRepoOffsets (RepoSizeHandle Nothing) = pure mempty
|
||||
|
||||
{- Gets the offsets to sizes of Repos, including all live changes that
|
||||
- are happening now.
|
||||
-
|
||||
- This does not necessarily include all changes that have been journalled,
|
||||
- This does not necessarily include all changes that have been made,
|
||||
- only ones that had startingLiveSizeChange called for them will be
|
||||
- included. Also live changes or recent changes that were to a UUID not in
|
||||
- the RepoSizes map are not included.
|
||||
- included.
|
||||
-
|
||||
- In the unlikely case where two live changes are occurring, one
|
||||
- adding a key and the other removing the same key, the one
|
||||
- adding the key is used, in order to err on the side of a larger
|
||||
- RepoSize.
|
||||
- repository size.
|
||||
-
|
||||
- In the case where the same live change is recorded by two different
|
||||
- processes or threads, the first to complete will record it as a recent
|
||||
|
@ -291,24 +344,14 @@ getRecentChange u k = do
|
|||
- This is only expensive when there are a lot of live changes happening at
|
||||
- the same time.
|
||||
-}
|
||||
estimateLiveRepoSizes :: RepoSizeHandle -> IO (Maybe (M.Map UUID RepoSize, Sha))
|
||||
estimateLiveRepoSizes (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||
getAnnexBranchCommit >>= \case
|
||||
Just annexbranchsha -> do
|
||||
sizechanges <- getSizeChanges
|
||||
livechanges <- getLiveSizeChanges
|
||||
reposizes <- getRepoSizes'
|
||||
m <- M.fromList <$> forM reposizes
|
||||
(go sizechanges livechanges)
|
||||
return (Just (m, annexbranchsha))
|
||||
Nothing -> return Nothing
|
||||
liveRepoOffsets :: RepoSizeHandle -> IO (M.Map UUID SizeOffset)
|
||||
liveRepoOffsets (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||
sizechanges <- getSizeChanges
|
||||
livechanges <- getLiveSizeChanges
|
||||
let us = nub (M.keys sizechanges ++ M.keys livechanges)
|
||||
M.fromList <$> forM us (go sizechanges livechanges)
|
||||
where
|
||||
go
|
||||
:: M.Map UUID FileSize
|
||||
-> M.Map UUID [(Key, (SizeChange, SizeChangeId))]
|
||||
-> (UUID, RepoSize)
|
||||
-> SqlPersistM (UUID, RepoSize)
|
||||
go sizechanges livechanges (u, RepoSize startsize) = do
|
||||
go sizechanges livechanges u = do
|
||||
let livechangesbykey =
|
||||
M.fromListWith (++) $
|
||||
map (\(k, v) -> (k, [v])) $
|
||||
|
@ -321,18 +364,13 @@ estimateLiveRepoSizes (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
|||
(\t (k, sc) -> updateRollingTotal t sc k)
|
||||
(fromMaybe 0 (M.lookup u sizechanges))
|
||||
livechanges'
|
||||
return (u, RepoSize (startsize + sizechange))
|
||||
return (u, SizeOffset sizechange)
|
||||
|
||||
combinelikelivechanges =
|
||||
S.elems
|
||||
. S.fromList
|
||||
. map (\(k, (sc, _)) -> (k, sc))
|
||||
|
||||
nonredundantlivechange
|
||||
:: M.Map Key [(SizeChange, SizeChangeId)]
|
||||
-> UUID
|
||||
-> (Key, (SizeChange, SizeChangeId))
|
||||
-> SqlPersistM Bool
|
||||
nonredundantlivechange livechangesbykey u (k, (sc, cid))
|
||||
| null (competinglivechanges livechangesbykey k sc cid) =
|
||||
getRecentChange u k >>= pure . \case
|
||||
|
@ -340,14 +378,8 @@ estimateLiveRepoSizes (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
|||
Just sc' -> sc /= sc'
|
||||
| otherwise = pure False
|
||||
|
||||
competinglivechanges
|
||||
:: M.Map Key [(SizeChange, SizeChangeId)]
|
||||
-> Key
|
||||
-> SizeChange
|
||||
-> SizeChangeId
|
||||
-> [(SizeChange, SizeChangeId)]
|
||||
competinglivechanges livechangesbykey k RemovingKey cid =
|
||||
filter (\(sc', cid') -> cid /= cid' && sc' == AddingKey)
|
||||
(fromMaybe [] $ M.lookup k livechangesbykey)
|
||||
competinglivechanges _ _ AddingKey _ = []
|
||||
estimateLiveRepoSizes (RepoSizeHandle Nothing) = return Nothing
|
||||
liveRepoOffsets (RepoSizeHandle Nothing) = pure mempty
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue