bf86b5ca16
Made fromDirect check that a file in the tree has good content (and is not a broken symlink either) before copying it to another file that has the same key. Made replaceFile clean up the temp file if the action that creates it, or the file replacement action fails.
379 lines
12 KiB
Haskell
379 lines
12 KiB
Haskell
{- git-annex transfer information files and lock files
|
|
-
|
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
|
-
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE CPP #-}
|
|
|
|
module Logs.Transfer where
|
|
|
|
import Common.Annex
|
|
import Annex.Perms
|
|
import Annex.Exception
|
|
import qualified Git
|
|
import Types.Remote
|
|
import Types.Key
|
|
import Utility.Metered
|
|
import Utility.Percentage
|
|
import Utility.QuickCheck
|
|
|
|
import System.Posix.Types
|
|
import Data.Time.Clock
|
|
import Data.Time.Clock.POSIX
|
|
import Data.Time
|
|
import System.Locale
|
|
import Control.Concurrent
|
|
|
|
{- Enough information to uniquely identify a transfer, used as the filename
|
|
- of the transfer information file. -}
|
|
data Transfer = Transfer
|
|
{ transferDirection :: Direction
|
|
, transferUUID :: UUID
|
|
, transferKey :: Key
|
|
}
|
|
deriving (Eq, Ord, Read, Show)
|
|
|
|
{- Information about a Transfer, stored in the transfer information file.
|
|
-
|
|
- Note that the associatedFile may not correspond to a file in the local
|
|
- git repository. It's some file, possibly relative to some directory,
|
|
- of some repository, that was acted on to initiate the transfer.
|
|
-}
|
|
data TransferInfo = TransferInfo
|
|
{ startedTime :: Maybe POSIXTime
|
|
, transferPid :: Maybe ProcessID
|
|
, transferTid :: Maybe ThreadId
|
|
, transferRemote :: Maybe Remote
|
|
, bytesComplete :: Maybe Integer
|
|
, associatedFile :: Maybe FilePath
|
|
, transferPaused :: Bool
|
|
}
|
|
deriving (Show, Eq, Ord)
|
|
|
|
stubTransferInfo :: TransferInfo
|
|
stubTransferInfo = TransferInfo Nothing Nothing Nothing Nothing Nothing Nothing False
|
|
|
|
data Direction = Upload | Download
|
|
deriving (Eq, Ord, Read, Show)
|
|
|
|
showLcDirection :: Direction -> String
|
|
showLcDirection Upload = "upload"
|
|
showLcDirection Download = "download"
|
|
|
|
readLcDirection :: String -> Maybe Direction
|
|
readLcDirection "upload" = Just Upload
|
|
readLcDirection "download" = Just Download
|
|
readLcDirection _ = Nothing
|
|
|
|
describeTransfer :: Transfer -> TransferInfo -> String
|
|
describeTransfer t info = unwords
|
|
[ show $ transferDirection t
|
|
, show $ transferUUID t
|
|
, fromMaybe (key2file $ transferKey t) (associatedFile info)
|
|
, show $ bytesComplete info
|
|
]
|
|
|
|
{- Transfers that will accomplish the same task. -}
|
|
equivilantTransfer :: Transfer -> Transfer -> Bool
|
|
equivilantTransfer t1 t2
|
|
| transferDirection t1 == Download && transferDirection t2 == Download &&
|
|
transferKey t1 == transferKey t2 = True
|
|
| otherwise = t1 == t2
|
|
|
|
percentComplete :: Transfer -> TransferInfo -> Maybe Percentage
|
|
percentComplete (Transfer { transferKey = key }) info =
|
|
percentage <$> keySize key <*> Just (fromMaybe 0 $ bytesComplete info)
|
|
|
|
type RetryDecider = TransferInfo -> TransferInfo -> Bool
|
|
|
|
noRetry :: RetryDecider
|
|
noRetry _ _ = False
|
|
|
|
{- Retries a transfer when it fails, as long as the failed transfer managed
|
|
- to send some data. -}
|
|
forwardRetry :: RetryDecider
|
|
forwardRetry old new = bytesComplete old < bytesComplete new
|
|
|
|
upload :: UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex Bool) -> Annex Bool
|
|
upload u key = runTransfer (Transfer Upload u key)
|
|
|
|
download :: UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex Bool) -> Annex Bool
|
|
download u key = runTransfer (Transfer Download u key)
|
|
|
|
{- Runs a transfer action. Creates and locks the lock file while the
|
|
- action is running, and stores info in the transfer information
|
|
- file. Will throw an error if the transfer is already in progress.
|
|
-
|
|
- If the transfer action returns False, the transfer info is
|
|
- left in the failedTransferDir.
|
|
-
|
|
- An upload can be run from a read-only filesystem, and in this case
|
|
- no transfer information or lock file is used.
|
|
-}
|
|
runTransfer :: Transfer -> Maybe FilePath -> RetryDecider -> (MeterUpdate -> Annex Bool) -> Annex Bool
|
|
runTransfer t file shouldretry a = do
|
|
info <- liftIO $ startTransferInfo file
|
|
(meter, tfile, metervar) <- mkProgressUpdater t info
|
|
mode <- annexFileMode
|
|
fd <- liftIO $ prep tfile mode info
|
|
ok <- retry info metervar $
|
|
bracketIO (return fd) (cleanup tfile) (const $ a meter)
|
|
unless ok $ recordFailedTransfer t info
|
|
return ok
|
|
where
|
|
prep tfile mode info = do
|
|
#ifndef __WINDOWS__
|
|
mfd <- catchMaybeIO $
|
|
openFd (transferLockFile tfile) ReadWrite (Just mode)
|
|
defaultFileFlags { trunc = True }
|
|
case mfd of
|
|
Nothing -> return mfd
|
|
Just fd -> do
|
|
locked <- catchMaybeIO $
|
|
setLock fd (WriteLock, AbsoluteSeek, 0, 0)
|
|
when (isNothing locked) $
|
|
error "transfer already in progress"
|
|
void $ tryIO $ writeTransferInfoFile info tfile
|
|
return mfd
|
|
#else
|
|
catchMaybeIO $ do
|
|
writeFile (transferLockFile tfile) ""
|
|
writeTransferInfoFile info tfile
|
|
#endif
|
|
cleanup _ Nothing = noop
|
|
cleanup tfile (Just fd) = do
|
|
void $ tryIO $ removeFile tfile
|
|
void $ tryIO $ removeFile $ transferLockFile tfile
|
|
#ifndef __WINDOWS__
|
|
closeFd fd
|
|
#endif
|
|
retry oldinfo metervar run = do
|
|
v <- tryAnnex run
|
|
case v of
|
|
Right b -> return b
|
|
Left _ -> do
|
|
b <- getbytescomplete metervar
|
|
let newinfo = oldinfo { bytesComplete = Just b }
|
|
if shouldretry oldinfo newinfo
|
|
then retry newinfo metervar run
|
|
else return False
|
|
getbytescomplete metervar
|
|
| transferDirection t == Upload =
|
|
liftIO $ readMVar metervar
|
|
| otherwise = do
|
|
f <- fromRepo $ gitAnnexTmpLocation (transferKey t)
|
|
liftIO $ catchDefaultIO 0 $
|
|
fromIntegral . fileSize <$> getFileStatus f
|
|
|
|
{- Generates a callback that can be called as transfer progresses to update
|
|
- the transfer info file. Also returns the file it'll be updating, and a
|
|
- MVar that can be used to read the number of bytesComplete. -}
|
|
mkProgressUpdater :: Transfer -> TransferInfo -> Annex (MeterUpdate, FilePath, MVar Integer)
|
|
mkProgressUpdater t info = do
|
|
tfile <- fromRepo $ transferFile t
|
|
_ <- tryAnnex $ createAnnexDirectory $ takeDirectory tfile
|
|
mvar <- liftIO $ newMVar 0
|
|
return (liftIO . updater tfile mvar, tfile, mvar)
|
|
where
|
|
updater tfile mvar b = modifyMVar_ mvar $ \oldbytes -> do
|
|
let newbytes = fromBytesProcessed b
|
|
if newbytes - oldbytes >= mindelta
|
|
then do
|
|
let info' = info { bytesComplete = Just newbytes }
|
|
_ <- tryIO $ writeTransferInfoFile info' tfile
|
|
return newbytes
|
|
else return oldbytes
|
|
{- The minimum change in bytesComplete that is worth
|
|
- updating a transfer info file for is 1% of the total
|
|
- keySize, rounded down. -}
|
|
mindelta = case keySize (transferKey t) of
|
|
Just sz -> sz `div` 100
|
|
Nothing -> 100 * 1024 -- arbitrarily, 100 kb
|
|
|
|
startTransferInfo :: Maybe FilePath -> IO TransferInfo
|
|
startTransferInfo file = TransferInfo
|
|
<$> (Just . utcTimeToPOSIXSeconds <$> getCurrentTime)
|
|
<*> pure Nothing -- pid not stored in file, so omitted for speed
|
|
<*> pure Nothing -- tid ditto
|
|
<*> pure Nothing -- not 0; transfer may be resuming
|
|
<*> pure Nothing
|
|
<*> pure file
|
|
<*> pure False
|
|
|
|
{- If a transfer is still running, returns its TransferInfo. -}
|
|
checkTransfer :: Transfer -> Annex (Maybe TransferInfo)
|
|
checkTransfer t = do
|
|
tfile <- fromRepo $ transferFile t
|
|
#ifndef __WINDOWS__
|
|
mode <- annexFileMode
|
|
mfd <- liftIO $ catchMaybeIO $
|
|
openFd (transferLockFile tfile) ReadOnly (Just mode) defaultFileFlags
|
|
case mfd of
|
|
Nothing -> return Nothing -- failed to open file; not running
|
|
Just fd -> do
|
|
locked <- liftIO $
|
|
getLock fd (WriteLock, AbsoluteSeek, 0, 0)
|
|
liftIO $ closeFd fd
|
|
case locked of
|
|
Nothing -> return Nothing
|
|
Just (pid, _) -> liftIO $ catchDefaultIO Nothing $
|
|
readTransferInfoFile (Just pid) tfile
|
|
#else
|
|
ifM (liftIO $ doesFileExist $ transferLockFile tfile)
|
|
( liftIO $ catchDefaultIO Nothing $
|
|
readTransferInfoFile Nothing tfile
|
|
, return Nothing
|
|
)
|
|
#endif
|
|
|
|
{- Gets all currently running transfers. -}
|
|
getTransfers :: Annex [(Transfer, TransferInfo)]
|
|
getTransfers = do
|
|
transfers <- mapMaybe parseTransferFile . concat <$> findfiles
|
|
infos <- mapM checkTransfer transfers
|
|
return $ map (\(t, Just i) -> (t, i)) $
|
|
filter running $ zip transfers infos
|
|
where
|
|
findfiles = liftIO . mapM dirContentsRecursive
|
|
=<< mapM (fromRepo . transferDir) [Download, Upload]
|
|
running (_, i) = isJust i
|
|
|
|
{- Gets failed transfers for a given remote UUID. -}
|
|
getFailedTransfers :: UUID -> Annex [(Transfer, TransferInfo)]
|
|
getFailedTransfers u = catMaybes <$> (liftIO . getpairs =<< concat <$> findfiles)
|
|
where
|
|
getpairs = mapM $ \f -> do
|
|
let mt = parseTransferFile f
|
|
mi <- readTransferInfoFile Nothing f
|
|
return $ case (mt, mi) of
|
|
(Just t, Just i) -> Just (t, i)
|
|
_ -> Nothing
|
|
findfiles = liftIO . mapM dirContentsRecursive
|
|
=<< mapM (fromRepo . failedTransferDir u) [Download, Upload]
|
|
|
|
removeFailedTransfer :: Transfer -> Annex ()
|
|
removeFailedTransfer t = do
|
|
f <- fromRepo $ failedTransferFile t
|
|
liftIO $ void $ tryIO $ removeFile f
|
|
|
|
recordFailedTransfer :: Transfer -> TransferInfo -> Annex ()
|
|
recordFailedTransfer t info = do
|
|
failedtfile <- fromRepo $ failedTransferFile t
|
|
createAnnexDirectory $ takeDirectory failedtfile
|
|
liftIO $ writeTransferInfoFile info failedtfile
|
|
|
|
{- The transfer information file to use for a given Transfer. -}
|
|
transferFile :: Transfer -> Git.Repo -> FilePath
|
|
transferFile (Transfer direction u key) r = transferDir direction r
|
|
</> filter (/= '/') (fromUUID u)
|
|
</> keyFile key
|
|
|
|
{- The transfer information file to use to record a failed Transfer -}
|
|
failedTransferFile :: Transfer -> Git.Repo -> FilePath
|
|
failedTransferFile (Transfer direction u key) r = failedTransferDir u direction r
|
|
</> keyFile key
|
|
|
|
{- The transfer lock file corresponding to a given transfer info file. -}
|
|
transferLockFile :: FilePath -> FilePath
|
|
transferLockFile infofile = let (d,f) = splitFileName infofile in
|
|
combine d ("lck." ++ f)
|
|
|
|
{- Parses a transfer information filename to a Transfer. -}
|
|
parseTransferFile :: FilePath -> Maybe Transfer
|
|
parseTransferFile file
|
|
| "lck." `isPrefixOf` takeFileName file = Nothing
|
|
| otherwise = case drop (length bits - 3) bits of
|
|
[direction, u, key] -> Transfer
|
|
<$> readLcDirection direction
|
|
<*> pure (toUUID u)
|
|
<*> fileKey key
|
|
_ -> Nothing
|
|
where
|
|
bits = splitDirectories file
|
|
|
|
writeTransferInfoFile :: TransferInfo -> FilePath -> IO ()
|
|
writeTransferInfoFile info tfile = do
|
|
h <- openFile tfile WriteMode
|
|
fileEncoding h
|
|
hPutStr h $ writeTransferInfo info
|
|
hClose h
|
|
|
|
{- File format is a header line containing the startedTime and any
|
|
- bytesComplete value. Followed by a newline and the associatedFile.
|
|
-
|
|
- The transferPid is not included; instead it is obtained by looking
|
|
- at the process that locks the file.
|
|
-}
|
|
writeTransferInfo :: TransferInfo -> String
|
|
writeTransferInfo info = unlines
|
|
[ (maybe "" show $ startedTime info) ++
|
|
(maybe "" (\b -> ' ' : show b) (bytesComplete info))
|
|
, fromMaybe "" $ associatedFile info -- comes last; arbitrary content
|
|
]
|
|
|
|
readTransferInfoFile :: Maybe ProcessID -> FilePath -> IO (Maybe TransferInfo)
|
|
readTransferInfoFile mpid tfile = catchDefaultIO Nothing $ do
|
|
h <- openFile tfile ReadMode
|
|
fileEncoding h
|
|
hClose h `after` (readTransferInfo mpid <$> hGetContentsStrict h)
|
|
|
|
readTransferInfo :: Maybe ProcessID -> String -> Maybe TransferInfo
|
|
readTransferInfo mpid s = TransferInfo
|
|
<$> time
|
|
<*> pure mpid
|
|
<*> pure Nothing
|
|
<*> pure Nothing
|
|
<*> bytes
|
|
<*> pure (if null filename then Nothing else Just filename)
|
|
<*> pure False
|
|
where
|
|
(firstline, rest) = separate (== '\n') s
|
|
filename
|
|
| end rest == "\n" = beginning rest
|
|
| otherwise = rest
|
|
bits = split " " firstline
|
|
numbits = length bits
|
|
time = if numbits > 0
|
|
then Just <$> parsePOSIXTime =<< headMaybe bits
|
|
else pure Nothing -- not failure
|
|
bytes = if numbits > 1
|
|
then Just <$> readish =<< headMaybe (drop 1 bits)
|
|
else pure Nothing -- not failure
|
|
|
|
parsePOSIXTime :: String -> Maybe POSIXTime
|
|
parsePOSIXTime s = utcTimeToPOSIXSeconds
|
|
<$> parseTime defaultTimeLocale "%s%Qs" s
|
|
|
|
{- The directory holding transfer information files for a given Direction. -}
|
|
transferDir :: Direction -> Git.Repo -> FilePath
|
|
transferDir direction r = gitAnnexTransferDir r </> showLcDirection direction
|
|
|
|
{- The directory holding failed transfer information files for a given
|
|
- Direction and UUID -}
|
|
failedTransferDir :: UUID -> Direction -> Git.Repo -> FilePath
|
|
failedTransferDir u direction r = gitAnnexTransferDir r
|
|
</> "failed"
|
|
</> showLcDirection direction
|
|
</> filter (/= '/') (fromUUID u)
|
|
|
|
instance Arbitrary TransferInfo where
|
|
arbitrary = TransferInfo
|
|
<$> arbitrary
|
|
<*> arbitrary
|
|
<*> pure Nothing -- cannot generate a ThreadID
|
|
<*> pure Nothing -- remote not needed
|
|
<*> arbitrary
|
|
-- associated file cannot be empty (but can be Nothing)
|
|
<*> arbitrary `suchThat` (/= Just "")
|
|
<*> arbitrary
|
|
|
|
prop_read_write_transferinfo :: TransferInfo -> Bool
|
|
prop_read_write_transferinfo info
|
|
| isJust (transferRemote info) = True -- remote not stored
|
|
| isJust (transferTid info) = True -- tid not stored
|
|
| otherwise = Just (info { transferPaused = False }) == info'
|
|
where
|
|
info' = readTransferInfo (transferPid info) (writeTransferInfo info)
|
|
|