wait on child transfer processes, and invalidate cache
There's still a bug; if the child updates its transfer info file, then the data from it will superscede the TransferInfo, losing the info that we should wait on this child.
This commit is contained in:
parent
4a10795144
commit
62876502c5
7 changed files with 43 additions and 19 deletions
|
@ -54,8 +54,11 @@ newDaemonStatus = DaemonStatus
|
|||
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
|
||||
getDaemonStatus = liftIO . readMVar
|
||||
|
||||
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
|
||||
modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a)
|
||||
modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
|
||||
modifyDaemonStatus_ handle a = liftIO $ modifyMVar_ handle (return . a)
|
||||
|
||||
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> Annex b
|
||||
modifyDaemonStatus handle a = liftIO $ modifyMVar handle (return . a)
|
||||
|
||||
{- Load any previous daemon status file, and store it in the MVar for this
|
||||
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
||||
|
@ -137,5 +140,15 @@ tenMinutes = 10 * 60
|
|||
|
||||
{- Mutates the transfer map. -}
|
||||
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex ()
|
||||
adjustTransfers dstatus a = modifyDaemonStatus dstatus $
|
||||
adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
|
||||
\s -> s { currentTransfers = a (currentTransfers s) }
|
||||
|
||||
{- Removes a transfer from the map, and returns its info. -}
|
||||
removeTransfer :: DaemonStatusHandle -> Transfer -> Annex (Maybe TransferInfo)
|
||||
removeTransfer dstatus t = modifyDaemonStatus dstatus go
|
||||
where
|
||||
go s =
|
||||
let (info, ts) = M.updateLookupWithKey
|
||||
(\_k _v -> Nothing)
|
||||
t (currentTransfers s)
|
||||
in (s { currentTransfers = ts }, info)
|
||||
|
|
|
@ -26,7 +26,7 @@ sanityCheckerThread st status transferqueue changechan = forever $ do
|
|||
waitForNextCheck st status
|
||||
|
||||
runThreadState st $
|
||||
modifyDaemonStatus status $ \s -> s
|
||||
modifyDaemonStatus_ status $ \s -> s
|
||||
{ sanityCheckRunning = True }
|
||||
|
||||
now <- getPOSIXTime -- before check started
|
||||
|
@ -34,7 +34,7 @@ sanityCheckerThread st status transferqueue changechan = forever $ do
|
|||
(runThreadState st . warning . show)
|
||||
|
||||
runThreadState st $ do
|
||||
modifyDaemonStatus status $ \s -> s
|
||||
modifyDaemonStatus_ status $ \s -> s
|
||||
{ sanityCheckRunning = False
|
||||
, lastSanityCheck = Just now
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import Assistant.DaemonStatus
|
|||
import Logs.Transfer
|
||||
import Utility.DirWatcher
|
||||
import Utility.Types.DirWatcher
|
||||
import Annex.BranchState
|
||||
|
||||
import Data.Map as M
|
||||
|
||||
|
@ -51,16 +52,27 @@ onErr _ _ msg _ = error msg
|
|||
onAdd :: Handler
|
||||
onAdd st dstatus file _ = case parseTransferFile file of
|
||||
Nothing -> noop
|
||||
Just t -> do
|
||||
pid <- getProcessID
|
||||
runThreadState st $ go t pid =<< checkTransfer t
|
||||
Just t -> runThreadState st $ go t =<< checkTransfer t
|
||||
where
|
||||
go _ _ Nothing = noop -- transfer already finished
|
||||
go t pid (Just info) = adjustTransfers dstatus $
|
||||
go _ Nothing = noop -- transfer already finished
|
||||
go t (Just info) = adjustTransfers dstatus $
|
||||
M.insertWith' const t info
|
||||
|
||||
{- Called when a transfer information file is removed. -}
|
||||
{- Called when a transfer information file is removed.
|
||||
-
|
||||
- When the transfer process is a child of this process, wait on it
|
||||
- to avoid zombies.
|
||||
-}
|
||||
onDel :: Handler
|
||||
onDel st dstatus file _ = case parseTransferFile file of
|
||||
Nothing -> noop
|
||||
Just t -> runThreadState st $ adjustTransfers dstatus $ M.delete t
|
||||
Just t -> maybe noop waitchild
|
||||
=<< runThreadState st (removeTransfer dstatus t)
|
||||
where
|
||||
waitchild info
|
||||
| shouldWait info = case transferPid info of
|
||||
Nothing -> noop
|
||||
Just pid -> do
|
||||
void $ getProcessStatus True False pid
|
||||
runThreadState st invalidateCache
|
||||
| otherwise = noop
|
||||
|
|
|
@ -44,12 +44,6 @@ shouldTransfer dstatus t = go =<< currentTransfers <$> getDaemonStatus dstatus
|
|||
not <$> inAnnex (transferKey t)
|
||||
| otherwise = return True
|
||||
|
||||
{- Waits for any of the transfers in the map to complete. -}
|
||||
waitTransfer :: IO ()
|
||||
waitTransfer = error "TODO"
|
||||
-- getProcessStatus True False pid
|
||||
-- runThreadState st invalidateCache
|
||||
|
||||
{- A transfer is run in a separate process, with a *copy* of the Annex
|
||||
- state. This is necessary to avoid blocking the rest of the assistant
|
||||
- on the transfer completing, and also to allow multiple transfers to run
|
||||
|
@ -81,4 +75,5 @@ runTransfer st dstatus t info
|
|||
M.insertWith' const t info
|
||||
{ startedTime = Just now
|
||||
, transferPid = Just pid
|
||||
, shouldWait = True
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ statupScan st dstatus scanner = do
|
|||
showAction "scanning"
|
||||
r <- scanner
|
||||
runThreadState st $
|
||||
modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
|
||||
modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
|
||||
|
||||
-- Notice any files that were deleted before watching was started.
|
||||
runThreadState st $ do
|
||||
|
|
|
@ -27,6 +27,7 @@ stubInfo f = TransferInfo
|
|||
, transferRemote = Nothing
|
||||
, bytesComplete = Nothing
|
||||
, associatedFile = f
|
||||
, shouldWait = False
|
||||
}
|
||||
|
||||
{- Adds pending transfers to the end of the queue for some of the known
|
||||
|
|
|
@ -38,6 +38,7 @@ data TransferInfo = TransferInfo
|
|||
, transferRemote :: Maybe Remote
|
||||
, bytesComplete :: Maybe Integer
|
||||
, associatedFile :: Maybe FilePath
|
||||
, shouldWait :: Bool
|
||||
}
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
|
@ -80,6 +81,7 @@ transfer t file a = do
|
|||
<*> pure Nothing -- not 0; transfer may be resuming
|
||||
<*> pure Nothing
|
||||
<*> pure file
|
||||
<*> pure False
|
||||
bracketIO (prep tfile mode info) (cleanup tfile) a
|
||||
where
|
||||
prep tfile mode info = do
|
||||
|
@ -169,6 +171,7 @@ readTransferInfo pid s =
|
|||
<*> pure Nothing
|
||||
<*> pure Nothing
|
||||
<*> pure (if null filename then Nothing else Just filename)
|
||||
<*> pure False
|
||||
_ -> Nothing
|
||||
where
|
||||
(bits, filebits) = splitAt 1 $ lines s
|
||||
|
|
Loading…
Add table
Reference in a new issue