started work on getLiveRepoSizes
Doesn't quite compile
This commit is contained in:
parent
db89e39df6
commit
21608716bd
3 changed files with 186 additions and 36 deletions
|
@ -94,7 +94,7 @@ prepareLiveUpdate mu k sc = do
|
|||
| otherwise -> waitdone donev finishv h u cid
|
||||
Right Nothing -> abandoned h u cid
|
||||
Left _ -> abandoned h u cid
|
||||
abandoned h u cid = Db.staleLiveSizeChange h u k sc cid
|
||||
abandoned h u cid = Db.removeStaleLiveSizeChange h u k sc cid
|
||||
|
||||
-- Called when a preferred content check indicates that a live update is
|
||||
-- needed. Can be called more than once on the same LiveUpdate.
|
||||
|
|
|
@ -25,11 +25,10 @@ module Database.RepoSize (
|
|||
closeDb,
|
||||
getRepoSizes,
|
||||
setRepoSizes,
|
||||
getLiveSizeChanges,
|
||||
getLiveRepoSizes,
|
||||
startingLiveSizeChange,
|
||||
successfullyFinishedLiveSizeChange,
|
||||
staleLiveSizeChange,
|
||||
getSizeChanges,
|
||||
removeStaleLiveSizeChange,
|
||||
) where
|
||||
|
||||
import Annex.Common
|
||||
|
@ -46,7 +45,8 @@ import qualified Utility.RawFilePath as R
|
|||
import Database.Persist.Sql hiding (Key)
|
||||
import Database.Persist.TH
|
||||
import qualified System.FilePath.ByteString as P
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Data.Set as S
|
||||
|
||||
share [mkPersist sqlSettings, mkMigrate "migrateRepoSizes"] [persistLowerCase|
|
||||
-- Corresponds to location log information from the git-annex branch.
|
||||
|
@ -73,6 +73,13 @@ SizeChanges
|
|||
repo UUID
|
||||
rollingtotal FileSize
|
||||
UniqueRepoRollingTotal repo
|
||||
-- The most recent size changes that were removed from LiveSizeChanges
|
||||
-- upon successful completion.
|
||||
RecentChanges
|
||||
repo UUID
|
||||
key Key
|
||||
change SizeChange
|
||||
UniqueRecentChange repo key
|
||||
|]
|
||||
|
||||
{- Gets a handle to the database. It's cached in Annex state. -}
|
||||
|
@ -115,19 +122,21 @@ closeDb :: RepoSizeHandle -> Annex ()
|
|||
closeDb (RepoSizeHandle (Just h)) = liftIO $ H.closeDb h
|
||||
closeDb (RepoSizeHandle Nothing) = noop
|
||||
|
||||
{- Gets the sizes of repositories as of a commit to the git-annex
|
||||
- branch. -}
|
||||
getRepoSizes :: RepoSizeHandle -> IO (M.Map UUID RepoSize, Maybe Sha)
|
||||
getRepoSizes (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||
sizemap <- M.fromList . map conv <$> getRepoSizes'
|
||||
sizemap <- M.fromList <$> getRepoSizes'
|
||||
annexbranchsha <- getAnnexBranchCommit
|
||||
return (sizemap, annexbranchsha)
|
||||
getRepoSizes (RepoSizeHandle Nothing) = return (mempty, Nothing)
|
||||
|
||||
getRepoSizes' :: SqlPersistM [(UUID, RepoSize)]
|
||||
getRepoSizes' = map conv <$> selectList [] []
|
||||
where
|
||||
conv entity =
|
||||
let RepoSizes u sz = entityVal entity
|
||||
in (u, RepoSize sz)
|
||||
getRepoSizes (RepoSizeHandle Nothing) = return (mempty, Nothing)
|
||||
|
||||
getRepoSizes' :: SqlPersistM [Entity RepoSizes]
|
||||
getRepoSizes' = selectList [] []
|
||||
|
||||
getAnnexBranchCommit :: SqlPersistM (Maybe Sha)
|
||||
getAnnexBranchCommit = do
|
||||
|
@ -149,8 +158,8 @@ getAnnexBranchCommit = do
|
|||
setRepoSizes :: RepoSizeHandle -> M.Map UUID RepoSize -> Sha -> IO ()
|
||||
setRepoSizes (RepoSizeHandle (Just h)) sizemap branchcommitsha =
|
||||
H.commitDb h $ do
|
||||
l <- getRepoSizes'
|
||||
forM_ (map entityVal l) $ \(RepoSizes u _) ->
|
||||
l <- getRepoSizes'
|
||||
forM_ (map fst l) $ \u ->
|
||||
unless (M.member u sizemap) $
|
||||
unsetRepoSize u
|
||||
forM_ (M.toList sizemap) $
|
||||
|
@ -173,8 +182,6 @@ recordAnnexBranchCommit branchcommitsha = do
|
|||
deleteWhere ([] :: [Filter AnnexBranch])
|
||||
void $ insertUniqueFast $ AnnexBranch $ toSSha branchcommitsha
|
||||
|
||||
{- If there is already a size change for the same UUID, Key,
|
||||
- and SizeChangeId, it is overwritten with the new size change. -}
|
||||
startingLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||
startingLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||
H.commitDb h $ void $ upsertBy
|
||||
|
@ -188,10 +195,11 @@ startingLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
|||
successfullyFinishedLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||
successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||
H.commitDb h $ do
|
||||
-- Update the rolling total and remove the live change in the
|
||||
-- same transaction.
|
||||
-- Update the rolling total, add as a recent change,
|
||||
-- and remove the live change in the same transaction.
|
||||
rollingtotal <- getSizeChangeFor u
|
||||
setSizeChangeFor u (updaterollingtotal rollingtotal)
|
||||
setSizeChangeFor u (updateRollingTotal rollingtotal sc k)
|
||||
addRecentChange u k sc
|
||||
removeLiveSizeChange u k sc sid
|
||||
where
|
||||
updaterollingtotal t = case sc of
|
||||
|
@ -200,10 +208,17 @@ successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
|||
ksz = fromMaybe 0 $ fromKey keySize k
|
||||
successfullyFinishedLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||
|
||||
staleLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||
staleLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||
updateRollingTotal :: FileSize -> SizeChange -> Key -> FileSize
|
||||
updateRollingTotal t sc k = case sc of
|
||||
AddingKey -> t + ksz
|
||||
RemovingKey -> t - ksz
|
||||
where
|
||||
ksz = fromMaybe 0 $ fromKey keySize k
|
||||
|
||||
removeStaleLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||
removeStaleLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||
H.commitDb h $ removeLiveSizeChange u k sc sid
|
||||
staleLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||
removeStaleLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||
|
||||
removeLiveSizeChange :: UUID -> Key -> SizeChange -> SizeChangeId -> SqlPersistM ()
|
||||
removeLiveSizeChange u k sc sid =
|
||||
|
@ -214,25 +229,15 @@ removeLiveSizeChange u k sc sid =
|
|||
, LiveSizeChangesChange ==. sc
|
||||
]
|
||||
|
||||
getLiveSizeChanges :: RepoSizeHandle -> IO (M.Map UUID (Key, SizeChange, SizeChangeId))
|
||||
getLiveSizeChanges (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||
m <- M.fromList . map conv <$> getLiveSizeChanges'
|
||||
return m
|
||||
getLiveSizeChanges :: SqlPersistM (M.Map UUID [(Key, (SizeChange, SizeChangeId))])
|
||||
getLiveSizeChanges = M.fromListWith (++) . map conv <$> selectList [] []
|
||||
where
|
||||
conv entity =
|
||||
let LiveSizeChanges u k sid sc = entityVal entity
|
||||
in (u, (k, sc, sid))
|
||||
getLiveSizeChanges (RepoSizeHandle Nothing) = return mempty
|
||||
in (u, [(k, (sc, sid))])
|
||||
|
||||
getLiveSizeChanges' :: SqlPersistM [Entity LiveSizeChanges]
|
||||
getLiveSizeChanges' = selectList [] []
|
||||
|
||||
getSizeChanges :: RepoSizeHandle -> IO (M.Map UUID FileSize)
|
||||
getSizeChanges (RepoSizeHandle (Just h)) = H.queryDb h getSizeChanges'
|
||||
getSizeChanges (RepoSizeHandle Nothing) = return mempty
|
||||
|
||||
getSizeChanges' :: SqlPersistM (M.Map UUID FileSize)
|
||||
getSizeChanges' = M.fromList . map conv <$> selectList [] []
|
||||
getSizeChanges :: SqlPersistM (M.Map UUID FileSize)
|
||||
getSizeChanges = M.fromList . map conv <$> selectList [] []
|
||||
where
|
||||
conv entity =
|
||||
let SizeChanges u n = entityVal entity
|
||||
|
@ -251,3 +256,96 @@ setSizeChangeFor u sz =
|
|||
(UniqueRepoRollingTotal u)
|
||||
(SizeChanges u sz)
|
||||
[SizeChangesRollingtotal =. sz]
|
||||
|
||||
addRecentChange :: UUID -> Key -> SizeChange -> SqlPersistM ()
|
||||
addRecentChange u k sc =
|
||||
void $ upsertBy
|
||||
(UniqueRecentChange u k)
|
||||
(RecentChanges u k sc)
|
||||
[RecentChangesChange =. sc]
|
||||
|
||||
getRecentChange :: UUID -> Key -> SqlPersistM (Maybe SizeChange)
|
||||
getRecentChange u k = do
|
||||
l <- selectList
|
||||
[ RecentChangesRepo ==. u
|
||||
, RecentChangesKey ==. k
|
||||
] []
|
||||
return $ case l of
|
||||
(s:_) -> Just $ recentChangesChange $ entityVal s
|
||||
[] -> Nothing
|
||||
|
||||
{- Gets the sizes of Repos as of a commit to the git-annex branch
|
||||
- (which is not necessarily the current commit), adjusted with all
|
||||
- live changes that have happened since then or are happening now.
|
||||
-
|
||||
- This does not necessarily include all changes that have been journalled,
|
||||
- only ones that had startingLiveSizeChange called for them will be
|
||||
- included. Also live changes or recent changes that were to a UUID not in
|
||||
- the RepoSizes map are not included.
|
||||
-
|
||||
- In the unlikely case where two live changes are occurring, one
|
||||
- adding a key and the other removing the same key, the one
|
||||
- adding the key is used, in order to err on the side of a larger
|
||||
- RepoSize.
|
||||
-
|
||||
- Omits live changes that are redundant due to a recent change already
|
||||
- being recorded for the same change.
|
||||
-
|
||||
- This is only expensive when there are a lot of live changes happening at
|
||||
- the same time.
|
||||
-}
|
||||
getLiveRepoSizes :: RepoSizeHandle -> IO (M.Map UUID RepoSize, Maybe Sha)
|
||||
getLiveRepoSizes (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||
sizechanges <- getSizeChanges
|
||||
livechanges <- getLiveSizeChanges
|
||||
reposizes <- getRepoSizes'
|
||||
annexbranchsha <- getAnnexBranchCommit
|
||||
m <- M.fromList <$> forM reposizes (go sizechanges livechanges)
|
||||
return (m, annexbranchsha)
|
||||
where
|
||||
go
|
||||
:: M.Map UUID FileSize
|
||||
-> M.Map UUID [(Key, (SizeChange, SizeChangeId))]
|
||||
-> (UUID, RepoSize)
|
||||
-> SqlPersistM (UUID, RepoSize)
|
||||
go sizechanges livechanges (u, RepoSize startsize) = do
|
||||
let livechangesbykey =
|
||||
M.fromListWith (++) $ maybe [] (\v -> [v]) $
|
||||
M.lookup u livechanges
|
||||
livechanges' <- combinelikelivechanges <$>
|
||||
filterM (nonredundantlivechange livechangesbykey u)
|
||||
(fromMaybe [] $ M.lookup u livechanges)
|
||||
let sizechange = foldl'
|
||||
(\t (k, sc) -> updateRollingTotal t sc k)
|
||||
(fromMaybe 0 (M.lookup u sizechanges))
|
||||
livechanges'
|
||||
return (u, RepoSize (startsize + sizechange))
|
||||
|
||||
combinelikelivechanges =
|
||||
S.elems
|
||||
. S.fromList
|
||||
. map (\(k, (sc, _)) -> (k, sc))
|
||||
|
||||
nonredundantlivechange
|
||||
:: M.Map Key [(SizeChange, SizeChangeId)]
|
||||
-> UUID
|
||||
-> (Key, (SizeChange, SizeChangeId))
|
||||
-> SqlPersistM Bool
|
||||
nonredundantlivechange livechangesbykey u (k, (sc, cid))
|
||||
| null (competinglivechanges livechangesbykey k sc cid) =
|
||||
getRecentChange u k >>= pure . \case
|
||||
Nothing -> True
|
||||
Just sc' -> sc /= sc'
|
||||
| otherwise = pure False
|
||||
|
||||
competinglivechanges
|
||||
:: M.Map Key [(SizeChange, SizeChangeId)]
|
||||
-> Key
|
||||
-> SizeChange
|
||||
-> SizeChangeId
|
||||
-> [(SizeChange, SizeChangeId)]
|
||||
competinglivechanges livechangesbykey k RemovingKey cid =
|
||||
filter (\(sc', cid') -> cid /= cid' && sc' == AddingKey)
|
||||
(fromMaybe [] $ M.lookup k livechangesbykey)
|
||||
competinglivechanges _ _ AddingKey _ = []
|
||||
getLiveRepoSizes (RepoSizeHandle Nothing) = return mempty
|
||||
|
|
|
@ -100,7 +100,7 @@ Planned schedule of work:
|
|||
|
||||
When updating location log for a key, when there is actually a change,
|
||||
update the db, remove the live update (done) and update the sizechanges
|
||||
table in the same transaction.
|
||||
table in the same transaction (done).
|
||||
|
||||
Two concurrent processes might both start the same action, eg dropping
|
||||
a key, and both succeed, and so both update the location log. One needs
|
||||
|
@ -145,6 +145,48 @@ Planned schedule of work:
|
|||
|
||||
* Still implementing LiveUpdate. Check for TODO XXX markers
|
||||
|
||||
* Concurrency issue noted in commit db89e39df606b6ec292e0f1c3a7a60e317ac60f1
|
||||
|
||||
But: There will be a window where the redundant LiveUpdate is still
|
||||
visible in the db, and processes can see it, combine it with the
|
||||
rollingtotal, and arrive at the wrong size. This is a small window, but
|
||||
it still ought to be addressed. Unsure if it would always be safe to
|
||||
remove the redundant LiveUpdate? Consider the case where two drops and a
|
||||
get are all running concurrently somehow, and the order they finish is
|
||||
[drop, get, drop]. The second drop seems redundant to the first, but
|
||||
it would not be safe to remove it. While this seems unlikely, it's hard
|
||||
to rule out that a get and drop at different stages can both be running
|
||||
at the same time.
|
||||
|
||||
It also is possible for a redundant LiveUpdate to get added to the db
|
||||
just after the rollingtotal was updated. In this case, combining the LiveUpdate
|
||||
with the rollingtotal again yields the wrong reposize.
|
||||
|
||||
So is the rollingtotal doomed to not be accurate?
|
||||
|
||||
A separate table could be kept of recent updates. When combining a LiveUpdate
|
||||
with the rollingtotal to get a reposize, first check if the LiveUpdate is
|
||||
redundant given a recent update. When updating the RepoSizes table, clear the
|
||||
recent updates table and the rolling totals table (in the same transaction).
|
||||
This recent updates table could get fairly large, but only needs to be queried
|
||||
for each current LiveUpdate, of which there are not ususally many running.
|
||||
|
||||
When does a recent update mean a LiveUpdate is redundant? In the case of two drops,
|
||||
the second is clearly redundant. But what about two gets and a drop? In this
|
||||
case, after the first get, we don't know what order operations will
|
||||
happen in. So the fact that the first get is in the recent updates table
|
||||
should not make the second get be treated as redundant.
|
||||
|
||||
So, look up each LiveUpdate in the recent updates table. When the same
|
||||
operation is found there, look to see if there is any other LiveUpdate of
|
||||
the same key and uuid, but with a different SizeChange. Only when there is
|
||||
not is the LiveUpdate redundant.
|
||||
|
||||
What if the recent updates table contains a get and a drop of the same
|
||||
key. Now a get is running. Is it redundant? Perhaps the recent updates
|
||||
table needs timestamps. More simply, when adding a drop to the recent
|
||||
updates table, any existing get of the same key should be removed.
|
||||
|
||||
* In the case where a copy to a remote fails (due eg to annex.diskreserve),
|
||||
the LiveUpdate thread can not get a chance to catch its exception when
|
||||
the LiveUpdate is gced, before git-annex exits. In this case, the
|
||||
|
@ -156,6 +198,11 @@ Planned schedule of work:
|
|||
I'd think, but I tried manually doing a performGC at git-annex shutdown
|
||||
and it didn't help.
|
||||
|
||||
getLiveRepoSizes is an unfinished try at implementing the above.
|
||||
|
||||
* Something needs to empty SizeChanges and RecentChanges when
|
||||
setRepoSizes is called. While avoiding races.
|
||||
|
||||
* The assistant is using NoLiveUpdate, but it should be posssible to plumb
|
||||
a LiveUpdate through it from preferred content checking to location log
|
||||
updating.
|
||||
|
@ -165,6 +212,11 @@ Planned schedule of work:
|
|||
overLocationLogs. In the other path it does not, and this should be fixed
|
||||
for consistency and correctness.
|
||||
|
||||
* getLiveRepoSizes has a filterM getRecentChange over the live updates.
|
||||
This could be optimised to a single sql join. There are usually not many
|
||||
live updates, but sometimes there will be a great many recent changes,
|
||||
so it might be worth doing this optimisation.
|
||||
|
||||
## completed items for August's work on balanced preferred content
|
||||
|
||||
* Balanced preferred content basic implementation, including --rebalance
|
||||
|
|
Loading…
Reference in a new issue