maintain pools of running transferkeys processes (untested)

This commit is contained in:
Joey Hess 2013-03-19 18:46:29 -04:00
parent ef3221181d
commit b6d691aff7
7 changed files with 178 additions and 86 deletions

View file

@ -35,6 +35,7 @@ import Assistant.Types.DaemonStatus
import Assistant.Types.ScanRemotes
import Assistant.Types.TransferQueue
import Assistant.Types.TransferSlots
import Assistant.Types.TransferrerPool
import Assistant.Types.Pushes
import Assistant.Types.BranchChange
import Assistant.Types.Commits
@ -62,6 +63,7 @@ data AssistantData = AssistantData
, scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue
, transferSlots :: TransferSlots
, transferrerPool :: TransferrerPool
, failedPushMap :: FailedPushMap
, commitChan :: CommitChan
, changeChan :: ChangeChan
@ -78,6 +80,7 @@ newAssistantData st dstatus = AssistantData
<*> newScanRemoteMap
<*> newTransferQueue
<*> newTransferSlots
<*> newTransferrerPool
<*> newFailedPushMap
<*> newCommitChan
<*> newChangeChan

View file

@ -14,25 +14,23 @@ import Assistant.TransferSlots
import Assistant.Alert
import Assistant.Commits
import Assistant.Drop
import Assistant.TransferrerPool
import Logs.Transfer
import Logs.Location
import Annex.Content
import qualified Remote
import qualified Types.Remote as Remote
import qualified Git
import Types.Key
import Locations.UserConfig
import Assistant.Threads.TransferWatcher
import Annex.Wanted
import System.Process (create_group)
{- Dispatches transfers from the queue. -}
transfererThread :: NamedThread
transfererThread = namedThread "Transferrer" $ do
program <- liftIO readProgramFile
forever $ inTransferSlot $
maybe (return Nothing) (uncurry $ startTransfer program)
forever $ inTransferSlot program $
maybe (return Nothing) (uncurry $ genTransfer)
=<< getNextTransfer notrunning
where
{- Skip transfers that are already running. -}
@ -40,12 +38,8 @@ transfererThread = namedThread "Transferrer" $ do
{- By the time this is called, the daemonstatus's currentTransfers map should
- already have been updated to include the transfer. -}
startTransfer
:: FilePath
-> Transfer
-> TransferInfo
-> Assistant (Maybe (Transfer, TransferInfo, Assistant ()))
startTransfer program t info = case (transferRemote info, associatedFile info) of
genTransfer :: Transfer -> TransferInfo -> TransferGenerator
genTransfer t info = case (transferRemote info, associatedFile info) of
(Just remote, Just file)
| Git.repoIsLocalUnknown (Remote.repo remote) -> do
-- optimisation for removable drives not plugged in
@ -56,7 +50,7 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o
( do
debug [ "Transferring:" , describeTransfer t info ]
notifyTransfer
return $ Just (t, info, transferprocess remote file)
return $ Just (t, info, go remote file)
, do
debug [ "Skipping unnecessary transfer:",
describeTransfer t info ]
@ -69,57 +63,40 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o
direction = transferDirection t
isdownload = direction == Download
transferprocess remote file = void $ do
(_, _, _, pid)
<- liftIO $ createProcess
(proc program $ toCommand params)
{ create_group = True }
{- Alerts are only shown for successful transfers.
- Transfers can temporarily fail for many reasons,
- so there's no point in bothering the user about
- those. The assistant should recover.
-
- After a successful upload, handle dropping it from
- here, if desired. In this case, the remote it was
- uploaded to is known to have it.
-
- Also, after a successful transfer, the location
- log has changed. Indicate that a commit has been
- made, in order to queue a push of the git-annex
- branch out to remotes that did not participate
- in the transfer.
-
- If the process failed, it could have crashed,
- so remove the transfer from the list of current
- transfers, just in case it didn't stop
- in a way that lets the TransferWatcher do its
- usual cleanup.
-}
ifM (liftIO $ (==) ExitSuccess <$> waitForProcess pid)
( do
void $ addAlert $ makeAlertFiller True $
transferFileAlert direction True file
unless isdownload $
handleDrops
("object uploaded to " ++ show remote)
True (transferKey t)
(associatedFile info)
(Just remote)
recordCommit
, void $ removeTransfer t
)
where
params =
[ Param "transferkey"
, Param "--quiet"
, Param $ key2file $ transferKey t
, Param $ if isdownload
then "--from"
else "--to"
, Param $ Remote.name remote
, Param "--file"
, File file
]
{- Alerts are only shown for successful transfers.
- Transfers can temporarily fail for many reasons,
- so there's no point in bothering the user about
- those. The assistant should recover.
-
- After a successful upload, handle dropping it from
- here, if desired. In this case, the remote it was
- uploaded to is known to have it.
-
- Also, after a successful transfer, the location
- log has changed. Indicate that a commit has been
- made, in order to queue a push of the git-annex
- branch out to remotes that did not participate
- in the transfer.
-
- If the process failed, it could have crashed,
- so remove the transfer from the list of current
- transfers, just in case it didn't stop
- in a way that lets the TransferWatcher do its
- usual cleanup.
-}
go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
( do
void $ addAlert $ makeAlertFiller True $
transferFileAlert direction True file
unless isdownload $
handleDrops
("object uploaded to " ++ show remote)
True (transferKey t)
(associatedFile info)
(Just remote)
void $ recordCommit
, void $ removeTransfer t
)
{- Called right before a transfer begins, this is a last chance to avoid
- unnecessary transfers.

View file

@ -11,28 +11,30 @@ import Assistant.Common
import Utility.ThreadScheduler
import Assistant.Types.TransferSlots
import Assistant.DaemonStatus
import Assistant.TransferrerPool
import Assistant.Types.TransferrerPool
import Logs.Transfer
import qualified Control.Exception as E
import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Assistant ()))
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
{- Waits until a transfer slot becomes available, then runs a
- TransferGenerator, and then runs the transfer action in its own thread.
-}
inTransferSlot :: TransferGenerator -> Assistant ()
inTransferSlot gen = do
inTransferSlot :: FilePath -> TransferGenerator -> Assistant ()
inTransferSlot program gen = do
flip MSemN.wait 1 <<~ transferSlots
runTransferThread =<< gen
runTransferThread program =<< gen
{- Runs a TransferGenerator, and its transfer action,
- without waiting for a slot to become available. -}
inImmediateTransferSlot :: TransferGenerator -> Assistant ()
inImmediateTransferSlot gen = do
inImmediateTransferSlot :: FilePath -> TransferGenerator -> Assistant ()
inImmediateTransferSlot program gen = do
flip MSemN.signal (-1) <<~ transferSlots
runTransferThread =<< gen
runTransferThread program =<< gen
{- Runs a transfer action, in an already allocated transfer slot.
- Once it finishes, frees the transfer slot.
@ -44,19 +46,22 @@ inImmediateTransferSlot gen = do
- then pausing the thread until a ResumeTransfer exception is raised,
- then rerunning the action.
-}
runTransferThread :: Maybe (Transfer, TransferInfo, Assistant ()) -> Assistant ()
runTransferThread Nothing = flip MSemN.signal 1 <<~ transferSlots
runTransferThread (Just (t, info, a)) = do
runTransferThread :: FilePath -> Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()) -> Assistant ()
runTransferThread _ Nothing = flip MSemN.signal 1 <<~ transferSlots
runTransferThread program (Just (t, info, a)) = do
d <- getAssistant id
aio <- asIO a
tid <- liftIO $ forkIO $ runTransferThread' d aio
aio <- asIO1 a
tid <- liftIO $ forkIO $ runTransferThread' program d aio
updateTransferInfo t $ info { transferTid = Just tid }
runTransferThread' :: AssistantData -> IO () -> IO ()
runTransferThread' d a = go
runTransferThread' :: FilePath -> AssistantData -> (Transferrer -> IO ()) -> IO ()
runTransferThread' program d run = go
where
go = catchPauseResume a
pause = catchPauseResume $ runEvery (Seconds 86400) noop
go = catchPauseResume $
withTransferrer program (transferrerPool d)
run
pause = catchPauseResume $
runEvery (Seconds 86400) noop
{- Note: This must use E.try, rather than E.catch.
- When E.catch is used, and has called go in its exception
- handler, Control.Concurrent.throwTo will block sometimes

View file

@ -0,0 +1,81 @@
{- A pool of "git-annex transferkeys" processes
-
- Copyright 2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.TransferrerPool where
import Assistant.Common
import Assistant.Types.TransferrerPool
import Logs.Transfer
import qualified Command.TransferKeys as T
import Control.Concurrent.STM
import System.Process (create_group)
import Control.Exception (throw)
import Control.Concurrent
import Types.Remote (AssociatedFile)
{- Runs an action with a Transferrer from the pool. -}
withTransferrer :: FilePath -> TransferrerPool -> (Transferrer -> IO a) -> IO a
withTransferrer program pool a = do
t <- maybe (mkTransferrer program) (checkTransferrer program)
=<< atomically (tryReadTChan pool)
v <- tryNonAsync $ a t
unlessM (putback t) $
void $ forkIO $ stopTransferrer t
either throw return v
where
putback t = atomically $ ifM (isEmptyTChan pool)
( do
writeTChan pool t
return True
, return False
)
{- Requests that a Transferrer perform a Transfer, and waits for it to
- finish. -}
performTransfer :: Transferrer -> Transfer -> AssociatedFile -> IO Bool
performTransfer transferrer t f = catchBoolIO $ do
T.sendRequest t f (transferrerWrite transferrer)
T.readResponse (transferrerRead transferrer)
{- Starts a new git-annex transferkeys process, setting up a pipe
- that will be used to communicate with it. -}
mkTransferrer :: FilePath -> IO Transferrer
mkTransferrer program = do
(myread, twrite) <- createPipe
(tread, mywrite) <- createPipe
mapM_ (\fd -> setFdOption fd CloseOnExec True) [myread, mywrite]
let params =
[ Param "transferkeys"
, Param "--readfd", Param $ show tread
, Param "--writefd", Param $ show twrite
]
{- It's put into its own group so that the whole group can be
- killed to stop a transfer. -}
(_, _, _, pid) <- createProcess (proc program $ toCommand params)
{ create_group = True }
closeFd twrite
closeFd tread
myreadh <- fdToHandle myread
mywriteh <- fdToHandle mywrite
return $ Transferrer
{ transferrerRead = myreadh
, transferrerWrite = mywriteh
, transferrerHandle = pid
}
{- Checks if a Transferrer is still running. If not, makes a new one. -}
checkTransferrer :: FilePath -> Transferrer -> IO Transferrer
checkTransferrer program t = maybe (return t) (const $ mkTransferrer program)
=<< getProcessExitCode (transferrerHandle t)
{- Closing the fds will stop the transferrer. -}
stopTransferrer :: Transferrer -> IO ()
stopTransferrer t = do
hClose $ transferrerRead t
hClose $ transferrerWrite t
void $ waitForProcess $ transferrerHandle t

View file

@ -0,0 +1,23 @@
{- A pool of "git-annex transferkeys" processes
-
- Copyright 2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Types.TransferrerPool where
import Common.Annex
import Control.Concurrent.STM
type TransferrerPool = TChan Transferrer
data Transferrer = Transferrer
{ transferrerRead :: Handle
, transferrerWrite :: Handle
, transferrerHandle :: ProcessHandle
}
newTransferrerPool :: IO TransferrerPool
newTransferrerPool = newTChanIO

View file

@ -117,8 +117,8 @@ startTransfer t = do
liftIO $ throwTo tid ResumeTransfer
start info = liftAssistant $ do
program <- liftIO readProgramFile
inImmediateTransferSlot $
Transferrer.startTransfer program t info
inImmediateTransferSlot program $
Transferrer.genTransfer t info
getCurrentTransfers :: Handler TransferMap
getCurrentTransfers = currentTransfers <$> liftAssistant getDaemonStatus

View file

@ -86,16 +86,19 @@ runRequests readh writeh a = go =<< readrequests
hPutStrLn writeh $ serialize b
hFlush writeh
sendRequest :: TransferRequest -> Handle -> IO ()
sendRequest (TransferRequest d r k f) h = do
sendRequest :: Transfer -> AssociatedFile -> Handle -> IO ()
sendRequest t f h = do
hPutStr h $ join fieldSep
[ serialize d
, serialize $ Remote.uuid r
, serialize k
[ serialize (transferDirection t)
, serialize (transferUUID t)
, serialize (transferKey t)
, serialize f
]
hFlush h
readResponse :: Handle -> IO Bool
readResponse h = fromMaybe False . deserialize <$> hGetLine h
fieldSep :: String
fieldSep = "\0"