remove stale live changes from reposize database
Reorganized the reposize database directory, and split up a column. checkStaleSizeChanges needs to run before needLiveUpdate, otherwise the process won't be holding a lock on its pid file, and another process could go in and expire the live update it records. It just so happens that they do get called in the correct order, since checking balanced preferred content calls getLiveRepoSizes before needLiveUpdate. The 1 minute delay between checks is arbitrary, but will avoid excess work. The downside of it is that, if a process is dropping a file and gets interrupted, for 1 minute another process can expect a repository will soon be smaller than it is. And so a process might send data to a repository when a file is not really going to be dropped from it. But note that can already happen if a drop takes some time in eg locking and then fails. So it seems possible that live updates should only be allowed to increase, rather than decrease the size of a repository.
This commit is contained in:
parent
278adbb726
commit
f89a1b8216
7 changed files with 199 additions and 83 deletions
|
@ -77,6 +77,7 @@ module Annex.Locations (
|
||||||
gitAnnexImportFeedDbLock,
|
gitAnnexImportFeedDbLock,
|
||||||
gitAnnexRepoSizeDbDir,
|
gitAnnexRepoSizeDbDir,
|
||||||
gitAnnexRepoSizeDbLock,
|
gitAnnexRepoSizeDbLock,
|
||||||
|
gitAnnexRepoSizeLiveDir,
|
||||||
gitAnnexScheduleState,
|
gitAnnexScheduleState,
|
||||||
gitAnnexTransferDir,
|
gitAnnexTransferDir,
|
||||||
gitAnnexCredsDir,
|
gitAnnexCredsDir,
|
||||||
|
@ -157,7 +158,21 @@ objectDir :: RawFilePath
|
||||||
objectDir = P.addTrailingPathSeparator $ annexDir P.</> "objects"
|
objectDir = P.addTrailingPathSeparator $ annexDir P.</> "objects"
|
||||||
|
|
||||||
{- Annexed file's possible locations relative to the .git directory
|
{- Annexed file's possible locations relative to the .git directory
|
||||||
- in a non-bare repository.
|
- in a non-bare eepository.
|
||||||
|
|
||||||
|
{- Checks for other git-annex processes that might have been interrupted
|
||||||
|
- and left the database populated with stale live size changes. Those
|
||||||
|
- are removed from the database.
|
||||||
|
-
|
||||||
|
- Also registers the current process so that other calls to this will not
|
||||||
|
- consider it stale while it's running.
|
||||||
|
-
|
||||||
|
- This checks the first time it is called, and again if it's been more
|
||||||
|
- than 1 minute since the last check.
|
||||||
|
-}
|
||||||
|
checkStaleSizeChanges :: Db.RepoSizeHandle -> Annex ()
|
||||||
|
checkStaleSizeChanges h = do
|
||||||
|
undefined
|
||||||
-
|
-
|
||||||
- Normally it is hashDirMixed. However, it's always possible that a
|
- Normally it is hashDirMixed. However, it's always possible that a
|
||||||
- bare repository was converted to non-bare, or that the cripped
|
- bare repository was converted to non-bare, or that the cripped
|
||||||
|
@ -520,11 +535,17 @@ gitAnnexImportFeedDbLock r c = gitAnnexImportFeedDbDir r c <> ".lck"
|
||||||
{- Directory containing reposize database. -}
|
{- Directory containing reposize database. -}
|
||||||
gitAnnexRepoSizeDbDir :: Git.Repo -> GitConfig -> RawFilePath
|
gitAnnexRepoSizeDbDir :: Git.Repo -> GitConfig -> RawFilePath
|
||||||
gitAnnexRepoSizeDbDir r c =
|
gitAnnexRepoSizeDbDir r c =
|
||||||
fromMaybe (gitAnnexDir r) (annexDbDir c) P.</> "reposize"
|
fromMaybe (gitAnnexDir r) (annexDbDir c) P.</> "reposize" P.</> "db"
|
||||||
|
|
||||||
{- Lock file for the reposize database. -}
|
{- Lock file for the reposize database. -}
|
||||||
gitAnnexRepoSizeDbLock :: Git.Repo -> GitConfig -> RawFilePath
|
gitAnnexRepoSizeDbLock :: Git.Repo -> GitConfig -> RawFilePath
|
||||||
gitAnnexRepoSizeDbLock r c = gitAnnexRepoSizeDbDir r c <> ".lck"
|
gitAnnexRepoSizeDbLock r c =
|
||||||
|
fromMaybe (gitAnnexDir r) (annexDbDir c) P.</> "reposize" P.</> "lock"
|
||||||
|
|
||||||
|
{- Directory containing liveness pid files. -}
|
||||||
|
gitAnnexRepoSizeLiveDir :: Git.Repo -> GitConfig -> RawFilePath
|
||||||
|
gitAnnexRepoSizeLiveDir r c =
|
||||||
|
fromMaybe (gitAnnexDir r) (annexDbDir c) P.</> "reposize" P.</> "live"
|
||||||
|
|
||||||
{- .git/annex/schedulestate is used to store information about when
|
{- .git/annex/schedulestate is used to store information about when
|
||||||
- scheduled jobs were last run. -}
|
- scheduled jobs were last run. -}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import qualified Annex
|
||||||
import Annex.Branch (UnmergedBranches(..), getBranch)
|
import Annex.Branch (UnmergedBranches(..), getBranch)
|
||||||
import qualified Database.RepoSize as Db
|
import qualified Database.RepoSize as Db
|
||||||
import Annex.Journal
|
import Annex.Journal
|
||||||
|
import Annex.RepoSize.LiveUpdate
|
||||||
import Logs
|
import Logs
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Logs.UUID
|
import Logs.UUID
|
||||||
|
@ -55,6 +56,7 @@ getLiveRepoSizes quiet = do
|
||||||
where
|
where
|
||||||
go sizemap = do
|
go sizemap = do
|
||||||
h <- Db.getRepoSizeHandle
|
h <- Db.getRepoSizeHandle
|
||||||
|
checkStaleSizeChanges h
|
||||||
liveoffsets <- liftIO $ Db.liveRepoOffsets h
|
liveoffsets <- liftIO $ Db.liveRepoOffsets h
|
||||||
let calc u (RepoSize size, SizeOffset startoffset) =
|
let calc u (RepoSize size, SizeOffset startoffset) =
|
||||||
case M.lookup u liveoffsets of
|
case M.lookup u liveoffsets of
|
||||||
|
@ -86,12 +88,12 @@ calcRepoSizes quiet rsv = go `onException` failed
|
||||||
calculatefromscratch h = do
|
calculatefromscratch h = do
|
||||||
unless quiet $
|
unless quiet $
|
||||||
showSideAction "calculating repository sizes"
|
showSideAction "calculating repository sizes"
|
||||||
(sizemap, branchsha) <- calcBranchRepoSizes
|
use h =<< calcBranchRepoSizes
|
||||||
liftIO $ Db.setRepoSizes h sizemap branchsha
|
|
||||||
calcJournalledRepoSizes h sizemap branchsha
|
|
||||||
|
|
||||||
incrementalupdate h oldsizemap oldbranchsha currbranchsha = do
|
incrementalupdate h oldsizemap oldbranchsha currbranchsha =
|
||||||
(sizemap, branchsha) <- diffBranchRepoSizes quiet oldsizemap oldbranchsha currbranchsha
|
use h =<< diffBranchRepoSizes quiet oldsizemap oldbranchsha currbranchsha
|
||||||
|
|
||||||
|
use h (sizemap, branchsha) = do
|
||||||
liftIO $ Db.setRepoSizes h sizemap branchsha
|
liftIO $ Db.setRepoSizes h sizemap branchsha
|
||||||
calcJournalledRepoSizes h sizemap branchsha
|
calcJournalledRepoSizes h sizemap branchsha
|
||||||
|
|
||||||
|
|
|
@ -11,13 +11,20 @@ module Annex.RepoSize.LiveUpdate where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import Logs.Presence.Pure
|
import Logs.Presence.Pure
|
||||||
import qualified Database.RepoSize as Db
|
import Database.RepoSize.Handle
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
import Types.FileMatcher
|
import Types.FileMatcher
|
||||||
|
import Annex.LockFile
|
||||||
|
import Annex.LockPool
|
||||||
|
import qualified Database.RepoSize as Db
|
||||||
import qualified Utility.Matcher as Matcher
|
import qualified Utility.Matcher as Matcher
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import System.Process
|
import System.Process
|
||||||
|
import Text.Read
|
||||||
|
import Data.Time.Clock.POSIX
|
||||||
|
import qualified Utility.RawFilePath as R
|
||||||
|
import qualified System.FilePath.ByteString as P
|
||||||
|
|
||||||
{- Called when a location log change is journalled, so the LiveUpdate
|
{- Called when a location log change is journalled, so the LiveUpdate
|
||||||
- is done. This is called with the journal still locked, so no concurrent
|
- is done. This is called with the journal still locked, so no concurrent
|
||||||
|
@ -124,3 +131,59 @@ finishedLiveUpdate lu u k sc =
|
||||||
tryNonAsync (putMVar (liveUpdateDone lu) (Just (u, k, sc, finishv))) >>= \case
|
tryNonAsync (putMVar (liveUpdateDone lu) (Just (u, k, sc, finishv))) >>= \case
|
||||||
Right () -> void $ tryNonAsync $ takeMVar finishv
|
Right () -> void $ tryNonAsync $ takeMVar finishv
|
||||||
Left _ -> noop
|
Left _ -> noop
|
||||||
|
|
||||||
|
{- Checks for other git-annex processes that might have been interrupted
|
||||||
|
- and left the database populated with stale live size changes. Those
|
||||||
|
- are removed from the database.
|
||||||
|
-
|
||||||
|
- Also registers the current process so that other calls to this will not
|
||||||
|
- consider it stale while it's running.
|
||||||
|
-
|
||||||
|
- This checks the first time it is called, and again if it's been more
|
||||||
|
- than 1 minute since the last check.
|
||||||
|
-}
|
||||||
|
checkStaleSizeChanges :: RepoSizeHandle -> Annex ()
|
||||||
|
checkStaleSizeChanges h@(RepoSizeHandle (Just _) livev) = do
|
||||||
|
livedir <- calcRepo' gitAnnexRepoSizeLiveDir
|
||||||
|
pid <- liftIO getCurrentPid
|
||||||
|
let pidlockfile = show pid
|
||||||
|
now <- liftIO getPOSIXTime
|
||||||
|
liftIO (takeMVar livev) >>= \case
|
||||||
|
Nothing -> do
|
||||||
|
lck <- takeExclusiveLock $
|
||||||
|
livedir P.</> toRawFilePath pidlockfile
|
||||||
|
go livedir lck pidlockfile now
|
||||||
|
Just v@(lck, lastcheck)
|
||||||
|
| now >= lastcheck + 60 ->
|
||||||
|
go livedir lck pidlockfile now
|
||||||
|
| otherwise ->
|
||||||
|
liftIO $ putMVar livev (Just v)
|
||||||
|
where
|
||||||
|
go livedir lck pidlockfile now = do
|
||||||
|
void $ tryNonAsync $ do
|
||||||
|
lockfiles <- liftIO $ filter (not . dirCruft)
|
||||||
|
<$> getDirectoryContents (fromRawFilePath livedir)
|
||||||
|
stale <- forM lockfiles $ \lockfile ->
|
||||||
|
if (lockfile /= pidlockfile)
|
||||||
|
then case readMaybe lockfile of
|
||||||
|
Nothing -> return Nothing
|
||||||
|
Just pid -> checkstale livedir lockfile pid
|
||||||
|
else return Nothing
|
||||||
|
let stale' = catMaybes stale
|
||||||
|
unless (null stale') $ liftIO $ do
|
||||||
|
Db.removeStaleLiveSizeChanges h (map fst stale')
|
||||||
|
mapM_ snd stale'
|
||||||
|
liftIO $ putMVar livev (Just (lck, now))
|
||||||
|
|
||||||
|
checkstale livedir lockfile pid =
|
||||||
|
let f = livedir P.</> toRawFilePath lockfile
|
||||||
|
in tryLockShared Nothing f >>= \case
|
||||||
|
Nothing -> return Nothing
|
||||||
|
Just lck -> do
|
||||||
|
return $ Just
|
||||||
|
( StaleSizeChanger (SizeChangeProcessId pid)
|
||||||
|
, do
|
||||||
|
dropLock lck
|
||||||
|
removeWhenExistsWith R.removeLink f
|
||||||
|
)
|
||||||
|
checkStaleSizeChanges (RepoSizeHandle Nothing _) = noop
|
||||||
|
|
|
@ -29,6 +29,7 @@ module Database.RepoSize (
|
||||||
startingLiveSizeChange,
|
startingLiveSizeChange,
|
||||||
successfullyFinishedLiveSizeChange,
|
successfullyFinishedLiveSizeChange,
|
||||||
removeStaleLiveSizeChange,
|
removeStaleLiveSizeChange,
|
||||||
|
removeStaleLiveSizeChanges,
|
||||||
recordedRepoOffsets,
|
recordedRepoOffsets,
|
||||||
liveRepoOffsets,
|
liveRepoOffsets,
|
||||||
) where
|
) where
|
||||||
|
@ -50,6 +51,7 @@ import qualified System.FilePath.ByteString as P
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
|
import Control.Concurrent
|
||||||
|
|
||||||
share [mkPersist sqlSettings, mkMigrate "migrateRepoSizes"] [persistLowerCase|
|
share [mkPersist sqlSettings, mkMigrate "migrateRepoSizes"] [persistLowerCase|
|
||||||
-- Corresponds to location log information from the git-annex branch.
|
-- Corresponds to location log information from the git-annex branch.
|
||||||
|
@ -67,9 +69,10 @@ AnnexBranch
|
||||||
LiveSizeChanges
|
LiveSizeChanges
|
||||||
repo UUID
|
repo UUID
|
||||||
key Key
|
key Key
|
||||||
changeid SizeChangeId
|
changeid SizeChangeUniqueId
|
||||||
|
changepid SizeChangeProcessId
|
||||||
change SizeChange
|
change SizeChange
|
||||||
UniqueLiveSizeChange repo key changeid
|
UniqueLiveSizeChange repo key changeid changepid
|
||||||
-- A rolling total of size changes that were removed from LiveSizeChanges
|
-- A rolling total of size changes that were removed from LiveSizeChanges
|
||||||
-- upon successful completion.
|
-- upon successful completion.
|
||||||
SizeChanges
|
SizeChanges
|
||||||
|
@ -110,18 +113,22 @@ openDb = lockDbWhile permerr $ do
|
||||||
initDb db $ void $
|
initDb db $ void $
|
||||||
runMigrationSilent migrateRepoSizes
|
runMigrationSilent migrateRepoSizes
|
||||||
h <- liftIO $ H.openDb db "repo_sizes"
|
h <- liftIO $ H.openDb db "repo_sizes"
|
||||||
return $ RepoSizeHandle (Just h)
|
mkhandle (Just h)
|
||||||
where
|
where
|
||||||
|
mkhandle mh = do
|
||||||
|
livev <- liftIO $ newMVar Nothing
|
||||||
|
return $ RepoSizeHandle mh livev
|
||||||
|
|
||||||
-- If permissions don't allow opening the database,
|
-- If permissions don't allow opening the database,
|
||||||
-- just don't use it. Since this database is just a cache
|
-- just don't use it. Since this database is just a cache
|
||||||
-- of information available in the git-annex branch, the same
|
-- of information available in the git-annex branch, the same
|
||||||
-- information can be queried from the branch, though much less
|
-- information can be queried from the branch, though much less
|
||||||
-- efficiently.
|
-- efficiently.
|
||||||
permerr _e = return (RepoSizeHandle Nothing)
|
permerr _e = mkhandle Nothing
|
||||||
|
|
||||||
closeDb :: RepoSizeHandle -> Annex ()
|
closeDb :: RepoSizeHandle -> Annex ()
|
||||||
closeDb (RepoSizeHandle (Just h)) = liftIO $ H.closeDb h
|
closeDb (RepoSizeHandle (Just h) _) = liftIO $ H.closeDb h
|
||||||
closeDb (RepoSizeHandle Nothing) = noop
|
closeDb (RepoSizeHandle Nothing _) = noop
|
||||||
|
|
||||||
-- This does not prevent another process that has already
|
-- This does not prevent another process that has already
|
||||||
-- opened the db from changing it at the same time.
|
-- opened the db from changing it at the same time.
|
||||||
|
@ -133,11 +140,11 @@ lockDbWhile permerr a = do
|
||||||
{- Gets the sizes of repositories as of a commit to the git-annex
|
{- Gets the sizes of repositories as of a commit to the git-annex
|
||||||
- branch. -}
|
- branch. -}
|
||||||
getRepoSizes :: RepoSizeHandle -> IO (M.Map UUID RepoSize, Maybe Sha)
|
getRepoSizes :: RepoSizeHandle -> IO (M.Map UUID RepoSize, Maybe Sha)
|
||||||
getRepoSizes (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
getRepoSizes (RepoSizeHandle (Just h) _) = H.queryDb h $ do
|
||||||
sizemap <- M.fromList <$> getRepoSizes'
|
sizemap <- M.fromList <$> getRepoSizes'
|
||||||
annexbranchsha <- getAnnexBranchCommit
|
annexbranchsha <- getAnnexBranchCommit
|
||||||
return (sizemap, annexbranchsha)
|
return (sizemap, annexbranchsha)
|
||||||
getRepoSizes (RepoSizeHandle Nothing) = return (mempty, Nothing)
|
getRepoSizes (RepoSizeHandle Nothing _) = return (mempty, Nothing)
|
||||||
|
|
||||||
getRepoSizes' :: SqlPersistM [(UUID, RepoSize)]
|
getRepoSizes' :: SqlPersistM [(UUID, RepoSize)]
|
||||||
getRepoSizes' = map conv <$> selectList [] []
|
getRepoSizes' = map conv <$> selectList [] []
|
||||||
|
@ -164,7 +171,7 @@ getAnnexBranchCommit = do
|
||||||
- happen, but ensures that the database is consistent.
|
- happen, but ensures that the database is consistent.
|
||||||
-}
|
-}
|
||||||
setRepoSizes :: RepoSizeHandle -> M.Map UUID RepoSize -> Sha -> IO ()
|
setRepoSizes :: RepoSizeHandle -> M.Map UUID RepoSize -> Sha -> IO ()
|
||||||
setRepoSizes (RepoSizeHandle (Just h)) sizemap branchcommitsha =
|
setRepoSizes (RepoSizeHandle (Just h) _) sizemap branchcommitsha =
|
||||||
H.commitDb h $ do
|
H.commitDb h $ do
|
||||||
l <- getRepoSizes'
|
l <- getRepoSizes'
|
||||||
forM_ (map fst l) $ \u ->
|
forM_ (map fst l) $ \u ->
|
||||||
|
@ -174,7 +181,7 @@ setRepoSizes (RepoSizeHandle (Just h)) sizemap branchcommitsha =
|
||||||
uncurry setRepoSize
|
uncurry setRepoSize
|
||||||
clearRecentChanges
|
clearRecentChanges
|
||||||
recordAnnexBranchCommit branchcommitsha
|
recordAnnexBranchCommit branchcommitsha
|
||||||
setRepoSizes (RepoSizeHandle Nothing) _ _ = noop
|
setRepoSizes (RepoSizeHandle Nothing _) _ _ = noop
|
||||||
|
|
||||||
setRepoSize :: UUID -> RepoSize -> SqlPersistM ()
|
setRepoSize :: UUID -> RepoSize -> SqlPersistM ()
|
||||||
setRepoSize u (RepoSize sz) =
|
setRepoSize u (RepoSize sz) =
|
||||||
|
@ -192,14 +199,20 @@ recordAnnexBranchCommit branchcommitsha = do
|
||||||
void $ insertUniqueFast $ AnnexBranch $ toSSha branchcommitsha
|
void $ insertUniqueFast $ AnnexBranch $ toSSha branchcommitsha
|
||||||
|
|
||||||
startingLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
startingLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||||
startingLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
startingLiveSizeChange (RepoSizeHandle (Just h) _) u k sc cid =
|
||||||
H.commitDb h $ void $ upsertBy
|
H.commitDb h $ void $ upsertBy
|
||||||
(UniqueLiveSizeChange u k sid)
|
(UniqueLiveSizeChange u k
|
||||||
(LiveSizeChanges u k sid sc)
|
(sizeChangeUniqueId cid)
|
||||||
|
(sizeChangeProcessId cid))
|
||||||
|
(LiveSizeChanges u k
|
||||||
|
(sizeChangeUniqueId cid)
|
||||||
|
(sizeChangeProcessId cid)
|
||||||
|
sc)
|
||||||
[ LiveSizeChangesChange =. sc
|
[ LiveSizeChangesChange =. sc
|
||||||
, LiveSizeChangesChangeid =. sid
|
, LiveSizeChangesChangeid =. sizeChangeUniqueId cid
|
||||||
|
, LiveSizeChangesChangepid =. sizeChangeProcessId cid
|
||||||
]
|
]
|
||||||
startingLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
startingLiveSizeChange (RepoSizeHandle Nothing _) _ _ _ _ = noop
|
||||||
|
|
||||||
{- A live size change has successfully finished.
|
{- A live size change has successfully finished.
|
||||||
-
|
-
|
||||||
|
@ -212,7 +225,7 @@ startingLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
||||||
- total.
|
- total.
|
||||||
-}
|
-}
|
||||||
successfullyFinishedLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
successfullyFinishedLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||||
successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h) _) u k sc cid =
|
||||||
H.commitDb h $ do
|
H.commitDb h $ do
|
||||||
getRecentChange u k >>= \case
|
getRecentChange u k >>= \case
|
||||||
Just sc' | sc == sc' -> remove
|
Just sc' | sc == sc' -> remove
|
||||||
|
@ -223,8 +236,8 @@ successfullyFinishedLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
||||||
setSizeChangeFor u (updateRollingTotal rollingtotal sc k)
|
setSizeChangeFor u (updateRollingTotal rollingtotal sc k)
|
||||||
addRecentChange u k sc
|
addRecentChange u k sc
|
||||||
remove
|
remove
|
||||||
remove = removeLiveSizeChange u k sc sid
|
remove = removeLiveSizeChange u k sc cid
|
||||||
successfullyFinishedLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
successfullyFinishedLiveSizeChange (RepoSizeHandle Nothing _) _ _ _ _ = noop
|
||||||
|
|
||||||
updateRollingTotal :: FileSize -> SizeChange -> Key -> FileSize
|
updateRollingTotal :: FileSize -> SizeChange -> Key -> FileSize
|
||||||
updateRollingTotal t sc k = case sc of
|
updateRollingTotal t sc k = case sc of
|
||||||
|
@ -234,28 +247,37 @@ updateRollingTotal t sc k = case sc of
|
||||||
ksz = fromMaybe 0 $ fromKey keySize k
|
ksz = fromMaybe 0 $ fromKey keySize k
|
||||||
|
|
||||||
removeStaleLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
removeStaleLiveSizeChange :: RepoSizeHandle -> UUID -> Key -> SizeChange -> SizeChangeId -> IO ()
|
||||||
removeStaleLiveSizeChange (RepoSizeHandle (Just h)) u k sc sid =
|
removeStaleLiveSizeChange (RepoSizeHandle (Just h) _) u k sc cid =
|
||||||
H.commitDb h $ removeLiveSizeChange u k sc sid
|
H.commitDb h $ removeLiveSizeChange u k sc cid
|
||||||
removeStaleLiveSizeChange (RepoSizeHandle Nothing) _ _ _ _ = noop
|
removeStaleLiveSizeChange (RepoSizeHandle Nothing _) _ _ _ _ = noop
|
||||||
|
|
||||||
removeLiveSizeChange :: UUID -> Key -> SizeChange -> SizeChangeId -> SqlPersistM ()
|
removeLiveSizeChange :: UUID -> Key -> SizeChange -> SizeChangeId -> SqlPersistM ()
|
||||||
removeLiveSizeChange u k sc sid =
|
removeLiveSizeChange u k sc cid =
|
||||||
deleteWhere
|
deleteWhere
|
||||||
[ LiveSizeChangesRepo ==. u
|
[ LiveSizeChangesRepo ==. u
|
||||||
, LiveSizeChangesKey ==. k
|
, LiveSizeChangesKey ==. k
|
||||||
, LiveSizeChangesChangeid ==. sid
|
, LiveSizeChangesChangeid ==. sizeChangeUniqueId cid
|
||||||
|
, LiveSizeChangesChangepid ==. sizeChangeProcessId cid
|
||||||
, LiveSizeChangesChange ==. sc
|
, LiveSizeChangesChange ==. sc
|
||||||
]
|
]
|
||||||
|
|
||||||
|
removeStaleLiveSizeChanges :: RepoSizeHandle -> [StaleSizeChanger] -> IO ()
|
||||||
|
removeStaleLiveSizeChanges (RepoSizeHandle (Just h) _) stale = do
|
||||||
|
let stalepids = map staleSizeChangerProcessId stale
|
||||||
|
H.commitDb h $ deleteWhere [ LiveSizeChangesChangepid <-. stalepids ]
|
||||||
|
removeStaleLiveSizeChanges (RepoSizeHandle Nothing _) _ = noop
|
||||||
|
|
||||||
getLiveSizeChangesMap :: SqlPersistM (M.Map UUID [(Key, (SizeChange, SizeChangeId))])
|
getLiveSizeChangesMap :: SqlPersistM (M.Map UUID [(Key, (SizeChange, SizeChangeId))])
|
||||||
getLiveSizeChangesMap = M.fromListWith (++) . map conv <$> getLiveSizeChanges
|
getLiveSizeChangesMap = M.fromListWith (++) . map conv <$> getLiveSizeChanges
|
||||||
where
|
where
|
||||||
conv (LiveSizeChanges u k sid sc) = (u, [(k, (sc, sid))])
|
conv (LiveSizeChanges u k cid pid sc) = (u, [(k, (sc, sid))])
|
||||||
|
where
|
||||||
|
sid = SizeChangeId cid pid
|
||||||
|
|
||||||
getLiveSizeChangesList :: SqlPersistM [(UUID, Key, SizeChange)]
|
getLiveSizeChangesList :: SqlPersistM [(UUID, Key, SizeChange)]
|
||||||
getLiveSizeChangesList = map conv <$> getLiveSizeChanges
|
getLiveSizeChangesList = map conv <$> getLiveSizeChanges
|
||||||
where
|
where
|
||||||
conv (LiveSizeChanges u k _sid sc) = (u, k, sc)
|
conv (LiveSizeChanges u k _cid _pid sc) = (u, k, sc)
|
||||||
|
|
||||||
getLiveSizeChanges :: SqlPersistM [LiveSizeChanges]
|
getLiveSizeChanges :: SqlPersistM [LiveSizeChanges]
|
||||||
getLiveSizeChanges = map entityVal <$> selectList [] []
|
getLiveSizeChanges = map entityVal <$> selectList [] []
|
||||||
|
@ -326,9 +348,9 @@ clearRecentChanges = do
|
||||||
{- Gets the recorded offsets to sizes of Repos, not including live
|
{- Gets the recorded offsets to sizes of Repos, not including live
|
||||||
- changes. -}
|
- changes. -}
|
||||||
recordedRepoOffsets :: RepoSizeHandle -> IO (M.Map UUID SizeOffset)
|
recordedRepoOffsets :: RepoSizeHandle -> IO (M.Map UUID SizeOffset)
|
||||||
recordedRepoOffsets (RepoSizeHandle (Just h)) =
|
recordedRepoOffsets (RepoSizeHandle (Just h) _) =
|
||||||
M.map SizeOffset <$> H.queryDb h getSizeChanges
|
M.map SizeOffset <$> H.queryDb h getSizeChanges
|
||||||
recordedRepoOffsets (RepoSizeHandle Nothing) = pure mempty
|
recordedRepoOffsets (RepoSizeHandle Nothing _) = pure mempty
|
||||||
|
|
||||||
{- Gets the offsets to sizes of Repos, including all live changes that
|
{- Gets the offsets to sizes of Repos, including all live changes that
|
||||||
- are happening now.
|
- are happening now.
|
||||||
|
@ -351,7 +373,7 @@ recordedRepoOffsets (RepoSizeHandle Nothing) = pure mempty
|
||||||
- the same time.
|
- the same time.
|
||||||
-}
|
-}
|
||||||
liveRepoOffsets :: RepoSizeHandle -> IO (M.Map UUID SizeOffset)
|
liveRepoOffsets :: RepoSizeHandle -> IO (M.Map UUID SizeOffset)
|
||||||
liveRepoOffsets (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
liveRepoOffsets (RepoSizeHandle (Just h) _) = H.queryDb h $ do
|
||||||
sizechanges <- getSizeChanges
|
sizechanges <- getSizeChanges
|
||||||
livechanges <- getLiveSizeChangesMap
|
livechanges <- getLiveSizeChangesMap
|
||||||
let us = S.toList $ S.fromList $
|
let us = S.toList $ S.fromList $
|
||||||
|
@ -389,4 +411,4 @@ liveRepoOffsets (RepoSizeHandle (Just h)) = H.queryDb h $ do
|
||||||
filter (\(sc', cid') -> cid /= cid' && sc' == AddingKey)
|
filter (\(sc', cid') -> cid /= cid' && sc' == AddingKey)
|
||||||
(fromMaybe [] $ M.lookup k livechangesbykey)
|
(fromMaybe [] $ M.lookup k livechangesbykey)
|
||||||
competinglivechanges _ _ AddingKey _ = []
|
competinglivechanges _ _ AddingKey _ = []
|
||||||
liveRepoOffsets (RepoSizeHandle Nothing) = pure mempty
|
liveRepoOffsets (RepoSizeHandle Nothing _) = pure mempty
|
||||||
|
|
|
@ -8,7 +8,15 @@
|
||||||
module Database.RepoSize.Handle where
|
module Database.RepoSize.Handle where
|
||||||
|
|
||||||
import qualified Database.Handle as H
|
import qualified Database.Handle as H
|
||||||
|
import Utility.LockPool (LockHandle)
|
||||||
|
|
||||||
-- Contains Nothing if the database was not able to be opened due to
|
import Control.Concurrent
|
||||||
|
import Data.Time.Clock.POSIX
|
||||||
|
|
||||||
|
data RepoSizeHandle = RepoSizeHandle
|
||||||
|
(Maybe H.DbHandle)
|
||||||
|
-- ^ Nothing if the database was not able to be opened due to
|
||||||
-- permissions.
|
-- permissions.
|
||||||
newtype RepoSizeHandle = RepoSizeHandle (Maybe H.DbHandle)
|
(MVar (Maybe (LockHandle, POSIXTime)))
|
||||||
|
-- ^ Live update lock and time of last check for stale live
|
||||||
|
-- updates.
|
||||||
|
|
|
@ -14,11 +14,11 @@ import Types.Key
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Database.Persist.Sql hiding (Key)
|
import Database.Persist.Sql hiding (Key)
|
||||||
import qualified Data.Text as T
|
|
||||||
import Data.Unique
|
import Data.Unique
|
||||||
import Text.Read
|
import Text.Read
|
||||||
import System.Process (Pid)
|
import System.Process (Pid)
|
||||||
import Utility.Split
|
import qualified Data.Text as T
|
||||||
|
import qualified Data.Set as S
|
||||||
|
|
||||||
-- The current size of a repo.
|
-- The current size of a repo.
|
||||||
newtype RepoSize = RepoSize { fromRepoSize :: Integer }
|
newtype RepoSize = RepoSize { fromRepoSize :: Integer }
|
||||||
|
@ -62,36 +62,57 @@ instance PersistFieldSql SizeChange where
|
||||||
sqlType _ = SqlInt32
|
sqlType _ = SqlInt32
|
||||||
|
|
||||||
data SizeChangeId = SizeChangeId
|
data SizeChangeId = SizeChangeId
|
||||||
{ sizeChangeUniqueId :: Int
|
{ sizeChangeUniqueId :: SizeChangeUniqueId
|
||||||
-- ^ unique per process
|
, sizeChangeProcessId :: SizeChangeProcessId
|
||||||
, sizeChangeProcessId :: Integer
|
|
||||||
-- ^ a pid, using Integer for portability
|
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
|
-- A unique value for the current process.
|
||||||
|
newtype SizeChangeUniqueId = SizeChangeUniqueId Int
|
||||||
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
|
-- A pid, using Integer for portability
|
||||||
|
newtype SizeChangeProcessId = SizeChangeProcessId Integer
|
||||||
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
mkSizeChangeId :: Pid -> IO SizeChangeId
|
mkSizeChangeId :: Pid -> IO SizeChangeId
|
||||||
mkSizeChangeId pid = do
|
mkSizeChangeId pid = do
|
||||||
u <- newUnique
|
u <- newUnique
|
||||||
return $ SizeChangeId
|
return $ SizeChangeId
|
||||||
{ sizeChangeProcessId = fromIntegral pid
|
{ sizeChangeUniqueId =
|
||||||
, sizeChangeUniqueId = hashUnique u
|
SizeChangeUniqueId $ hashUnique u
|
||||||
|
, sizeChangeProcessId =
|
||||||
|
SizeChangeProcessId $ fromIntegral pid
|
||||||
}
|
}
|
||||||
|
|
||||||
instance PersistField SizeChangeId where
|
instance PersistField SizeChangeUniqueId where
|
||||||
toPersistValue cid = toPersistValue $
|
toPersistValue (SizeChangeUniqueId i) = toPersistValue (show i)
|
||||||
show (sizeChangeProcessId cid) ++ ":" ++
|
|
||||||
show (sizeChangeUniqueId cid)
|
|
||||||
fromPersistValue b = fromPersistValue b >>= parse
|
fromPersistValue b = fromPersistValue b >>= parse
|
||||||
where
|
where
|
||||||
parse s = maybe
|
parse s = maybe
|
||||||
(Left $ T.pack $ "bad serialized SizeChangeId " ++ show s)
|
(Left $ T.pack $ "bad serialized SizeChangeUniqueId " ++ show s)
|
||||||
Right
|
Right
|
||||||
(parse' s)
|
(SizeChangeUniqueId <$> readMaybe s)
|
||||||
parse' s = case splitc ':' s of
|
|
||||||
(pid:uid:[]) -> SizeChangeId
|
|
||||||
<$> readMaybe pid
|
|
||||||
<*> readMaybe uid
|
|
||||||
_ -> Nothing
|
|
||||||
|
|
||||||
instance PersistFieldSql SizeChangeId where
|
instance PersistFieldSql SizeChangeUniqueId where
|
||||||
sqlType _ = SqlString
|
sqlType _ = SqlString
|
||||||
|
|
||||||
|
instance PersistField SizeChangeProcessId where
|
||||||
|
toPersistValue (SizeChangeProcessId i) = toPersistValue (show i)
|
||||||
|
fromPersistValue b = fromPersistValue b >>= parse
|
||||||
|
where
|
||||||
|
parse s = maybe
|
||||||
|
(Left $ T.pack $ "bad serialized SizeChangeProcessId " ++ show s)
|
||||||
|
Right
|
||||||
|
(SizeChangeProcessId <$> readMaybe s)
|
||||||
|
|
||||||
|
instance PersistFieldSql SizeChangeProcessId where
|
||||||
|
sqlType _ = SqlString
|
||||||
|
|
||||||
|
newtype StaleSizeChanger = StaleSizeChanger
|
||||||
|
{ staleSizeChangerProcessId :: SizeChangeProcessId }
|
||||||
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
|
isStaleSizeChangeId :: S.Set StaleSizeChanger -> SizeChangeId -> Bool
|
||||||
|
isStaleSizeChangeId s cid =
|
||||||
|
StaleSizeChanger (sizeChangeProcessId cid) `S.member` s
|
||||||
|
|
|
@ -37,27 +37,6 @@ Planned schedule of work:
|
||||||
|
|
||||||
* Test that live repo size data is correct and really works.
|
* Test that live repo size data is correct and really works.
|
||||||
|
|
||||||
* When loading the live update table, check if PIDs in it are still
|
|
||||||
running (and are still git-annex), and if not, remove stale entries
|
|
||||||
from it, which can accumulate when processes are interrupted.
|
|
||||||
Note that it will be ok for the wrong git-annex process, running again
|
|
||||||
at a pid to keep a stale item in the live update table, because that
|
|
||||||
is unlikely and exponentially unlikely to happen repeatedly, so stale
|
|
||||||
information will only be used for a short time.
|
|
||||||
|
|
||||||
But then, how to check if a PID is git-annex or not? /proc of course,
|
|
||||||
but what about other OS's? Windows?
|
|
||||||
|
|
||||||
A plan: Have git-annex lock a per-pid file at startup. Then before
|
|
||||||
loading the live updates table, check each other per-pid file, by
|
|
||||||
try to take a shared lock. If able to, that process is no longer running,
|
|
||||||
and its live updates should be considered stale, and can be removed
|
|
||||||
while loading the live updates table.
|
|
||||||
|
|
||||||
Might be better to not lock at startup, but only once live updates are
|
|
||||||
used. annex.pidlock might otherwise prevent running more than one
|
|
||||||
git-annex at a time.
|
|
||||||
|
|
||||||
* The assistant is using NoLiveUpdate, but it should be posssible to plumb
|
* The assistant is using NoLiveUpdate, but it should be posssible to plumb
|
||||||
a LiveUpdate through it from preferred content checking to location log
|
a LiveUpdate through it from preferred content checking to location log
|
||||||
updating.
|
updating.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue