unfinished (and unbuildable) work toward separate transfer processes

This commit is contained in:
Joey Hess 2012-07-05 18:57:06 -06:00
parent 0c563c39df
commit a92f5589fc
3 changed files with 63 additions and 69 deletions

View file

@ -31,14 +31,15 @@
- them. - them.
- Thread 8: merger - Thread 8: merger
- Waits for pushes to be received from remotes, and merges the - Waits for pushes to be received from remotes, and merges the
- updated branches into the current branch. This uses inotify - updated branches into the current branch.
- on .git/refs/heads, so there are additional inotify threads - (This uses inotify on .git/refs/heads, so there are additional
- associated with it, too. - inotify threads associated with it, too.)
- Thread 9: transfer watcher - Thread 9: transfer watcher
- Watches for transfer information files being created and removed, - Watches for transfer information files being created and removed,
- and maintains the DaemonStatus currentTransfers map. This uses - and maintains the DaemonStatus currentTransfers map and the
- inotify on .git/annex/transfer/, so there are additional inotify - TransferSlots QSemN.
- threads associated with it, too. - (This uses inotify on .git/annex/transfer/, so there are
- additional inotify threads associated with it, too.)
- Thread 10: transferrer - Thread 10: transferrer
- Waits for Transfers to be queued and does them. - Waits for Transfers to be queued and does them.
- Thread 11: status logger - Thread 11: status logger
@ -66,6 +67,12 @@
- retrier blocks until they're available. - retrier blocks until they're available.
- TransferQueue (STM TChan) - TransferQueue (STM TChan)
- Transfers to make are indicated by writing to this channel. - Transfers to make are indicated by writing to this channel.
- TransferSlots (QSemN)
- Count of the number of currently available transfer slots.
- Updated by the transfer watcher, this allows other threads
- to block until a slot is available.
- This MVar should only be manipulated from inside the Annex monad,
- which ensures it's accessed only after the ThreadState MVar.
-} -}
module Assistant where module Assistant where
@ -109,15 +116,16 @@ startDaemon assistant foreground
commitchan <- newCommitChan commitchan <- newCommitChan
pushmap <- newFailedPushMap pushmap <- newFailedPushMap
transferqueue <- newTransferQueue transferqueue <- newTransferQueue
transferslots <- newTransferSlots
mapM_ (void . forkIO) mapM_ (void . forkIO)
[ commitThread st changechan commitchan transferqueue dstatus [ commitThread st changechan commitchan transferqueue dstatus
, pushThread st dstatus commitchan pushmap , pushThread st dstatus commitchan pushmap
, pushRetryThread st pushmap , pushRetryThread st pushmap
, mergeThread st , mergeThread st
, transferWatcherThread st dstatus , transferWatcherThread st dstatus transferslots
, transfererThread st dstatus transferqueue transferslots
, daemonStatusThread st dstatus , daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan , sanityCheckerThread st dstatus transferqueue changechan
, transfererThread st dstatus transferqueue
, watchThread st dstatus transferqueue changechan , watchThread st dstatus transferqueue changechan
] ]
waitForTermination waitForTermination

View file

@ -14,6 +14,7 @@ import Assistant.TransferQueue
import Logs.Transfer import Logs.Transfer
import Annex.Content import Annex.Content
import Annex.BranchState import Annex.BranchState
import Utility.ThreadScheduler
import Command import Command
import qualified Command.Move import qualified Command.Move
@ -22,68 +23,58 @@ import Control.Concurrent
import Data.Time.Clock import Data.Time.Clock
import qualified Data.Map as M import qualified Data.Map as M
{- Dispatches transfers from the queue. {- For now only one transfer is run at a time. -}
- maxTransfers :: Int
- This is currently very simplistic, and runs only one transfer at a time. maxTransfers = 1
-}
{- Dispatches transfers from the queue. -}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO () transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO ()
transfererThread st dstatus transferqueue = do transfererThread st dstatus transferqueue = runEvery (Seconds 1) $ do
mypid <- getProcessID (t, info) <- getNextTransfer transferqueue
mytid <- myThreadId go =<< runThreadState st $ shouldTransfer t
go mypid mytid
where where
go mypid mytid = do go Yes = runTransfer st t
(t, info) <- getNextTransfer transferqueue go No = noop
go TooMany = waitTransfer >> go Yes
now <- getCurrentTime data ShouldTransfer = Yes | Skip | TooMany
let info' = info
{ startedTime = Just now
, transferPid = Just mypid
, transferThread = Just mytid
}
ifM (runThreadState st $ shouldtransfer t info') {- Checks if the requested transfer is already running, or
( runTransfer st t info' - the file to download is already present.
, noop -
) - There also may be too many transfers already running to service this
go mypid mytid - transfer yet. -}
shouldTransfer :: DaemonStatusHandle -> Transfer -> Annex ShouldTransfer
-- Check if the transfer is already running, shouldTransfer dstatus t = go =<< currentTransfers <$> getDaemonStatus dstatus
-- and if not, add it to the TransferMap. where
shouldtransfer t info = do go m
current <- currentTransfers <$> getDaemonStatus dstatus | M.member t m = return Skip
if M.member t current | M.size m > maxTransfers = return TooMany
then return False
else ifM (validtransfer t)
( do
adjustTransfers dstatus $
M.insertWith' const t info
return True
, return False
)
validtransfer t
| transferDirection t == Download = | transferDirection t == Download =
not <$> inAnnex (transferKey t) ifM (inAnnex $ transferKey t) (No, Yes)
| otherwise = return True | otherwise = return Yes
{- A transfer is run in a separate thread, with a *copy* of the Annex {- Waits for any of the transfers in the map to complete. -}
waitTransfer :: IO ()
waitTransfer = error "TODO"
-- getProcessStatus True False pid
-- runThreadState st invalidateCache
{- A transfer is run in a separate process, with a *copy* of the Annex
- state. This is necessary to avoid blocking the rest of the assistant - state. This is necessary to avoid blocking the rest of the assistant
- on the transfer completing, and also to allow multiple transfers to run - on the transfer completing, and also to allow multiple transfers to run
- at once. - at once.
- -
- However, it means that the transfer threads are responsible - However, it means that the transfer processes are responsible
- for doing any necessary shutdown cleanups, and that the parent - for doing any necessary shutdown cleanups, and that the parent
- thread's cache must be invalidated, as changes may have been made to the - thread's cache must be invalidated once a transfer completes, as
- git-annex branch. - changes may have been made to the git-annex branch.
- -
- Currently a minimal shutdown is done; the transfer threads are - Currently a minimal shutdown is done; the transfer processes are
- effectively running in oneshot mode, without committing changes to the - effectively running in oneshot mode, without committing changes to the
- git-annex branch, and transfers should never queue git commands to run. - git-annex branch, and transfers should never queue git commands to run.
-
- Note: It is unsafe to call getDaemonStatus inside the transfer thread.
-} -}
runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO () runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO ProcessID
runTransfer st t info runTransfer st t info
| transferDirection t == Download = go Command.Move.fromStart | transferDirection t == Download = go Command.Move.fromStart
| otherwise = go Command.Move.toStart | otherwise = go Command.Move.toStart
@ -91,12 +82,12 @@ runTransfer st t info
go cmd = case (transferRemote info, associatedFile info) of go cmd = case (transferRemote info, associatedFile info) of
(Nothing, _) -> noop (Nothing, _) -> noop
(_, Nothing) -> noop (_, Nothing) -> noop
(Just remote, Just file) -> (Just remote, Just file) -> do
inthread $ void $ doCommand $ now <- getCurrentTime
cmd remote False file (transferKey t) pid <- forkProcess $ unsafeRunThreadState st $
inthread a = do doCommand $ cmd remote False file (transferKey t)
mvar <- newEmptyMVar adjustTransfers dstatus $
void $ forkIO $ M.insertWith' const t info
unsafeRunThreadState st a `E.finally` putMVar mvar () { startedTime = Just now
void $ takeMVar mvar -- wait for transfer thread , transferPid = Just pid
runThreadState st invalidateCache }

View file

@ -14,7 +14,6 @@ import qualified Git
import Types.Remote import Types.Remote
import qualified Fields import qualified Fields
import Control.Concurrent
import System.Posix.Types import System.Posix.Types
import Data.Time.Clock import Data.Time.Clock
@ -36,7 +35,6 @@ data Transfer = Transfer
data TransferInfo = TransferInfo data TransferInfo = TransferInfo
{ startedTime :: Maybe UTCTime { startedTime :: Maybe UTCTime
, transferPid :: Maybe ProcessID , transferPid :: Maybe ProcessID
, transferThread :: Maybe ThreadId
, transferRemote :: Maybe Remote , transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer , bytesComplete :: Maybe Integer
, associatedFile :: Maybe FilePath , associatedFile :: Maybe FilePath
@ -79,7 +77,6 @@ transfer t file a = do
info <- liftIO $ TransferInfo info <- liftIO $ TransferInfo
<$> (Just <$> getCurrentTime) <$> (Just <$> getCurrentTime)
<*> pure Nothing -- pid not stored in file, so omitted for speed <*> pure Nothing -- pid not stored in file, so omitted for speed
<*> pure Nothing -- threadid not stored in file, so omitted for speed
<*> pure Nothing -- not 0; transfer may be resuming <*> pure Nothing -- not 0; transfer may be resuming
<*> pure Nothing <*> pure Nothing
<*> pure file <*> pure file
@ -158,7 +155,6 @@ writeTransferInfo :: TransferInfo -> String
writeTransferInfo info = unlines writeTransferInfo info = unlines
-- transferPid is not included; instead obtained by looking at -- transferPid is not included; instead obtained by looking at
-- the process that locks the file. -- the process that locks the file.
-- transferThread is not included; not relevant for other processes
[ show $ startedTime info [ show $ startedTime info
-- bytesComplete is not included; changes too fast -- bytesComplete is not included; changes too fast
, fromMaybe "" $ associatedFile info -- comes last; arbitrary content , fromMaybe "" $ associatedFile info -- comes last; arbitrary content
@ -172,7 +168,6 @@ readTransferInfo pid s =
<*> pure (Just pid) <*> pure (Just pid)
<*> pure Nothing <*> pure Nothing
<*> pure Nothing <*> pure Nothing
<*> pure Nothing
<*> pure (if null filename then Nothing else Just filename) <*> pure (if null filename then Nothing else Just filename)
_ -> Nothing _ -> Nothing
where where