From 71b5ad8398c4d86d5e9b993e175b48f2c5f0861d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 5 Jul 2012 14:34:20 -0600 Subject: [PATCH] wrote transfer thread finally! --- Assistant.hs | 8 ++- Assistant/DaemonStatus.hs | 9 ++- Assistant/Threads/TransferWatcher.hs | 20 +++--- Assistant/Threads/Transferrer.hs | 102 +++++++++++++++++++++++++++ Assistant/TransferQueue.hs | 8 ++- Command/Status.hs | 4 +- Logs/Transfer.hs | 5 +- 7 files changed, 136 insertions(+), 20 deletions(-) create mode 100644 Assistant/Threads/Transferrer.hs diff --git a/Assistant.hs b/Assistant.hs index 82ac2037e3..e751b4ae8c 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -39,9 +39,11 @@ - and maintains the DaemonStatus currentTransfers map. This uses - inotify on .git/annex/transfer/, so there are additional inotify - threads associated with it, too. - - Thread 10: status logger + - Thread 10: transferrer + - Waits for Transfers to be queued and does them. + - Thread 11: status logger - Wakes up periodically and records the daemon's status to disk. - - Thread 11: sanity checker + - Thread 12: sanity checker - Wakes up periodically (rarely) and does sanity checks. - - ThreadState: (MVar) @@ -80,6 +82,7 @@ import Assistant.Threads.Committer import Assistant.Threads.Pusher import Assistant.Threads.Merger import Assistant.Threads.TransferWatcher +import Assistant.Threads.Transferrer import Assistant.Threads.SanityChecker import qualified Utility.Daemon import Utility.LogFile @@ -114,6 +117,7 @@ startDaemon assistant foreground , transferWatcherThread st dstatus , daemonStatusThread st dstatus , sanityCheckerThread st dstatus transferqueue changechan + , transfererThread st dstatus transferqueue , watchThread st dstatus transferqueue changechan ] waitForTermination diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index a3e909904f..40816bb1a7 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -31,12 +31,14 @@ data DaemonStatus = DaemonStatus -- Last time the sanity checker ran , lastSanityCheck :: Maybe POSIXTime -- Currently running file content transfers - , currentTransfers :: M.Map Transfer TransferInfo + , currentTransfers :: TransferMap -- Ordered list of remotes to talk to. , knownRemotes :: [Remote] } deriving (Show) +type TransferMap = M.Map Transfer TransferInfo + type DaemonStatusHandle = MVar DaemonStatus newDaemonStatus :: DaemonStatus @@ -132,3 +134,8 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status) tenMinutes :: Int tenMinutes = 10 * 60 + +{- Mutates the transfer map. -} +adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex () +adjustTransfers dstatus a = modifyDaemonStatus dstatus $ + \s -> s { currentTransfers = a (currentTransfers s) } diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 811b045a82..f18d4e3f86 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -58,21 +58,17 @@ onAdd :: Handler onAdd st dstatus file _ = case parseTransferFile file of Nothing -> noop Just t -> do - minfo <- runThreadState st $ checkTransfer t pid <- getProcessID - case minfo of - Nothing -> noop -- transfer already finished - Just info - | transferPid info == Just pid -> noop - | otherwise -> adjustTransfers st dstatus - (M.insertWith' const t info) + runThreadState st $ go t pid =<< checkTransfer t + where + go _ _ Nothing = noop -- transfer already finished + go t pid (Just info) + | transferPid info == Just pid = noop + | otherwise = adjustTransfers dstatus $ + M.insertWith' const t info {- Called when a transfer information file is removed. -} onDel :: Handler onDel st dstatus file _ = case parseTransferFile file of Nothing -> noop - Just t -> adjustTransfers st dstatus (M.delete t) - -adjustTransfers :: ThreadState -> DaemonStatusHandle -> (M.Map Transfer TransferInfo -> M.Map Transfer TransferInfo) -> IO () -adjustTransfers st dstatus a = runThreadState st $ modifyDaemonStatus dstatus $ - \s -> s { currentTransfers = a (currentTransfers s) } + Just t -> runThreadState st $ adjustTransfers dstatus $ M.delete t diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs new file mode 100644 index 0000000000..0562a607ce --- /dev/null +++ b/Assistant/Threads/Transferrer.hs @@ -0,0 +1,102 @@ +{- git-annex assistant data transferrer thread + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Transferrer where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.TransferQueue +import Logs.Transfer +import Annex.Content +import Annex.BranchState +import Command +import qualified Command.Move + +import Control.Exception as E +import Control.Concurrent +import Data.Time.Clock +import qualified Data.Map as M + +{- Dispatches transfers from the queue. + - + - This is currently very simplistic, and runs only one transfer at a time. + -} +transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO () +transfererThread st dstatus transferqueue = do + mypid <- getProcessID + mytid <- myThreadId + go mypid mytid + where + go mypid mytid = do + (t, info) <- getNextTransfer transferqueue + + now <- getCurrentTime + let info' = info + { startedTime = Just now + , transferPid = Just mypid + , transferThread = Just mytid + } + + ifM (runThreadState st $ shouldtransfer t info') + ( runTransfer st t info' + , noop + ) + go mypid mytid + + -- Check if the transfer is already running, + -- and if not, add it to the TransferMap. + shouldtransfer t info = do + current <- currentTransfers <$> getDaemonStatus dstatus + if M.member t current + then ifM (validtransfer t) + ( do + adjustTransfers dstatus $ + M.insertWith' const t info + return True + , return False + ) + else return False + + validtransfer t + | transferDirection t == Download = + not <$> inAnnex (transferKey t) + | otherwise = return True + +{- A transfer is run in a separate thread, with a *copy* of the Annex + - state. This is necessary to avoid blocking the rest of the assistant + - on the transfer completing, and also to allow multiple transfers to run + - at once. + - + - However, it means that the transfer threads are responsible + - for doing any necessary shutdown cleanups, and that the parent + - thread's cache must be invalidated, as changes may have been made to the + - git-annex branch. + - + - Currently a minimal shutdown is done; the transfer threads are + - effectively running in oneshot mode, without committing changes to the + - git-annex branch, and transfers should never queue git commands to run. + - + - Note: It is unsafe to call getDaemonStatus inside the transfer thread. + -} +runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO () +runTransfer st t info + | transferDirection t == Download = go Command.Move.fromStart + | otherwise = go Command.Move.toStart + where + go cmd = case (transferRemote info, associatedFile info) of + (Nothing, _) -> noop + (_, Nothing) -> noop + (Just remote, Just file) -> + inthread $ void $ doCommand $ + cmd remote False file (transferKey t) + inthread a = do + mvar <- newEmptyMVar + void $ forkIO $ + runThreadState st a `E.finally` putMVar mvar () + void $ takeMVar mvar -- wait for transfer thread + runThreadState st invalidateCache diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index f1f4882bef..a35815ca16 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -25,6 +25,7 @@ stubInfo f = TransferInfo { startedTime = Nothing , transferPid = Nothing , transferThread = Nothing + , transferRemote = Nothing , bytesComplete = Nothing , associatedFile = f } @@ -33,7 +34,7 @@ stubInfo f = TransferInfo - remotes. -} queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () queueTransfers q daemonstatus k f direction = - mapM_ (liftIO . queueTransfer q f . gentransfer) + mapM_ (\r -> queue r $ gentransfer r) =<< sufficientremotes . knownRemotes <$> getDaemonStatus daemonstatus where @@ -53,8 +54,11 @@ queueTransfers q daemonstatus k f direction = gentransfer r = Transfer { transferDirection = direction , transferKey = k - , transferRemote = Remote.uuid r + , transferUUID = Remote.uuid r } + queue r t = liftIO $ void $ atomically $ do + let info = (stubInfo f) { transferRemote = Just r } + writeTChan q (t, info) {- Adds a pending transfer to the end of the queue. -} queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () diff --git a/Command/Status.hs b/Command/Status.hs index eff21bb509..2d63c525c3 100644 --- a/Command/Status.hs +++ b/Command/Status.hs @@ -186,8 +186,8 @@ transfer_list = stat "transfers in progress" $ nojson $ lift $ do [ show (transferDirection t) ++ "ing" , fromMaybe (show $ transferKey t) (associatedFile i) , if transferDirection t == Upload then "to" else "from" - , maybe (fromUUID $ transferRemote t) Remote.name $ - M.lookup (transferRemote t) uuidmap + , maybe (fromUUID $ transferUUID t) Remote.name $ + M.lookup (transferUUID t) uuidmap ] disk_size :: Stat diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs index f808cb6a44..12ab8ff113 100644 --- a/Logs/Transfer.hs +++ b/Logs/Transfer.hs @@ -22,7 +22,7 @@ import Data.Time.Clock - of the transfer information file. -} data Transfer = Transfer { transferDirection :: Direction - , transferRemote :: UUID + , transferUUID :: UUID , transferKey :: Key } deriving (Show, Eq, Ord) @@ -37,6 +37,7 @@ data TransferInfo = TransferInfo { startedTime :: Maybe UTCTime , transferPid :: Maybe ProcessID , transferThread :: Maybe ThreadId + , transferRemote :: Maybe Remote , bytesComplete :: Maybe Integer , associatedFile :: Maybe FilePath } @@ -80,6 +81,7 @@ transfer t file a = do <*> pure Nothing -- pid not stored in file, so omitted for speed <*> pure Nothing -- threadid not stored in file, so omitted for speed <*> pure Nothing -- not 0; transfer may be resuming + <*> pure Nothing <*> pure file bracketIO (prep tfile mode info) (cleanup tfile) a where @@ -170,6 +172,7 @@ readTransferInfo pid s = <*> pure (Just pid) <*> pure Nothing <*> pure Nothing + <*> pure Nothing <*> pure (if null filename then Nothing else Just filename) _ -> Nothing where