Merge branch 'master' of https://github.com/bgamari/git-annex into safesemaphore

This commit is contained in:
Joey Hess 2012-10-05 17:41:13 -04:00
commit e137edf757
4 changed files with 23 additions and 20 deletions

View file

@ -16,9 +16,10 @@ import Logs.Transfer
import qualified Control.Exception as E import qualified Control.Exception as E
import Control.Concurrent import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
import Data.Typeable import Data.Typeable
type TransferSlots = QSemN type TransferSlots = MSemN.MSemN Int
{- A special exception that can be thrown to pause or resume a transfer, while {- A special exception that can be thrown to pause or resume a transfer, while
- keeping its slot in use. -} - keeping its slot in use. -}
@ -39,21 +40,21 @@ numSlots :: Int
numSlots = 1 numSlots = 1
newTransferSlots :: IO TransferSlots newTransferSlots :: IO TransferSlots
newTransferSlots = newQSemN numSlots newTransferSlots = MSemN.new numSlots
{- Waits until a transfer slot becomes available, then runs a {- Waits until a transfer slot becomes available, then runs a
- TransferGenerator, and then runs the transfer action in its own thread. - TransferGenerator, and then runs the transfer action in its own thread.
-} -}
inTransferSlot :: TransferSlotRunner inTransferSlot :: TransferSlotRunner
inTransferSlot dstatus s gen = do inTransferSlot dstatus s gen = do
waitQSemN s 1 MSemN.wait s 1
runTransferThread dstatus s =<< gen runTransferThread dstatus s =<< gen
{- Runs a TransferGenerator, and its transfer action, {- Runs a TransferGenerator, and its transfer action,
- without waiting for a slot to become available. -} - without waiting for a slot to become available. -}
inImmediateTransferSlot :: TransferSlotRunner inImmediateTransferSlot :: TransferSlotRunner
inImmediateTransferSlot dstatus s gen = do inImmediateTransferSlot dstatus s gen = do
signalQSemN s (-1) MSemN.signal s (-1)
runTransferThread dstatus s =<< gen runTransferThread dstatus s =<< gen
{- Runs a transfer action, in an already allocated transfer slot. {- Runs a transfer action, in an already allocated transfer slot.
@ -67,7 +68,7 @@ inImmediateTransferSlot dstatus s gen = do
- then rerunning the action. - then rerunning the action.
-} -}
runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO () runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO ()
runTransferThread _ s Nothing = signalQSemN s 1 runTransferThread _ s Nothing = MSemN.signal s 1
runTransferThread dstatus s (Just (t, info, a)) = do runTransferThread dstatus s (Just (t, info, a)) = do
tid <- forkIO go tid <- forkIO go
updateTransferInfo dstatus t $ info { transferTid = Just tid } updateTransferInfo dstatus t $ info { transferTid = Just tid }
@ -86,4 +87,4 @@ runTransferThread dstatus s (Just (t, info, a)) = do
Just ResumeTransfer -> go Just ResumeTransfer -> go
_ -> done _ -> done
_ -> done _ -> done
done = signalQSemN s 1 done = MSemN.signal s 1

View file

@ -38,6 +38,7 @@ import Types.Key
import qualified Fields import qualified Fields
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.MSampleVar
import System.Process (std_in, std_err) import System.Process (std_in, std_err)
remote :: RemoteType remote :: RemoteType
@ -273,9 +274,9 @@ copyFromRemote r key file dest
: maybe [] (\f -> [(Fields.associatedFile, f)]) file : maybe [] (\f -> [(Fields.associatedFile, f)]) file
Just (cmd, params) <- git_annex_shell r "transferinfo" Just (cmd, params) <- git_annex_shell r "transferinfo"
[Param $ key2file key] fields [Param $ key2file key] fields
v <- liftIO $ newEmptySampleVar v <- liftIO $ newEmptySV
tid <- liftIO $ forkIO $ void $ tryIO $ do tid <- liftIO $ forkIO $ void $ tryIO $ do
bytes <- readSampleVar v bytes <- readSV v
p <- createProcess $ p <- createProcess $
(proc cmd (toCommand params)) (proc cmd (toCommand params))
{ std_in = CreatePipe { std_in = CreatePipe
@ -288,8 +289,8 @@ copyFromRemote r key file dest
hFlush h hFlush h
send bytes send bytes
forever $ forever $
send =<< readSampleVar v send =<< readSV v
let feeder = writeSampleVar v let feeder = writeSV v
bracketIO noop (const $ tryIO $ killThread tid) (a feeder) bracketIO noop (const $ tryIO $ killThread tid) (a feeder)
copyFromRemoteCheap :: Git.Repo -> Key -> FilePath -> Annex Bool copyFromRemoteCheap :: Git.Repo -> Key -> FilePath -> Annex Bool

View file

@ -26,10 +26,10 @@ module Utility.NotificationBroadcaster (
import Common import Common
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.SampleVar import Control.Concurrent.MSampleVar
{- One SampleVar per client. The TMVar is never empty, so never blocks. -} {- One MSampleVar per client. The TMVar is never empty, so never blocks. -}
type NotificationBroadcaster = TMVar [SampleVar ()] type NotificationBroadcaster = TMVar [MSampleVar ()]
newtype NotificationId = NotificationId Int newtype NotificationId = NotificationId Int
deriving (Read, Show, Eq, Ord) deriving (Read, Show, Eq, Ord)
@ -47,7 +47,7 @@ newNotificationHandle b = NotificationHandle
<*> addclient <*> addclient
where where
addclient = do addclient = do
s <- newEmptySampleVar s <- newEmptySV
atomically $ do atomically $ do
l <- takeTMVar b l <- takeTMVar b
putTMVar b $ l ++ [s] putTMVar b $ l ++ [s]
@ -67,11 +67,11 @@ sendNotification b = do
l <- atomically $ readTMVar b l <- atomically $ readTMVar b
mapM_ notify l mapM_ notify l
where where
notify s = writeSampleVar s () notify s = writeSV s ()
{- Used by a client to block until a new notification is available since {- Used by a client to block until a new notification is available since
- the last time it tried. -} - the last time it tried. -}
waitNotification :: NotificationHandle -> IO () waitNotification :: NotificationHandle -> IO ()
waitNotification (NotificationHandle b (NotificationId i)) = do waitNotification (NotificationHandle b (NotificationId i)) = do
l <- atomically $ readTMVar b l <- atomically $ readTMVar b
readSampleVar (l !! i) readSV (l !! i)

View file

@ -1,5 +1,5 @@
Name: git-annex Name: git-annex
Version: 3.20121001 Version: 3.20121002
Cabal-Version: >= 1.8 Cabal-Version: >= 1.8
License: GPL License: GPL
Maintainer: Joey Hess <joey@kitenet.net> Maintainer: Joey Hess <joey@kitenet.net>
@ -48,8 +48,9 @@ Executable git-annex
Build-Depends: MissingH, hslogger, directory, filepath, Build-Depends: MissingH, hslogger, directory, filepath,
unix, containers, utf8-string, network, mtl, bytestring, old-locale, time, unix, containers, utf8-string, network, mtl, bytestring, old-locale, time,
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP, pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
base == 4.5.*, monad-control, transformers-base, lifted-base, base >= 4.5 && < 4.7, monad-control, transformers-base, lifted-base,
IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process,
SafeSemaphore
-- Need to list these because they're generated from .hsc files. -- Need to list these because they're generated from .hsc files.
Other-Modules: Utility.Touch Utility.Mounts Other-Modules: Utility.Touch Utility.Mounts
Include-Dirs: Utility Include-Dirs: Utility
@ -95,7 +96,7 @@ Test-Suite test
Build-Depends: testpack, HUnit, MissingH, hslogger, directory, filepath, Build-Depends: testpack, HUnit, MissingH, hslogger, directory, filepath,
unix, containers, utf8-string, network, mtl, bytestring, old-locale, time, unix, containers, utf8-string, network, mtl, bytestring, old-locale, time,
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP, pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
base == 4.5.*, monad-control, transformers-base, lifted-base, base >= 4.5 && < 4.7, monad-control, transformers-base, lifted-base,
IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process
Other-Modules: Utility.Touch Other-Modules: Utility.Touch
Include-Dirs: Utility Include-Dirs: Utility