3266ad3ff7
However, the test suite fails some quickchecks, so this branch is not yet in a mergeable state.
300 lines
10 KiB
Haskell
300 lines
10 KiB
Haskell
{- git-annex assistant transfer slots
|
|
-
|
|
- Copyright 2012 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE CPP #-}
|
|
|
|
module Assistant.TransferSlots where
|
|
|
|
import Assistant.Common
|
|
import Utility.ThreadScheduler
|
|
import Assistant.Types.TransferSlots
|
|
import Assistant.DaemonStatus
|
|
import Assistant.TransferrerPool
|
|
import Assistant.Types.TransferrerPool
|
|
import Assistant.Types.TransferQueue
|
|
import Assistant.TransferQueue
|
|
import Assistant.Alert
|
|
import Assistant.Alert.Utility
|
|
import Assistant.Commits
|
|
import Assistant.Drop
|
|
import Types.Transfer
|
|
import Logs.Transfer
|
|
import Logs.Location
|
|
import qualified Git
|
|
import qualified Remote
|
|
import qualified Types.Remote as Remote
|
|
import Annex.Content
|
|
import Annex.Wanted
|
|
import Annex.Path
|
|
import Utility.Batch
|
|
import Types.NumCopies
|
|
|
|
import qualified Data.Map as M
|
|
import qualified Control.Exception as E
|
|
import Control.Concurrent
|
|
import qualified Control.Concurrent.MSemN as MSemN
|
|
#ifndef mingw32_HOST_OS
|
|
import System.Posix.Process (getProcessGroupIDOf)
|
|
import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL)
|
|
#else
|
|
import System.Win32.Process (terminateProcessById)
|
|
#endif
|
|
|
|
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
|
|
|
|
{- Waits until a transfer slot becomes available, then runs a
|
|
- TransferGenerator, and then runs the transfer action in its own thread.
|
|
-}
|
|
inTransferSlot :: FilePath -> BatchCommandMaker -> TransferGenerator -> Assistant ()
|
|
inTransferSlot program batchmaker gen = do
|
|
flip MSemN.wait 1 <<~ transferSlots
|
|
runTransferThread program batchmaker =<< gen
|
|
|
|
{- Runs a TransferGenerator, and its transfer action,
|
|
- without waiting for a slot to become available. -}
|
|
inImmediateTransferSlot :: FilePath -> BatchCommandMaker -> TransferGenerator -> Assistant ()
|
|
inImmediateTransferSlot program batchmaker gen = do
|
|
flip MSemN.signal (-1) <<~ transferSlots
|
|
runTransferThread program batchmaker =<< gen
|
|
|
|
{- Runs a transfer action, in an already allocated transfer slot.
|
|
- Once it finishes, frees the transfer slot.
|
|
-
|
|
- Note that the action is subject to being killed when the transfer
|
|
- is canceled or paused.
|
|
-
|
|
- A PauseTransfer exception is handled by letting the action be killed,
|
|
- then pausing the thread until a ResumeTransfer exception is raised,
|
|
- then rerunning the action.
|
|
-}
|
|
runTransferThread :: FilePath -> BatchCommandMaker -> Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()) -> Assistant ()
|
|
runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots
|
|
runTransferThread program batchmaker (Just (t, info, a)) = do
|
|
d <- getAssistant id
|
|
aio <- asIO1 a
|
|
tid <- liftIO $ forkIO $ runTransferThread' program batchmaker d aio
|
|
updateTransferInfo t $ info { transferTid = Just tid }
|
|
|
|
runTransferThread' :: FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO ()
|
|
runTransferThread' program batchmaker d run = go
|
|
where
|
|
go = catchPauseResume $
|
|
withTransferrer program batchmaker (transferrerPool d)
|
|
run
|
|
pause = catchPauseResume $
|
|
runEvery (Seconds 86400) noop
|
|
{- Note: This must use E.try, rather than E.catch.
|
|
- When E.catch is used, and has called go in its exception
|
|
- handler, Control.Concurrent.throwTo will block sometimes
|
|
- when signaling. Using E.try avoids the problem. -}
|
|
catchPauseResume a' = do
|
|
r <- E.try a' :: IO (Either E.SomeException ())
|
|
case r of
|
|
Left e -> case E.fromException e of
|
|
Just PauseTransfer -> pause
|
|
Just ResumeTransfer -> go
|
|
_ -> done
|
|
_ -> done
|
|
done = runAssistant d $
|
|
flip MSemN.signal 1 <<~ transferSlots
|
|
|
|
{- By the time this is called, the daemonstatus's currentTransfers map should
|
|
- already have been updated to include the transfer. -}
|
|
genTransfer :: Transfer -> TransferInfo -> TransferGenerator
|
|
genTransfer t info = case transferRemote info of
|
|
Just remote -> ifM (unpluggedremovabledrive remote)
|
|
( do
|
|
-- optimisation, since the transfer would fail
|
|
liftAnnex $ recordFailedTransfer t info
|
|
void $ removeTransfer t
|
|
return Nothing
|
|
, ifM (liftAnnex $ shouldTransfer t info)
|
|
( do
|
|
debug [ "Transferring:" , describeTransfer t info ]
|
|
notifyTransfer
|
|
return $ Just (t, info, go remote)
|
|
, do
|
|
debug [ "Skipping unnecessary transfer:",
|
|
describeTransfer t info ]
|
|
void $ removeTransfer t
|
|
finishedTransfer t (Just info)
|
|
return Nothing
|
|
)
|
|
)
|
|
_ -> return Nothing
|
|
where
|
|
direction = transferDirection t
|
|
isdownload = direction == Download
|
|
|
|
unpluggedremovabledrive remote = Git.repoIsLocalUnknown
|
|
<$> liftAnnex (Remote.getRepo remote)
|
|
|
|
{- Alerts are only shown for successful transfers.
|
|
- Transfers can temporarily fail for many reasons,
|
|
- so there's no point in bothering the user about
|
|
- those. The assistant should recover.
|
|
-
|
|
- After a successful upload, handle dropping it from
|
|
- here, if desired. In this case, the remote it was
|
|
- uploaded to is known to have it.
|
|
-
|
|
- Also, after a successful transfer, the location
|
|
- log has changed. Indicate that a commit has been
|
|
- made, in order to queue a push of the git-annex
|
|
- branch out to remotes that did not participate
|
|
- in the transfer.
|
|
-
|
|
- If the process failed, it could have crashed,
|
|
- so remove the transfer from the list of current
|
|
- transfers, just in case it didn't stop
|
|
- in a way that lets the TransferWatcher do its
|
|
- usual cleanup. However, first check if something else is
|
|
- running the transfer, to avoid removing active transfers.
|
|
-}
|
|
go remote transferrer = ifM (liftIO $ performTransfer transferrer t info)
|
|
( do
|
|
case associatedFile info of
|
|
AssociatedFile Nothing -> noop
|
|
AssociatedFile (Just af) -> void $
|
|
addAlert $ makeAlertFiller True $
|
|
transferFileAlert direction True (fromRawFilePath af)
|
|
unless isdownload $
|
|
handleDrops
|
|
("object uploaded to " ++ show remote)
|
|
True (transferKey t)
|
|
(associatedFile info)
|
|
[mkVerifiedCopy RecentlyVerifiedCopy remote]
|
|
void recordCommit
|
|
, whenM (liftAnnex $ isNothing <$> checkTransfer t) $
|
|
void $ removeTransfer t
|
|
)
|
|
|
|
{- Called right before a transfer begins, this is a last chance to avoid
|
|
- unnecessary transfers.
|
|
-
|
|
- For downloads, we obviously don't need to download if the already
|
|
- have the object.
|
|
-
|
|
- Smilarly, for uploads, check if the remote is known to already have
|
|
- the object.
|
|
-
|
|
- Also, uploads get queued to all remotes, in order of cost.
|
|
- This may mean, for example, that an object is uploaded over the LAN
|
|
- to a locally paired client, and once that upload is done, a more
|
|
- expensive transfer remote no longer wants the object. (Since
|
|
- all the clients have it already.) So do one last check if this is still
|
|
- preferred content.
|
|
-
|
|
- We'll also do one last preferred content check for downloads. An
|
|
- example of a case where this could be needed is if a download is queued
|
|
- for a file that gets moved out of an archive directory -- but before
|
|
- that download can happen, the file is put back in the archive.
|
|
-}
|
|
shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
|
|
shouldTransfer t info
|
|
| transferDirection t == Download =
|
|
(not <$> inAnnex key) <&&> wantGet True (Just key) file
|
|
| transferDirection t == Upload = case transferRemote info of
|
|
Nothing -> return False
|
|
Just r -> notinremote r
|
|
<&&> wantSend True (Just key) file (Remote.uuid r)
|
|
| otherwise = return False
|
|
where
|
|
key = transferKey t
|
|
file = associatedFile info
|
|
|
|
{- Trust the location log to check if the remote already has
|
|
- the key. This avoids a roundtrip to the remote. -}
|
|
notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
|
|
|
|
{- Queue uploads of files downloaded to us, spreading them
|
|
- out to other reachable remotes.
|
|
-
|
|
- Downloading a file may have caused a remote to not want it;
|
|
- so check for drops from remotes.
|
|
-
|
|
- Uploading a file may cause the local repo, or some other remote to not
|
|
- want it; handle that too.
|
|
-}
|
|
finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
|
|
finishedTransfer t (Just info)
|
|
| transferDirection t == Download =
|
|
whenM (liftAnnex $ inAnnex $ transferKey t) $ do
|
|
dodrops False
|
|
void $ queueTransfersMatching (/= transferUUID t)
|
|
"newly received object"
|
|
Later (transferKey t) (associatedFile info) Upload
|
|
| otherwise = dodrops True
|
|
where
|
|
dodrops fromhere = handleDrops
|
|
("drop wanted after " ++ describeTransfer t info)
|
|
fromhere (transferKey t) (associatedFile info) []
|
|
finishedTransfer _ _ = noop
|
|
|
|
{- Pause a running transfer. -}
|
|
pauseTransfer :: Transfer -> Assistant ()
|
|
pauseTransfer = cancelTransfer True
|
|
|
|
{- Cancel a running transfer. -}
|
|
cancelTransfer :: Bool -> Transfer -> Assistant ()
|
|
cancelTransfer pause t = do
|
|
m <- getCurrentTransfers
|
|
unless pause $
|
|
{- remove queued transfer -}
|
|
void $ dequeueTransfers $ equivilantTransfer t
|
|
{- stop running transfer -}
|
|
maybe noop stop (M.lookup t m)
|
|
where
|
|
stop info = do
|
|
{- When there's a thread associated with the
|
|
- transfer, it's signaled first, to avoid it
|
|
- displaying any alert about the transfer having
|
|
- failed when the transfer process is killed. -}
|
|
liftIO $ maybe noop signalthread $ transferTid info
|
|
liftIO $ maybe noop killproc $ transferPid info
|
|
if pause
|
|
then void $ alterTransferInfo t $
|
|
\i -> i { transferPaused = True }
|
|
else void $ removeTransfer t
|
|
signalthread tid
|
|
| pause = throwTo tid PauseTransfer
|
|
| otherwise = killThread tid
|
|
killproc pid = void $ tryIO $ do
|
|
#ifndef mingw32_HOST_OS
|
|
{- In order to stop helper processes like rsync,
|
|
- kill the whole process group of the process
|
|
- running the transfer. -}
|
|
g <- getProcessGroupIDOf pid
|
|
let signal sig = void $ tryIO $ signalProcessGroup sig g
|
|
signal sigTERM
|
|
threadDelay 50000 -- 0.05 second grace period
|
|
signal sigKILL
|
|
#else
|
|
terminateProcessById pid
|
|
#endif
|
|
|
|
{- Start or resume a transfer. -}
|
|
startTransfer :: Transfer -> Assistant ()
|
|
startTransfer t = do
|
|
m <- getCurrentTransfers
|
|
maybe startqueued go (M.lookup t m)
|
|
where
|
|
go info = maybe (start info) resume $ transferTid info
|
|
startqueued = do
|
|
is <- map snd <$> getMatchingTransfers (== t)
|
|
maybe noop start $ headMaybe is
|
|
resume tid = do
|
|
alterTransferInfo t $ \i -> i { transferPaused = False }
|
|
liftIO $ throwTo tid ResumeTransfer
|
|
start info = do
|
|
program <- liftIO programPath
|
|
batchmaker <- liftIO getBatchCommandMaker
|
|
inImmediateTransferSlot program batchmaker $
|
|
genTransfer t info
|
|
|
|
getCurrentTransfers :: Assistant TransferMap
|
|
getCurrentTransfers = currentTransfers <$> getDaemonStatus
|