wrote transfer thread

finally!
This commit is contained in:
Joey Hess 2012-07-05 14:34:20 -06:00
parent e8df726d07
commit 71b5ad8398
7 changed files with 136 additions and 20 deletions

View file

@ -39,9 +39,11 @@
- and maintains the DaemonStatus currentTransfers map. This uses - and maintains the DaemonStatus currentTransfers map. This uses
- inotify on .git/annex/transfer/, so there are additional inotify - inotify on .git/annex/transfer/, so there are additional inotify
- threads associated with it, too. - threads associated with it, too.
- Thread 10: status logger - Thread 10: transferrer
- Waits for Transfers to be queued and does them.
- Thread 11: status logger
- Wakes up periodically and records the daemon's status to disk. - Wakes up periodically and records the daemon's status to disk.
- Thread 11: sanity checker - Thread 12: sanity checker
- Wakes up periodically (rarely) and does sanity checks. - Wakes up periodically (rarely) and does sanity checks.
- -
- ThreadState: (MVar) - ThreadState: (MVar)
@ -80,6 +82,7 @@ import Assistant.Threads.Committer
import Assistant.Threads.Pusher import Assistant.Threads.Pusher
import Assistant.Threads.Merger import Assistant.Threads.Merger
import Assistant.Threads.TransferWatcher import Assistant.Threads.TransferWatcher
import Assistant.Threads.Transferrer
import Assistant.Threads.SanityChecker import Assistant.Threads.SanityChecker
import qualified Utility.Daemon import qualified Utility.Daemon
import Utility.LogFile import Utility.LogFile
@ -114,6 +117,7 @@ startDaemon assistant foreground
, transferWatcherThread st dstatus , transferWatcherThread st dstatus
, daemonStatusThread st dstatus , daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan , sanityCheckerThread st dstatus transferqueue changechan
, transfererThread st dstatus transferqueue
, watchThread st dstatus transferqueue changechan , watchThread st dstatus transferqueue changechan
] ]
waitForTermination waitForTermination

View file

@ -31,12 +31,14 @@ data DaemonStatus = DaemonStatus
-- Last time the sanity checker ran -- Last time the sanity checker ran
, lastSanityCheck :: Maybe POSIXTime , lastSanityCheck :: Maybe POSIXTime
-- Currently running file content transfers -- Currently running file content transfers
, currentTransfers :: M.Map Transfer TransferInfo , currentTransfers :: TransferMap
-- Ordered list of remotes to talk to. -- Ordered list of remotes to talk to.
, knownRemotes :: [Remote] , knownRemotes :: [Remote]
} }
deriving (Show) deriving (Show)
type TransferMap = M.Map Transfer TransferInfo
type DaemonStatusHandle = MVar DaemonStatus type DaemonStatusHandle = MVar DaemonStatus
newDaemonStatus :: DaemonStatus newDaemonStatus :: DaemonStatus
@ -132,3 +134,8 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int tenMinutes :: Int
tenMinutes = 10 * 60 tenMinutes = 10 * 60
{- Mutates the transfer map. -}
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex ()
adjustTransfers dstatus a = modifyDaemonStatus dstatus $
\s -> s { currentTransfers = a (currentTransfers s) }

View file

@ -58,21 +58,17 @@ onAdd :: Handler
onAdd st dstatus file _ = case parseTransferFile file of onAdd st dstatus file _ = case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> do Just t -> do
minfo <- runThreadState st $ checkTransfer t
pid <- getProcessID pid <- getProcessID
case minfo of runThreadState st $ go t pid =<< checkTransfer t
Nothing -> noop -- transfer already finished where
Just info go _ _ Nothing = noop -- transfer already finished
| transferPid info == Just pid -> noop go t pid (Just info)
| otherwise -> adjustTransfers st dstatus | transferPid info == Just pid = noop
(M.insertWith' const t info) | otherwise = adjustTransfers dstatus $
M.insertWith' const t info
{- Called when a transfer information file is removed. -} {- Called when a transfer information file is removed. -}
onDel :: Handler onDel :: Handler
onDel st dstatus file _ = case parseTransferFile file of onDel st dstatus file _ = case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> adjustTransfers st dstatus (M.delete t) Just t -> runThreadState st $ adjustTransfers dstatus $ M.delete t
adjustTransfers :: ThreadState -> DaemonStatusHandle -> (M.Map Transfer TransferInfo -> M.Map Transfer TransferInfo) -> IO ()
adjustTransfers st dstatus a = runThreadState st $ modifyDaemonStatus dstatus $
\s -> s { currentTransfers = a (currentTransfers s) }

View file

@ -0,0 +1,102 @@
{- git-annex assistant data transferrer thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.Transferrer where
import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.TransferQueue
import Logs.Transfer
import Annex.Content
import Annex.BranchState
import Command
import qualified Command.Move
import Control.Exception as E
import Control.Concurrent
import Data.Time.Clock
import qualified Data.Map as M
{- Dispatches transfers from the queue.
-
- This is currently very simplistic, and runs only one transfer at a time.
-}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO ()
transfererThread st dstatus transferqueue = do
mypid <- getProcessID
mytid <- myThreadId
go mypid mytid
where
go mypid mytid = do
(t, info) <- getNextTransfer transferqueue
now <- getCurrentTime
let info' = info
{ startedTime = Just now
, transferPid = Just mypid
, transferThread = Just mytid
}
ifM (runThreadState st $ shouldtransfer t info')
( runTransfer st t info'
, noop
)
go mypid mytid
-- Check if the transfer is already running,
-- and if not, add it to the TransferMap.
shouldtransfer t info = do
current <- currentTransfers <$> getDaemonStatus dstatus
if M.member t current
then ifM (validtransfer t)
( do
adjustTransfers dstatus $
M.insertWith' const t info
return True
, return False
)
else return False
validtransfer t
| transferDirection t == Download =
not <$> inAnnex (transferKey t)
| otherwise = return True
{- A transfer is run in a separate thread, with a *copy* of the Annex
- state. This is necessary to avoid blocking the rest of the assistant
- on the transfer completing, and also to allow multiple transfers to run
- at once.
-
- However, it means that the transfer threads are responsible
- for doing any necessary shutdown cleanups, and that the parent
- thread's cache must be invalidated, as changes may have been made to the
- git-annex branch.
-
- Currently a minimal shutdown is done; the transfer threads are
- effectively running in oneshot mode, without committing changes to the
- git-annex branch, and transfers should never queue git commands to run.
-
- Note: It is unsafe to call getDaemonStatus inside the transfer thread.
-}
runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO ()
runTransfer st t info
| transferDirection t == Download = go Command.Move.fromStart
| otherwise = go Command.Move.toStart
where
go cmd = case (transferRemote info, associatedFile info) of
(Nothing, _) -> noop
(_, Nothing) -> noop
(Just remote, Just file) ->
inthread $ void $ doCommand $
cmd remote False file (transferKey t)
inthread a = do
mvar <- newEmptyMVar
void $ forkIO $
runThreadState st a `E.finally` putMVar mvar ()
void $ takeMVar mvar -- wait for transfer thread
runThreadState st invalidateCache

View file

@ -25,6 +25,7 @@ stubInfo f = TransferInfo
{ startedTime = Nothing { startedTime = Nothing
, transferPid = Nothing , transferPid = Nothing
, transferThread = Nothing , transferThread = Nothing
, transferRemote = Nothing
, bytesComplete = Nothing , bytesComplete = Nothing
, associatedFile = f , associatedFile = f
} }
@ -33,7 +34,7 @@ stubInfo f = TransferInfo
- remotes. -} - remotes. -}
queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
queueTransfers q daemonstatus k f direction = queueTransfers q daemonstatus k f direction =
mapM_ (liftIO . queueTransfer q f . gentransfer) mapM_ (\r -> queue r $ gentransfer r)
=<< sufficientremotes . knownRemotes =<< sufficientremotes . knownRemotes
<$> getDaemonStatus daemonstatus <$> getDaemonStatus daemonstatus
where where
@ -53,8 +54,11 @@ queueTransfers q daemonstatus k f direction =
gentransfer r = Transfer gentransfer r = Transfer
{ transferDirection = direction { transferDirection = direction
, transferKey = k , transferKey = k
, transferRemote = Remote.uuid r , transferUUID = Remote.uuid r
} }
queue r t = liftIO $ void $ atomically $ do
let info = (stubInfo f) { transferRemote = Just r }
writeTChan q (t, info)
{- Adds a pending transfer to the end of the queue. -} {- Adds a pending transfer to the end of the queue. -}
queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()

View file

@ -186,8 +186,8 @@ transfer_list = stat "transfers in progress" $ nojson $ lift $ do
[ show (transferDirection t) ++ "ing" [ show (transferDirection t) ++ "ing"
, fromMaybe (show $ transferKey t) (associatedFile i) , fromMaybe (show $ transferKey t) (associatedFile i)
, if transferDirection t == Upload then "to" else "from" , if transferDirection t == Upload then "to" else "from"
, maybe (fromUUID $ transferRemote t) Remote.name $ , maybe (fromUUID $ transferUUID t) Remote.name $
M.lookup (transferRemote t) uuidmap M.lookup (transferUUID t) uuidmap
] ]
disk_size :: Stat disk_size :: Stat

View file

@ -22,7 +22,7 @@ import Data.Time.Clock
- of the transfer information file. -} - of the transfer information file. -}
data Transfer = Transfer data Transfer = Transfer
{ transferDirection :: Direction { transferDirection :: Direction
, transferRemote :: UUID , transferUUID :: UUID
, transferKey :: Key , transferKey :: Key
} }
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
@ -37,6 +37,7 @@ data TransferInfo = TransferInfo
{ startedTime :: Maybe UTCTime { startedTime :: Maybe UTCTime
, transferPid :: Maybe ProcessID , transferPid :: Maybe ProcessID
, transferThread :: Maybe ThreadId , transferThread :: Maybe ThreadId
, transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer , bytesComplete :: Maybe Integer
, associatedFile :: Maybe FilePath , associatedFile :: Maybe FilePath
} }
@ -80,6 +81,7 @@ transfer t file a = do
<*> pure Nothing -- pid not stored in file, so omitted for speed <*> pure Nothing -- pid not stored in file, so omitted for speed
<*> pure Nothing -- threadid not stored in file, so omitted for speed <*> pure Nothing -- threadid not stored in file, so omitted for speed
<*> pure Nothing -- not 0; transfer may be resuming <*> pure Nothing -- not 0; transfer may be resuming
<*> pure Nothing
<*> pure file <*> pure file
bracketIO (prep tfile mode info) (cleanup tfile) a bracketIO (prep tfile mode info) (cleanup tfile) a
where where
@ -170,6 +172,7 @@ readTransferInfo pid s =
<*> pure (Just pid) <*> pure (Just pid)
<*> pure Nothing <*> pure Nothing
<*> pure Nothing <*> pure Nothing
<*> pure Nothing
<*> pure (if null filename then Nothing else Just filename) <*> pure (if null filename then Nothing else Just filename)
_ -> Nothing _ -> Nothing
where where