run file transfers in threads, not processes

This should fix OSX/BSD issues with not noticing transfer information
files with kqueue. Now that threads are used, the thread can manage the
transfer slot allocation and deallocation by itself; much cleaner.
This commit is contained in:
Joey Hess 2012-07-18 19:13:56 -04:00
parent eea0a3616c
commit cf47bb3f50
7 changed files with 29 additions and 53 deletions

View file

@ -123,7 +123,7 @@ startDaemon assistant foreground
, pushThread st dstatus commitchan pushmap , pushThread st dstatus commitchan pushmap
, pushRetryThread st pushmap , pushRetryThread st pushmap
, mergeThread st , mergeThread st
, transferWatcherThread st dstatus transferslots , transferWatcherThread st dstatus
, transfererThread st dstatus transferqueue transferslots , transfererThread st dstatus transferqueue transferslots
, daemonStatusThread st dstatus , daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan , sanityCheckerThread st dstatus transferqueue changechan

View file

@ -37,14 +37,13 @@ withThreadState a = do
runThreadState :: ThreadState -> Annex a -> IO a runThreadState :: ThreadState -> Annex a -> IO a
runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
{- Runs an Annex action in a separate thread, using a copy of the state {- Runs an Annex action, using a copy of the state from the MVar.
- from the MVar.
- -
- It's up to the action to perform any necessary shutdown tasks in order - It's up to the action to perform any necessary shutdown tasks in order
- for state to not be lost. And it's up to the caller to resynchronise - for state to not be lost. And it's up to the caller to resynchronise
- with any changes the action makes to eg, the git-annex branch. - with any changes the action makes to eg, the git-annex branch.
-} -}
unsafeForkIOThreadState :: ThreadState -> Annex a -> IO ThreadId unsafeRunThreadState :: ThreadState -> Annex a -> IO ()
unsafeForkIOThreadState mvar a = do unsafeRunThreadState mvar a = do
state <- readMVar mvar state <- readMVar mvar
forkIO $ void $ Annex.eval state a void $ Annex.eval state a

View file

@ -10,23 +10,20 @@ module Assistant.Threads.TransferWatcher where
import Common.Annex import Common.Annex
import Assistant.ThreadedMonad import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.TransferSlots
import Logs.Transfer import Logs.Transfer
import Utility.DirWatcher import Utility.DirWatcher
import Utility.Types.DirWatcher import Utility.Types.DirWatcher
import Annex.BranchState
import Data.Map as M import Data.Map as M
import System.Posix.Process
{- This thread watches for changes to the gitAnnexTransferDir, {- This thread watches for changes to the gitAnnexTransferDir,
- and updates the DaemonStatus's map of ongoing transfers. -} - and updates the DaemonStatus's map of ongoing transfers. -}
transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferSlots -> IO () transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
transferWatcherThread st dstatus transferslots = do transferWatcherThread st dstatus = do
g <- runThreadState st $ fromRepo id g <- runThreadState st $ fromRepo id
let dir = gitAnnexTransferDir g let dir = gitAnnexTransferDir g
createDirectoryIfMissing True dir createDirectoryIfMissing True dir
let hook a = Just $ runHandler st dstatus transferslots a let hook a = Just $ runHandler st dstatus a
let hooks = mkWatchHooks let hooks = mkWatchHooks
{ addHook = hook onAdd { addHook = hook onAdd
, delHook = hook onDel , delHook = hook onDel
@ -34,51 +31,36 @@ transferWatcherThread st dstatus transferslots = do
} }
void $ watchDir dir (const False) hooks id void $ watchDir dir (const False) hooks id
type Handler = ThreadState -> DaemonStatusHandle -> TransferSlots -> FilePath -> Maybe FileStatus -> IO () type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO ()
{- Runs an action handler. {- Runs an action handler.
- -
- Exceptions are ignored, otherwise a whole thread could be crashed. - Exceptions are ignored, otherwise a whole thread could be crashed.
-} -}
runHandler :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Handler -> FilePath -> Maybe FileStatus -> IO () runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO ()
runHandler st dstatus transferslots handler file filestatus = void $ do runHandler st dstatus handler file filestatus = void $ do
either print (const noop) =<< tryIO go either print (const noop) =<< tryIO go
where where
go = handler st dstatus transferslots file filestatus go = handler st dstatus file filestatus
{- Called when there's an error with inotify. -} {- Called when there's an error with inotify. -}
onErr :: Handler onErr :: Handler
onErr _ _ _ msg _ = error msg onErr _ _ msg _ = error msg
{- Called when a new transfer information file is written. -} {- Called when a new transfer information file is written. -}
onAdd :: Handler onAdd :: Handler
onAdd st dstatus _ file _ = case parseTransferFile file of onAdd st dstatus file _ = case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> runThreadState st $ go t =<< checkTransfer t Just t -> runThreadState st $ go t =<< checkTransfer t
where where
go _ Nothing = noop -- transfer already finished go _ Nothing = noop -- transfer already finished
go t (Just info) = adjustTransfers dstatus $ go t (Just info) = adjustTransfers dstatus $
M.insertWith' merge t info M.insertWith' merge t info
-- preseve shouldWait flag, which is not written to disk -- preseve transferTid, which is not written to disk
merge new old = new { shouldWait = shouldWait old } merge new old = new { transferTid = transferTid old }
{- Called when a transfer information file is removed. {- Called when a transfer information file is removed. -}
-
- When the transfer process is a child of this process, wait on it
- to avoid zombies.
-}
onDel :: Handler onDel :: Handler
onDel st dstatus transferslots file _ = case parseTransferFile file of onDel st dstatus file _ = case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> maybe noop waitchild Just t -> void $ runThreadState st $ removeTransfer dstatus t
=<< runThreadState st (removeTransfer dstatus t)
where
waitchild info
| shouldWait info = case transferPid info of
Nothing -> noop
Just pid -> do
void $ tryIO $
getProcessStatus True False pid
runThreadState st invalidateCache
transferComplete transferslots
| otherwise = noop

View file

@ -74,9 +74,8 @@ runTransfer st dstatus slots t info = case (transferRemote info, associatedFile
(Nothing, _) -> noop (Nothing, _) -> noop
(_, Nothing) -> noop (_, Nothing) -> noop
(Just remote, Just file) -> do (Just remote, Just file) -> do
tid <- inTransferSlot slots $ tid <- inTransferSlot slots st $
unsafeForkIOThreadState st $ transferprocess remote file
transferprocess remote file
now <- getCurrentTime now <- getCurrentTime
runThreadState st $ adjustTransfers dstatus $ runThreadState st $ adjustTransfers dstatus $
M.insertWith' const t info M.insertWith' const t info

View file

@ -28,7 +28,6 @@ stubInfo f = TransferInfo
, transferRemote = Nothing , transferRemote = Nothing
, bytesComplete = Nothing , bytesComplete = Nothing
, associatedFile = f , associatedFile = f
, shouldWait = False
} }
{- Adds pending transfers to the end of the queue for some of the known {- Adds pending transfers to the end of the queue for some of the known

View file

@ -10,6 +10,9 @@ module Assistant.TransferSlots where
import Control.Exception import Control.Exception
import Control.Concurrent import Control.Concurrent
import Common.Annex
import Assistant.ThreadedMonad
type TransferSlots = QSemN type TransferSlots = QSemN
{- Number of concurrent transfers allowed to be run from the assistant. {- Number of concurrent transfers allowed to be run from the assistant.
@ -24,16 +27,13 @@ newTransferSlots :: IO TransferSlots
newTransferSlots = newQSemN numSlots newTransferSlots = newQSemN numSlots
{- Waits until a transfer slot becomes available, and runs a transfer {- Waits until a transfer slot becomes available, and runs a transfer
- action in the slot. If the action throws an exception, its slot is - action in the slot, in its own thread. -}
- freed here, otherwise it should be freed by the TransferWatcher when inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId
- the transfer is complete. inTransferSlot s st a = forkIO $ bracket_ start done run
-}
inTransferSlot :: TransferSlots -> IO a -> IO a
inTransferSlot s a = bracketOnError start abort run
where where
start = waitQSemN s 1 start = waitQSemN s 1
abort = const $ transferComplete s done = transferComplete s
run = const a run = unsafeRunThreadState st a
{- Call when a transfer is complete. -} {- Call when a transfer is complete. -}
transferComplete :: TransferSlots -> IO () transferComplete :: TransferSlots -> IO ()

View file

@ -43,7 +43,6 @@ data TransferInfo = TransferInfo
, transferRemote :: Maybe Remote , transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer , bytesComplete :: Maybe Integer
, associatedFile :: Maybe FilePath , associatedFile :: Maybe FilePath
, shouldWait :: Bool
} }
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
@ -87,7 +86,6 @@ transfer t file a = do
<*> pure Nothing -- not 0; transfer may be resuming <*> pure Nothing -- not 0; transfer may be resuming
<*> pure Nothing <*> pure Nothing
<*> pure file <*> pure file
<*> pure False
bracketIO (prep tfile mode info) (cleanup tfile) a bracketIO (prep tfile mode info) (cleanup tfile) a
where where
prep tfile mode info = do prep tfile mode info = do
@ -180,7 +178,6 @@ readTransferInfo pid s =
<*> pure Nothing <*> pure Nothing
<*> pure Nothing <*> pure Nothing
<*> pure (if null filename then Nothing else Just filename) <*> pure (if null filename then Nothing else Just filename)
<*> pure False
_ -> Nothing _ -> Nothing
where where
(bits, filebits) = splitAt 1 $ lines s (bits, filebits) = splitAt 1 $ lines s