Merge branch 'safesemaphore'
Conflicts: debian/changelog git-annex.cabal
This commit is contained in:
commit
14b376d440
7 changed files with 25 additions and 17 deletions
|
@ -16,9 +16,10 @@ import Logs.Transfer
|
||||||
|
|
||||||
import qualified Control.Exception as E
|
import qualified Control.Exception as E
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import qualified Control.Concurrent.MSemN as MSemN
|
||||||
import Data.Typeable
|
import Data.Typeable
|
||||||
|
|
||||||
type TransferSlots = QSemN
|
type TransferSlots = MSemN.MSemN Int
|
||||||
|
|
||||||
{- A special exception that can be thrown to pause or resume a transfer, while
|
{- A special exception that can be thrown to pause or resume a transfer, while
|
||||||
- keeping its slot in use. -}
|
- keeping its slot in use. -}
|
||||||
|
@ -39,21 +40,21 @@ numSlots :: Int
|
||||||
numSlots = 1
|
numSlots = 1
|
||||||
|
|
||||||
newTransferSlots :: IO TransferSlots
|
newTransferSlots :: IO TransferSlots
|
||||||
newTransferSlots = newQSemN numSlots
|
newTransferSlots = MSemN.new numSlots
|
||||||
|
|
||||||
{- Waits until a transfer slot becomes available, then runs a
|
{- Waits until a transfer slot becomes available, then runs a
|
||||||
- TransferGenerator, and then runs the transfer action in its own thread.
|
- TransferGenerator, and then runs the transfer action in its own thread.
|
||||||
-}
|
-}
|
||||||
inTransferSlot :: TransferSlotRunner
|
inTransferSlot :: TransferSlotRunner
|
||||||
inTransferSlot dstatus s gen = do
|
inTransferSlot dstatus s gen = do
|
||||||
waitQSemN s 1
|
MSemN.wait s 1
|
||||||
runTransferThread dstatus s =<< gen
|
runTransferThread dstatus s =<< gen
|
||||||
|
|
||||||
{- Runs a TransferGenerator, and its transfer action,
|
{- Runs a TransferGenerator, and its transfer action,
|
||||||
- without waiting for a slot to become available. -}
|
- without waiting for a slot to become available. -}
|
||||||
inImmediateTransferSlot :: TransferSlotRunner
|
inImmediateTransferSlot :: TransferSlotRunner
|
||||||
inImmediateTransferSlot dstatus s gen = do
|
inImmediateTransferSlot dstatus s gen = do
|
||||||
signalQSemN s (-1)
|
MSemN.signal s (-1)
|
||||||
runTransferThread dstatus s =<< gen
|
runTransferThread dstatus s =<< gen
|
||||||
|
|
||||||
{- Runs a transfer action, in an already allocated transfer slot.
|
{- Runs a transfer action, in an already allocated transfer slot.
|
||||||
|
@ -67,7 +68,7 @@ inImmediateTransferSlot dstatus s gen = do
|
||||||
- then rerunning the action.
|
- then rerunning the action.
|
||||||
-}
|
-}
|
||||||
runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO ()
|
runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO ()
|
||||||
runTransferThread _ s Nothing = signalQSemN s 1
|
runTransferThread _ s Nothing = MSemN.signal s 1
|
||||||
runTransferThread dstatus s (Just (t, info, a)) = do
|
runTransferThread dstatus s (Just (t, info, a)) = do
|
||||||
tid <- forkIO go
|
tid <- forkIO go
|
||||||
updateTransferInfo dstatus t $ info { transferTid = Just tid }
|
updateTransferInfo dstatus t $ info { transferTid = Just tid }
|
||||||
|
@ -86,4 +87,4 @@ runTransferThread dstatus s (Just (t, info, a)) = do
|
||||||
Just ResumeTransfer -> go
|
Just ResumeTransfer -> go
|
||||||
_ -> done
|
_ -> done
|
||||||
_ -> done
|
_ -> done
|
||||||
done = signalQSemN s 1
|
done = MSemN.signal s 1
|
||||||
|
|
|
@ -39,6 +39,7 @@ import Types.Key
|
||||||
import qualified Fields
|
import qualified Fields
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import Control.Concurrent.MSampleVar
|
||||||
import System.Process (std_in, std_err)
|
import System.Process (std_in, std_err)
|
||||||
|
|
||||||
remote :: RemoteType
|
remote :: RemoteType
|
||||||
|
@ -290,9 +291,9 @@ copyFromRemote r key file dest
|
||||||
: maybe [] (\f -> [(Fields.associatedFile, f)]) file
|
: maybe [] (\f -> [(Fields.associatedFile, f)]) file
|
||||||
Just (cmd, params) <- git_annex_shell r "transferinfo"
|
Just (cmd, params) <- git_annex_shell r "transferinfo"
|
||||||
[Param $ key2file key] fields
|
[Param $ key2file key] fields
|
||||||
v <- liftIO $ newEmptySampleVar
|
v <- liftIO $ newEmptySV
|
||||||
tid <- liftIO $ forkIO $ void $ tryIO $ do
|
tid <- liftIO $ forkIO $ void $ tryIO $ do
|
||||||
bytes <- readSampleVar v
|
bytes <- readSV v
|
||||||
p <- createProcess $
|
p <- createProcess $
|
||||||
(proc cmd (toCommand params))
|
(proc cmd (toCommand params))
|
||||||
{ std_in = CreatePipe
|
{ std_in = CreatePipe
|
||||||
|
@ -305,8 +306,8 @@ copyFromRemote r key file dest
|
||||||
hFlush h
|
hFlush h
|
||||||
send bytes
|
send bytes
|
||||||
forever $
|
forever $
|
||||||
send =<< readSampleVar v
|
send =<< readSV v
|
||||||
let feeder = writeSampleVar v
|
let feeder = writeSV v
|
||||||
bracketIO noop (const $ tryIO $ killThread tid) (a feeder)
|
bracketIO noop (const $ tryIO $ killThread tid) (a feeder)
|
||||||
|
|
||||||
copyFromRemoteCheap :: Git.Repo -> Key -> FilePath -> Annex Bool
|
copyFromRemoteCheap :: Git.Repo -> Key -> FilePath -> Annex Bool
|
||||||
|
|
|
@ -26,10 +26,10 @@ module Utility.NotificationBroadcaster (
|
||||||
import Common
|
import Common
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.SampleVar
|
import Control.Concurrent.MSampleVar
|
||||||
|
|
||||||
{- One SampleVar per client. The TMVar is never empty, so never blocks. -}
|
{- One MSampleVar per client. The TMVar is never empty, so never blocks. -}
|
||||||
type NotificationBroadcaster = TMVar [SampleVar ()]
|
type NotificationBroadcaster = TMVar [MSampleVar ()]
|
||||||
|
|
||||||
newtype NotificationId = NotificationId Int
|
newtype NotificationId = NotificationId Int
|
||||||
deriving (Read, Show, Eq, Ord)
|
deriving (Read, Show, Eq, Ord)
|
||||||
|
@ -47,7 +47,7 @@ newNotificationHandle b = NotificationHandle
|
||||||
<*> addclient
|
<*> addclient
|
||||||
where
|
where
|
||||||
addclient = do
|
addclient = do
|
||||||
s <- newEmptySampleVar
|
s <- newEmptySV
|
||||||
atomically $ do
|
atomically $ do
|
||||||
l <- takeTMVar b
|
l <- takeTMVar b
|
||||||
putTMVar b $ l ++ [s]
|
putTMVar b $ l ++ [s]
|
||||||
|
@ -67,11 +67,11 @@ sendNotification b = do
|
||||||
l <- atomically $ readTMVar b
|
l <- atomically $ readTMVar b
|
||||||
mapM_ notify l
|
mapM_ notify l
|
||||||
where
|
where
|
||||||
notify s = writeSampleVar s ()
|
notify s = writeSV s ()
|
||||||
|
|
||||||
{- Used by a client to block until a new notification is available since
|
{- Used by a client to block until a new notification is available since
|
||||||
- the last time it tried. -}
|
- the last time it tried. -}
|
||||||
waitNotification :: NotificationHandle -> IO ()
|
waitNotification :: NotificationHandle -> IO ()
|
||||||
waitNotification (NotificationHandle b (NotificationId i)) = do
|
waitNotification (NotificationHandle b (NotificationId i)) = do
|
||||||
l <- atomically $ readTMVar b
|
l <- atomically $ readTMVar b
|
||||||
readSampleVar (l !! i)
|
readSV (l !! i)
|
||||||
|
|
3
debian/changelog
vendored
3
debian/changelog
vendored
|
@ -4,6 +4,9 @@ git-annex (3.20121018) UNRELEASED; urgency=low
|
||||||
* Preferred content path matching bugfix.
|
* Preferred content path matching bugfix.
|
||||||
* Preferred content expressions cannot use "in=".
|
* Preferred content expressions cannot use "in=".
|
||||||
* Preferred content expressions can use "present".
|
* Preferred content expressions can use "present".
|
||||||
|
* Depend on and use the Haskell SafeSemaphore library, which provides
|
||||||
|
exception-safe versions of SampleVar and QSemN.
|
||||||
|
Thanks, Ben Gamari for an excellent patch set.
|
||||||
|
|
||||||
-- Joey Hess <joeyh@debian.org> Wed, 17 Oct 2012 14:24:10 -0400
|
-- Joey Hess <joeyh@debian.org> Wed, 17 Oct 2012 14:24:10 -0400
|
||||||
|
|
||||||
|
|
1
debian/control
vendored
1
debian/control
vendored
|
@ -39,6 +39,7 @@ Build-Depends:
|
||||||
libghc-crypto-api-dev,
|
libghc-crypto-api-dev,
|
||||||
libghc-network-multicast-dev,
|
libghc-network-multicast-dev,
|
||||||
libghc-network-info-dev,
|
libghc-network-info-dev,
|
||||||
|
libghc-safesemaphore-dev,
|
||||||
ikiwiki,
|
ikiwiki,
|
||||||
perlmagick,
|
perlmagick,
|
||||||
git,
|
git,
|
||||||
|
|
|
@ -18,6 +18,7 @@ quite a lot.
|
||||||
* [bloomfilter](http://hackage.haskell.org/package/bloomfilter)
|
* [bloomfilter](http://hackage.haskell.org/package/bloomfilter)
|
||||||
* [edit-distance](http://hackage.haskell.org/package/edit-distance)
|
* [edit-distance](http://hackage.haskell.org/package/edit-distance)
|
||||||
* [hS3](http://hackage.haskell.org/package/hS3) (optional)
|
* [hS3](http://hackage.haskell.org/package/hS3) (optional)
|
||||||
|
* [SafeSemaphore](http://hackage.haskell.org/package/SafeSemaphore)
|
||||||
* Optional haskell stuff, used by the [[assistant]] and its webapp (edit Makefile to disable)
|
* Optional haskell stuff, used by the [[assistant]] and its webapp (edit Makefile to disable)
|
||||||
* [stm](http://hackage.haskell.org/package/stm)
|
* [stm](http://hackage.haskell.org/package/stm)
|
||||||
(version 2.3 or newer)
|
(version 2.3 or newer)
|
||||||
|
|
|
@ -50,7 +50,8 @@ Executable git-annex
|
||||||
bytestring, old-locale, time,
|
bytestring, old-locale, time,
|
||||||
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
|
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
|
||||||
base (>= 4.5 && < 4.7), monad-control, transformers-base, lifted-base,
|
base (>= 4.5 && < 4.7), monad-control, transformers-base, lifted-base,
|
||||||
IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process
|
IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, process,
|
||||||
|
SafeSemaphore
|
||||||
-- Need to list these because they're generated from .hsc files.
|
-- Need to list these because they're generated from .hsc files.
|
||||||
Other-Modules: Utility.Touch Utility.Mounts
|
Other-Modules: Utility.Touch Utility.Mounts
|
||||||
Include-Dirs: Utility
|
Include-Dirs: Utility
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue