fix Annex.repoSize sharing between threads
This commit is contained in:
parent
e361b9ea3c
commit
61d95627f3
4 changed files with 56 additions and 46 deletions
5
Annex.hs
5
Annex.hs
|
@ -132,6 +132,7 @@ data AnnexRead = AnnexRead
|
||||||
, forcenumcopies :: Maybe NumCopies
|
, forcenumcopies :: Maybe NumCopies
|
||||||
, forcemincopies :: Maybe MinCopies
|
, forcemincopies :: Maybe MinCopies
|
||||||
, forcebackend :: Maybe String
|
, forcebackend :: Maybe String
|
||||||
|
, reposizes :: MVar (Maybe (M.Map UUID RepoSize))
|
||||||
, rebalance :: Bool
|
, rebalance :: Bool
|
||||||
, useragent :: Maybe String
|
, useragent :: Maybe String
|
||||||
, desktopnotify :: DesktopNotify
|
, desktopnotify :: DesktopNotify
|
||||||
|
@ -149,6 +150,7 @@ newAnnexRead c = do
|
||||||
tp <- newTransferrerPool
|
tp <- newTransferrerPool
|
||||||
cm <- newTMVarIO M.empty
|
cm <- newTMVarIO M.empty
|
||||||
cc <- newTMVarIO (CredentialCache M.empty)
|
cc <- newTMVarIO (CredentialCache M.empty)
|
||||||
|
rs <- newMVar Nothing
|
||||||
return $ AnnexRead
|
return $ AnnexRead
|
||||||
{ branchstate = bs
|
{ branchstate = bs
|
||||||
, activekeys = emptyactivekeys
|
, activekeys = emptyactivekeys
|
||||||
|
@ -166,6 +168,7 @@ newAnnexRead c = do
|
||||||
, forcebackend = Nothing
|
, forcebackend = Nothing
|
||||||
, forcenumcopies = Nothing
|
, forcenumcopies = Nothing
|
||||||
, forcemincopies = Nothing
|
, forcemincopies = Nothing
|
||||||
|
, reposizes = rs
|
||||||
, rebalance = False
|
, rebalance = False
|
||||||
, useragent = Nothing
|
, useragent = Nothing
|
||||||
, desktopnotify = mempty
|
, desktopnotify = mempty
|
||||||
|
@ -202,7 +205,6 @@ data AnnexState = AnnexState
|
||||||
, remoteconfigmap :: Maybe (M.Map UUID RemoteConfig)
|
, remoteconfigmap :: Maybe (M.Map UUID RemoteConfig)
|
||||||
, clusters :: Maybe (Annex Clusters)
|
, clusters :: Maybe (Annex Clusters)
|
||||||
, maxsizes :: Maybe (M.Map UUID MaxSize)
|
, maxsizes :: Maybe (M.Map UUID MaxSize)
|
||||||
, reposizes :: Maybe (M.Map UUID RepoSize)
|
|
||||||
, forcetrust :: TrustMap
|
, forcetrust :: TrustMap
|
||||||
, trustmap :: Maybe TrustMap
|
, trustmap :: Maybe TrustMap
|
||||||
, groupmap :: Maybe GroupMap
|
, groupmap :: Maybe GroupMap
|
||||||
|
@ -258,7 +260,6 @@ newAnnexState c r = do
|
||||||
, remoteconfigmap = Nothing
|
, remoteconfigmap = Nothing
|
||||||
, clusters = Nothing
|
, clusters = Nothing
|
||||||
, maxsizes = Nothing
|
, maxsizes = Nothing
|
||||||
, reposizes = Nothing
|
|
||||||
, forcetrust = M.empty
|
, forcetrust = M.empty
|
||||||
, trustmap = Nothing
|
, trustmap = Nothing
|
||||||
, groupmap = Nothing
|
, groupmap = Nothing
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
{-# LANGUAGE OverloadedStrings, BangPatterns #-}
|
||||||
|
|
||||||
module Annex.RepoSize (
|
module Annex.RepoSize (
|
||||||
getRepoSizes,
|
getRepoSizes,
|
||||||
|
@ -15,28 +15,35 @@ import Annex.Common
|
||||||
import Annex.RepoSize.LiveUpdate
|
import Annex.RepoSize.LiveUpdate
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import Annex.Branch (UnmergedBranches(..), getBranch)
|
import Annex.Branch (UnmergedBranches(..), getBranch)
|
||||||
import Annex.Journal (lockJournal)
|
|
||||||
import Types.RepoSize
|
import Types.RepoSize
|
||||||
import qualified Database.RepoSize as Db
|
import qualified Database.RepoSize as Db
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Logs.UUID
|
import Logs.UUID
|
||||||
import Git.Types (Sha)
|
import Git.Types (Sha)
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
|
|
||||||
{- Gets the repo size map. Cached for speed. -}
|
{- Gets the repo size map. Cached for speed. -}
|
||||||
getRepoSizes :: Annex (M.Map UUID RepoSize)
|
getRepoSizes :: Annex (M.Map UUID RepoSize)
|
||||||
getRepoSizes = maybe calcRepoSizes return =<< Annex.getState Annex.reposizes
|
getRepoSizes = do
|
||||||
|
rsv <- Annex.getRead Annex.reposizes
|
||||||
|
liftIO (takeMVar rsv) >>= \case
|
||||||
|
Just sizemap -> do
|
||||||
|
liftIO $ putMVar rsv (Just sizemap)
|
||||||
|
return sizemap
|
||||||
|
Nothing -> calcRepoSizes rsv
|
||||||
|
|
||||||
{- Sets Annex.reposizes with current information from the git-annex
|
{- Fills an empty Annex.reposizes MVar with current information
|
||||||
- branch, supplimented with journalled but not yet committed information.
|
- from the git-annex branch, supplimented with journalled but
|
||||||
-
|
- not yet committed information.
|
||||||
- This should only be called when Annex.reposizes = Nothing.
|
|
||||||
-}
|
-}
|
||||||
calcRepoSizes :: Annex (M.Map UUID RepoSize)
|
calcRepoSizes :: MVar (Maybe (M.Map UUID RepoSize)) -> Annex (M.Map UUID RepoSize)
|
||||||
calcRepoSizes = bracket Db.openDb Db.closeDb $ \h -> do
|
calcRepoSizes rsv = bracket setup cleanup $ \h -> go h `onException` failed
|
||||||
|
where
|
||||||
|
go h = do
|
||||||
(oldsizemap, moldbranchsha) <- liftIO $ Db.getRepoSizes h
|
(oldsizemap, moldbranchsha) <- liftIO $ Db.getRepoSizes h
|
||||||
case moldbranchsha of
|
!sizemap <- case moldbranchsha of
|
||||||
Nothing -> calculatefromscratch h
|
Nothing -> calculatefromscratch h
|
||||||
Just oldbranchsha -> do
|
Just oldbranchsha -> do
|
||||||
currbranchsha <- getBranch
|
currbranchsha <- getBranch
|
||||||
|
@ -46,13 +53,23 @@ calcRepoSizes = bracket Db.openDb Db.closeDb $ \h -> do
|
||||||
-- XXX todo incremental update by diffing
|
-- XXX todo incremental update by diffing
|
||||||
-- from old to new branch.
|
-- from old to new branch.
|
||||||
calculatefromscratch h
|
calculatefromscratch h
|
||||||
where
|
liftIO $ putMVar rsv (Just sizemap)
|
||||||
|
return sizemap
|
||||||
|
|
||||||
calculatefromscratch h = do
|
calculatefromscratch h = do
|
||||||
showSideAction "calculating repository sizes"
|
showSideAction "calculating repository sizes"
|
||||||
(sizemap, branchsha) <- calcBranchRepoSizes
|
(sizemap, branchsha) <- calcBranchRepoSizes
|
||||||
liftIO $ Db.setRepoSizes h sizemap branchsha
|
liftIO $ Db.setRepoSizes h sizemap branchsha
|
||||||
calcJournalledRepoSizes sizemap branchsha
|
calcJournalledRepoSizes sizemap branchsha
|
||||||
|
|
||||||
|
setup = Db.openDb
|
||||||
|
|
||||||
|
cleanup = Db.closeDb
|
||||||
|
|
||||||
|
failed = do
|
||||||
|
liftIO $ putMVar rsv (Just M.empty)
|
||||||
|
return M.empty
|
||||||
|
|
||||||
{- Sum up the sizes of all keys in all repositories, from the information
|
{- Sum up the sizes of all keys in all repositories, from the information
|
||||||
- in the git-annex branch, but not the journal. Retuns the sha of the
|
- in the git-annex branch, but not the journal. Retuns the sha of the
|
||||||
- branch commit that was used.
|
- branch commit that was used.
|
||||||
|
@ -77,19 +94,13 @@ calcBranchRepoSizes = do
|
||||||
|
|
||||||
{- Given the RepoSizes calculated from the git-annex branch, updates it with
|
{- Given the RepoSizes calculated from the git-annex branch, updates it with
|
||||||
- data from journalled location logs.
|
- data from journalled location logs.
|
||||||
-
|
|
||||||
- This should only be called when Annex.reposizes = Nothing.
|
|
||||||
-}
|
-}
|
||||||
calcJournalledRepoSizes :: M.Map UUID RepoSize -> Sha -> Annex (M.Map UUID RepoSize)
|
calcJournalledRepoSizes
|
||||||
calcJournalledRepoSizes startmap branchsha = lockJournal $ \_jl -> do
|
:: M.Map UUID RepoSize
|
||||||
sizemap <- overLocationLogsJournal startmap branchsha accumsizes
|
-> Sha
|
||||||
-- Set while the journal is still locked. Since Annex.reposizes
|
-> Annex (M.Map UUID RepoSize)
|
||||||
-- was Nothing until this point, any other thread that might be
|
calcJournalledRepoSizes startmap branchsha =
|
||||||
-- journalling a location log change at the same time will
|
overLocationLogsJournal startmap branchsha accumsizes
|
||||||
-- be blocked from running updateRepoSize concurrently with this.
|
|
||||||
Annex.changeState $ \st -> st
|
|
||||||
{ Annex.reposizes = Just sizemap }
|
|
||||||
return sizemap
|
|
||||||
where
|
where
|
||||||
accumsizes k (newlocs, removedlocs) m = return $
|
accumsizes k (newlocs, removedlocs) m = return $
|
||||||
let m' = foldl' (flip $ M.alter $ addKeyRepoSize k) m newlocs
|
let m' = foldl' (flip $ M.alter $ addKeyRepoSize k) m newlocs
|
||||||
|
|
|
@ -14,17 +14,19 @@ import qualified Annex
|
||||||
import Types.RepoSize
|
import Types.RepoSize
|
||||||
import Logs.Presence.Pure
|
import Logs.Presence.Pure
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
|
|
||||||
updateRepoSize :: UUID -> Key -> LogStatus -> Annex ()
|
updateRepoSize :: UUID -> Key -> LogStatus -> Annex ()
|
||||||
updateRepoSize u k s = Annex.getState Annex.reposizes >>= \case
|
updateRepoSize u k s = do
|
||||||
Nothing -> noop
|
rsv <- Annex.getRead Annex.reposizes
|
||||||
|
liftIO (takeMVar rsv) >>= \case
|
||||||
|
Nothing -> liftIO (putMVar rsv Nothing)
|
||||||
Just sizemap -> do
|
Just sizemap -> do
|
||||||
let !sizemap' = M.adjust
|
let !sizemap' = M.adjust
|
||||||
(fromMaybe (RepoSize 0) . f k . Just)
|
(fromMaybe (RepoSize 0) . f k . Just)
|
||||||
u sizemap
|
u sizemap
|
||||||
Annex.changeState $ \st -> st
|
liftIO $ putMVar rsv (Just sizemap')
|
||||||
{ Annex.reposizes = Just sizemap' }
|
|
||||||
where
|
where
|
||||||
f = case s of
|
f = case s of
|
||||||
InfoPresent -> addKeyRepoSize
|
InfoPresent -> addKeyRepoSize
|
||||||
|
|
|
@ -39,10 +39,6 @@ Planned schedule of work:
|
||||||
Note ideas in above todo about doing this at git-annex branch merge
|
Note ideas in above todo about doing this at git-annex branch merge
|
||||||
time to reuse the git diff done there.
|
time to reuse the git diff done there.
|
||||||
|
|
||||||
* Annex.reposizes is not shared amoung threads, so duplicate work
|
|
||||||
to populate it, and threads won't learn about changes made by other
|
|
||||||
threads.
|
|
||||||
|
|
||||||
* What if 2 concurrent threads are considering sending two different
|
* What if 2 concurrent threads are considering sending two different
|
||||||
keys to a repo at the same time. It can hold either but not both.
|
keys to a repo at the same time. It can hold either but not both.
|
||||||
It should avoid sending both in this situation. (Also discussed in
|
It should avoid sending both in this situation. (Also discussed in
|
||||||
|
|
Loading…
Reference in a new issue