git-annex/Assistant/NetMessager.hs

96 lines
3.1 KiB
Haskell
Raw Normal View History

2012-11-03 18:16:17 +00:00
{- git-annex assistant out of band network messager interface
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
import Control.Concurrent
2012-11-03 18:16:17 +00:00
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
2012-11-03 18:16:17 +00:00
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
2012-11-08 18:06:43 +00:00
(atomically . flip writeTChan m) <<~ (netMessages . netMessager)
2012-11-03 18:16:17 +00:00
waitNetMessage :: Assistant (NetMessage)
2012-11-08 18:06:43 +00:00
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)
2012-11-03 18:16:17 +00:00
notifyNetMessagerRestart :: Assistant ()
notifyNetMessagerRestart =
2012-11-08 18:06:43 +00:00
flip writeSV () <<~ (netMessagerRestart . netMessager)
2012-11-03 18:16:17 +00:00
waitNetMessagerRestart :: Assistant ()
2012-11-08 18:06:43 +00:00
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
getPushRunning :: Assistant PushRunning
getPushRunning =
(atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
{- Runs an action that runs either the send or receive end of a push.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- go to netMessagesDeferred. Once the push finishes, those deferred
- messages will be fed to handledeferred for processing.
-}
runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
runPush v handledeferred a = do
nm <- getAssistant netMessager
let pr = netMessagerPushRunning nm
let setup = void $ atomically $ swapTMVar pr v
let cleanup = atomically $ do
void $ swapTMVar pr NoPushRunning
emptytchan (netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm
return r
where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do
s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m
{- While a push is running, matching push messages are put into
- netMessagesPush, while others go to netMessagesDeferredPush. To avoid
- bloating memory, only PushRequest and StartingPush messages are
- deferred.
-
- When no push is running, returns False.
-}
queueNetPushMessage :: NetMessage -> Assistant Bool
queueNetPushMessage m = do
nm <- getAssistant netMessager
liftIO $ atomically $ do
running <- readTMVar (netMessagerPushRunning nm)
case running of
NoPushRunning -> return False
SendPushRunning cid -> go nm cid
ReceivePushRunning cid -> go nm cid
where
go nm cid
| getClientID m == Just cid = do
writeTChan (netMessagesPush nm) m
return True
| otherwise = do
case m of
PushRequest _ -> defer nm
StartingPush _ -> defer nm
_ -> noop
return True
defer nm = do
s <- takeTMVar (netMessagesDeferredPush nm)
putTMVar (netMessagesDeferredPush nm) $ S.insert m s
waitNetPushMessage :: Assistant (NetMessage)
waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)