add thread id field to transferinfo

Also converted its timestand to posix seconds, like is used in the other
log files.
This commit is contained in:
Joey Hess 2012-07-18 18:42:41 -04:00
parent f520a2c103
commit eea0a3616c
4 changed files with 25 additions and 17 deletions

View file

@ -12,8 +12,6 @@ import qualified Annex
import Control.Concurrent import Control.Concurrent
import Data.Tuple import Data.Tuple
import System.Posix.Types
import System.Posix.Process
{- The Annex state is stored in a MVar, so that threaded actions can access {- The Annex state is stored in a MVar, so that threaded actions can access
- it. -} - it. -}
@ -39,14 +37,14 @@ withThreadState a = do
runThreadState :: ThreadState -> Annex a -> IO a runThreadState :: ThreadState -> Annex a -> IO a
runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
{- Runs an Annex action in a separate process, using a copy of the state {- Runs an Annex action in a separate thread, using a copy of the state
- from the MVar. - from the MVar.
- -
- It's up to the action to perform any necessary shutdown tasks in order - It's up to the action to perform any necessary shutdown tasks in order
- for state to not be lost. And it's up to the caller to resynchronise - for state to not be lost. And it's up to the caller to resynchronise
- with any changes the action makes to eg, the git-annex branch. - with any changes the action makes to eg, the git-annex branch.
-} -}
unsafeForkProcessThreadState :: ThreadState -> Annex a -> IO ProcessID unsafeForkIOThreadState :: ThreadState -> Annex a -> IO ThreadId
unsafeForkProcessThreadState mvar a = do unsafeForkIOThreadState mvar a = do
state <- readMVar mvar state <- readMVar mvar
forkProcess $ void $ Annex.eval state a forkIO $ void $ Annex.eval state a

View file

@ -18,6 +18,7 @@ import Logs.Location
import Annex.Content import Annex.Content
import qualified Remote import qualified Remote
import Data.Time.Clock.POSIX
import Data.Time.Clock import Data.Time.Clock
import qualified Data.Map as M import qualified Data.Map as M
@ -58,12 +59,12 @@ shouldTransfer dstatus t info =
| otherwise = return False | otherwise = return False
key = transferKey t key = transferKey t
{- A transfer is run in a separate process, with a *copy* of the Annex {- A transfer is run in a separate thread, with a *copy* of the Annex
- state. This is necessary to avoid blocking the rest of the assistant - state. This is necessary to avoid blocking the rest of the assistant
- on the transfer completing, and also to allow multiple transfers to run - on the transfer completing, and also to allow multiple transfers to run
- at once. - at once. This requires GHC's threaded runtime to work!
- -
- However, it means that the transfer processes are responsible - The copy of state means that the transfer processes are responsible
- for doing any necessary shutdown cleanups, and that the parent - for doing any necessary shutdown cleanups, and that the parent
- thread's cache must be invalidated once a transfer completes, as - thread's cache must be invalidated once a transfer completes, as
- changes may have been made to the git-annex branch. - changes may have been made to the git-annex branch.
@ -73,15 +74,14 @@ runTransfer st dstatus slots t info = case (transferRemote info, associatedFile
(Nothing, _) -> noop (Nothing, _) -> noop
(_, Nothing) -> noop (_, Nothing) -> noop
(Just remote, Just file) -> do (Just remote, Just file) -> do
pid <- inTransferSlot slots $ tid <- inTransferSlot slots $
unsafeForkProcessThreadState st $ unsafeForkIOThreadState st $
transferprocess remote file transferprocess remote file
now <- getCurrentTime now <- getCurrentTime
runThreadState st $ adjustTransfers dstatus $ runThreadState st $ adjustTransfers dstatus $
M.insertWith' const t info M.insertWith' const t info
{ startedTime = Just now { startedTime = Just $ utcTimeToPOSIXSeconds now
, transferPid = Just pid , transferTid = Just tid
, shouldWait = True
} }
where where
isdownload = transferDirection t == Download isdownload = transferDirection t == Download

View file

@ -24,6 +24,7 @@ stubInfo :: AssociatedFile -> TransferInfo
stubInfo f = TransferInfo stubInfo f = TransferInfo
{ startedTime = Nothing { startedTime = Nothing
, transferPid = Nothing , transferPid = Nothing
, transferTid = Nothing
, transferRemote = Nothing , transferRemote = Nothing
, bytesComplete = Nothing , bytesComplete = Nothing
, associatedFile = f , associatedFile = f

View file

@ -16,6 +16,10 @@ import qualified Fields
import System.Posix.Types import System.Posix.Types
import Data.Time.Clock import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
import Control.Concurrent
{- Enough information to uniquely identify a transfer, used as the filename {- Enough information to uniquely identify a transfer, used as the filename
- of the transfer information file. -} - of the transfer information file. -}
@ -33,8 +37,9 @@ data Transfer = Transfer
- of some repository, that was acted on to initiate the transfer. - of some repository, that was acted on to initiate the transfer.
-} -}
data TransferInfo = TransferInfo data TransferInfo = TransferInfo
{ startedTime :: Maybe UTCTime { startedTime :: Maybe POSIXTime
, transferPid :: Maybe ProcessID , transferPid :: Maybe ProcessID
, transferTid :: Maybe ThreadId
, transferRemote :: Maybe Remote , transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer , bytesComplete :: Maybe Integer
, associatedFile :: Maybe FilePath , associatedFile :: Maybe FilePath
@ -76,8 +81,9 @@ transfer t file a = do
createAnnexDirectory $ takeDirectory tfile createAnnexDirectory $ takeDirectory tfile
mode <- annexFileMode mode <- annexFileMode
info <- liftIO $ TransferInfo info <- liftIO $ TransferInfo
<$> (Just <$> getCurrentTime) <$> (Just . utcTimeToPOSIXSeconds <$> getCurrentTime)
<*> pure Nothing -- pid not stored in file, so omitted for speed <*> pure Nothing -- pid not stored in file, so omitted for speed
<*> pure Nothing -- tid ditto
<*> pure Nothing -- not 0; transfer may be resuming <*> pure Nothing -- not 0; transfer may be resuming
<*> pure Nothing <*> pure Nothing
<*> pure file <*> pure file
@ -168,13 +174,16 @@ readTransferInfo :: ProcessID -> String -> Maybe TransferInfo
readTransferInfo pid s = readTransferInfo pid s =
case bits of case bits of
[time] -> TransferInfo [time] -> TransferInfo
<$> readish time <$> parsetime time
<*> pure (Just pid) <*> pure (Just pid)
<*> pure Nothing <*> pure Nothing
<*> pure Nothing <*> pure Nothing
<*> pure Nothing
<*> pure (if null filename then Nothing else Just filename) <*> pure (if null filename then Nothing else Just filename)
<*> pure False <*> pure False
_ -> Nothing _ -> Nothing
where where
(bits, filebits) = splitAt 1 $ lines s (bits, filebits) = splitAt 1 $ lines s
filename = join "\n" filebits filename = join "\n" filebits
parsetime t = Just . utcTimeToPOSIXSeconds
<$> parseTime defaultTimeLocale "%s%Qs" t