04c12aa6df
Rather than using Read/Show, which would force me to preserve data types into the future. I considered just deriving json and sending that, but I don't much like deriving json with data types that have named constructors (like Key does) because again it locks in data type details. So instead, used SimpleProtocol, with a fairly complex and unreadable protocol. But it is as efficient as the p2p protocol at least, and as future proof. (Writing my own custom json instances would have worked but I thought of it too late and don't want to do all the work twice. The only real benefit might be that aeson could be faster.) Note that, when a new protocol request type is added later, git-annex trying to use it will cause the git-annex transferrer to display a protocol error message. That seems ok; it would only happen if a new git-annex found an old version of itself in PATH or the program file. So it's unlikely, and all it can do anyway is display an error. (The error message could perhaps be improved..) This commit was sponsored by Jack Hill on Patreon.
316 lines
11 KiB
Haskell
316 lines
11 KiB
Haskell
{- git-annex assistant transfer slots
|
|
-
|
|
- Copyright 2012-2020 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE CPP #-}
|
|
|
|
module Assistant.TransferSlots where
|
|
|
|
import Control.Concurrent.STM
|
|
|
|
import Assistant.Common
|
|
import Utility.ThreadScheduler
|
|
import Utility.NotificationBroadcaster
|
|
import Assistant.Types.TransferSlots
|
|
import Assistant.DaemonStatus
|
|
import Annex.TransferrerPool
|
|
import Types.TransferrerPool
|
|
import Assistant.Types.TransferQueue
|
|
import Assistant.TransferQueue
|
|
import Assistant.Alert
|
|
import Assistant.Alert.Utility
|
|
import Assistant.Commits
|
|
import Assistant.Drop
|
|
import Annex.Transfer (stallDetection)
|
|
import Types.Transfer
|
|
import Logs.Transfer
|
|
import Logs.Location
|
|
import qualified Git
|
|
import qualified Annex
|
|
import qualified Remote
|
|
import qualified Types.Remote as Remote
|
|
import Annex.Content
|
|
import Annex.Wanted
|
|
import Annex.Path
|
|
import Utility.Batch
|
|
import Types.NumCopies
|
|
|
|
import Data.Either
|
|
import qualified Data.Map as M
|
|
import qualified Control.Exception as E
|
|
import Control.Concurrent
|
|
import qualified Control.Concurrent.MSemN as MSemN
|
|
#ifndef mingw32_HOST_OS
|
|
import System.Posix.Process (getProcessGroupIDOf)
|
|
import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL)
|
|
#else
|
|
import System.Win32.Process (terminateProcessById)
|
|
#endif
|
|
|
|
type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
|
|
|
|
{- Waits until a transfer slot becomes available, then runs a
|
|
- TransferGenerator, and then runs the transfer action in its own thread.
|
|
-}
|
|
inTransferSlot :: FilePath -> BatchCommandMaker -> TransferGenerator -> Assistant ()
|
|
inTransferSlot program batchmaker gen = do
|
|
flip MSemN.wait 1 <<~ transferSlots
|
|
runTransferThread program batchmaker =<< gen
|
|
|
|
{- Runs a TransferGenerator, and its transfer action,
|
|
- without waiting for a slot to become available. -}
|
|
inImmediateTransferSlot :: FilePath -> BatchCommandMaker -> TransferGenerator -> Assistant ()
|
|
inImmediateTransferSlot program batchmaker gen = do
|
|
flip MSemN.signal (-1) <<~ transferSlots
|
|
runTransferThread program batchmaker =<< gen
|
|
|
|
{- Runs a transfer action, in an already allocated transfer slot.
|
|
- Once it finishes, frees the transfer slot.
|
|
-
|
|
- Note that the action is subject to being killed when the transfer
|
|
- is canceled or paused.
|
|
-
|
|
- A PauseTransfer exception is handled by letting the action be killed,
|
|
- then pausing the thread until a ResumeTransfer exception is raised,
|
|
- then rerunning the action.
|
|
-}
|
|
runTransferThread :: FilePath -> BatchCommandMaker -> Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()) -> Assistant ()
|
|
runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots
|
|
runTransferThread program batchmaker (Just (t, info, a)) = do
|
|
d <- getAssistant id
|
|
mkcheck <- checkNetworkConnections
|
|
<$> getAssistant daemonStatusHandle
|
|
aio <- asIO1 a
|
|
tid <- liftIO $ forkIO $ runTransferThread' mkcheck program batchmaker d aio
|
|
updateTransferInfo t $ info { transferTid = Just tid }
|
|
|
|
runTransferThread' :: MkCheckTransferrer -> FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO ()
|
|
runTransferThread' mkcheck program batchmaker d run = go
|
|
where
|
|
go = catchPauseResume $ do
|
|
p <- runAssistant d $ liftAnnex $
|
|
Annex.getState Annex.transferrerpool
|
|
withTransferrer' True mkcheck program batchmaker p run
|
|
pause = catchPauseResume $
|
|
runEvery (Seconds 86400) noop
|
|
{- Note: This must use E.try, rather than E.catch.
|
|
- When E.catch is used, and has called go in its exception
|
|
- handler, Control.Concurrent.throwTo will block sometimes
|
|
- when signaling. Using E.try avoids the problem. -}
|
|
catchPauseResume a' = do
|
|
r <- E.try a' :: IO (Either E.SomeException ())
|
|
case r of
|
|
Left e -> case E.fromException e of
|
|
Just PauseTransfer -> pause
|
|
Just ResumeTransfer -> go
|
|
_ -> done
|
|
_ -> done
|
|
done = runAssistant d $
|
|
flip MSemN.signal 1 <<~ transferSlots
|
|
|
|
{- By the time this is called, the daemonstatus's currentTransfers map should
|
|
- already have been updated to include the transfer. -}
|
|
genTransfer :: Transfer -> TransferInfo -> TransferGenerator
|
|
genTransfer t info = case transferRemote info of
|
|
Just remote -> ifM (unpluggedremovabledrive remote)
|
|
( do
|
|
-- optimisation, since the transfer would fail
|
|
liftAnnex $ recordFailedTransfer t info
|
|
void $ removeTransfer t
|
|
return Nothing
|
|
, ifM (liftAnnex $ shouldTransfer t info)
|
|
( do
|
|
debug [ "Transferring:" , describeTransfer t info ]
|
|
notifyTransfer
|
|
sd <- liftAnnex $ stallDetection remote
|
|
return $ Just (t, info, go remote sd)
|
|
, do
|
|
debug [ "Skipping unnecessary transfer:",
|
|
describeTransfer t info ]
|
|
void $ removeTransfer t
|
|
finishedTransfer t (Just info)
|
|
return Nothing
|
|
)
|
|
)
|
|
_ -> return Nothing
|
|
where
|
|
direction = transferDirection t
|
|
isdownload = direction == Download
|
|
|
|
unpluggedremovabledrive remote = Git.repoIsLocalUnknown
|
|
<$> liftAnnex (Remote.getRepo remote)
|
|
|
|
{- Alerts are only shown for successful transfers.
|
|
- Transfers can temporarily fail for many reasons,
|
|
- so there's no point in bothering the user about
|
|
- those. The assistant should recover.
|
|
-
|
|
- After a successful upload, handle dropping it from
|
|
- here, if desired. In this case, the remote it was
|
|
- uploaded to is known to have it.
|
|
-
|
|
- Also, after a successful transfer, the location
|
|
- log has changed. Indicate that a commit has been
|
|
- made, in order to queue a push of the git-annex
|
|
- branch out to remotes that did not participate
|
|
- in the transfer.
|
|
-
|
|
- If the process failed, it could have crashed,
|
|
- so remove the transfer from the list of current
|
|
- transfers, just in case it didn't stop
|
|
- in a way that lets the TransferWatcher do its
|
|
- usual cleanup. However, first check if something else is
|
|
- running the transfer, to avoid removing active transfers.
|
|
-}
|
|
go remote sd transferrer = ifM (isRight <$> performTransfer sd AssistantLevel liftAnnex (transferRemote info) t info transferrer)
|
|
( do
|
|
case associatedFile info of
|
|
AssociatedFile Nothing -> noop
|
|
AssociatedFile (Just af) -> void $
|
|
addAlert $ makeAlertFiller True $
|
|
transferFileAlert direction True (fromRawFilePath af)
|
|
unless isdownload $
|
|
handleDrops
|
|
("object uploaded to " ++ show remote)
|
|
True (transferKey t)
|
|
(associatedFile info)
|
|
[mkVerifiedCopy RecentlyVerifiedCopy remote]
|
|
void recordCommit
|
|
, whenM (liftAnnex $ isNothing <$> checkTransfer t) $
|
|
void $ removeTransfer t
|
|
)
|
|
|
|
{- Called right before a transfer begins, this is a last chance to avoid
|
|
- unnecessary transfers.
|
|
-
|
|
- For downloads, we obviously don't need to download if the already
|
|
- have the object.
|
|
-
|
|
- Smilarly, for uploads, check if the remote is known to already have
|
|
- the object.
|
|
-
|
|
- Also, uploads get queued to all remotes, in order of cost.
|
|
- This may mean, for example, that an object is uploaded over the LAN
|
|
- to a locally paired client, and once that upload is done, a more
|
|
- expensive transfer remote no longer wants the object. (Since
|
|
- all the clients have it already.) So do one last check if this is still
|
|
- preferred content.
|
|
-
|
|
- We'll also do one last preferred content check for downloads. An
|
|
- example of a case where this could be needed is if a download is queued
|
|
- for a file that gets moved out of an archive directory -- but before
|
|
- that download can happen, the file is put back in the archive.
|
|
-}
|
|
shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
|
|
shouldTransfer t info
|
|
| transferDirection t == Download =
|
|
(not <$> inAnnex key) <&&> wantGet True (Just key) file
|
|
| transferDirection t == Upload = case transferRemote info of
|
|
Nothing -> return False
|
|
Just r -> notinremote r
|
|
<&&> wantSend True (Just key) file (Remote.uuid r)
|
|
| otherwise = return False
|
|
where
|
|
key = transferKey t
|
|
file = associatedFile info
|
|
|
|
{- Trust the location log to check if the remote already has
|
|
- the key. This avoids a roundtrip to the remote. -}
|
|
notinremote r = notElem (Remote.uuid r) <$> loggedLocations key
|
|
|
|
{- Queue uploads of files downloaded to us, spreading them
|
|
- out to other reachable remotes.
|
|
-
|
|
- Downloading a file may have caused a remote to not want it;
|
|
- so check for drops from remotes.
|
|
-
|
|
- Uploading a file may cause the local repo, or some other remote to not
|
|
- want it; handle that too.
|
|
-}
|
|
finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
|
|
finishedTransfer t (Just info)
|
|
| transferDirection t == Download =
|
|
whenM (liftAnnex $ inAnnex $ transferKey t) $ do
|
|
dodrops False
|
|
void $ queueTransfersMatching (/= transferUUID t)
|
|
"newly received object"
|
|
Later (transferKey t) (associatedFile info) Upload
|
|
| otherwise = dodrops True
|
|
where
|
|
dodrops fromhere = handleDrops
|
|
("drop wanted after " ++ describeTransfer t info)
|
|
fromhere (transferKey t) (associatedFile info) []
|
|
finishedTransfer _ _ = noop
|
|
|
|
{- Pause a running transfer. -}
|
|
pauseTransfer :: Transfer -> Assistant ()
|
|
pauseTransfer = cancelTransfer True
|
|
|
|
{- Cancel a running transfer. -}
|
|
cancelTransfer :: Bool -> Transfer -> Assistant ()
|
|
cancelTransfer pause t = do
|
|
m <- getCurrentTransfers
|
|
unless pause $
|
|
{- remove queued transfer -}
|
|
void $ dequeueTransfers $ equivilantTransfer t
|
|
{- stop running transfer -}
|
|
maybe noop stop (M.lookup t m)
|
|
where
|
|
stop info = do
|
|
{- When there's a thread associated with the
|
|
- transfer, it's signaled first, to avoid it
|
|
- displaying any alert about the transfer having
|
|
- failed when the transfer process is killed. -}
|
|
liftIO $ maybe noop signalthread $ transferTid info
|
|
liftIO $ maybe noop killproc $ transferPid info
|
|
if pause
|
|
then void $ alterTransferInfo t $
|
|
\i -> i { transferPaused = True }
|
|
else void $ removeTransfer t
|
|
signalthread tid
|
|
| pause = throwTo tid PauseTransfer
|
|
| otherwise = killThread tid
|
|
killproc pid = void $ tryIO $ do
|
|
#ifndef mingw32_HOST_OS
|
|
{- In order to stop helper processes like rsync,
|
|
- kill the whole process group of the process
|
|
- running the transfer. -}
|
|
g <- getProcessGroupIDOf pid
|
|
let signal sig = void $ tryIO $ signalProcessGroup sig g
|
|
signal sigTERM
|
|
threadDelay 50000 -- 0.05 second grace period
|
|
signal sigKILL
|
|
#else
|
|
terminateProcessById pid
|
|
#endif
|
|
|
|
{- Start or resume a transfer. -}
|
|
startTransfer :: Transfer -> Assistant ()
|
|
startTransfer t = do
|
|
m <- getCurrentTransfers
|
|
maybe startqueued go (M.lookup t m)
|
|
where
|
|
go info = maybe (start info) resume $ transferTid info
|
|
startqueued = do
|
|
is <- map snd <$> getMatchingTransfers (== t)
|
|
maybe noop start $ headMaybe is
|
|
resume tid = do
|
|
alterTransferInfo t $ \i -> i { transferPaused = False }
|
|
liftIO $ throwTo tid ResumeTransfer
|
|
start info = do
|
|
program <- liftIO programPath
|
|
batchmaker <- liftIO getBatchCommandMaker
|
|
inImmediateTransferSlot program batchmaker $
|
|
genTransfer t info
|
|
|
|
getCurrentTransfers :: Assistant TransferMap
|
|
getCurrentTransfers = currentTransfers <$> getDaemonStatus
|
|
|
|
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
|
|
checkNetworkConnections dstatushandle = do
|
|
dstatus <- atomically $ readTVar dstatushandle
|
|
h <- newNotificationHandle False (networkConnectedNotifier dstatus)
|
|
return $ not <$> checkNotification h
|