remove xmpp support
I've long considered the XMPP support in git-annex a wart. It's nice to remove it. (This also removes the NetMessager, which was only used for XMPP, and the daemonstatus's desynced list (likewise).) Existing XMPP remotes should be ignored by git-annex. This commit was sponsored by Brock Spratlen on Patreon.
This commit is contained in:
parent
a7fd200440
commit
d58148031b
64 changed files with 38 additions and 2827 deletions
|
@ -12,7 +12,6 @@ module Assistant.DaemonStatus where
|
|||
import Assistant.Common
|
||||
import Assistant.Alert.Utility
|
||||
import Utility.Tmp
|
||||
import Assistant.Types.NetMessager
|
||||
import Utility.NotificationBroadcaster
|
||||
import Types.Transfer
|
||||
import Logs.Transfer
|
||||
|
@ -20,14 +19,12 @@ import Logs.Trust
|
|||
import Logs.TimeStamp
|
||||
import qualified Remote
|
||||
import qualified Types.Remote as Remote
|
||||
import qualified Git
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import System.Posix.Types
|
||||
import Data.Time.Clock.POSIX
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Text as T
|
||||
|
||||
getDaemonStatus :: Assistant DaemonStatus
|
||||
getDaemonStatus = (atomically . readTVar) <<~ daemonStatusHandle
|
||||
|
@ -264,6 +261,3 @@ alertDuring :: Alert -> Assistant a -> Assistant a
|
|||
alertDuring alert a = do
|
||||
i <- addAlert $ alert { alertClass = Activity }
|
||||
removeAlert i `after` a
|
||||
|
||||
getXMPPClientID :: Remote -> ClientID
|
||||
getXMPPClientID r = T.pack $ drop (length "xmpp::") (Git.repoLocation (Remote.repo r))
|
||||
|
|
|
@ -40,8 +40,6 @@ import Assistant.Types.BranchChange
|
|||
import Assistant.Types.Commits
|
||||
import Assistant.Types.Changes
|
||||
import Assistant.Types.RepoProblem
|
||||
import Assistant.Types.Buddies
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.Types.ThreadName
|
||||
import Assistant.Types.RemoteControl
|
||||
import Assistant.Types.CredPairCache
|
||||
|
@ -68,8 +66,6 @@ data AssistantData = AssistantData
|
|||
, changePool :: ChangePool
|
||||
, repoProblemChan :: RepoProblemChan
|
||||
, branchChangeHandle :: BranchChangeHandle
|
||||
, buddyList :: BuddyList
|
||||
, netMessager :: NetMessager
|
||||
, remoteControl :: RemoteControl
|
||||
, credPairCache :: CredPairCache
|
||||
}
|
||||
|
@ -88,8 +84,6 @@ newAssistantData st dstatus = AssistantData
|
|||
<*> newChangePool
|
||||
<*> newRepoProblemChan
|
||||
<*> newBranchChangeHandle
|
||||
<*> newBuddyList
|
||||
<*> newNetMessager
|
||||
<*> newRemoteControl
|
||||
<*> newCredPairCache
|
||||
|
||||
|
|
|
@ -1,180 +0,0 @@
|
|||
{- git-annex assistant out of band network messager interface
|
||||
-
|
||||
- Copyright 2012-2013 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module Assistant.NetMessager where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.Types.NetMessager
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.MSampleVar
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.DList as D
|
||||
|
||||
sendNetMessage :: NetMessage -> Assistant ()
|
||||
sendNetMessage m =
|
||||
(atomically . flip writeTChan m) <<~ (netMessages . netMessager)
|
||||
|
||||
waitNetMessage :: Assistant (NetMessage)
|
||||
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)
|
||||
|
||||
notifyNetMessagerRestart :: Assistant ()
|
||||
notifyNetMessagerRestart =
|
||||
flip writeSV () <<~ (netMessagerRestart . netMessager)
|
||||
|
||||
{- This can be used to get an early indication if the network has
|
||||
- changed, to immediately restart a connection. However, that is not
|
||||
- available on all systems, so clients also need to deal with
|
||||
- restarting dropped connections in the usual way. -}
|
||||
waitNetMessagerRestart :: Assistant ()
|
||||
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
|
||||
|
||||
{- Store a new important NetMessage for a client, and if an equivilant
|
||||
- older message is already stored, remove it from both importantNetMessages
|
||||
- and sentImportantNetMessages. -}
|
||||
storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
|
||||
storeImportantNetMessage m client matchingclient = go <<~ netMessager
|
||||
where
|
||||
go nm = atomically $ do
|
||||
q <- takeTMVar $ importantNetMessages nm
|
||||
sent <- takeTMVar $ sentImportantNetMessages nm
|
||||
putTMVar (importantNetMessages nm) $
|
||||
M.alter (Just . maybe (S.singleton m) (S.insert m)) client $
|
||||
M.mapWithKey removematching q
|
||||
putTMVar (sentImportantNetMessages nm) $
|
||||
M.mapWithKey removematching sent
|
||||
removematching someclient s
|
||||
| matchingclient someclient = S.filter (not . equivilantImportantNetMessages m) s
|
||||
| otherwise = s
|
||||
|
||||
{- Indicates that an important NetMessage has been sent to a client. -}
|
||||
sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
|
||||
sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
|
||||
where
|
||||
go v = atomically $ do
|
||||
sent <- takeTMVar v
|
||||
putTMVar v $
|
||||
M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent
|
||||
|
||||
{- Checks for important NetMessages that have been stored for a client, and
|
||||
- sent to a client. Typically the same client for both, although
|
||||
- a modified or more specific client may need to be used. -}
|
||||
checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
|
||||
checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
|
||||
where
|
||||
go nm = atomically $ do
|
||||
stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
|
||||
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
|
||||
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
|
||||
|
||||
{- Queues a push initiation message in the queue for the appropriate
|
||||
- side of the push but only if there is not already an initiation message
|
||||
- from the same client in the queue. -}
|
||||
queuePushInitiation :: NetMessage -> Assistant ()
|
||||
queuePushInitiation msg@(Pushing clientid stage) = do
|
||||
tv <- getPushInitiationQueue side
|
||||
liftIO $ atomically $ do
|
||||
r <- tryTakeTMVar tv
|
||||
case r of
|
||||
Nothing -> putTMVar tv [msg]
|
||||
Just l -> do
|
||||
let !l' = msg : filter differentclient l
|
||||
putTMVar tv l'
|
||||
where
|
||||
side = pushDestinationSide stage
|
||||
differentclient (Pushing cid _) = cid /= clientid
|
||||
differentclient _ = True
|
||||
queuePushInitiation _ = noop
|
||||
|
||||
{- Waits for a push inititation message to be received, and runs
|
||||
- function to select a message from the queue. -}
|
||||
waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage
|
||||
waitPushInitiation side selector = do
|
||||
tv <- getPushInitiationQueue side
|
||||
liftIO $ atomically $ do
|
||||
q <- takeTMVar tv
|
||||
if null q
|
||||
then retry
|
||||
else do
|
||||
let (msg, !q') = selector q
|
||||
unless (null q') $
|
||||
putTMVar tv q'
|
||||
return msg
|
||||
|
||||
{- Stores messages for a push into the appropriate inbox.
|
||||
-
|
||||
- To avoid overflow, only 1000 messages max are stored in any
|
||||
- inbox, which should be far more than necessary.
|
||||
-
|
||||
- TODO: If we have more than 100 inboxes for different clients,
|
||||
- discard old ones that are not currently being used by any push.
|
||||
-}
|
||||
storeInbox :: NetMessage -> Assistant ()
|
||||
storeInbox msg@(Pushing clientid stage) = do
|
||||
inboxes <- getInboxes side
|
||||
stored <- liftIO $ atomically $ do
|
||||
m <- readTVar inboxes
|
||||
let update = \v -> do
|
||||
writeTVar inboxes $
|
||||
M.insertWith' const clientid v m
|
||||
return True
|
||||
case M.lookup clientid m of
|
||||
Nothing -> update (1, tostore)
|
||||
Just (sz, l)
|
||||
| sz > 1000 -> return False
|
||||
| otherwise ->
|
||||
let !sz' = sz + 1
|
||||
!l' = D.append l tostore
|
||||
in update (sz', l')
|
||||
if stored
|
||||
then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
|
||||
else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
|
||||
where
|
||||
side = pushDestinationSide stage
|
||||
tostore = D.singleton msg
|
||||
storeInbox _ = noop
|
||||
|
||||
{- Gets the new message for a push from its inbox.
|
||||
- Blocks until a message has been received. -}
|
||||
waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
|
||||
waitInbox clientid side = do
|
||||
inboxes <- getInboxes side
|
||||
liftIO $ atomically $ do
|
||||
m <- readTVar inboxes
|
||||
case M.lookup clientid m of
|
||||
Nothing -> retry
|
||||
Just (sz, dl)
|
||||
| sz < 1 -> retry
|
||||
| otherwise -> do
|
||||
let msg = D.head dl
|
||||
let dl' = D.tail dl
|
||||
let !sz' = sz - 1
|
||||
writeTVar inboxes $
|
||||
M.insertWith' const clientid (sz', dl') m
|
||||
return msg
|
||||
|
||||
emptyInbox :: ClientID -> PushSide -> Assistant ()
|
||||
emptyInbox clientid side = do
|
||||
inboxes <- getInboxes side
|
||||
liftIO $ atomically $
|
||||
modifyTVar' inboxes $
|
||||
M.delete clientid
|
||||
|
||||
getInboxes :: PushSide -> Assistant Inboxes
|
||||
getInboxes side =
|
||||
getSide side . netMessagerInboxes <$> getAssistant netMessager
|
||||
|
||||
getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage])
|
||||
getPushInitiationQueue side =
|
||||
getSide side . netMessagerPushInitiations <$> getAssistant netMessager
|
||||
|
||||
netMessagerDebug :: ClientID -> [String] -> Assistant ()
|
||||
netMessagerDebug clientid l = debug $
|
||||
"NetMessager" : l ++ [show $ logClientID clientid]
|
|
@ -9,8 +9,6 @@ module Assistant.Sync where
|
|||
|
||||
import Assistant.Common
|
||||
import Assistant.Pushes
|
||||
import Assistant.NetMessager
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.Alert
|
||||
import Assistant.Alert.Utility
|
||||
import Assistant.DaemonStatus
|
||||
|
@ -20,7 +18,6 @@ import qualified Command.Sync
|
|||
import Utility.Parallel
|
||||
import qualified Git
|
||||
import qualified Git.Command
|
||||
import qualified Git.Ref
|
||||
import qualified Git.Merge
|
||||
import qualified Remote
|
||||
import qualified Types.Remote as Remote
|
||||
|
@ -40,7 +37,6 @@ import Types.Transfer
|
|||
|
||||
import Data.Time.Clock
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Set as S
|
||||
import Control.Concurrent
|
||||
|
||||
{- Syncs with remotes that may have been disconnected for a while.
|
||||
|
@ -51,21 +47,14 @@ import Control.Concurrent
|
|||
- the remotes have diverged from the local git-annex branch. Otherwise,
|
||||
- it's sufficient to requeue failed transfers.
|
||||
-
|
||||
- XMPP remotes are also signaled that we can push to them, and we request
|
||||
- they push to us. Since XMPP pushes run ansynchronously, any scan of the
|
||||
- XMPP remotes has to be deferred until they're done pushing to us, so
|
||||
- all XMPP remotes are marked as possibly desynced.
|
||||
-
|
||||
- Also handles signaling any connectRemoteNotifiers, after the syncing is
|
||||
- done.
|
||||
-}
|
||||
reconnectRemotes :: Bool -> [Remote] -> Assistant ()
|
||||
reconnectRemotes _ [] = noop
|
||||
reconnectRemotes notifypushes rs = void $ do
|
||||
reconnectRemotes :: [Remote] -> Assistant ()
|
||||
reconnectRemotes [] = noop
|
||||
reconnectRemotes rs = void $ do
|
||||
rs' <- liftIO $ filterM (Remote.checkAvailable True) rs
|
||||
unless (null rs') $ do
|
||||
modifyDaemonStatus_ $ \s -> s
|
||||
{ desynced = S.union (S.fromList $ map Remote.uuid xmppremotes) (desynced s) }
|
||||
failedrs <- syncAction rs' (const go)
|
||||
forM_ failedrs $ \r ->
|
||||
whenM (liftIO $ Remote.checkAvailable False r) $
|
||||
|
@ -73,7 +62,7 @@ reconnectRemotes notifypushes rs = void $ do
|
|||
mapM_ signal $ filter (`notElem` failedrs) rs'
|
||||
where
|
||||
gitremotes = filter (notspecialremote . Remote.repo) rs
|
||||
(xmppremotes, nonxmppremotes) = partition Remote.isXMPPRemote rs
|
||||
(_xmppremotes, nonxmppremotes) = partition Remote.isXMPPRemote rs
|
||||
notspecialremote r
|
||||
| Git.repoIsUrl r = True
|
||||
| Git.repoIsLocal r = True
|
||||
|
@ -82,7 +71,7 @@ reconnectRemotes notifypushes rs = void $ do
|
|||
sync currentbranch@(Just _, _) = do
|
||||
(failedpull, diverged) <- manualPull currentbranch gitremotes
|
||||
now <- liftIO getCurrentTime
|
||||
failedpush <- pushToRemotes' now notifypushes gitremotes
|
||||
failedpush <- pushToRemotes' now gitremotes
|
||||
return (nub $ failedpull ++ failedpush, diverged)
|
||||
{- No local branch exists yet, but we can try pulling. -}
|
||||
sync (Nothing, _) = manualPull (Nothing, Nothing) gitremotes
|
||||
|
@ -102,9 +91,6 @@ reconnectRemotes notifypushes rs = void $ do
|
|||
- as "git annex sync", except in parallel, and will co-exist with use of
|
||||
- "git annex sync".
|
||||
-
|
||||
- After the pushes to normal git remotes, also signals XMPP clients that
|
||||
- they can request an XMPP push.
|
||||
-
|
||||
- Avoids running possibly long-duration commands in the Annex monad, so
|
||||
- as not to block other threads.
|
||||
-
|
||||
|
@ -122,27 +108,21 @@ reconnectRemotes notifypushes rs = void $ do
|
|||
-
|
||||
- Returns any remotes that it failed to push to.
|
||||
-}
|
||||
pushToRemotes :: Bool -> [Remote] -> Assistant [Remote]
|
||||
pushToRemotes notifypushes remotes = do
|
||||
pushToRemotes :: [Remote] -> Assistant [Remote]
|
||||
pushToRemotes remotes = do
|
||||
now <- liftIO getCurrentTime
|
||||
let remotes' = filter (not . remoteAnnexReadOnly . Remote.gitconfig) remotes
|
||||
syncAction remotes' (pushToRemotes' now notifypushes)
|
||||
pushToRemotes' :: UTCTime -> Bool -> [Remote] -> Assistant [Remote]
|
||||
pushToRemotes' now notifypushes remotes = do
|
||||
syncAction remotes' (pushToRemotes' now)
|
||||
pushToRemotes' :: UTCTime -> [Remote] -> Assistant [Remote]
|
||||
pushToRemotes' now remotes = do
|
||||
(g, branch, u) <- liftAnnex $ do
|
||||
Annex.Branch.commit "update"
|
||||
(,,)
|
||||
<$> gitRepo
|
||||
<*> join Command.Sync.getCurrBranch
|
||||
<*> getUUID
|
||||
let (xmppremotes, normalremotes) = partition Remote.isXMPPRemote remotes
|
||||
let (_xmppremotes, normalremotes) = partition Remote.isXMPPRemote remotes
|
||||
ret <- go True branch g u normalremotes
|
||||
unless (null xmppremotes) $ do
|
||||
shas <- liftAnnex $ map fst <$>
|
||||
inRepo (Git.Ref.matchingWithHEAD
|
||||
[Annex.Branch.fullname, Git.Ref.headRef])
|
||||
forM_ xmppremotes $ \r -> sendNetMessage $
|
||||
Pushing (getXMPPClientID r) (CanPush u shas)
|
||||
return ret
|
||||
where
|
||||
go _ (Nothing, _) _ _ _ = return [] -- no branch, so nothing to do
|
||||
|
@ -152,11 +132,7 @@ pushToRemotes' now notifypushes remotes = do
|
|||
(succeeded, failed) <- parallelPush g rs (push branch)
|
||||
updatemap succeeded []
|
||||
if null failed
|
||||
then do
|
||||
when notifypushes $
|
||||
sendNetMessage $ NotifyPush $
|
||||
map Remote.uuid succeeded
|
||||
return failed
|
||||
then return []
|
||||
else if shouldretry
|
||||
then retry currbranch g u failed
|
||||
else fallback branch g u failed
|
||||
|
@ -175,9 +151,6 @@ pushToRemotes' now notifypushes remotes = do
|
|||
debug ["fallback pushing to", show rs]
|
||||
(succeeded, failed) <- parallelPush g rs (taggedPush u Nothing branch)
|
||||
updatemap succeeded failed
|
||||
when (notifypushes && (not $ null succeeded)) $
|
||||
sendNetMessage $ NotifyPush $
|
||||
map Remote.uuid succeeded
|
||||
return failed
|
||||
|
||||
push branch remote = Command.Sync.pushBranch remote branch
|
||||
|
@ -195,10 +168,6 @@ parallelPush g rs a = do
|
|||
{- Displays an alert while running an action that syncs with some remotes,
|
||||
- and returns any remotes that it failed to sync with.
|
||||
-
|
||||
- XMPP remotes are handled specially; since the action can only start
|
||||
- an async process for them, they are not included in the alert, but are
|
||||
- still passed to the action.
|
||||
-
|
||||
- Readonly remotes are also hidden (to hide the web special remote).
|
||||
-}
|
||||
syncAction :: [Remote] -> ([Remote] -> Assistant [Remote]) -> Assistant [Remote]
|
||||
|
@ -222,15 +191,11 @@ syncAction rs a
|
|||
- remotes that it failed to pull from, and a Bool indicating
|
||||
- whether the git-annex branches of the remotes and local had
|
||||
- diverged before the pull.
|
||||
-
|
||||
- After pulling from the normal git remotes, requests pushes from any
|
||||
- XMPP remotes. However, those pushes will run asynchronously, so their
|
||||
- results are not included in the return data.
|
||||
-}
|
||||
manualPull :: Command.Sync.CurrBranch -> [Remote] -> Assistant ([Remote], Bool)
|
||||
manualPull currentbranch remotes = do
|
||||
g <- liftAnnex gitRepo
|
||||
let (xmppremotes, normalremotes) = partition Remote.isXMPPRemote remotes
|
||||
let (_xmppremotes, normalremotes) = partition Remote.isXMPPRemote remotes
|
||||
failed <- forM normalremotes $ \r -> do
|
||||
g' <- liftAnnex $ sshOptionsTo (Remote.repo r) (Remote.gitconfig r) g
|
||||
ifM (liftIO $ Git.Command.runBool [Param "fetch", Param $ Remote.name r] g')
|
||||
|
@ -240,9 +205,6 @@ manualPull currentbranch remotes = do
|
|||
haddiverged <- liftAnnex Annex.Branch.forceUpdate
|
||||
forM_ normalremotes $ \r ->
|
||||
liftAnnex $ Command.Sync.mergeRemote r currentbranch mergeConfig
|
||||
u <- liftAnnex getUUID
|
||||
forM_ xmppremotes $ \r ->
|
||||
sendNetMessage $ Pushing (getXMPPClientID r) (PushRequest u)
|
||||
return (catMaybes failed, haddiverged)
|
||||
|
||||
mergeConfig :: [Git.Merge.MergeConfig]
|
||||
|
@ -257,7 +219,7 @@ syncRemote :: Remote -> Assistant ()
|
|||
syncRemote remote = do
|
||||
updateSyncRemotes
|
||||
thread <- asIO $ do
|
||||
reconnectRemotes False [remote]
|
||||
reconnectRemotes [remote]
|
||||
addScanRemotes True [remote]
|
||||
void $ liftIO $ forkIO $ thread
|
||||
|
||||
|
|
|
@ -10,8 +10,6 @@ module Assistant.Threads.Merger where
|
|||
import Assistant.Common
|
||||
import Assistant.TransferQueue
|
||||
import Assistant.BranchChange
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.ScanRemotes
|
||||
import Assistant.Sync
|
||||
import Utility.DirWatcher
|
||||
import Utility.DirWatcher.Types
|
||||
|
@ -19,11 +17,6 @@ import qualified Annex.Branch
|
|||
import qualified Git
|
||||
import qualified Git.Branch
|
||||
import qualified Command.Sync
|
||||
import Annex.TaggedPush
|
||||
import Remote (remoteFromUUID)
|
||||
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Text as T
|
||||
|
||||
{- This thread watches for changes to .git/refs/, and handles incoming
|
||||
- pushes. -}
|
||||
|
@ -70,8 +63,7 @@ onChange file
|
|||
branchChanged
|
||||
diverged <- liftAnnex Annex.Branch.forceUpdate
|
||||
when diverged $
|
||||
unlessM handleDesynced $
|
||||
queueDeferredDownloads "retrying deferred download" Later
|
||||
queueDeferredDownloads "retrying deferred download" Later
|
||||
| "/synced/" `isInfixOf` file =
|
||||
mergecurrent =<< liftAnnex (join Command.Sync.getCurrBranch)
|
||||
| otherwise = noop
|
||||
|
@ -91,22 +83,6 @@ onChange file
|
|||
changedbranch
|
||||
mergecurrent _ = noop
|
||||
|
||||
handleDesynced = case fromTaggedBranch changedbranch of
|
||||
Nothing -> return False
|
||||
Just (u, info) -> do
|
||||
mr <- liftAnnex $ remoteFromUUID u
|
||||
case mr of
|
||||
Nothing -> return False
|
||||
Just r -> do
|
||||
s <- desynced <$> getDaemonStatus
|
||||
if S.member u s || Just (T.unpack $ getXMPPClientID r) == info
|
||||
then do
|
||||
modifyDaemonStatus_ $ \st -> st
|
||||
{ desynced = S.delete u s }
|
||||
addScanRemotes True [r]
|
||||
return True
|
||||
else return False
|
||||
|
||||
equivBranches :: Git.Ref -> Git.Ref -> Bool
|
||||
equivBranches x y = base x == base y
|
||||
where
|
||||
|
|
|
@ -146,7 +146,7 @@ handleMount urlrenderer dir = do
|
|||
debug ["detected mount of", dir]
|
||||
rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir
|
||||
mapM_ (fsckNudge urlrenderer . Just) rs
|
||||
reconnectRemotes True rs
|
||||
reconnectRemotes rs
|
||||
|
||||
{- Finds remotes located underneath the mount point.
|
||||
-
|
||||
|
|
|
@ -22,7 +22,6 @@ import Assistant.RemoteControl
|
|||
import Utility.DBus
|
||||
import DBus.Client
|
||||
import DBus
|
||||
import Assistant.NetMessager
|
||||
#else
|
||||
#ifdef linux_HOST_OS
|
||||
#warning Building without dbus support; will poll for network connection changes
|
||||
|
@ -44,9 +43,8 @@ netWatcherThread = thread noop
|
|||
- while (despite the local network staying up), are synced with
|
||||
- periodically.
|
||||
-
|
||||
- Note that it does not call notifyNetMessagerRestart, or
|
||||
- signal the RemoteControl, because it doesn't know that the
|
||||
- network has changed.
|
||||
- Note that it does not signal the RemoteControl, because it doesn't
|
||||
- know that the network has changed.
|
||||
-}
|
||||
netWatcherFallbackThread :: NamedThread
|
||||
netWatcherFallbackThread = namedThread "NetWatcherFallback" $
|
||||
|
@ -76,7 +74,6 @@ dbusThread = do
|
|||
sendRemoteControl LOSTNET
|
||||
connchange True = do
|
||||
debug ["detected network connection"]
|
||||
notifyNetMessagerRestart
|
||||
handleConnection
|
||||
sendRemoteControl RESUME
|
||||
onerr e _ = do
|
||||
|
@ -197,7 +194,7 @@ listenWicdConnections client setconnected = do
|
|||
handleConnection :: Assistant ()
|
||||
handleConnection = do
|
||||
liftIO . sendNotification . networkConnectedNotifier =<< getDaemonStatus
|
||||
reconnectRemotes True =<< networkRemotes
|
||||
reconnectRemotes =<< networkRemotes
|
||||
|
||||
{- Network remotes to sync with. -}
|
||||
networkRemotes :: Assistant [Remote]
|
||||
|
|
|
@ -24,7 +24,7 @@ pushRetryThread = namedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do
|
|||
topush <- getFailedPushesBefore (fromIntegral halfhour)
|
||||
unless (null topush) $ do
|
||||
debug ["retrying", show (length topush), "failed pushes"]
|
||||
void $ pushToRemotes True topush
|
||||
void $ pushToRemotes topush
|
||||
where
|
||||
halfhour = 1800
|
||||
|
||||
|
@ -35,7 +35,7 @@ pushThread = namedThread "Pusher" $ runEvery (Seconds 2) <~> do
|
|||
-- Next, wait until at least one commit has been made
|
||||
void getCommits
|
||||
-- Now see if now's a good time to push.
|
||||
void $ pushToRemotes True =<< pushTargets
|
||||
void $ pushToRemotes =<< pushTargets
|
||||
|
||||
{- We want to avoid pushing to remotes that are marked readonly.
|
||||
-
|
||||
|
|
|
@ -76,7 +76,7 @@ transferScannerThread urlrenderer = namedThread "TransferScanner" $ do
|
|||
- to determine if the remote has been emptied.
|
||||
-}
|
||||
startupScan = do
|
||||
reconnectRemotes True =<< syncGitRemotes <$> getDaemonStatus
|
||||
reconnectRemotes =<< syncGitRemotes <$> getDaemonStatus
|
||||
addScanRemotes True =<< syncDataRemotes <$> getDaemonStatus
|
||||
|
||||
{- This is a cheap scan for failed transfers involving a remote. -}
|
||||
|
|
|
@ -26,7 +26,6 @@ import Assistant.WebApp.Configurators.Pairing
|
|||
import Assistant.WebApp.Configurators.AWS
|
||||
import Assistant.WebApp.Configurators.IA
|
||||
import Assistant.WebApp.Configurators.WebDAV
|
||||
import Assistant.WebApp.Configurators.XMPP
|
||||
import Assistant.WebApp.Configurators.Preferences
|
||||
import Assistant.WebApp.Configurators.Unused
|
||||
import Assistant.WebApp.Configurators.Edit
|
||||
|
|
|
@ -1,375 +0,0 @@
|
|||
{- git-annex XMPP client
|
||||
-
|
||||
- Copyright 2012, 2013 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.XMPPClient where
|
||||
|
||||
import Assistant.Common hiding (ProtocolError)
|
||||
import Assistant.XMPP
|
||||
import Assistant.XMPP.Client
|
||||
import Assistant.NetMessager
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.Types.Buddies
|
||||
import Assistant.XMPP.Buddies
|
||||
import Assistant.Sync
|
||||
import Assistant.DaemonStatus
|
||||
import qualified Remote
|
||||
import Utility.ThreadScheduler
|
||||
import Assistant.WebApp (UrlRenderer)
|
||||
import Assistant.WebApp.Types hiding (liftAssistant)
|
||||
import Assistant.Alert
|
||||
import Assistant.Pairing
|
||||
import Assistant.XMPP.Git
|
||||
import Annex.UUID
|
||||
import Logs.UUID
|
||||
import qualified Command.Sync
|
||||
|
||||
import Network.Protocol.XMPP
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.STM.TMVar
|
||||
import Control.Concurrent.STM (atomically)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Map as M
|
||||
import Data.Time.Clock
|
||||
import Control.Concurrent.Async
|
||||
|
||||
xmppClientThread :: UrlRenderer -> NamedThread
|
||||
xmppClientThread urlrenderer = namedThread "XMPPClient" $
|
||||
restartableClient . xmppClient urlrenderer =<< getAssistant id
|
||||
|
||||
{- Runs the client, handing restart events. -}
|
||||
restartableClient :: (XMPPCreds -> UUID -> IO ()) -> Assistant ()
|
||||
restartableClient a = forever $ go =<< liftAnnex getXMPPCreds
|
||||
where
|
||||
go Nothing = waitNetMessagerRestart
|
||||
go (Just creds) = do
|
||||
xmppuuid <- maybe NoUUID Remote.uuid . headMaybe
|
||||
. filter Remote.isXMPPRemote . syncRemotes
|
||||
<$> getDaemonStatus
|
||||
tid <- liftIO $ forkIO $ a creds xmppuuid
|
||||
waitNetMessagerRestart
|
||||
liftIO $ killThread tid
|
||||
|
||||
xmppClient :: UrlRenderer -> AssistantData -> XMPPCreds -> UUID -> IO ()
|
||||
xmppClient urlrenderer d creds xmppuuid =
|
||||
retry (runclient creds) =<< getCurrentTime
|
||||
where
|
||||
liftAssistant = runAssistant d
|
||||
inAssistant = liftIO . liftAssistant
|
||||
|
||||
{- When the client exits, it's restarted;
|
||||
- if it keeps failing, back off to wait 5 minutes before
|
||||
- trying it again. -}
|
||||
retry client starttime = do
|
||||
{- The buddy list starts empty each time
|
||||
- the client connects, so that stale info
|
||||
- is not retained. -}
|
||||
liftAssistant $
|
||||
updateBuddyList (const noBuddies) <<~ buddyList
|
||||
void client
|
||||
liftAssistant $ do
|
||||
modifyDaemonStatus_ $ \s -> s
|
||||
{ xmppClientID = Nothing }
|
||||
changeCurrentlyConnected $ S.delete xmppuuid
|
||||
|
||||
now <- getCurrentTime
|
||||
if diffUTCTime now starttime > 300
|
||||
then do
|
||||
liftAssistant $ debug ["connection lost; reconnecting"]
|
||||
retry client now
|
||||
else do
|
||||
liftAssistant $ debug ["connection failed; will retry"]
|
||||
threadDelaySeconds (Seconds 300)
|
||||
retry client =<< getCurrentTime
|
||||
|
||||
runclient c = liftIO $ connectXMPP c $ \jid -> do
|
||||
selfjid <- bindJID jid
|
||||
putStanza gitAnnexSignature
|
||||
|
||||
inAssistant $ do
|
||||
modifyDaemonStatus_ $ \s -> s
|
||||
{ xmppClientID = Just $ xmppJID creds }
|
||||
changeCurrentlyConnected $ S.insert xmppuuid
|
||||
debug ["connected", logJid selfjid]
|
||||
|
||||
lasttraffic <- liftIO $ atomically . newTMVar =<< getCurrentTime
|
||||
|
||||
sender <- xmppSession $ sendnotifications selfjid
|
||||
receiver <- xmppSession $ receivenotifications selfjid lasttraffic
|
||||
pinger <- xmppSession $ sendpings selfjid lasttraffic
|
||||
{- Run all 3 threads concurrently, until
|
||||
- any of them throw an exception.
|
||||
- Then kill all 3 threads, and rethrow the
|
||||
- exception.
|
||||
-
|
||||
- If this thread gets an exception, the 3 threads
|
||||
- will also be killed. -}
|
||||
liftIO $ pinger `concurrently` sender `concurrently` receiver
|
||||
|
||||
sendnotifications selfjid = forever $
|
||||
join $ inAssistant $ relayNetMessage selfjid
|
||||
receivenotifications selfjid lasttraffic = forever $ do
|
||||
l <- decodeStanza selfjid <$> getStanza
|
||||
void $ liftIO $ atomically . swapTMVar lasttraffic =<< getCurrentTime
|
||||
inAssistant $ debug
|
||||
["received:", show $ map logXMPPEvent l]
|
||||
mapM_ (handlemsg selfjid) l
|
||||
sendpings selfjid lasttraffic = forever $ do
|
||||
putStanza pingstanza
|
||||
|
||||
startping <- liftIO getCurrentTime
|
||||
liftIO $ threadDelaySeconds (Seconds 120)
|
||||
t <- liftIO $ atomically $ readTMVar lasttraffic
|
||||
when (t < startping) $ do
|
||||
inAssistant $ debug ["ping timeout"]
|
||||
error "ping timeout"
|
||||
where
|
||||
{- XEP-0199 says that the server will respond with either
|
||||
- a ping response or an error message. Either will
|
||||
- cause traffic, so good enough. -}
|
||||
pingstanza = xmppPing selfjid
|
||||
|
||||
handlemsg selfjid (PresenceMessage p) = do
|
||||
void $ inAssistant $
|
||||
updateBuddyList (updateBuddies p) <<~ buddyList
|
||||
resendImportantMessages selfjid p
|
||||
handlemsg _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
|
||||
handlemsg _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
|
||||
handlemsg selfjid (GotNetMessage (PairingNotification stage c u)) =
|
||||
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
|
||||
handlemsg _ (GotNetMessage m@(Pushing _ pushstage))
|
||||
| isPushNotice pushstage = inAssistant $ handlePushNotice m
|
||||
| isPushInitiation pushstage = inAssistant $ queuePushInitiation m
|
||||
| otherwise = inAssistant $ storeInbox m
|
||||
handlemsg _ (Ignorable _) = noop
|
||||
handlemsg _ (Unknown _) = noop
|
||||
handlemsg _ (ProtocolError _) = noop
|
||||
|
||||
resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do
|
||||
let c = formatJID jid
|
||||
(stored, sent) <- inAssistant $
|
||||
checkImportantNetMessages (formatJID (baseJID jid), c)
|
||||
forM_ (S.toList $ S.difference stored sent) $ \msg -> do
|
||||
let msg' = readdressNetMessage msg c
|
||||
inAssistant $ debug
|
||||
[ "sending to new client:"
|
||||
, logJid jid
|
||||
, show $ logNetMessage msg'
|
||||
]
|
||||
join $ inAssistant $ convertNetMsg msg' selfjid
|
||||
inAssistant $ sentImportantNetMessage msg c
|
||||
resendImportantMessages _ _ = noop
|
||||
|
||||
data XMPPEvent
|
||||
= GotNetMessage NetMessage
|
||||
| PresenceMessage Presence
|
||||
| Ignorable ReceivedStanza
|
||||
| Unknown ReceivedStanza
|
||||
| ProtocolError ReceivedStanza
|
||||
deriving Show
|
||||
|
||||
logXMPPEvent :: XMPPEvent -> String
|
||||
logXMPPEvent (GotNetMessage m) = logNetMessage m
|
||||
logXMPPEvent (PresenceMessage p) = logPresence p
|
||||
logXMPPEvent (Ignorable (ReceivedPresence p)) = "Ignorable " ++ logPresence p
|
||||
logXMPPEvent (Ignorable _) = "Ignorable message"
|
||||
logXMPPEvent (Unknown _) = "Unknown message"
|
||||
logXMPPEvent (ProtocolError _) = "Protocol error message"
|
||||
|
||||
logPresence :: Presence -> String
|
||||
logPresence (p@Presence { presenceFrom = Just jid }) = unwords
|
||||
[ "Presence from"
|
||||
, logJid jid
|
||||
, show $ extractGitAnnexTag p
|
||||
]
|
||||
logPresence _ = "Presence from unknown"
|
||||
|
||||
logJid :: JID -> String
|
||||
logJid jid =
|
||||
let name = T.unpack (buddyName jid)
|
||||
resource = maybe "" (T.unpack . strResource) (jidResource jid)
|
||||
in take 1 name ++ show (length name) ++ "/" ++ resource
|
||||
|
||||
logClient :: Client -> String
|
||||
logClient (Client jid) = logJid jid
|
||||
|
||||
{- Decodes an XMPP stanza into one or more events. -}
|
||||
decodeStanza :: JID -> ReceivedStanza -> [XMPPEvent]
|
||||
decodeStanza selfjid s@(ReceivedPresence p)
|
||||
| presenceType p == PresenceError = [ProtocolError s]
|
||||
| isNothing (presenceFrom p) = [Ignorable s]
|
||||
| presenceFrom p == Just selfjid = [Ignorable s]
|
||||
| otherwise = maybe [PresenceMessage p] decode (gitAnnexTagInfo p)
|
||||
where
|
||||
decode i
|
||||
| tagAttr i == pushAttr = impliedp $ GotNetMessage $ NotifyPush $
|
||||
decodePushNotification (tagValue i)
|
||||
| tagAttr i == queryAttr = impliedp $ GotNetMessage QueryPresence
|
||||
| otherwise = [Unknown s]
|
||||
{- Things sent via presence imply a presence message,
|
||||
- along with their real meaning. -}
|
||||
impliedp v = [PresenceMessage p, v]
|
||||
decodeStanza selfjid s@(ReceivedMessage m)
|
||||
| isNothing (messageFrom m) = [Ignorable s]
|
||||
| messageFrom m == Just selfjid = [Ignorable s]
|
||||
| messageType m == MessageError = [ProtocolError s]
|
||||
| otherwise = [fromMaybe (Unknown s) (GotNetMessage <$> decodeMessage m)]
|
||||
decodeStanza _ s = [Unknown s]
|
||||
|
||||
{- Waits for a NetMessager message to be sent, and relays it to XMPP.
|
||||
-
|
||||
- Chat messages must be directed to specific clients, not a base
|
||||
- account JID, due to git-annex clients using a negative presence priority.
|
||||
- PairingNotification messages are always directed at specific
|
||||
- clients, but Pushing messages are sometimes not, and need to be exploded
|
||||
- out to specific clients.
|
||||
-
|
||||
- Important messages, not directed at any specific client,
|
||||
- are cached to be sent later when additional clients connect.
|
||||
-}
|
||||
relayNetMessage :: JID -> Assistant (XMPP ())
|
||||
relayNetMessage selfjid = do
|
||||
msg <- waitNetMessage
|
||||
debug ["sending:", logNetMessage msg]
|
||||
a1 <- handleImportant msg
|
||||
a2 <- convert msg
|
||||
return (a1 >> a2)
|
||||
where
|
||||
handleImportant msg = case parseJID =<< isImportantNetMessage msg of
|
||||
Just tojid
|
||||
| tojid == baseJID tojid -> do
|
||||
storeImportantNetMessage msg (formatJID tojid) $
|
||||
\c -> (baseJID <$> parseJID c) == Just tojid
|
||||
return $ putStanza presenceQuery
|
||||
_ -> return noop
|
||||
convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid ->
|
||||
if tojid == baseJID tojid
|
||||
then do
|
||||
clients <- maybe [] (S.toList . buddyAssistants)
|
||||
<$> getBuddy (genBuddyKey tojid) <<~ buddyList
|
||||
debug ["exploded undirected message to clients", unwords $ map logClient clients]
|
||||
return $ forM_ clients $ \(Client jid) ->
|
||||
putStanza $ pushMessage pushstage jid selfjid
|
||||
else do
|
||||
debug ["to client:", logJid tojid]
|
||||
return $ putStanza $ pushMessage pushstage tojid selfjid
|
||||
convert msg = convertNetMsg msg selfjid
|
||||
|
||||
{- Converts a NetMessage to an XMPP action. -}
|
||||
convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ())
|
||||
convertNetMsg msg selfjid = convert msg
|
||||
where
|
||||
convert (NotifyPush us) = return $ putStanza $ pushNotification us
|
||||
convert QueryPresence = return $ putStanza presenceQuery
|
||||
convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do
|
||||
changeBuddyPairing tojid True
|
||||
return $ putStanza $ pairingNotification stage u tojid selfjid
|
||||
convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid ->
|
||||
return $ putStanza $ pushMessage pushstage tojid selfjid
|
||||
|
||||
withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> Assistant (XMPP ())
|
||||
withOtherClient selfjid c a = case parseJID c of
|
||||
Nothing -> return noop
|
||||
Just tojid
|
||||
| tojid == selfjid -> return noop
|
||||
| otherwise -> a tojid
|
||||
|
||||
withClient :: ClientID -> (JID -> XMPP ()) -> XMPP ()
|
||||
withClient c a = maybe noop a $ parseJID c
|
||||
|
||||
{- Returns an IO action that runs a XMPP action in a separate thread,
|
||||
- using a session to allow it to access the same XMPP client. -}
|
||||
xmppSession :: XMPP () -> XMPP (IO ())
|
||||
xmppSession a = do
|
||||
s <- getSession
|
||||
return $ void $ runXMPP s a
|
||||
|
||||
{- We only pull from one remote out of the set listed in the push
|
||||
- notification, as an optimisation.
|
||||
-
|
||||
- Note that it might be possible (though very unlikely) for the push
|
||||
- notification to take a while to be sent, and multiple pushes happen
|
||||
- before it is sent, so it includes multiple remotes that were pushed
|
||||
- to at different times.
|
||||
-
|
||||
- It could then be the case that the remote we choose had the earlier
|
||||
- push sent to it, but then failed to get the later push, and so is not
|
||||
- fully up-to-date. If that happens, the pushRetryThread will come along
|
||||
- and retry the push, and we'll get another notification once it succeeds,
|
||||
- and pull again. -}
|
||||
pull :: [UUID] -> Assistant ()
|
||||
pull [] = noop
|
||||
pull us = do
|
||||
rs <- filter matching . syncGitRemotes <$> getDaemonStatus
|
||||
debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
|
||||
pullone rs =<< liftAnnex (join Command.Sync.getCurrBranch)
|
||||
where
|
||||
matching r = Remote.uuid r `S.member` s
|
||||
s = S.fromList us
|
||||
|
||||
pullone [] _ = noop
|
||||
pullone (r:rs) branch =
|
||||
unlessM (null . fst <$> manualPull branch [r]) $
|
||||
pullone rs branch
|
||||
|
||||
{- PairReq from another client using our JID is automatically
|
||||
- accepted. This is so pairing devices all using the same XMPP
|
||||
- account works without confirmations.
|
||||
-
|
||||
- Also, autoaccept PairReq from the same JID of any repo we've
|
||||
- already paired with, as long as the UUID in the PairReq is
|
||||
- one we know about.
|
||||
-}
|
||||
pairMsgReceived :: UrlRenderer -> PairStage -> UUID -> JID -> JID -> Assistant ()
|
||||
pairMsgReceived urlrenderer PairReq theiruuid selfjid theirjid
|
||||
| baseJID selfjid == baseJID theirjid = autoaccept
|
||||
| otherwise = do
|
||||
knownjids <- mapMaybe (parseJID . getXMPPClientID)
|
||||
. filter Remote.isXMPPRemote . syncRemotes <$> getDaemonStatus
|
||||
um <- liftAnnex uuidMap
|
||||
if elem (baseJID theirjid) knownjids && M.member theiruuid um
|
||||
then autoaccept
|
||||
else showalert
|
||||
|
||||
where
|
||||
autoaccept = do
|
||||
selfuuid <- liftAnnex getUUID
|
||||
sendNetMessage $
|
||||
PairingNotification PairAck (formatJID theirjid) selfuuid
|
||||
finishXMPPPairing theirjid theiruuid
|
||||
-- Show an alert to let the user decide if they want to pair.
|
||||
showalert = do
|
||||
button <- mkAlertButton True (T.pack "Respond") urlrenderer $
|
||||
ConfirmXMPPPairFriendR $
|
||||
PairKey theiruuid $ formatJID theirjid
|
||||
void $ addAlert $ pairRequestReceivedAlert
|
||||
(T.unpack $ buddyName theirjid)
|
||||
button
|
||||
|
||||
{- PairAck must come from one of the buddies we are pairing with;
|
||||
- don't pair with just anyone. -}
|
||||
pairMsgReceived _ PairAck theiruuid _selfjid theirjid =
|
||||
whenM (isBuddyPairing theirjid) $ do
|
||||
changeBuddyPairing theirjid False
|
||||
selfuuid <- liftAnnex getUUID
|
||||
sendNetMessage $
|
||||
PairingNotification PairDone (formatJID theirjid) selfuuid
|
||||
finishXMPPPairing theirjid theiruuid
|
||||
|
||||
pairMsgReceived _ PairDone _theiruuid _selfjid theirjid =
|
||||
changeBuddyPairing theirjid False
|
||||
|
||||
isBuddyPairing :: JID -> Assistant Bool
|
||||
isBuddyPairing jid = maybe False buddyPairing <$>
|
||||
getBuddy (genBuddyKey jid) <<~ buddyList
|
||||
|
||||
changeBuddyPairing :: JID -> Bool -> Assistant ()
|
||||
changeBuddyPairing jid ispairing =
|
||||
updateBuddyList (M.adjust set key) <<~ buddyList
|
||||
where
|
||||
key = genBuddyKey jid
|
||||
set b = b { buddyPairing = ispairing }
|
|
@ -1,82 +0,0 @@
|
|||
{- git-annex XMPP pusher threads
|
||||
-
|
||||
- This is a pair of threads. One handles git send-pack,
|
||||
- and the other git receive-pack. Each thread can be running at most
|
||||
- one such operation at a time.
|
||||
-
|
||||
- Why not use a single thread? Consider two clients A and B.
|
||||
- If both decide to run a receive-pack at the same time to the other,
|
||||
- they would deadlock with only one thread. For larger numbers of
|
||||
- clients, the two threads are also sufficient.
|
||||
-
|
||||
- Copyright 2013 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.XMPPPusher where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.NetMessager
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.WebApp (UrlRenderer)
|
||||
import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
|
||||
import Assistant.XMPP.Git
|
||||
|
||||
import Control.Exception as E
|
||||
|
||||
xmppSendPackThread :: UrlRenderer -> NamedThread
|
||||
xmppSendPackThread = pusherThread "XMPPSendPack" SendPack
|
||||
|
||||
xmppReceivePackThread :: UrlRenderer -> NamedThread
|
||||
xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack
|
||||
|
||||
pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread
|
||||
pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing
|
||||
where
|
||||
go lastpushedto = do
|
||||
msg <- waitPushInitiation side $ selectNextPush lastpushedto
|
||||
debug ["started running push", logNetMessage msg]
|
||||
|
||||
runpush <- asIO $ runPush checker msg
|
||||
r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID)))
|
||||
let successful = case r of
|
||||
Right (Just _) -> True
|
||||
_ -> False
|
||||
|
||||
{- Empty the inbox, because stuff may have
|
||||
- been left in it if the push failed. -}
|
||||
let justpushedto = getclient msg
|
||||
maybe noop (`emptyInbox` side) justpushedto
|
||||
|
||||
debug ["finished running push", logNetMessage msg, show successful]
|
||||
go $ if successful then justpushedto else lastpushedto
|
||||
|
||||
checker = checkCloudRepos urlrenderer
|
||||
|
||||
getclient (Pushing cid _) = Just cid
|
||||
getclient _ = Nothing
|
||||
|
||||
{- Select the next push to run from the queue.
|
||||
- The queue cannot be empty!
|
||||
-
|
||||
- We prefer to select the most recently added push, because its requestor
|
||||
- is more likely to still be connected.
|
||||
-
|
||||
- When passed the ID of a client we just pushed to, we prefer to not
|
||||
- immediately push again to that same client. This avoids one client
|
||||
- drowing out others. So pushes from the client we just pushed to are
|
||||
- relocated to the beginning of the list, to be processed later.
|
||||
-}
|
||||
selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage])
|
||||
selectNextPush _ (m:[]) = (m, []) -- common case
|
||||
selectNextPush _ [] = error "selectNextPush: empty list"
|
||||
selectNextPush lastpushedto l = go [] l
|
||||
where
|
||||
go (r:ejected) [] = (r, ejected)
|
||||
go rejected (m:ms) = case m of
|
||||
(Pushing clientid _)
|
||||
| Just clientid /= lastpushedto -> (m, rejected ++ ms)
|
||||
_ -> go (m:rejected) ms
|
||||
go [] [] = error "empty push queue"
|
||||
|
|
@ -1,80 +0,0 @@
|
|||
{- git-annex assistant buddies
|
||||
-
|
||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Assistant.Types.Buddies where
|
||||
|
||||
import Annex.Common
|
||||
|
||||
import qualified Data.Map as M
|
||||
import Control.Concurrent.STM
|
||||
import Utility.NotificationBroadcaster
|
||||
import Data.Text as T
|
||||
|
||||
{- For simplicity, dummy types are defined even when XMPP is disabled. -}
|
||||
#ifdef WITH_XMPP
|
||||
import Network.Protocol.XMPP
|
||||
import Data.Set as S
|
||||
import Data.Ord
|
||||
|
||||
newtype Client = Client JID
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Ord Client where
|
||||
compare = comparing show
|
||||
|
||||
data Buddy = Buddy
|
||||
{ buddyPresent :: S.Set Client
|
||||
, buddyAway :: S.Set Client
|
||||
, buddyAssistants :: S.Set Client
|
||||
, buddyPairing :: Bool
|
||||
}
|
||||
#else
|
||||
data Buddy = Buddy
|
||||
#endif
|
||||
deriving (Eq, Show)
|
||||
|
||||
data BuddyKey = BuddyKey T.Text
|
||||
deriving (Eq, Ord, Show, Read)
|
||||
|
||||
data PairKey = PairKey UUID T.Text
|
||||
deriving (Eq, Ord, Show, Read)
|
||||
|
||||
type Buddies = M.Map BuddyKey Buddy
|
||||
|
||||
{- A list of buddies, and a way to notify when it changes. -}
|
||||
type BuddyList = (TMVar Buddies, NotificationBroadcaster)
|
||||
|
||||
noBuddies :: Buddies
|
||||
noBuddies = M.empty
|
||||
|
||||
newBuddyList :: IO BuddyList
|
||||
newBuddyList = (,)
|
||||
<$> atomically (newTMVar noBuddies)
|
||||
<*> newNotificationBroadcaster
|
||||
|
||||
getBuddyList :: BuddyList -> IO [Buddy]
|
||||
getBuddyList (v, _) = M.elems <$> atomically (readTMVar v)
|
||||
|
||||
getBuddy :: BuddyKey -> BuddyList -> IO (Maybe Buddy)
|
||||
getBuddy k (v, _) = M.lookup k <$> atomically (readTMVar v)
|
||||
|
||||
getBuddyBroadcaster :: BuddyList -> NotificationBroadcaster
|
||||
getBuddyBroadcaster (_, h) = h
|
||||
|
||||
{- Applies a function to modify the buddy list, and if it's changed,
|
||||
- sends notifications to any listeners. -}
|
||||
updateBuddyList :: (Buddies -> Buddies) -> BuddyList -> IO ()
|
||||
updateBuddyList a (v, caster) = do
|
||||
changed <- atomically $ do
|
||||
buds <- takeTMVar v
|
||||
let buds' = a buds
|
||||
putTMVar v buds'
|
||||
return $ buds /= buds'
|
||||
when changed $
|
||||
sendNotification caster
|
|
@ -12,7 +12,6 @@ import Assistant.Pairing
|
|||
import Utility.NotificationBroadcaster
|
||||
import Types.Transfer
|
||||
import Assistant.Types.ThreadName
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.Types.Alert
|
||||
import Utility.Url
|
||||
|
||||
|
@ -54,8 +53,6 @@ data DaemonStatus = DaemonStatus
|
|||
, syncingToCloudRemote :: Bool
|
||||
-- Set of uuids of remotes that are currently connected.
|
||||
, currentlyConnectedRemotes :: S.Set UUID
|
||||
-- List of uuids of remotes that we may have gotten out of sync with.
|
||||
, desynced :: S.Set UUID
|
||||
-- Pairing request that is in progress.
|
||||
, pairingInProgress :: Maybe PairingInProgress
|
||||
-- Broadcasts notifications about all changes to the DaemonStatus.
|
||||
|
@ -77,9 +74,6 @@ data DaemonStatus = DaemonStatus
|
|||
, globalRedirUrl :: Maybe URLString
|
||||
-- Actions to run after a Key is transferred.
|
||||
, transferHook :: M.Map Key (Transfer -> IO ())
|
||||
-- When the XMPP client is connected, this will contain the XMPP
|
||||
-- address.
|
||||
, xmppClientID :: Maybe ClientID
|
||||
-- MVars to signal when a remote gets connected.
|
||||
, connectRemoteNotifiers :: M.Map UUID [MVar ()]
|
||||
}
|
||||
|
@ -105,7 +99,6 @@ newDaemonStatus = DaemonStatus
|
|||
<*> pure []
|
||||
<*> pure False
|
||||
<*> pure S.empty
|
||||
<*> pure S.empty
|
||||
<*> pure Nothing
|
||||
<*> newNotificationBroadcaster
|
||||
<*> newNotificationBroadcaster
|
||||
|
@ -117,5 +110,4 @@ newDaemonStatus = DaemonStatus
|
|||
<*> newNotificationBroadcaster
|
||||
<*> pure Nothing
|
||||
<*> pure M.empty
|
||||
<*> pure Nothing
|
||||
<*> pure M.empty
|
||||
|
|
|
@ -1,155 +0,0 @@
|
|||
{- git-annex assistant out of band network messager types
|
||||
-
|
||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Types.NetMessager where
|
||||
|
||||
import Annex.Common
|
||||
import Assistant.Pairing
|
||||
import Git.Types
|
||||
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.Encoding as T
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.DList as D
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.MSampleVar
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.Text (Text)
|
||||
|
||||
{- Messages that can be sent out of band by a network messager. -}
|
||||
data NetMessage
|
||||
-- indicate that pushes have been made to the repos with these uuids
|
||||
= NotifyPush [UUID]
|
||||
-- requests other clients to inform us of their presence
|
||||
| QueryPresence
|
||||
-- notification about a stage in the pairing process,
|
||||
-- involving a client, and a UUID.
|
||||
| PairingNotification PairStage ClientID UUID
|
||||
-- used for git push over the network messager
|
||||
| Pushing ClientID PushStage
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
{- Something used to identify the client, or clients to send the message to. -}
|
||||
type ClientID = Text
|
||||
|
||||
data PushStage
|
||||
-- indicates that we have data to push over the out of band network
|
||||
= CanPush UUID [Sha]
|
||||
-- request that a git push be sent over the out of band network
|
||||
| PushRequest UUID
|
||||
-- indicates that a push is starting
|
||||
| StartingPush UUID
|
||||
-- a chunk of output of git receive-pack
|
||||
| ReceivePackOutput SequenceNum ByteString
|
||||
-- a chuck of output of git send-pack
|
||||
| SendPackOutput SequenceNum ByteString
|
||||
-- sent when git receive-pack exits, with its exit code
|
||||
| ReceivePackDone ExitCode
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
{- A sequence number. Incremented by one per packet in a sequence,
|
||||
- starting with 1 for the first packet. 0 means sequence numbers are
|
||||
- not being used. -}
|
||||
type SequenceNum = Int
|
||||
|
||||
{- NetMessages that are important (and small), and should be stored to be
|
||||
- resent when new clients are seen. -}
|
||||
isImportantNetMessage :: NetMessage -> Maybe ClientID
|
||||
isImportantNetMessage (Pushing c (CanPush _ _)) = Just c
|
||||
isImportantNetMessage (Pushing c (PushRequest _)) = Just c
|
||||
isImportantNetMessage _ = Nothing
|
||||
|
||||
{- Checks if two important NetMessages are equivilant.
|
||||
- That is to say, assuming they were sent to the same client,
|
||||
- would it do the same thing for one as for the other? -}
|
||||
equivilantImportantNetMessages :: NetMessage -> NetMessage -> Bool
|
||||
equivilantImportantNetMessages (Pushing _ (CanPush _ _)) (Pushing _ (CanPush _ _)) = True
|
||||
equivilantImportantNetMessages (Pushing _ (PushRequest _)) (Pushing _ (PushRequest _)) = True
|
||||
equivilantImportantNetMessages _ _ = False
|
||||
|
||||
readdressNetMessage :: NetMessage -> ClientID -> NetMessage
|
||||
readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid
|
||||
readdressNetMessage (Pushing _ stage) c = Pushing c stage
|
||||
readdressNetMessage m _ = m
|
||||
|
||||
{- Convert a NetMessage to something that can be logged. -}
|
||||
logNetMessage :: NetMessage -> String
|
||||
logNetMessage (Pushing c stage) = show $ Pushing (logClientID c) $
|
||||
case stage of
|
||||
ReceivePackOutput n _ -> ReceivePackOutput n elided
|
||||
SendPackOutput n _ -> SendPackOutput n elided
|
||||
s -> s
|
||||
where
|
||||
elided = T.encodeUtf8 $ T.pack "<elided>"
|
||||
logNetMessage (PairingNotification stage c uuid) =
|
||||
show $ PairingNotification stage (logClientID c) uuid
|
||||
logNetMessage m = show m
|
||||
|
||||
logClientID :: ClientID -> ClientID
|
||||
logClientID c = T.concat [T.take 1 c, T.pack $ show $ T.length c]
|
||||
|
||||
{- Things that initiate either side of a push, but do not actually send data. -}
|
||||
isPushInitiation :: PushStage -> Bool
|
||||
isPushInitiation (PushRequest _) = True
|
||||
isPushInitiation (StartingPush _) = True
|
||||
isPushInitiation _ = False
|
||||
|
||||
isPushNotice :: PushStage -> Bool
|
||||
isPushNotice (CanPush _ _) = True
|
||||
isPushNotice _ = False
|
||||
|
||||
data PushSide = SendPack | ReceivePack
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
pushDestinationSide :: PushStage -> PushSide
|
||||
pushDestinationSide (CanPush _ _) = ReceivePack
|
||||
pushDestinationSide (PushRequest _) = SendPack
|
||||
pushDestinationSide (StartingPush _) = ReceivePack
|
||||
pushDestinationSide (ReceivePackOutput _ _) = SendPack
|
||||
pushDestinationSide (SendPackOutput _ _) = ReceivePack
|
||||
pushDestinationSide (ReceivePackDone _) = SendPack
|
||||
|
||||
type SideMap a = PushSide -> a
|
||||
|
||||
mkSideMap :: STM a -> IO (SideMap a)
|
||||
mkSideMap gen = do
|
||||
(sp, rp) <- atomically $ (,) <$> gen <*> gen
|
||||
return $ lookupside sp rp
|
||||
where
|
||||
lookupside sp _ SendPack = sp
|
||||
lookupside _ rp ReceivePack = rp
|
||||
|
||||
getSide :: PushSide -> SideMap a -> a
|
||||
getSide side m = m side
|
||||
|
||||
type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage))
|
||||
|
||||
data NetMessager = NetMessager
|
||||
-- outgoing messages
|
||||
{ netMessages :: TChan NetMessage
|
||||
-- important messages for each client
|
||||
, importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
|
||||
-- important messages that are believed to have been sent to a client
|
||||
, sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
|
||||
-- write to this to restart the net messager
|
||||
, netMessagerRestart :: MSampleVar ()
|
||||
-- queue of incoming messages that request the initiation of pushes
|
||||
, netMessagerPushInitiations :: SideMap (TMVar [NetMessage])
|
||||
-- incoming messages containing data for a running
|
||||
-- (or not yet started) push
|
||||
, netMessagerInboxes :: SideMap Inboxes
|
||||
}
|
||||
|
||||
newNetMessager :: IO NetMessager
|
||||
newNetMessager = NetMessager
|
||||
<$> atomically newTChan
|
||||
<*> atomically (newTMVar M.empty)
|
||||
<*> atomically (newTMVar M.empty)
|
||||
<*> newEmptySV
|
||||
<*> mkSideMap newEmptyTMVar
|
||||
<*> mkSideMap (newTVar M.empty)
|
|
@ -5,26 +5,18 @@
|
|||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE QuasiQuotes, TemplateHaskell, OverloadedStrings, CPP #-}
|
||||
{-# LANGUAGE QuasiQuotes, TemplateHaskell, OverloadedStrings #-}
|
||||
|
||||
module Assistant.WebApp.Configurators where
|
||||
|
||||
import Assistant.WebApp.Common
|
||||
import Assistant.WebApp.RepoList
|
||||
#ifdef WITH_XMPP
|
||||
import Assistant.XMPP.Client
|
||||
#endif
|
||||
|
||||
{- The main configuration screen. -}
|
||||
getConfigurationR :: Handler Html
|
||||
getConfigurationR = ifM inFirstRun
|
||||
( redirect FirstRepositoryR
|
||||
, page "Configuration" (Just Configuration) $ do
|
||||
#ifdef WITH_XMPP
|
||||
xmppconfigured <- liftAnnex $ isJust <$> getXMPPCreds
|
||||
#else
|
||||
let xmppconfigured = False
|
||||
#endif
|
||||
$(widgetFile "configurators/main")
|
||||
)
|
||||
|
||||
|
@ -39,9 +31,6 @@ makeMiscRepositories = $(widgetFile "configurators/addrepository/misc")
|
|||
makeCloudRepositories :: Widget
|
||||
makeCloudRepositories = $(widgetFile "configurators/addrepository/cloud")
|
||||
|
||||
makeXMPPConnection :: Widget
|
||||
makeXMPPConnection = $(widgetFile "configurators/addrepository/xmppconnection")
|
||||
|
||||
makeSshRepository :: Widget
|
||||
makeSshRepository = $(widgetFile "configurators/addrepository/ssh")
|
||||
|
||||
|
|
|
@ -37,16 +37,8 @@ notCurrentRepo uuid a = do
|
|||
go Nothing = error "Unknown UUID"
|
||||
go (Just _) = a
|
||||
|
||||
handleXMPPRemoval :: UUID -> Handler Html -> Handler Html
|
||||
handleXMPPRemoval uuid nonxmpp = do
|
||||
remote <- fromMaybe (error "unknown remote")
|
||||
<$> liftAnnex (Remote.remoteFromUUID uuid)
|
||||
if Remote.isXMPPRemote remote
|
||||
then deletionPage $ $(widgetFile "configurators/delete/xmpp")
|
||||
else nonxmpp
|
||||
|
||||
getDeleteRepositoryR :: UUID -> Handler Html
|
||||
getDeleteRepositoryR uuid = notCurrentRepo uuid $ handleXMPPRemoval uuid $ do
|
||||
getDeleteRepositoryR uuid = notCurrentRepo uuid $ do
|
||||
deletionPage $ do
|
||||
reponame <- liftAnnex $ Remote.prettyUUID uuid
|
||||
$(widgetFile "configurators/delete/start")
|
||||
|
|
|
@ -12,7 +12,6 @@ module Assistant.WebApp.Configurators.Pairing where
|
|||
|
||||
import Assistant.Pairing
|
||||
import Assistant.WebApp.Common
|
||||
import Assistant.Types.Buddies
|
||||
import Annex.UUID
|
||||
#ifdef WITH_PAIRING
|
||||
import Assistant.Pairing.Network
|
||||
|
@ -22,17 +21,6 @@ import Assistant.Alert
|
|||
import Assistant.DaemonStatus
|
||||
import Utility.Verifiable
|
||||
#endif
|
||||
#ifdef WITH_XMPP
|
||||
import Assistant.XMPP.Client
|
||||
import Assistant.XMPP.Buddies
|
||||
import Assistant.XMPP.Git
|
||||
import Network.Protocol.XMPP
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.NetMessager
|
||||
import Assistant.WebApp.RepoList
|
||||
import Assistant.WebApp.Configurators
|
||||
import Assistant.WebApp.Configurators.XMPP
|
||||
#endif
|
||||
import Utility.UserInfo
|
||||
import Git
|
||||
|
||||
|
@ -44,84 +32,6 @@ import Data.Char
|
|||
import qualified Control.Exception as E
|
||||
import Control.Concurrent
|
||||
#endif
|
||||
#ifdef WITH_XMPP
|
||||
import qualified Data.Set as S
|
||||
#endif
|
||||
|
||||
getStartXMPPPairFriendR :: Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
getStartXMPPPairFriendR = ifM (isJust <$> liftAnnex getXMPPCreds)
|
||||
( do
|
||||
{- Ask buddies to send presence info, to get
|
||||
- the buddy list populated. -}
|
||||
liftAssistant $ sendNetMessage QueryPresence
|
||||
pairPage $
|
||||
$(widgetFile "configurators/pairing/xmpp/friend/prompt")
|
||||
, do
|
||||
-- go get XMPP configured, then come back
|
||||
redirect XMPPConfigForPairFriendR
|
||||
)
|
||||
#else
|
||||
getStartXMPPPairFriendR = noXMPPPairing
|
||||
|
||||
noXMPPPairing :: Handler Html
|
||||
noXMPPPairing = noPairing "XMPP"
|
||||
#endif
|
||||
|
||||
getStartXMPPPairSelfR :: Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
getStartXMPPPairSelfR = go =<< liftAnnex getXMPPCreds
|
||||
where
|
||||
go Nothing = do
|
||||
-- go get XMPP configured, then come back
|
||||
redirect XMPPConfigForPairSelfR
|
||||
go (Just creds) = do
|
||||
{- Ask buddies to send presence info, to get
|
||||
- the buddy list populated. -}
|
||||
liftAssistant $ sendNetMessage QueryPresence
|
||||
let account = xmppJID creds
|
||||
pairPage $
|
||||
$(widgetFile "configurators/pairing/xmpp/self/prompt")
|
||||
#else
|
||||
getStartXMPPPairSelfR = noXMPPPairing
|
||||
#endif
|
||||
|
||||
getRunningXMPPPairFriendR :: BuddyKey -> Handler Html
|
||||
getRunningXMPPPairFriendR = sendXMPPPairRequest . Just
|
||||
|
||||
getRunningXMPPPairSelfR :: Handler Html
|
||||
getRunningXMPPPairSelfR = sendXMPPPairRequest Nothing
|
||||
|
||||
{- Sends a XMPP pair request, to a buddy or to self. -}
|
||||
sendXMPPPairRequest :: Maybe BuddyKey -> Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
sendXMPPPairRequest mbid = do
|
||||
bid <- maybe getself return mbid
|
||||
buddy <- liftAssistant $ getBuddy bid <<~ buddyList
|
||||
go $ S.toList . buddyAssistants <$> buddy
|
||||
where
|
||||
go (Just (clients@((Client exemplar):_))) = do
|
||||
u <- liftAnnex getUUID
|
||||
liftAssistant $ forM_ clients $ \(Client c) -> sendNetMessage $
|
||||
PairingNotification PairReq (formatJID c) u
|
||||
xmppPairStatus True $
|
||||
if selfpair then Nothing else Just exemplar
|
||||
go _
|
||||
{- Nudge the user to turn on their other device. -}
|
||||
| selfpair = do
|
||||
liftAssistant $ sendNetMessage QueryPresence
|
||||
pairPage $
|
||||
$(widgetFile "configurators/pairing/xmpp/self/retry")
|
||||
{- Buddy could have logged out, etc.
|
||||
- Go back to buddy list. -}
|
||||
| otherwise = redirect StartXMPPPairFriendR
|
||||
selfpair = isNothing mbid
|
||||
getself = maybe (error "XMPP not configured")
|
||||
(return . BuddyKey . xmppJID)
|
||||
=<< liftAnnex getXMPPCreds
|
||||
#else
|
||||
sendXMPPPairRequest _ = noXMPPPairing
|
||||
#endif
|
||||
|
||||
{- Starts local pairing. -}
|
||||
getStartLocalPairR :: Handler Html
|
||||
|
@ -158,41 +68,6 @@ postFinishLocalPairR msg = promptSecret (Just msg) $ \_ secret -> do
|
|||
postFinishLocalPairR _ = noLocalPairing
|
||||
#endif
|
||||
|
||||
getConfirmXMPPPairFriendR :: PairKey -> Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
getConfirmXMPPPairFriendR pairkey@(PairKey _ t) = case parseJID t of
|
||||
Nothing -> error "bad JID"
|
||||
Just theirjid -> pairPage $ do
|
||||
let name = buddyName theirjid
|
||||
$(widgetFile "configurators/pairing/xmpp/friend/confirm")
|
||||
#else
|
||||
getConfirmXMPPPairFriendR _ = noXMPPPairing
|
||||
#endif
|
||||
|
||||
getFinishXMPPPairFriendR :: PairKey -> Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
getFinishXMPPPairFriendR (PairKey theiruuid t) = case parseJID t of
|
||||
Nothing -> error "bad JID"
|
||||
Just theirjid -> do
|
||||
selfuuid <- liftAnnex getUUID
|
||||
liftAssistant $ do
|
||||
sendNetMessage $
|
||||
PairingNotification PairAck (formatJID theirjid) selfuuid
|
||||
finishXMPPPairing theirjid theiruuid
|
||||
xmppPairStatus False $ Just theirjid
|
||||
#else
|
||||
getFinishXMPPPairFriendR _ = noXMPPPairing
|
||||
#endif
|
||||
|
||||
{- Displays a page indicating pairing status and
|
||||
- prompting to set up cloud repositories. -}
|
||||
#ifdef WITH_XMPP
|
||||
xmppPairStatus :: Bool -> Maybe JID -> Handler Html
|
||||
xmppPairStatus inprogress theirjid = pairPage $ do
|
||||
let friend = buddyName <$> theirjid
|
||||
$(widgetFile "configurators/pairing/xmpp/end")
|
||||
#endif
|
||||
|
||||
getRunningLocalPairR :: SecretReminder -> Handler Html
|
||||
#ifdef WITH_PAIRING
|
||||
getRunningLocalPairR s = pairPage $ do
|
||||
|
|
|
@ -1,226 +0,0 @@
|
|||
{- git-annex assistant XMPP configuration
|
||||
-
|
||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE TypeFamilies, QuasiQuotes, TemplateHaskell, OverloadedStrings, FlexibleContexts #-}
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Assistant.WebApp.Configurators.XMPP where
|
||||
|
||||
import Assistant.WebApp.Common
|
||||
import Assistant.WebApp.Notifications
|
||||
import Utility.NotificationBroadcaster
|
||||
#ifdef WITH_XMPP
|
||||
import qualified Remote
|
||||
import Assistant.XMPP.Client
|
||||
import Assistant.XMPP.Buddies
|
||||
import Assistant.Types.Buddies
|
||||
import Assistant.NetMessager
|
||||
import Assistant.Alert
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.WebApp.RepoList
|
||||
import Assistant.WebApp.Configurators
|
||||
import Assistant.XMPP
|
||||
import qualified Git.Remote.Remove
|
||||
import Remote.List
|
||||
import Creds
|
||||
#endif
|
||||
|
||||
#ifdef WITH_XMPP
|
||||
import Network.Protocol.XMPP
|
||||
import Network
|
||||
import qualified Data.Text as T
|
||||
#endif
|
||||
|
||||
{- When appropriate, displays an alert suggesting to configure a cloud repo
|
||||
- to suppliment an XMPP remote. -}
|
||||
checkCloudRepos :: UrlRenderer -> Remote -> Assistant ()
|
||||
#ifdef WITH_XMPP
|
||||
checkCloudRepos urlrenderer r =
|
||||
unlessM (syncingToCloudRemote <$> getDaemonStatus) $ do
|
||||
buddyname <- getBuddyName $ Remote.uuid r
|
||||
button <- mkAlertButton True "Add a cloud repository" urlrenderer $
|
||||
NeedCloudRepoR $ Remote.uuid r
|
||||
void $ addAlert $ cloudRepoNeededAlert buddyname button
|
||||
#else
|
||||
checkCloudRepos _ _ = noop
|
||||
#endif
|
||||
|
||||
#ifdef WITH_XMPP
|
||||
{- Returns the name of the friend corresponding to a
|
||||
- repository's UUID, but not if it's our name. -}
|
||||
getBuddyName :: UUID -> Assistant (Maybe String)
|
||||
getBuddyName u = go =<< getclientjid
|
||||
where
|
||||
go Nothing = return Nothing
|
||||
go (Just myjid) = (T.unpack . buddyName <$>)
|
||||
. headMaybe
|
||||
. filter (\j -> baseJID j /= baseJID myjid)
|
||||
. map fst
|
||||
. filter (\(_, r) -> Remote.uuid r == u)
|
||||
<$> getXMPPRemotes
|
||||
getclientjid = maybe Nothing parseJID . xmppClientID
|
||||
<$> getDaemonStatus
|
||||
#endif
|
||||
|
||||
getNeedCloudRepoR :: UUID -> Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
getNeedCloudRepoR for = page "Cloud repository needed" (Just Configuration) $ do
|
||||
buddyname <- liftAssistant $ getBuddyName for
|
||||
$(widgetFile "configurators/xmpp/needcloudrepo")
|
||||
#else
|
||||
getNeedCloudRepoR _ = xmppPage $
|
||||
$(widgetFile "configurators/xmpp/disabled")
|
||||
#endif
|
||||
|
||||
getXMPPConfigR :: Handler Html
|
||||
getXMPPConfigR = postXMPPConfigR
|
||||
|
||||
postXMPPConfigR :: Handler Html
|
||||
postXMPPConfigR = xmppform DashboardR
|
||||
|
||||
getXMPPConfigForPairFriendR :: Handler Html
|
||||
getXMPPConfigForPairFriendR = postXMPPConfigForPairFriendR
|
||||
|
||||
postXMPPConfigForPairFriendR :: Handler Html
|
||||
postXMPPConfigForPairFriendR = xmppform StartXMPPPairFriendR
|
||||
|
||||
getXMPPConfigForPairSelfR :: Handler Html
|
||||
getXMPPConfigForPairSelfR = postXMPPConfigForPairSelfR
|
||||
|
||||
postXMPPConfigForPairSelfR :: Handler Html
|
||||
postXMPPConfigForPairSelfR = xmppform StartXMPPPairSelfR
|
||||
|
||||
xmppform :: Route WebApp -> Handler Html
|
||||
#ifdef WITH_XMPP
|
||||
xmppform next = xmppPage $ do
|
||||
((result, form), enctype) <- liftH $ do
|
||||
oldcreds <- liftAnnex getXMPPCreds
|
||||
runFormPostNoToken $ renderBootstrap3 bootstrapFormLayout $ xmppAForm $
|
||||
creds2Form <$> oldcreds
|
||||
let showform problem = $(widgetFile "configurators/xmpp")
|
||||
case result of
|
||||
FormSuccess f -> either (showform . Just) (liftH . storecreds)
|
||||
=<< liftIO (validateForm f)
|
||||
_ -> showform Nothing
|
||||
where
|
||||
storecreds creds = do
|
||||
void $ liftAnnex $ setXMPPCreds creds
|
||||
liftAssistant notifyNetMessagerRestart
|
||||
redirect next
|
||||
#else
|
||||
xmppform _ = xmppPage $
|
||||
$(widgetFile "configurators/xmpp/disabled")
|
||||
#endif
|
||||
|
||||
{- Called by client to get a list of buddies.
|
||||
-
|
||||
- Returns a div, which will be inserted into the calling page.
|
||||
-}
|
||||
getBuddyListR :: NotificationId -> Handler Html
|
||||
getBuddyListR nid = do
|
||||
waitNotifier getBuddyListBroadcaster nid
|
||||
|
||||
p <- widgetToPageContent buddyListDisplay
|
||||
withUrlRenderer $ [hamlet|^{pageBody p}|]
|
||||
|
||||
buddyListDisplay :: Widget
|
||||
buddyListDisplay = do
|
||||
autoUpdate ident NotifierBuddyListR (10 :: Int) (10 :: Int)
|
||||
#ifdef WITH_XMPP
|
||||
myjid <- liftAssistant $ xmppClientID <$> getDaemonStatus
|
||||
let isself (BuddyKey b) = Just b == myjid
|
||||
buddies <- liftAssistant $ do
|
||||
pairedwith <- map fst <$> getXMPPRemotes
|
||||
catMaybes . map (buddySummary pairedwith)
|
||||
<$> (getBuddyList <<~ buddyList)
|
||||
$(widgetFile "configurators/xmpp/buddylist")
|
||||
#else
|
||||
noop
|
||||
#endif
|
||||
where
|
||||
ident = "buddylist"
|
||||
|
||||
#ifdef WITH_XMPP
|
||||
|
||||
getXMPPRemotes :: Assistant [(JID, Remote)]
|
||||
getXMPPRemotes = catMaybes . map pair . filter Remote.isXMPPRemote . syncGitRemotes
|
||||
<$> getDaemonStatus
|
||||
where
|
||||
pair r = maybe Nothing (\jid -> Just (jid, r)) $
|
||||
parseJID $ getXMPPClientID r
|
||||
|
||||
data XMPPForm = XMPPForm
|
||||
{ formJID :: Text
|
||||
, formPassword :: Text }
|
||||
|
||||
creds2Form :: XMPPCreds -> XMPPForm
|
||||
creds2Form c = XMPPForm (xmppJID c) (xmppPassword c)
|
||||
|
||||
xmppAForm :: (Maybe XMPPForm) -> MkAForm XMPPForm
|
||||
xmppAForm d = XMPPForm
|
||||
<$> areq jidField (bfs "Jabber address") (formJID <$> d)
|
||||
<*> areq passwordField (bfs "Password") Nothing
|
||||
|
||||
jidField :: MkField Text
|
||||
jidField = checkBool (isJust . parseJID) bad textField
|
||||
where
|
||||
bad :: Text
|
||||
bad = "This should look like an email address.."
|
||||
|
||||
validateForm :: XMPPForm -> IO (Either String XMPPCreds)
|
||||
validateForm f = do
|
||||
let jid = fromMaybe (error "bad JID") $ parseJID (formJID f)
|
||||
let username = fromMaybe "" (strNode <$> jidNode jid)
|
||||
testXMPP $ XMPPCreds
|
||||
{ xmppUsername = username
|
||||
, xmppPassword = formPassword f
|
||||
, xmppHostname = T.unpack $ strDomain $ jidDomain jid
|
||||
, xmppPort = 5222
|
||||
, xmppJID = formJID f
|
||||
}
|
||||
|
||||
testXMPP :: XMPPCreds -> IO (Either String XMPPCreds)
|
||||
testXMPP creds = do
|
||||
(good, bad) <- partition (either (const False) (const True) . snd)
|
||||
<$> connectXMPP creds (const noop)
|
||||
case good of
|
||||
(((h, PortNumber p), _):_) -> return $ Right $ creds
|
||||
{ xmppHostname = h
|
||||
, xmppPort = fromIntegral p
|
||||
}
|
||||
(((h, _), _):_) -> return $ Right $ creds
|
||||
{ xmppHostname = h
|
||||
}
|
||||
_ -> return $ Left $ intercalate "; " $ map formatlog bad
|
||||
where
|
||||
formatlog ((h, p), Left e) = "host " ++ h ++ ":" ++ showport p ++ " failed: " ++ show e
|
||||
formatlog _ = ""
|
||||
|
||||
showport (PortNumber n) = show n
|
||||
showport (Service s) = s
|
||||
showport (UnixSocket s) = s
|
||||
#endif
|
||||
|
||||
getDisconnectXMPPR :: Handler Html
|
||||
getDisconnectXMPPR = do
|
||||
#ifdef WITH_XMPP
|
||||
rs <- filter Remote.isXMPPRemote . syncRemotes
|
||||
<$> liftAssistant getDaemonStatus
|
||||
liftAnnex $ do
|
||||
mapM_ (inRepo . Git.Remote.Remove.remove . Remote.name) rs
|
||||
void remoteListRefresh
|
||||
removeCreds xmppCredsFile
|
||||
liftAssistant $ do
|
||||
updateSyncRemotes
|
||||
notifyNetMessagerRestart
|
||||
redirect DashboardR
|
||||
#else
|
||||
xmppPage $ $(widgetFile "configurators/xmpp/disabled")
|
||||
#endif
|
||||
|
||||
xmppPage :: Widget -> Handler Html
|
||||
xmppPage = page "Jabber" (Just Configuration)
|
|
@ -13,7 +13,6 @@ import Assistant.Common
|
|||
import Assistant.WebApp
|
||||
import Assistant.WebApp.Types
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Types.Buddies
|
||||
import Utility.NotificationBroadcaster
|
||||
import Utility.Yesod
|
||||
import Utility.WebApp
|
||||
|
@ -60,9 +59,6 @@ getNotifierTransfersR = notifierUrl TransfersR getTransferBroadcaster
|
|||
getNotifierSideBarR :: Handler RepPlain
|
||||
getNotifierSideBarR = notifierUrl SideBarR getAlertBroadcaster
|
||||
|
||||
getNotifierBuddyListR :: Handler RepPlain
|
||||
getNotifierBuddyListR = notifierUrl BuddyListR getBuddyListBroadcaster
|
||||
|
||||
getNotifierRepoListR :: RepoSelector -> Handler RepPlain
|
||||
getNotifierRepoListR reposelector = notifierUrl route getRepoListBroadcaster
|
||||
where
|
||||
|
@ -77,9 +73,6 @@ getTransferBroadcaster = transferNotifier <$> getDaemonStatus
|
|||
getAlertBroadcaster :: Assistant NotificationBroadcaster
|
||||
getAlertBroadcaster = alertNotifier <$> getDaemonStatus
|
||||
|
||||
getBuddyListBroadcaster :: Assistant NotificationBroadcaster
|
||||
getBuddyListBroadcaster = getBuddyBroadcaster <$> getAssistant buddyList
|
||||
|
||||
getRepoListBroadcaster :: Assistant NotificationBroadcaster
|
||||
getRepoListBroadcaster = syncRemotesNotifier <$> getDaemonStatus
|
||||
|
||||
|
|
|
@ -260,7 +260,7 @@ getSyncNowRepositoryR uuid = do
|
|||
if u == uuid
|
||||
then do
|
||||
thread <- liftAssistant $ asIO $
|
||||
reconnectRemotes True
|
||||
reconnectRemotes
|
||||
=<< (syncRemotes <$> getDaemonStatus)
|
||||
void $ liftIO $ forkIO thread
|
||||
else maybe noop (liftAssistant . syncRemote)
|
||||
|
|
|
@ -15,7 +15,6 @@ module Assistant.WebApp.Types where
|
|||
import Assistant.Common
|
||||
import Assistant.Ssh
|
||||
import Assistant.Pairing
|
||||
import Assistant.Types.Buddies
|
||||
import Utility.NotificationBroadcaster
|
||||
import Utility.WebApp
|
||||
import Utility.Yesod
|
||||
|
@ -162,14 +161,6 @@ instance PathPiece UUID where
|
|||
toPathPiece = pack . show
|
||||
fromPathPiece = readish . unpack
|
||||
|
||||
instance PathPiece BuddyKey where
|
||||
toPathPiece = pack . show
|
||||
fromPathPiece = readish . unpack
|
||||
|
||||
instance PathPiece PairKey where
|
||||
toPathPiece = pack . show
|
||||
fromPathPiece = readish . unpack
|
||||
|
||||
instance PathPiece RepoSelector where
|
||||
toPathPiece = pack . show
|
||||
fromPathPiece = readish . unpack
|
||||
|
|
|
@ -16,11 +16,6 @@
|
|||
|
||||
/config ConfigurationR GET
|
||||
/config/preferences PreferencesR GET POST
|
||||
/config/xmpp XMPPConfigR GET POST
|
||||
/config/xmpp/for/self XMPPConfigForPairSelfR GET POST
|
||||
/config/xmpp/for/frield XMPPConfigForPairFriendR GET POST
|
||||
/config/xmpp/needcloudrepo/#UUID NeedCloudRepoR GET
|
||||
/config/xmpp/disconnect DisconnectXMPPR GET
|
||||
/config/needconnection ConnectionNeededR GET
|
||||
/config/fsck ConfigFsckR GET POST
|
||||
/config/fsck/preferences ConfigFsckPreferencesR POST
|
||||
|
@ -67,14 +62,6 @@
|
|||
/config/repository/pair/local/running/#SecretReminder RunningLocalPairR GET
|
||||
/config/repository/pair/local/finish/#PairMsg FinishLocalPairR GET POST
|
||||
|
||||
/config/repository/pair/xmpp/self/start StartXMPPPairSelfR GET
|
||||
/config/repository/pair/xmpp/self/running RunningXMPPPairSelfR GET
|
||||
|
||||
/config/repository/pair/xmpp/friend/start StartXMPPPairFriendR GET
|
||||
/config/repository/pair/xmpp/friend/running/#BuddyKey RunningXMPPPairFriendR GET
|
||||
/config/repository/pair/xmpp/friend/accept/#PairKey ConfirmXMPPPairFriendR GET
|
||||
/config/repository/pair/xmpp/friend/finish/#PairKey FinishXMPPPairFriendR GET
|
||||
|
||||
/config/repository/enable/rsync/#UUID EnableRsyncR GET POST
|
||||
/config/repository/enable/gcrypt/#UUID EnableSshGCryptR GET POST
|
||||
/config/repository/enable/directory/#UUID EnableDirectoryR GET
|
||||
|
@ -103,9 +90,6 @@
|
|||
/sidebar/#NotificationId SideBarR GET
|
||||
/notifier/sidebar NotifierSideBarR GET
|
||||
|
||||
/buddylist/#NotificationId BuddyListR GET
|
||||
/notifier/buddylist NotifierBuddyListR GET
|
||||
|
||||
/repolist/#NotificationId/#RepoSelector RepoListR GET
|
||||
/notifier/repolist/#RepoSelector NotifierRepoListR GET
|
||||
|
||||
|
|
|
@ -1,275 +0,0 @@
|
|||
{- core xmpp support
|
||||
-
|
||||
- Copyright 2012-2013 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Assistant.XMPP where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.Pairing
|
||||
import Git.Sha (extractSha)
|
||||
import Git
|
||||
|
||||
import Network.Protocol.XMPP hiding (Node)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Map as M
|
||||
import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
import Data.XML.Types
|
||||
import qualified "sandi" Codec.Binary.Base64 as B64
|
||||
import Data.Bits.Utils
|
||||
|
||||
{- Name of the git-annex tag, in our own XML namespace.
|
||||
- (Not using a namespace URL to avoid unnecessary bloat.) -}
|
||||
gitAnnexTagName :: Name
|
||||
gitAnnexTagName = "{git-annex}git-annex"
|
||||
|
||||
{- Creates a git-annex tag containing a particular attribute and value. -}
|
||||
gitAnnexTag :: Name -> Text -> Element
|
||||
gitAnnexTag attr val = gitAnnexTagContent attr val []
|
||||
|
||||
{- Also with some content. -}
|
||||
gitAnnexTagContent :: Name -> Text -> [Node] -> Element
|
||||
gitAnnexTagContent attr val = Element gitAnnexTagName [(attr, [ContentText val])]
|
||||
|
||||
isGitAnnexTag :: Element -> Bool
|
||||
isGitAnnexTag t = elementName t == gitAnnexTagName
|
||||
|
||||
{- Things that a git-annex tag can inserted into. -}
|
||||
class GitAnnexTaggable a where
|
||||
insertGitAnnexTag :: a -> Element -> a
|
||||
|
||||
extractGitAnnexTag :: a -> Maybe Element
|
||||
|
||||
hasGitAnnexTag :: a -> Bool
|
||||
hasGitAnnexTag = isJust . extractGitAnnexTag
|
||||
|
||||
instance GitAnnexTaggable Message where
|
||||
insertGitAnnexTag m elt = m { messagePayloads = elt : messagePayloads m }
|
||||
extractGitAnnexTag = headMaybe . filter isGitAnnexTag . messagePayloads
|
||||
|
||||
instance GitAnnexTaggable Presence where
|
||||
-- always mark extended away and set presence priority to negative
|
||||
insertGitAnnexTag p elt = p
|
||||
{ presencePayloads = extendedAway : negativePriority : elt : presencePayloads p }
|
||||
extractGitAnnexTag = headMaybe . filter isGitAnnexTag . presencePayloads
|
||||
|
||||
data GitAnnexTagInfo = GitAnnexTagInfo
|
||||
{ tagAttr :: Name
|
||||
, tagValue :: Text
|
||||
, tagElement :: Element
|
||||
}
|
||||
|
||||
type Decoder = Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
|
||||
gitAnnexTagInfo :: GitAnnexTaggable a => a -> Maybe GitAnnexTagInfo
|
||||
gitAnnexTagInfo v = case extractGitAnnexTag v of
|
||||
{- Each git-annex tag has a single attribute. -}
|
||||
Just (tag@(Element _ [(attr, _)] _)) -> GitAnnexTagInfo
|
||||
<$> pure attr
|
||||
<*> attributeText attr tag
|
||||
<*> pure tag
|
||||
_ -> Nothing
|
||||
|
||||
{- A presence with a git-annex tag in it.
|
||||
- Also includes a status tag, which may be visible in XMPP clients. -}
|
||||
gitAnnexPresence :: Element -> Presence
|
||||
gitAnnexPresence = insertGitAnnexTag $ addStatusTag $ emptyPresence PresenceAvailable
|
||||
where
|
||||
addStatusTag p = p
|
||||
{ presencePayloads = status : presencePayloads p }
|
||||
status = Element "status" [] [statusMessage]
|
||||
statusMessage = NodeContent $ ContentText $ T.pack "git-annex"
|
||||
|
||||
{- A presence with an empty git-annex tag in it, used for letting other
|
||||
- clients know we're around and are a git-annex client. -}
|
||||
gitAnnexSignature :: Presence
|
||||
gitAnnexSignature = gitAnnexPresence $ Element gitAnnexTagName [] []
|
||||
|
||||
{- XMPP client to server ping -}
|
||||
xmppPing :: JID -> IQ
|
||||
xmppPing selfjid = (emptyIQ IQGet)
|
||||
{ iqID = Just "c2s1"
|
||||
, iqFrom = Just selfjid
|
||||
, iqTo = Just $ JID Nothing (jidDomain selfjid) Nothing
|
||||
, iqPayload = Just $ Element xmppPingTagName [] []
|
||||
}
|
||||
|
||||
xmppPingTagName :: Name
|
||||
xmppPingTagName = "{urn:xmpp}ping"
|
||||
|
||||
{- A message with a git-annex tag in it. -}
|
||||
gitAnnexMessage :: Element -> JID -> JID -> Message
|
||||
gitAnnexMessage elt tojid fromjid = (insertGitAnnexTag silentMessage elt)
|
||||
{ messageTo = Just tojid
|
||||
, messageFrom = Just fromjid
|
||||
}
|
||||
|
||||
{- A notification that we've pushed to some repositories, listing their
|
||||
- UUIDs. -}
|
||||
pushNotification :: [UUID] -> Presence
|
||||
pushNotification = gitAnnexPresence . gitAnnexTag pushAttr . encodePushNotification
|
||||
|
||||
encodePushNotification :: [UUID] -> Text
|
||||
encodePushNotification = T.intercalate uuidSep . map (T.pack . fromUUID)
|
||||
|
||||
decodePushNotification :: Text -> [UUID]
|
||||
decodePushNotification = map (toUUID . T.unpack) . T.splitOn uuidSep
|
||||
|
||||
uuidSep :: Text
|
||||
uuidSep = ","
|
||||
|
||||
{- A request for other git-annex clients to send presence. -}
|
||||
presenceQuery :: Presence
|
||||
presenceQuery = gitAnnexPresence $ gitAnnexTag queryAttr T.empty
|
||||
|
||||
{- A notification about a stage of pairing. -}
|
||||
pairingNotification :: PairStage -> UUID -> JID -> JID -> Message
|
||||
pairingNotification pairstage u = gitAnnexMessage $
|
||||
gitAnnexTag pairAttr $ encodePairingNotification pairstage u
|
||||
|
||||
encodePairingNotification :: PairStage -> UUID -> Text
|
||||
encodePairingNotification pairstage u = T.unwords $ map T.pack
|
||||
[ show pairstage
|
||||
, fromUUID u
|
||||
]
|
||||
|
||||
decodePairingNotification :: Decoder
|
||||
decodePairingNotification m = parse . words . T.unpack . tagValue
|
||||
where
|
||||
parse [stage, u] = PairingNotification
|
||||
<$> readish stage
|
||||
<*> (formatJID <$> messageFrom m)
|
||||
<*> pure (toUUID u)
|
||||
parse _ = Nothing
|
||||
|
||||
pushMessage :: PushStage -> JID -> JID -> Message
|
||||
pushMessage = gitAnnexMessage . encode
|
||||
where
|
||||
encode (CanPush u shas) =
|
||||
gitAnnexTag canPushAttr $ T.pack $ unwords $
|
||||
fromUUID u : map fromRef shas
|
||||
encode (PushRequest u) =
|
||||
gitAnnexTag pushRequestAttr $ T.pack $ fromUUID u
|
||||
encode (StartingPush u) =
|
||||
gitAnnexTag startingPushAttr $ T.pack $ fromUUID u
|
||||
encode (ReceivePackOutput n b) =
|
||||
gitAnnexTagContent receivePackAttr (val n) $ encodeTagContent b
|
||||
encode (SendPackOutput n b) =
|
||||
gitAnnexTagContent sendPackAttr (val n) $ encodeTagContent b
|
||||
encode (ReceivePackDone code) =
|
||||
gitAnnexTag receivePackDoneAttr $ val $ encodeExitCode code
|
||||
val = T.pack . show
|
||||
|
||||
decodeMessage :: Message -> Maybe NetMessage
|
||||
decodeMessage m = decode =<< gitAnnexTagInfo m
|
||||
where
|
||||
decode i = M.lookup (tagAttr i) decoders >>= rundecoder i
|
||||
rundecoder i d = d m i
|
||||
decoders = M.fromList $ zip
|
||||
[ pairAttr
|
||||
, canPushAttr
|
||||
, pushRequestAttr
|
||||
, startingPushAttr
|
||||
, receivePackAttr
|
||||
, sendPackAttr
|
||||
, receivePackDoneAttr
|
||||
]
|
||||
[ decodePairingNotification
|
||||
, pushdecoder $ shasgen CanPush
|
||||
, pushdecoder $ gen PushRequest
|
||||
, pushdecoder $ gen StartingPush
|
||||
, pushdecoder $ seqgen ReceivePackOutput
|
||||
, pushdecoder $ seqgen SendPackOutput
|
||||
, pushdecoder $
|
||||
fmap (ReceivePackDone . decodeExitCode) . readish .
|
||||
T.unpack . tagValue
|
||||
]
|
||||
pushdecoder a m' i = Pushing
|
||||
<$> (formatJID <$> messageFrom m')
|
||||
<*> a i
|
||||
gen c i = c . toUUID <$> headMaybe (words (T.unpack (tagValue i)))
|
||||
seqgen c i = do
|
||||
packet <- decodeTagContent $ tagElement i
|
||||
let seqnum = fromMaybe 0 $ readish $ T.unpack $ tagValue i
|
||||
return $ c seqnum packet
|
||||
shasgen c i = do
|
||||
let (u:shas) = words $ T.unpack $ tagValue i
|
||||
return $ c (toUUID u) (mapMaybe extractSha shas)
|
||||
|
||||
decodeExitCode :: Int -> ExitCode
|
||||
decodeExitCode 0 = ExitSuccess
|
||||
decodeExitCode n = ExitFailure n
|
||||
|
||||
encodeExitCode :: ExitCode -> Int
|
||||
encodeExitCode ExitSuccess = 0
|
||||
encodeExitCode (ExitFailure n) = n
|
||||
|
||||
{- Base 64 encoding a ByteString to use as the content of a tag. -}
|
||||
encodeTagContent :: ByteString -> [Node]
|
||||
encodeTagContent b = [NodeContent $ ContentText $ T.pack $ w82s $ B.unpack $ B64.encode b]
|
||||
|
||||
decodeTagContent :: Element -> Maybe ByteString
|
||||
decodeTagContent elt = either (const Nothing) Just (B64.decode $ B.pack $ s2w8 s)
|
||||
where
|
||||
s = T.unpack $ T.concat $ elementText elt
|
||||
|
||||
{- The JID without the client part. -}
|
||||
baseJID :: JID -> JID
|
||||
baseJID j = JID (jidNode j) (jidDomain j) Nothing
|
||||
|
||||
{- An XMPP chat message with an empty body. This should not be displayed
|
||||
- by clients, but can be used for communications. -}
|
||||
silentMessage :: Message
|
||||
silentMessage = (emptyMessage MessageChat)
|
||||
{ messagePayloads = [ emptybody ] }
|
||||
where
|
||||
emptybody = Element
|
||||
{ elementName = "body"
|
||||
, elementAttributes = []
|
||||
, elementNodes = []
|
||||
}
|
||||
|
||||
{- Add to a presence to mark its client as extended away. -}
|
||||
extendedAway :: Element
|
||||
extendedAway = Element "show" [] [NodeContent $ ContentText "xa"]
|
||||
|
||||
{- Add to a presence to give it a negative priority. -}
|
||||
negativePriority :: Element
|
||||
negativePriority = Element "priority" [] [NodeContent $ ContentText "-1"]
|
||||
|
||||
pushAttr :: Name
|
||||
pushAttr = "push"
|
||||
|
||||
queryAttr :: Name
|
||||
queryAttr = "query"
|
||||
|
||||
pairAttr :: Name
|
||||
pairAttr = "pair"
|
||||
|
||||
canPushAttr :: Name
|
||||
canPushAttr = "canpush"
|
||||
|
||||
pushRequestAttr :: Name
|
||||
pushRequestAttr = "pushrequest"
|
||||
|
||||
startingPushAttr :: Name
|
||||
startingPushAttr = "startingpush"
|
||||
|
||||
receivePackAttr :: Name
|
||||
receivePackAttr = "rp"
|
||||
|
||||
sendPackAttr :: Name
|
||||
sendPackAttr = "sp"
|
||||
|
||||
receivePackDoneAttr :: Name
|
||||
receivePackDoneAttr = "rpdone"
|
||||
|
||||
shasAttr :: Name
|
||||
shasAttr = "shas"
|
|
@ -1,87 +0,0 @@
|
|||
{- xmpp buddies
|
||||
-
|
||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.XMPP.Buddies where
|
||||
|
||||
import Assistant.XMPP
|
||||
import Annex.Common
|
||||
import Assistant.Types.Buddies
|
||||
|
||||
import Network.Protocol.XMPP
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
|
||||
genBuddyKey :: JID -> BuddyKey
|
||||
genBuddyKey j = BuddyKey $ formatJID $ baseJID j
|
||||
|
||||
buddyName :: JID -> Text
|
||||
buddyName j = maybe (T.pack "") strNode (jidNode j)
|
||||
|
||||
ucFirst :: Text -> Text
|
||||
ucFirst s = let (first, rest) = T.splitAt 1 s
|
||||
in T.concat [T.toUpper first, rest]
|
||||
|
||||
{- Summary of info about a buddy.
|
||||
-
|
||||
- If the buddy has no clients at all anymore, returns Nothing. -}
|
||||
buddySummary :: [JID] -> Buddy -> Maybe (Text, Bool, Bool, Bool, BuddyKey)
|
||||
buddySummary pairedwith b = case clients of
|
||||
((Client j):_) -> Just (buddyName j, away, canpair, alreadypaired j, genBuddyKey j)
|
||||
[] -> Nothing
|
||||
where
|
||||
away = S.null (buddyPresent b) && S.null (buddyAssistants b)
|
||||
canpair = not $ S.null (buddyAssistants b)
|
||||
clients = S.toList $ buddyPresent b `S.union` buddyAway b `S.union` buddyAssistants b
|
||||
alreadypaired j = baseJID j `elem` pairedwith
|
||||
|
||||
{- Updates the buddies with XMPP presence info. -}
|
||||
updateBuddies :: Presence -> Buddies -> Buddies
|
||||
updateBuddies p@(Presence { presenceFrom = Just jid }) = M.alter update key
|
||||
where
|
||||
key = genBuddyKey jid
|
||||
update (Just b) = Just $ applyPresence p b
|
||||
update Nothing = newBuddy p
|
||||
updateBuddies _ = id
|
||||
|
||||
{- Creates a new buddy based on XMPP presence info. -}
|
||||
newBuddy :: Presence -> Maybe Buddy
|
||||
newBuddy p
|
||||
| presenceType p == PresenceAvailable = go
|
||||
| presenceType p == PresenceUnavailable = go
|
||||
| otherwise = Nothing
|
||||
where
|
||||
go = make <$> presenceFrom p
|
||||
make _jid = applyPresence p $ Buddy
|
||||
{ buddyPresent = S.empty
|
||||
, buddyAway = S.empty
|
||||
, buddyAssistants = S.empty
|
||||
, buddyPairing = False
|
||||
}
|
||||
|
||||
applyPresence :: Presence -> Buddy -> Buddy
|
||||
applyPresence p b = fromMaybe b $! go <$> presenceFrom p
|
||||
where
|
||||
go jid
|
||||
| presenceType p == PresenceUnavailable = b
|
||||
{ buddyAway = addto $ buddyAway b
|
||||
, buddyPresent = removefrom $ buddyPresent b
|
||||
, buddyAssistants = removefrom $ buddyAssistants b
|
||||
}
|
||||
| hasGitAnnexTag p = b
|
||||
{ buddyAssistants = addto $ buddyAssistants b
|
||||
, buddyAway = removefrom $ buddyAway b }
|
||||
| presenceType p == PresenceAvailable = b
|
||||
{ buddyPresent = addto $ buddyPresent b
|
||||
, buddyAway = removefrom $ buddyAway b
|
||||
}
|
||||
| otherwise = b
|
||||
where
|
||||
client = Client jid
|
||||
removefrom = S.filter (/= client)
|
||||
addto = S.insert client
|
|
@ -1,83 +0,0 @@
|
|||
{- xmpp client support
|
||||
-
|
||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.XMPP.Client where
|
||||
|
||||
import Assistant.Common
|
||||
import Utility.SRV
|
||||
import Creds
|
||||
|
||||
import Network.Protocol.XMPP
|
||||
import Network
|
||||
import Control.Concurrent
|
||||
import qualified Data.Text as T
|
||||
|
||||
{- Everything we need to know to connect to an XMPP server. -}
|
||||
data XMPPCreds = XMPPCreds
|
||||
{ xmppUsername :: T.Text
|
||||
, xmppPassword :: T.Text
|
||||
, xmppHostname :: HostName
|
||||
, xmppPort :: Int
|
||||
, xmppJID :: T.Text
|
||||
}
|
||||
deriving (Read, Show)
|
||||
|
||||
connectXMPP :: XMPPCreds -> (JID -> XMPP a) -> IO [(HostPort, Either SomeException ())]
|
||||
connectXMPP c a = case parseJID (xmppJID c) of
|
||||
Nothing -> error "bad JID"
|
||||
Just jid -> connectXMPP' jid c a
|
||||
|
||||
{- Do a SRV lookup, but if it fails, fall back to the cached xmppHostname. -}
|
||||
connectXMPP' :: JID -> XMPPCreds -> (JID -> XMPP a) -> IO [(HostPort, Either SomeException ())]
|
||||
connectXMPP' jid c a = reverse <$> (handlesrv =<< lookupSRV srvrecord)
|
||||
where
|
||||
srvrecord = mkSRVTcp "xmpp-client" $
|
||||
T.unpack $ strDomain $ jidDomain jid
|
||||
serverjid = JID Nothing (jidDomain jid) Nothing
|
||||
|
||||
handlesrv [] = do
|
||||
let h = xmppHostname c
|
||||
let p = PortNumber $ fromIntegral $ xmppPort c
|
||||
r <- run h p $ a jid
|
||||
return [r]
|
||||
handlesrv srvs = go [] srvs
|
||||
|
||||
go l [] = return l
|
||||
go l ((h,p):rest) = do
|
||||
{- Try each SRV record in turn, until one connects,
|
||||
- at which point the MVar will be full. -}
|
||||
mv <- newEmptyMVar
|
||||
r <- run h p $ do
|
||||
liftIO $ putMVar mv ()
|
||||
a jid
|
||||
ifM (isEmptyMVar mv)
|
||||
( go (r : l) rest
|
||||
, return (r : l)
|
||||
)
|
||||
|
||||
{- Async exceptions are let through so the XMPP thread can
|
||||
- be killed. -}
|
||||
run h p a' = do
|
||||
r <- tryNonAsync $
|
||||
runClientError (Server serverjid h p) jid
|
||||
(xmppUsername c) (xmppPassword c) (void a')
|
||||
return ((h, p), r)
|
||||
|
||||
{- XMPP runClient, that throws errors rather than returning an Either -}
|
||||
runClientError :: Server -> JID -> T.Text -> T.Text -> XMPP a -> IO a
|
||||
runClientError s j u p x = either (error . show) return =<< runClient s j u p x
|
||||
|
||||
getXMPPCreds :: Annex (Maybe XMPPCreds)
|
||||
getXMPPCreds = parse <$> readCacheCreds xmppCredsFile
|
||||
where
|
||||
parse s = readish =<< s
|
||||
|
||||
setXMPPCreds :: XMPPCreds -> Annex ()
|
||||
setXMPPCreds creds = writeCacheCreds (show creds) xmppCredsFile
|
||||
|
||||
xmppCredsFile :: FilePath
|
||||
xmppCredsFile = "xmpp"
|
|
@ -1,381 +0,0 @@
|
|||
{- git over XMPP
|
||||
-
|
||||
- Copyright 2012 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Assistant.XMPP.Git where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.NetMessager
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.XMPP
|
||||
import Assistant.XMPP.Buddies
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Alert
|
||||
import Assistant.MakeRemote
|
||||
import Assistant.Sync
|
||||
import qualified Command.Sync
|
||||
import qualified Annex.Branch
|
||||
import Annex.Path
|
||||
import Annex.UUID
|
||||
import Logs.UUID
|
||||
import Annex.TaggedPush
|
||||
import Annex.CatFile
|
||||
import Config
|
||||
import Git
|
||||
import qualified Types.Remote as Remote
|
||||
import qualified Remote as Remote
|
||||
import Remote.List
|
||||
import Utility.FileMode
|
||||
import Utility.Shell
|
||||
import Utility.Env
|
||||
|
||||
import Network.Protocol.XMPP
|
||||
import qualified Data.Text as T
|
||||
import System.Posix.Types
|
||||
import qualified System.Posix.IO
|
||||
import Control.Concurrent
|
||||
import System.Timeout
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.Map as M
|
||||
|
||||
{- Largest chunk of data to send in a single XMPP message. -}
|
||||
chunkSize :: Int
|
||||
chunkSize = 4096
|
||||
|
||||
{- How long to wait for an expected message before assuming the other side
|
||||
- has gone away and canceling a push.
|
||||
-
|
||||
- This needs to be long enough to allow a message of up to 2+ times
|
||||
- chunkSize to propigate up to a XMPP server, perhaps across to another
|
||||
- server, and back down to us. On the other hand, other XMPP pushes can be
|
||||
- delayed for running until the timeout is reached, so it should not be
|
||||
- excessive.
|
||||
-}
|
||||
xmppTimeout :: Int
|
||||
xmppTimeout = 120000000 -- 120 seconds
|
||||
|
||||
finishXMPPPairing :: JID -> UUID -> Assistant ()
|
||||
finishXMPPPairing jid u = void $ alertWhile alert $
|
||||
makeXMPPGitRemote buddy (baseJID jid) u
|
||||
where
|
||||
buddy = T.unpack $ buddyName jid
|
||||
alert = pairRequestAcknowledgedAlert buddy Nothing
|
||||
|
||||
gitXMPPLocation :: JID -> String
|
||||
gitXMPPLocation jid = "xmpp::" ++ T.unpack (formatJID $ baseJID jid)
|
||||
|
||||
makeXMPPGitRemote :: String -> JID -> UUID -> Assistant Bool
|
||||
makeXMPPGitRemote buddyname jid u = do
|
||||
remote <- liftAnnex $ addRemote $
|
||||
makeGitRemote buddyname $ gitXMPPLocation jid
|
||||
liftAnnex $ storeUUIDIn (remoteConfig (Remote.repo remote) "uuid") u
|
||||
liftAnnex $ void remoteListRefresh
|
||||
remote' <- liftAnnex $ fromMaybe (error "failed to add remote")
|
||||
<$> Remote.byName (Just buddyname)
|
||||
syncRemote remote'
|
||||
return True
|
||||
|
||||
{- Pushes over XMPP, communicating with a specific client.
|
||||
- Runs an arbitrary IO action to push, which should run git-push with
|
||||
- an xmpp:: url.
|
||||
-
|
||||
- To handle xmpp:: urls, git push will run git-remote-xmpp, which is
|
||||
- injected into its PATH, and in turn runs git-annex xmppgit. The
|
||||
- dataflow them becomes:
|
||||
-
|
||||
- git push <--> git-annex xmppgit <--> xmppPush <-------> xmpp
|
||||
- |
|
||||
- git receive-pack <--> xmppReceivePack <---------------> xmpp
|
||||
-
|
||||
- The pipe between git-annex xmppgit and us is set up and communicated
|
||||
- using two environment variables, relayIn and relayOut, that are set
|
||||
- to the file descriptors to use. Another, relayControl, is used to
|
||||
- propigate the exit status of git receive-pack.
|
||||
-
|
||||
- We listen at the other end of the pipe and relay to and from XMPP.
|
||||
-}
|
||||
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
|
||||
xmppPush cid gitpush = do
|
||||
u <- liftAnnex getUUID
|
||||
sendNetMessage $ Pushing cid (StartingPush u)
|
||||
|
||||
(Fd inf, writepush) <- liftIO System.Posix.IO.createPipe
|
||||
(readpush, Fd outf) <- liftIO System.Posix.IO.createPipe
|
||||
(Fd controlf, writecontrol) <- liftIO System.Posix.IO.createPipe
|
||||
|
||||
tmpdir <- gettmpdir
|
||||
installwrapper tmpdir
|
||||
|
||||
environ <- liftIO getEnvironment
|
||||
path <- liftIO getSearchPath
|
||||
let myenviron = addEntries
|
||||
[ ("PATH", intercalate [searchPathSeparator] $ tmpdir:path)
|
||||
, (relayIn, show inf)
|
||||
, (relayOut, show outf)
|
||||
, (relayControl, show controlf)
|
||||
]
|
||||
environ
|
||||
|
||||
inh <- liftIO $ fdToHandle readpush
|
||||
outh <- liftIO $ fdToHandle writepush
|
||||
controlh <- liftIO $ fdToHandle writecontrol
|
||||
|
||||
t1 <- forkIO <~> toxmpp 0 inh
|
||||
t2 <- forkIO <~> fromxmpp outh controlh
|
||||
|
||||
{- This can take a long time to run, so avoid running it in the
|
||||
- Annex monad. Also, override environment. -}
|
||||
g <- liftAnnex gitRepo
|
||||
r <- liftIO $ gitpush $ g { gitEnv = Just myenviron }
|
||||
|
||||
liftIO $ do
|
||||
mapM_ killThread [t1, t2]
|
||||
mapM_ hClose [inh, outh, controlh]
|
||||
mapM_ closeFd [Fd inf, Fd outf, Fd controlf]
|
||||
|
||||
return r
|
||||
where
|
||||
toxmpp seqnum inh = do
|
||||
b <- liftIO $ B.hGetSome inh chunkSize
|
||||
if B.null b
|
||||
then liftIO $ killThread =<< myThreadId
|
||||
else do
|
||||
let seqnum' = succ seqnum
|
||||
sendNetMessage $ Pushing cid $
|
||||
SendPackOutput seqnum' b
|
||||
toxmpp seqnum' inh
|
||||
|
||||
fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handlemsg
|
||||
where
|
||||
handlemsg (Just (Pushing _ (ReceivePackOutput _ b))) =
|
||||
liftIO $ writeChunk outh b
|
||||
handlemsg (Just (Pushing _ (ReceivePackDone exitcode))) =
|
||||
liftIO $ do
|
||||
hPrint controlh exitcode
|
||||
hFlush controlh
|
||||
handlemsg (Just _) = noop
|
||||
handlemsg Nothing = do
|
||||
debug ["timeout waiting for git receive-pack output via XMPP"]
|
||||
-- Send a synthetic exit code to git-annex
|
||||
-- xmppgit, which will exit and cause git push
|
||||
-- to die.
|
||||
liftIO $ do
|
||||
hPrint controlh (ExitFailure 1)
|
||||
hFlush controlh
|
||||
killThread =<< myThreadId
|
||||
|
||||
installwrapper tmpdir = liftIO $ do
|
||||
createDirectoryIfMissing True tmpdir
|
||||
let wrapper = tmpdir </> "git-remote-xmpp"
|
||||
program <- programPath
|
||||
writeFile wrapper $ unlines
|
||||
[ shebang_local
|
||||
, "exec " ++ program ++ " xmppgit"
|
||||
]
|
||||
modifyFileMode wrapper $ addModes executeModes
|
||||
|
||||
{- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp
|
||||
- dir (ie, not on a crippled filesystem where we can't make
|
||||
- the wrapper executable). -}
|
||||
gettmpdir = do
|
||||
v <- liftIO $ getEnv "GIT_ANNEX_TMP_DIR"
|
||||
case v of
|
||||
Nothing -> do
|
||||
tmp <- liftAnnex $ fromRepo gitAnnexTmpMiscDir
|
||||
return $ tmp </> "xmppgit"
|
||||
Just d -> return $ d </> "xmppgit"
|
||||
|
||||
type EnvVar = String
|
||||
|
||||
envVar :: String -> EnvVar
|
||||
envVar s = "GIT_ANNEX_XMPPGIT_" ++ s
|
||||
|
||||
relayIn :: EnvVar
|
||||
relayIn = envVar "IN"
|
||||
|
||||
relayOut :: EnvVar
|
||||
relayOut = envVar "OUT"
|
||||
|
||||
relayControl :: EnvVar
|
||||
relayControl = envVar "CONTROL"
|
||||
|
||||
relayHandle :: EnvVar -> IO Handle
|
||||
relayHandle var = do
|
||||
v <- getEnv var
|
||||
case readish =<< v of
|
||||
Nothing -> error $ var ++ " not set"
|
||||
Just n -> fdToHandle $ Fd n
|
||||
|
||||
{- Called by git-annex xmppgit.
|
||||
-
|
||||
- git-push is talking to us on stdin
|
||||
- we're talking to git-push on stdout
|
||||
- git-receive-pack is talking to us on relayIn (via XMPP)
|
||||
- we're talking to git-receive-pack on relayOut (via XMPP)
|
||||
- git-receive-pack's exit code will be passed to us on relayControl
|
||||
-}
|
||||
xmppGitRelay :: IO ()
|
||||
xmppGitRelay = do
|
||||
flip relay stdout =<< relayHandle relayIn
|
||||
relay stdin =<< relayHandle relayOut
|
||||
code <- hGetLine =<< relayHandle relayControl
|
||||
exitWith $ fromMaybe (ExitFailure 1) $ readish code
|
||||
where
|
||||
{- Is it possible to set up pipes and not need to copy the data
|
||||
- ourselves? See splice(2) -}
|
||||
relay fromh toh = void $ forkIO $ forever $ do
|
||||
b <- B.hGetSome fromh chunkSize
|
||||
when (B.null b) $ do
|
||||
hClose fromh
|
||||
hClose toh
|
||||
killThread =<< myThreadId
|
||||
writeChunk toh b
|
||||
|
||||
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
|
||||
- its exit status to XMPP. -}
|
||||
xmppReceivePack :: ClientID -> Assistant Bool
|
||||
xmppReceivePack cid = do
|
||||
repodir <- liftAnnex $ fromRepo repoPath
|
||||
let p = (proc "git" ["receive-pack", repodir])
|
||||
{ std_in = CreatePipe
|
||||
, std_out = CreatePipe
|
||||
, std_err = Inherit
|
||||
}
|
||||
(Just inh, Just outh, _, pid) <- liftIO $ createProcess p
|
||||
readertid <- forkIO <~> relayfromxmpp inh
|
||||
relaytoxmpp 0 outh
|
||||
code <- liftIO $ waitForProcess pid
|
||||
void $ sendNetMessage $ Pushing cid $ ReceivePackDone code
|
||||
liftIO $ do
|
||||
killThread readertid
|
||||
hClose inh
|
||||
hClose outh
|
||||
return $ code == ExitSuccess
|
||||
where
|
||||
relaytoxmpp seqnum outh = do
|
||||
b <- liftIO $ B.hGetSome outh chunkSize
|
||||
-- empty is EOF, so exit
|
||||
unless (B.null b) $ do
|
||||
let seqnum' = succ seqnum
|
||||
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
|
||||
relaytoxmpp seqnum' outh
|
||||
relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handlemsg
|
||||
where
|
||||
handlemsg (Just (Pushing _ (SendPackOutput _ b))) =
|
||||
liftIO $ writeChunk inh b
|
||||
handlemsg (Just _) = noop
|
||||
handlemsg Nothing = do
|
||||
debug ["timeout waiting for git send-pack output via XMPP"]
|
||||
-- closing the handle will make git receive-pack exit
|
||||
liftIO $ do
|
||||
hClose inh
|
||||
killThread =<< myThreadId
|
||||
|
||||
xmppRemotes :: ClientID -> UUID -> Assistant [Remote]
|
||||
xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
|
||||
Nothing -> return []
|
||||
Just jid -> do
|
||||
let loc = gitXMPPLocation jid
|
||||
um <- liftAnnex uuidMap
|
||||
filter (matching loc . Remote.repo) . filter (knownuuid um) . syncGitRemotes
|
||||
<$> getDaemonStatus
|
||||
where
|
||||
matching loc r = repoIsUrl r && repoLocation r == loc
|
||||
knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um
|
||||
|
||||
{- Returns the ClientID that it pushed to. -}
|
||||
runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID)
|
||||
runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
|
||||
go =<< liftAnnex (join Command.Sync.getCurrBranch)
|
||||
where
|
||||
go (Just branch, _) = do
|
||||
rs <- xmppRemotes cid theiruuid
|
||||
liftAnnex $ Annex.Branch.commit "update"
|
||||
(g, u) <- liftAnnex $ (,)
|
||||
<$> gitRepo
|
||||
<*> getUUID
|
||||
liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) branch g
|
||||
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
|
||||
if null rs
|
||||
then return Nothing
|
||||
else do
|
||||
forM_ rs $ \r -> do
|
||||
void $ alertWhile (syncAlert [r]) $
|
||||
xmppPush cid (taggedPush u selfjid branch r)
|
||||
checkcloudrepos r
|
||||
return $ Just cid
|
||||
go _ = return Nothing
|
||||
runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
|
||||
rs <- xmppRemotes cid theiruuid
|
||||
if null rs
|
||||
then return Nothing
|
||||
else do
|
||||
void $ alertWhile (syncAlert rs) $
|
||||
xmppReceivePack cid
|
||||
mapM_ checkcloudrepos rs
|
||||
return $ Just cid
|
||||
runPush _ _ = return Nothing
|
||||
|
||||
{- Check if any of the shas that can be pushed are ones we do not
|
||||
- have.
|
||||
-
|
||||
- (Older clients send no shas, so when there are none, always
|
||||
- request a push.)
|
||||
-}
|
||||
handlePushNotice :: NetMessage -> Assistant ()
|
||||
handlePushNotice (Pushing cid (CanPush theiruuid shas)) =
|
||||
unlessM (null <$> xmppRemotes cid theiruuid) $
|
||||
if null shas
|
||||
then go
|
||||
else ifM (haveall shas)
|
||||
( debug ["ignoring CanPush with known shas"]
|
||||
, go
|
||||
)
|
||||
where
|
||||
go = do
|
||||
u <- liftAnnex getUUID
|
||||
sendNetMessage $ Pushing cid (PushRequest u)
|
||||
haveall l = liftAnnex $ not <$> anyM donthave l
|
||||
donthave sha = isNothing <$> catObjectDetails sha
|
||||
handlePushNotice _ = noop
|
||||
|
||||
writeChunk :: Handle -> B.ByteString -> IO ()
|
||||
writeChunk h b = do
|
||||
B.hPut h b
|
||||
hFlush h
|
||||
|
||||
{- Gets NetMessages for a PushSide, ensures they are in order,
|
||||
- and runs an action to handle each in turn. The action will be passed
|
||||
- Nothing on timeout.
|
||||
-
|
||||
- Does not currently reorder messages, but does ensure that any
|
||||
- duplicate messages, or messages not in the sequence, are discarded.
|
||||
-}
|
||||
withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
|
||||
withPushMessagesInSequence cid side a = loop 0
|
||||
where
|
||||
loop seqnum = do
|
||||
m <- timeout xmppTimeout <~> waitInbox cid side
|
||||
let go s = a m >> loop s
|
||||
let next = seqnum + 1
|
||||
case extractSequence =<< m of
|
||||
Just seqnum'
|
||||
| seqnum' == next -> go next
|
||||
| seqnum' == 0 -> go seqnum
|
||||
| seqnum' == seqnum -> do
|
||||
debug ["ignoring duplicate sequence number", show seqnum]
|
||||
loop seqnum
|
||||
| otherwise -> do
|
||||
debug ["ignoring out of order sequence number", show seqnum', "expected", show next]
|
||||
loop seqnum
|
||||
Nothing -> go seqnum
|
||||
|
||||
extractSequence :: NetMessage -> Maybe Int
|
||||
extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
|
||||
extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
|
||||
extractSequence _ = Nothing
|
Loading…
Add table
Add a link
Reference in a new issue