rolling total of size changes in RepoSize database
When a live size change completes successfully, the same transaction that removes it from the database updates the rolling total for its repository. The idea is that when RepoSizes is read, SizeChanges will be as well, and cached locally. Any time a change is made, the local cache will be updated. So by comparing the local cache with the current SizeChanges, it can learn about size changes that were made by other processes. Then read the LiveSizeChanges, and add that in to get a live picture of the current sizes. Also added a SizeChangeId. This allows 2 different threads, or processes, to both record a live size change for the same repo and key, and update their own information without stepping on one-another's toes.
This commit is contained in:
parent
9188825a4d
commit
18f8d61f55
3 changed files with 128 additions and 30 deletions
|
@ -18,12 +18,10 @@ import Annex.UUID
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
|
import System.Process
|
||||||
|
|
||||||
updateRepoSize :: LiveUpdate -> UUID -> Key -> LogStatus -> Annex ()
|
updateRepoSize :: LiveUpdate -> UUID -> Key -> LogStatus -> Annex ()
|
||||||
updateRepoSize lu u k s = do
|
updateRepoSize lu u k s = do
|
||||||
-- TODO update reposizes db
|
|
||||||
-- FIXME locking so the liveupdate is remove in the same
|
|
||||||
-- transaction that updates reposizes and the db too.
|
|
||||||
liftIO $ finishedLiveUpdate lu u k sc
|
liftIO $ finishedLiveUpdate lu u k sc
|
||||||
rsv <- Annex.getRead Annex.reposizes
|
rsv <- Annex.getRead Annex.reposizes
|
||||||
liftIO (takeMVar rsv) >>= \case
|
liftIO (takeMVar rsv) >>= \case
|
||||||
|
@ -77,32 +75,35 @@ prepareLiveUpdate mu k sc = do
|
||||||
waitstart startv readyv donev finishv h u =
|
waitstart startv readyv donev finishv h u =
|
||||||
tryNonAsync (takeMVar startv) >>= \case
|
tryNonAsync (takeMVar startv) >>= \case
|
||||||
Right () -> do
|
Right () -> do
|
||||||
|
pid <- getCurrentPid
|
||||||
|
cid <- mkSizeChangeId pid
|
||||||
{- Deferring updating the database until
|
{- Deferring updating the database until
|
||||||
- here avoids overhead except in cases
|
- here avoids overhead except in cases
|
||||||
- where preferred content expressions
|
- where preferred content expressions
|
||||||
- need live updates. -}
|
- need live updates. -}
|
||||||
Db.startingLiveSizeChange h u k sc
|
Db.startingLiveSizeChange h u k sc cid
|
||||||
putMVar readyv ()
|
putMVar readyv ()
|
||||||
waitdone donev finishv h u
|
waitdone donev finishv h u cid
|
||||||
Left _ -> noop
|
Left _ -> noop
|
||||||
|
|
||||||
{- Wait for finishedLiveUpdate to be called, or for the LiveUpdate
|
{- Wait for finishedLiveUpdate to be called, or for the LiveUpdate
|
||||||
- to get garbage collected in the case where the change didn't
|
- to get garbage collected in the case where the change didn't
|
||||||
- actually happen. -}
|
- actually happen. -}
|
||||||
waitdone donev finishv h u = tryNonAsync (takeMVar donev) >>= \case
|
waitdone donev finishv h u cid = tryNonAsync (takeMVar donev) >>= \case
|
||||||
-- TODO need to update RepoSize db
|
-- TODO need to update local state too, and it must be done
|
||||||
-- in same transaction as Db.finishedLiveSizeChange
|
-- with locking around the state update and this database
|
||||||
|
-- update.
|
||||||
Right (Just (u', k', sc'))
|
Right (Just (u', k', sc'))
|
||||||
| u' == u && k' == k && sc' == sc -> do
|
| u' == u && k' == k && sc' == sc -> do
|
||||||
done h u
|
Db.successfullyFinishedLiveSizeChange h u k sc cid
|
||||||
putMVar finishv ()
|
putMVar finishv ()
|
||||||
-- This can happen when eg, storing to a cluster
|
-- This can happen when eg, storing to a cluster
|
||||||
-- causes fanout and so this is called with
|
-- causes fanout and so this is called with
|
||||||
-- other UUIDs.
|
-- other UUIDs.
|
||||||
| otherwise -> waitdone donev finishv h u
|
| otherwise -> waitdone donev finishv h u cid
|
||||||
Right Nothing -> done h u
|
Right Nothing -> abandoned h u cid
|
||||||
Left _ -> done h u
|
Left _ -> abandoned h u cid
|
||||||
done h u = Db.finishedLiveSizeChange h u k sc
|
abandoned h u cid = Db.staleLiveSizeChange h u k sc cid
|
||||||
|
|
||||||
-- Called when a preferred content check indicates that a live update is
|
-- Called when a preferred content check indicates that a live update is
|
||||||
-- needed. Can be called more than once on the same LiveUpdate.
|
-- needed. Can be called more than once on the same LiveUpdate.
|
||||||
|
|
|
@ -27,7 +27,9 @@ module Database.RepoSize (
|
||||||
setRepoSizes,
|
setRepoSizes,
|
||||||
getLiveSizeChanges,
|
getLiveSizeChanges,
|
||||||
startingLiveSizeChange,
|
startingLiveSizeChange,
|
||||||
finishedLiveSizeChange,
|
successfullyFinishedLiveSizeChange,
|
||||||
|
staleLiveSizeChange,
|
||||||
|
getSizeChanges,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
@ -57,11 +59,20 @@ AnnexBranch
|
||||||
commit SSha
|
commit SSha
|
||||||
UniqueCommit commit
|
UniqueCommit commit
|
||||||
-- Changes that are currently being made that affect repo sizes.
|
-- Changes that are currently being made that affect repo sizes.
|
||||||
|
-- (Only updated when preferred content expressions are in use that need
|
||||||
|
-- live size changes.)
|
||||||
LiveSizeChanges
|
LiveSizeChanges
|
||||||
repo UUID
|
repo UUID
|
||||||
key Key
|
key Key
|
||||||
|
changeid SizeChangeId
|
||||||
change SizeChange
|
change SizeChange
|
||||||
UniqueLiveSizeChange repo key
|
UniqueLiveSizeChange repo key changeid
|
||||||
|
-- A rolling total of size changes that were removed from LiveSizeChanges
|
||||||
|
-- upon successful completion.
|
||||||
|
SizeChanges
|
||||||
|
repo UUID
|
||||||
|
rollingtotal FileSize
|
||||||
|
UniqueRepoRollingTotal repo
|
||||||
|]
|
|]
|
||||||
|
|
||||||
{- Gets a handle to the database. It's cached in Annex state. -}
|
{- Gets a handle to the database. It's cached in Annex state. -}
|
||||||
|
@ -162,34 +173,81 @@ recordAnnexBranchCommit branchcommitsha = do
|
||||||
deleteWhere ([] :: [Filter AnnexBranch])
|
deleteWhere ([] :: [Filter AnnexBranch])
|
||||||
void $ insertUniqueFast $ AnnexBranch $ toSSha branchcommitsha
|
void $ insertUniqueFast $ AnnexBranch $ toSSha branchcommitsha
|
||||||
|
|
||||||
{- If there is already a size change for the same UUID and Key, it is
|
{- If there is already a size change for the same UUID, Key,
|
||||||
- overwritten with the new size change. -}
|
- and SizeChangeId, it is overwritten with the new size change. -}
|
||||||
startingLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> IO ()
|
startingLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||||
startingLiveSizeChange (RepoSizeHandle (Just h)) u k sc =
|
startingLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||||
H.commitDb h $ void $ upsertBy
|
H.commitDb h $ void $ upsertBy
|
||||||
(UniqueLiveSizeChange u k)
|
(UniqueLiveSizeChange u k sid)
|
||||||
(LiveSizeChanges u k sc)
|
(LiveSizeChanges u k sid sc)
|
||||||
[LiveSizeChangesChange =. sc]
|
[ LiveSizeChangesChange =. sc
|
||||||
startingLiveSizeChange (RepoSizeHandle Nothing) _ _ _ = noop
|
, LiveSizeChangesChangeid =. sid
|
||||||
|
]
|
||||||
|
startingLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||||
|
|
||||||
finishedLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> IO ()
|
successfullyFinishedLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||||
finishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc =
|
successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||||
H.commitDb h $ deleteWhere
|
H.commitDb h $ do
|
||||||
|
-- Update the rolling total and remove the live change in the
|
||||||
|
-- same transaction.
|
||||||
|
rollingtotal <- getSizeChangeFor u
|
||||||
|
setSizeChangeFor u (updaterollingtotal rollingtotal)
|
||||||
|
removeLiveSizeChange u k sc sid
|
||||||
|
where
|
||||||
|
updaterollingtotal t = case sc of
|
||||||
|
AddingKey -> t + ksz
|
||||||
|
RemovingKey -> t - ksz
|
||||||
|
ksz = fromMaybe 0 $ fromKey keySize k
|
||||||
|
successfullyFinishedLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||||
|
|
||||||
|
staleLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||||
|
staleLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||||
|
H.commitDb h $ removeLiveSizeChange u k sc sid
|
||||||
|
staleLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||||
|
|
||||||
|
removeLiveSizeChange :: UUID -> Key -> SizeChange -> SizeChangeId -> SqlPersistM ()
|
||||||
|
removeLiveSizeChange u k sc sid =
|
||||||
|
deleteWhere
|
||||||
[ LiveSizeChangesRepo ==. u
|
[ LiveSizeChangesRepo ==. u
|
||||||
, LiveSizeChangesKey ==. k
|
, LiveSizeChangesKey ==. k
|
||||||
|
, LiveSizeChangesChangeid ==. sid
|
||||||
, LiveSizeChangesChange ==. sc
|
, LiveSizeChangesChange ==. sc
|
||||||
]
|
]
|
||||||
finishedLiveSizeChange (RepoSizeHandle Nothing) _ _ _ = noop
|
|
||||||
|
|
||||||
getLiveSizeChanges :: RepoSizeHandle -> IO (M.Map UUID (Key, SizeChange))
|
getLiveSizeChanges :: RepoSizeHandle -> IO (M.Map UUID (Key, SizeChange, SizeChangeId))
|
||||||
getLiveSizeChanges (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
getLiveSizeChanges (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||||
m <- M.fromList . map conv <$> getLiveSizeChanges'
|
m <- M.fromList . map conv <$> getLiveSizeChanges'
|
||||||
return m
|
return m
|
||||||
where
|
where
|
||||||
conv entity =
|
conv entity =
|
||||||
let LiveSizeChanges u k sc = entityVal entity
|
let LiveSizeChanges u k sid sc = entityVal entity
|
||||||
in (u, (k, sc))
|
in (u, (k, sc, sid))
|
||||||
getLiveSizeChanges (RepoSizeHandle Nothing) = return mempty
|
getLiveSizeChanges (RepoSizeHandle Nothing) = return mempty
|
||||||
|
|
||||||
getLiveSizeChanges' :: SqlPersistM [Entity LiveSizeChanges]
|
getLiveSizeChanges' :: SqlPersistM [Entity LiveSizeChanges]
|
||||||
getLiveSizeChanges' = selectList [] []
|
getLiveSizeChanges' = selectList [] []
|
||||||
|
|
||||||
|
getSizeChanges :: RepoSizeHandle -> IO (M.Map UUID FileSize)
|
||||||
|
getSizeChanges (RepoSizeHandle (Just h)) = H.queryDb h getSizeChanges'
|
||||||
|
getSizeChanges (RepoSizeHandle Nothing) = return mempty
|
||||||
|
|
||||||
|
getSizeChanges' :: SqlPersistM (M.Map UUID FileSize)
|
||||||
|
getSizeChanges' = M.fromList . map conv <$> selectList [] []
|
||||||
|
where
|
||||||
|
conv entity =
|
||||||
|
let SizeChanges u n = entityVal entity
|
||||||
|
in (u, n)
|
||||||
|
|
||||||
|
getSizeChangeFor :: UUID -> SqlPersistM FileSize
|
||||||
|
getSizeChangeFor u = do
|
||||||
|
l <- selectList [SizeChangesRepo ==. u] []
|
||||||
|
return $ case l of
|
||||||
|
(s:_) -> sizeChangesRollingtotal $ entityVal s
|
||||||
|
[] -> 0
|
||||||
|
|
||||||
|
setSizeChangeFor :: UUID -> FileSize -> SqlPersistM ()
|
||||||
|
setSizeChangeFor u sz =
|
||||||
|
void $ upsertBy
|
||||||
|
(UniqueRepoRollingTotal u)
|
||||||
|
(SizeChanges u sz)
|
||||||
|
[SizeChangesRollingtotal =. sz]
|
||||||
|
|
|
@ -15,6 +15,10 @@ import Types.Key
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Database.Persist.Sql hiding (Key)
|
import Database.Persist.Sql hiding (Key)
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
|
import Data.Unique
|
||||||
|
import Text.Read
|
||||||
|
import System.Process (Pid)
|
||||||
|
import Utility.Split
|
||||||
|
|
||||||
-- The current size of a repo.
|
-- The current size of a repo.
|
||||||
newtype RepoSize = RepoSize { fromRepoSize :: Integer }
|
newtype RepoSize = RepoSize { fromRepoSize :: Integer }
|
||||||
|
@ -53,3 +57,38 @@ instance PersistField SizeChange where
|
||||||
|
|
||||||
instance PersistFieldSql SizeChange where
|
instance PersistFieldSql SizeChange where
|
||||||
sqlType _ = SqlInt32
|
sqlType _ = SqlInt32
|
||||||
|
|
||||||
|
data SizeChangeId = SizeChangeId
|
||||||
|
{ sizeChangeUniqueId :: Int
|
||||||
|
-- ^ unique per process
|
||||||
|
, sizeChangeProcessId :: Integer
|
||||||
|
-- ^ a pid, using Integer for portability
|
||||||
|
}
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
mkSizeChangeId :: Pid -> IO SizeChangeId
|
||||||
|
mkSizeChangeId pid = do
|
||||||
|
u <- newUnique
|
||||||
|
return $ SizeChangeId
|
||||||
|
{ sizeChangeProcessId = fromIntegral pid
|
||||||
|
, sizeChangeUniqueId = hashUnique u
|
||||||
|
}
|
||||||
|
|
||||||
|
instance PersistField SizeChangeId where
|
||||||
|
toPersistValue cid = toPersistValue $
|
||||||
|
show (sizeChangeProcessId cid) ++ ":" ++
|
||||||
|
show (sizeChangeUniqueId cid)
|
||||||
|
fromPersistValue b = fromPersistValue b >>= parse
|
||||||
|
where
|
||||||
|
parse s = maybe
|
||||||
|
(Left $ T.pack $ "bad serialized SizeChangeId " ++ show s)
|
||||||
|
Right
|
||||||
|
(parse' s)
|
||||||
|
parse' s = case splitc ':' s of
|
||||||
|
(pid:uid:[]) -> SizeChangeId
|
||||||
|
<$> readMaybe pid
|
||||||
|
<*> readMaybe uid
|
||||||
|
_ -> Nothing
|
||||||
|
|
||||||
|
instance PersistFieldSql SizeChangeId where
|
||||||
|
sqlType _ = SqlString
|
||||||
|
|
Loading…
Reference in a new issue