2012-11-03 18:16:17 +00:00
|
|
|
{- git-annex assistant out of band network messager interface
|
|
|
|
-
|
|
|
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
|
|
|
-
|
|
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
|
|
|
module Assistant.NetMessager where
|
|
|
|
|
|
|
|
import Assistant.Common
|
|
|
|
import Assistant.Types.NetMessager
|
|
|
|
|
2012-11-08 20:44:23 +00:00
|
|
|
import Control.Concurrent
|
2012-11-03 18:16:17 +00:00
|
|
|
import Control.Concurrent.STM
|
|
|
|
import Control.Concurrent.MSampleVar
|
2012-11-08 20:44:23 +00:00
|
|
|
import Control.Exception as E
|
|
|
|
import qualified Data.Set as S
|
2013-03-07 01:33:08 +00:00
|
|
|
import qualified Data.Map as M
|
2012-11-03 18:16:17 +00:00
|
|
|
|
|
|
|
sendNetMessage :: NetMessage -> Assistant ()
|
|
|
|
sendNetMessage m =
|
2012-11-08 18:06:43 +00:00
|
|
|
(atomically . flip writeTChan m) <<~ (netMessages . netMessager)
|
2012-11-03 18:16:17 +00:00
|
|
|
|
|
|
|
waitNetMessage :: Assistant (NetMessage)
|
2012-11-08 18:06:43 +00:00
|
|
|
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)
|
2012-11-03 18:16:17 +00:00
|
|
|
|
|
|
|
notifyNetMessagerRestart :: Assistant ()
|
|
|
|
notifyNetMessagerRestart =
|
2012-11-08 18:06:43 +00:00
|
|
|
flip writeSV () <<~ (netMessagerRestart . netMessager)
|
2012-11-03 18:16:17 +00:00
|
|
|
|
|
|
|
waitNetMessagerRestart :: Assistant ()
|
2012-11-08 18:06:43 +00:00
|
|
|
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
|
2012-11-08 20:44:23 +00:00
|
|
|
|
2013-03-07 01:33:08 +00:00
|
|
|
{- Store an important NetMessage for a client, and if the same message was
|
|
|
|
- already sent, remove it from sentImportantNetMessages. -}
|
|
|
|
storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
|
|
|
|
storeImportantNetMessage m client matchingclient = go <<~ netMessager
|
|
|
|
where
|
|
|
|
go nm = atomically $ do
|
|
|
|
q <- takeTMVar $ importantNetMessages nm
|
|
|
|
sent <- takeTMVar $ sentImportantNetMessages nm
|
|
|
|
putTMVar (importantNetMessages nm) $
|
|
|
|
M.alter (Just . maybe (S.singleton m) (S.insert m)) client q
|
|
|
|
putTMVar (sentImportantNetMessages nm) $
|
|
|
|
M.mapWithKey removematching sent
|
|
|
|
removematching someclient s
|
|
|
|
| matchingclient someclient = S.delete m s
|
|
|
|
| otherwise = s
|
|
|
|
|
|
|
|
{- Indicates that an important NetMessage has been sent to a client. -}
|
|
|
|
sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
|
|
|
|
sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
|
|
|
|
where
|
|
|
|
go v = atomically $ do
|
|
|
|
sent <- takeTMVar v
|
|
|
|
putTMVar v $
|
|
|
|
M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent
|
|
|
|
|
|
|
|
{- Checks for important NetMessages that have been stored for a client, and
|
|
|
|
- sent to a client. Typically the same client for both, although
|
|
|
|
- a modified or more specific client may need to be used. -}
|
|
|
|
checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
|
|
|
|
checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
|
|
|
|
where
|
|
|
|
go nm = atomically $ do
|
|
|
|
stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
|
|
|
|
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
|
|
|
|
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
|
|
|
|
|
2012-11-11 19:42:03 +00:00
|
|
|
{- Runs an action that runs either the send or receive side of a push.
|
2012-11-08 20:44:23 +00:00
|
|
|
-
|
|
|
|
- While the push is running, netMessagesPush will get messages put into it
|
|
|
|
- relating to this push, while any messages relating to other pushes
|
2012-11-11 19:42:03 +00:00
|
|
|
- on the same side go to netMessagesDeferred. Once the push finishes,
|
|
|
|
- those deferred messages will be fed to handledeferred for processing.
|
2012-11-08 20:44:23 +00:00
|
|
|
-}
|
2012-11-11 19:42:03 +00:00
|
|
|
runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
|
|
|
|
runPush side clientid handledeferred a = do
|
2012-11-08 20:44:23 +00:00
|
|
|
nm <- getAssistant netMessager
|
2012-11-11 19:42:03 +00:00
|
|
|
let runningv = getSide side $ netMessagerPushRunning nm
|
|
|
|
let setup = void $ atomically $ swapTMVar runningv $ Just clientid
|
2012-11-08 20:44:23 +00:00
|
|
|
let cleanup = atomically $ do
|
2012-11-11 19:42:03 +00:00
|
|
|
void $ swapTMVar runningv Nothing
|
|
|
|
emptytchan (getSide side $ netMessagesPush nm)
|
2012-11-08 20:44:23 +00:00
|
|
|
r <- E.bracket_ setup cleanup <~> a
|
|
|
|
(void . forkIO) <~> processdeferred nm
|
|
|
|
return r
|
|
|
|
where
|
|
|
|
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
|
|
|
|
processdeferred nm = do
|
2012-11-11 19:42:03 +00:00
|
|
|
s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
|
2012-11-08 20:44:23 +00:00
|
|
|
mapM_ rundeferred (S.toList s)
|
|
|
|
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
|
|
|
|
<~> handledeferred m
|
|
|
|
|
|
|
|
{- While a push is running, matching push messages are put into
|
2012-11-11 19:42:03 +00:00
|
|
|
- netMessagesPush, while others that involve the same side go to
|
2013-03-06 22:40:07 +00:00
|
|
|
- netMessagesPushDeferred.
|
2012-11-11 19:42:03 +00:00
|
|
|
-
|
|
|
|
- When no push is running involving the same side, returns False.
|
|
|
|
-
|
2012-11-10 05:34:03 +00:00
|
|
|
- To avoid bloating memory, only messages that initiate pushes are
|
2012-11-08 20:44:23 +00:00
|
|
|
- deferred.
|
|
|
|
-}
|
|
|
|
queueNetPushMessage :: NetMessage -> Assistant Bool
|
2012-11-11 19:42:03 +00:00
|
|
|
queueNetPushMessage m@(Pushing clientid stage) = do
|
2012-11-08 20:44:23 +00:00
|
|
|
nm <- getAssistant netMessager
|
|
|
|
liftIO $ atomically $ do
|
2012-11-11 19:42:03 +00:00
|
|
|
v <- readTMVar (getSide side $ netMessagerPushRunning nm)
|
|
|
|
case v of
|
|
|
|
Nothing -> return False
|
|
|
|
(Just runningclientid)
|
|
|
|
| runningclientid == clientid -> queue nm
|
|
|
|
| isPushInitiation stage -> defer nm
|
|
|
|
| otherwise -> discard
|
2012-11-08 20:44:23 +00:00
|
|
|
where
|
2012-11-11 19:42:03 +00:00
|
|
|
side = pushDestinationSide stage
|
|
|
|
queue nm = do
|
|
|
|
writeTChan (getSide side $ netMessagesPush nm) m
|
|
|
|
return True
|
2012-11-08 20:44:23 +00:00
|
|
|
defer nm = do
|
2012-11-11 19:42:03 +00:00
|
|
|
let mv = getSide side $ netMessagesPushDeferred nm
|
|
|
|
s <- takeTMVar mv
|
|
|
|
putTMVar mv $ S.insert m s
|
|
|
|
return True
|
|
|
|
discard = return True
|
|
|
|
queueNetPushMessage _ = return False
|
2012-11-08 20:44:23 +00:00
|
|
|
|
2012-11-11 19:42:03 +00:00
|
|
|
waitNetPushMessage :: PushSide -> Assistant (NetMessage)
|
|
|
|
waitNetPushMessage side = (atomically . readTChan)
|
|
|
|
<<~ (getSide side . netMessagesPush . netMessager)
|
2012-11-13 19:36:34 +00:00
|
|
|
|