382 lines
12 KiB
Haskell
382 lines
12 KiB
Haskell
{- git over XMPP
|
|
-
|
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
|
-
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE CPP #-}
|
|
|
|
module Assistant.XMPP.Git where
|
|
|
|
import Assistant.Common
|
|
import Assistant.NetMessager
|
|
import Assistant.Types.NetMessager
|
|
import Assistant.XMPP
|
|
import Assistant.XMPP.Buddies
|
|
import Assistant.DaemonStatus
|
|
import Assistant.Alert
|
|
import Assistant.MakeRemote
|
|
import Assistant.Sync
|
|
import qualified Command.Sync
|
|
import qualified Annex.Branch
|
|
import Annex.UUID
|
|
import Logs.UUID
|
|
import Annex.TaggedPush
|
|
import Annex.CatFile
|
|
import Config
|
|
import Git
|
|
import qualified Git.Branch
|
|
import Config.Files
|
|
import qualified Types.Remote as Remote
|
|
import qualified Remote as Remote
|
|
import Remote.List
|
|
import Utility.FileMode
|
|
import Utility.Shell
|
|
import Utility.Env
|
|
|
|
import Network.Protocol.XMPP
|
|
import qualified Data.Text as T
|
|
import System.Posix.Types
|
|
import System.Process (std_in, std_out, std_err)
|
|
import Control.Concurrent
|
|
import System.Timeout
|
|
import qualified Data.ByteString as B
|
|
import qualified Data.Map as M
|
|
|
|
{- Largest chunk of data to send in a single XMPP message. -}
|
|
chunkSize :: Int
|
|
chunkSize = 4096
|
|
|
|
{- How long to wait for an expected message before assuming the other side
|
|
- has gone away and canceling a push.
|
|
-
|
|
- This needs to be long enough to allow a message of up to 2+ times
|
|
- chunkSize to propigate up to a XMPP server, perhaps across to another
|
|
- server, and back down to us. On the other hand, other XMPP pushes can be
|
|
- delayed for running until the timeout is reached, so it should not be
|
|
- excessive.
|
|
-}
|
|
xmppTimeout :: Int
|
|
xmppTimeout = 120000000 -- 120 seconds
|
|
|
|
finishXMPPPairing :: JID -> UUID -> Assistant ()
|
|
finishXMPPPairing jid u = void $ alertWhile alert $
|
|
makeXMPPGitRemote buddy (baseJID jid) u
|
|
where
|
|
buddy = T.unpack $ buddyName jid
|
|
alert = pairRequestAcknowledgedAlert buddy Nothing
|
|
|
|
gitXMPPLocation :: JID -> String
|
|
gitXMPPLocation jid = "xmpp::" ++ T.unpack (formatJID $ baseJID jid)
|
|
|
|
makeXMPPGitRemote :: String -> JID -> UUID -> Assistant Bool
|
|
makeXMPPGitRemote buddyname jid u = do
|
|
remote <- liftAnnex $ addRemote $
|
|
makeGitRemote buddyname $ gitXMPPLocation jid
|
|
liftAnnex $ storeUUID (remoteConfig (Remote.repo remote) "uuid") u
|
|
liftAnnex $ void remoteListRefresh
|
|
remote' <- liftAnnex $ fromMaybe (error "failed to add remote")
|
|
<$> Remote.byName (Just buddyname)
|
|
syncRemote remote'
|
|
return True
|
|
|
|
{- Pushes over XMPP, communicating with a specific client.
|
|
- Runs an arbitrary IO action to push, which should run git-push with
|
|
- an xmpp:: url.
|
|
-
|
|
- To handle xmpp:: urls, git push will run git-remote-xmpp, which is
|
|
- injected into its PATH, and in turn runs git-annex xmppgit. The
|
|
- dataflow them becomes:
|
|
-
|
|
- git push <--> git-annex xmppgit <--> xmppPush <-------> xmpp
|
|
- |
|
|
- git receive-pack <--> xmppReceivePack <---------------> xmpp
|
|
-
|
|
- The pipe between git-annex xmppgit and us is set up and communicated
|
|
- using two environment variables, relayIn and relayOut, that are set
|
|
- to the file descriptors to use. Another, relayControl, is used to
|
|
- propigate the exit status of git receive-pack.
|
|
-
|
|
- We listen at the other end of the pipe and relay to and from XMPP.
|
|
-}
|
|
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
|
|
xmppPush cid gitpush = do
|
|
u <- liftAnnex getUUID
|
|
sendNetMessage $ Pushing cid (StartingPush u)
|
|
|
|
(Fd inf, writepush) <- liftIO createPipe
|
|
(readpush, Fd outf) <- liftIO createPipe
|
|
(Fd controlf, writecontrol) <- liftIO createPipe
|
|
|
|
tmpdir <- gettmpdir
|
|
installwrapper tmpdir
|
|
|
|
env <- liftIO getEnvironment
|
|
path <- liftIO getSearchPath
|
|
let myenv = M.fromList
|
|
[ ("PATH", intercalate [searchPathSeparator] $ tmpdir:path)
|
|
, (relayIn, show inf)
|
|
, (relayOut, show outf)
|
|
, (relayControl, show controlf)
|
|
]
|
|
`M.union` M.fromList env
|
|
|
|
inh <- liftIO $ fdToHandle readpush
|
|
outh <- liftIO $ fdToHandle writepush
|
|
controlh <- liftIO $ fdToHandle writecontrol
|
|
|
|
t1 <- forkIO <~> toxmpp 0 inh
|
|
t2 <- forkIO <~> fromxmpp outh controlh
|
|
|
|
{- This can take a long time to run, so avoid running it in the
|
|
- Annex monad. Also, override environment. -}
|
|
g <- liftAnnex gitRepo
|
|
r <- liftIO $ gitpush $ g { gitEnv = Just $ M.toList myenv }
|
|
|
|
liftIO $ do
|
|
mapM_ killThread [t1, t2]
|
|
mapM_ hClose [inh, outh, controlh]
|
|
mapM_ closeFd [Fd inf, Fd outf, Fd controlf]
|
|
|
|
return r
|
|
where
|
|
toxmpp seqnum inh = do
|
|
b <- liftIO $ B.hGetSome inh chunkSize
|
|
if B.null b
|
|
then liftIO $ killThread =<< myThreadId
|
|
else do
|
|
let seqnum' = succ seqnum
|
|
sendNetMessage $ Pushing cid $
|
|
SendPackOutput seqnum' b
|
|
toxmpp seqnum' inh
|
|
|
|
fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle
|
|
where
|
|
handle (Just (Pushing _ (ReceivePackOutput _ b))) =
|
|
liftIO $ writeChunk outh b
|
|
handle (Just (Pushing _ (ReceivePackDone exitcode))) =
|
|
liftIO $ do
|
|
hPrint controlh exitcode
|
|
hFlush controlh
|
|
handle (Just _) = noop
|
|
handle Nothing = do
|
|
debug ["timeout waiting for git receive-pack output via XMPP"]
|
|
-- Send a synthetic exit code to git-annex
|
|
-- xmppgit, which will exit and cause git push
|
|
-- to die.
|
|
liftIO $ do
|
|
hPrint controlh (ExitFailure 1)
|
|
hFlush controlh
|
|
killThread =<< myThreadId
|
|
|
|
installwrapper tmpdir = liftIO $ do
|
|
createDirectoryIfMissing True tmpdir
|
|
let wrapper = tmpdir </> "git-remote-xmpp"
|
|
program <- readProgramFile
|
|
writeFile wrapper $ unlines
|
|
[ shebang_local
|
|
, "exec " ++ program ++ " xmppgit"
|
|
]
|
|
modifyFileMode wrapper $ addModes executeModes
|
|
|
|
{- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp
|
|
- dir (ie, not on a crippled filesystem where we can't make
|
|
- the wrapper executable). -}
|
|
gettmpdir = do
|
|
v <- liftIO $ getEnv "GIT_ANNEX_TMP_DIR"
|
|
case v of
|
|
Nothing -> do
|
|
tmp <- liftAnnex $ fromRepo gitAnnexTmpDir
|
|
return $ tmp </> "xmppgit"
|
|
Just d -> return $ d </> "xmppgit"
|
|
|
|
type EnvVar = String
|
|
|
|
envVar :: String -> EnvVar
|
|
envVar s = "GIT_ANNEX_XMPPGIT_" ++ s
|
|
|
|
relayIn :: EnvVar
|
|
relayIn = envVar "IN"
|
|
|
|
relayOut :: EnvVar
|
|
relayOut = envVar "OUT"
|
|
|
|
relayControl :: EnvVar
|
|
relayControl = envVar "CONTROL"
|
|
|
|
relayHandle :: EnvVar -> IO Handle
|
|
relayHandle var = do
|
|
v <- getEnv var
|
|
case readish =<< v of
|
|
Nothing -> error $ var ++ " not set"
|
|
Just n -> fdToHandle $ Fd n
|
|
|
|
{- Called by git-annex xmppgit.
|
|
-
|
|
- git-push is talking to us on stdin
|
|
- we're talking to git-push on stdout
|
|
- git-receive-pack is talking to us on relayIn (via XMPP)
|
|
- we're talking to git-receive-pack on relayOut (via XMPP)
|
|
- git-receive-pack's exit code will be passed to us on relayControl
|
|
-}
|
|
xmppGitRelay :: IO ()
|
|
xmppGitRelay = do
|
|
flip relay stdout =<< relayHandle relayIn
|
|
relay stdin =<< relayHandle relayOut
|
|
code <- hGetLine =<< relayHandle relayControl
|
|
exitWith $ fromMaybe (ExitFailure 1) $ readish code
|
|
where
|
|
{- Is it possible to set up pipes and not need to copy the data
|
|
- ourselves? See splice(2) -}
|
|
relay fromh toh = void $ forkIO $ forever $ do
|
|
b <- B.hGetSome fromh chunkSize
|
|
when (B.null b) $ do
|
|
hClose fromh
|
|
hClose toh
|
|
killThread =<< myThreadId
|
|
writeChunk toh b
|
|
|
|
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
|
|
- its exit status to XMPP. -}
|
|
xmppReceivePack :: ClientID -> Assistant Bool
|
|
xmppReceivePack cid = do
|
|
repodir <- liftAnnex $ fromRepo repoPath
|
|
let p = (proc "git" ["receive-pack", repodir])
|
|
{ std_in = CreatePipe
|
|
, std_out = CreatePipe
|
|
, std_err = Inherit
|
|
}
|
|
(Just inh, Just outh, _, pid) <- liftIO $ createProcess p
|
|
readertid <- forkIO <~> relayfromxmpp inh
|
|
relaytoxmpp 0 outh
|
|
code <- liftIO $ waitForProcess pid
|
|
void $ sendNetMessage $ Pushing cid $ ReceivePackDone code
|
|
liftIO $ do
|
|
killThread readertid
|
|
hClose inh
|
|
hClose outh
|
|
return $ code == ExitSuccess
|
|
where
|
|
relaytoxmpp seqnum outh = do
|
|
b <- liftIO $ B.hGetSome outh chunkSize
|
|
-- empty is EOF, so exit
|
|
unless (B.null b) $ do
|
|
let seqnum' = succ seqnum
|
|
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
|
|
relaytoxmpp seqnum' outh
|
|
relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle
|
|
where
|
|
handle (Just (Pushing _ (SendPackOutput _ b))) =
|
|
liftIO $ writeChunk inh b
|
|
handle (Just _) = noop
|
|
handle Nothing = do
|
|
debug ["timeout waiting for git send-pack output via XMPP"]
|
|
-- closing the handle will make git receive-pack exit
|
|
liftIO $ do
|
|
hClose inh
|
|
killThread =<< myThreadId
|
|
|
|
xmppRemotes :: ClientID -> UUID -> Assistant [Remote]
|
|
xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
|
|
Nothing -> return []
|
|
Just jid -> do
|
|
let loc = gitXMPPLocation jid
|
|
um <- liftAnnex uuidMap
|
|
filter (matching loc . Remote.repo) . filter (knownuuid um) . syncGitRemotes
|
|
<$> getDaemonStatus
|
|
where
|
|
matching loc r = repoIsUrl r && repoLocation r == loc
|
|
knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um
|
|
|
|
{- Returns the ClientID that it pushed to. -}
|
|
runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID)
|
|
runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
|
|
go =<< liftAnnex (inRepo Git.Branch.current)
|
|
where
|
|
go Nothing = return Nothing
|
|
go (Just branch) = do
|
|
rs <- xmppRemotes cid theiruuid
|
|
liftAnnex $ Annex.Branch.commit "update"
|
|
(g, u) <- liftAnnex $ (,)
|
|
<$> gitRepo
|
|
<*> getUUID
|
|
liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
|
|
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
|
|
if null rs
|
|
then return Nothing
|
|
else do
|
|
forM_ rs $ \r -> do
|
|
void $ alertWhile (syncAlert [r]) $
|
|
xmppPush cid (taggedPush u selfjid branch r)
|
|
checkcloudrepos r
|
|
return $ Just cid
|
|
runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
|
|
rs <- xmppRemotes cid theiruuid
|
|
if null rs
|
|
then return Nothing
|
|
else do
|
|
void $ alertWhile (syncAlert rs) $
|
|
xmppReceivePack cid
|
|
mapM_ checkcloudrepos rs
|
|
return $ Just cid
|
|
runPush _ _ = return Nothing
|
|
|
|
{- Check if any of the shas that can be pushed are ones we do not
|
|
- have.
|
|
-
|
|
- (Older clients send no shas, so when there are none, always
|
|
- request a push.)
|
|
-}
|
|
handlePushNotice :: NetMessage -> Assistant ()
|
|
handlePushNotice (Pushing cid (CanPush theiruuid shas)) =
|
|
unlessM (null <$> xmppRemotes cid theiruuid) $
|
|
if null shas
|
|
then go
|
|
else ifM (haveall shas)
|
|
( debug ["ignoring CanPush with known shas"]
|
|
, go
|
|
)
|
|
where
|
|
go = do
|
|
u <- liftAnnex getUUID
|
|
sendNetMessage $ Pushing cid (PushRequest u)
|
|
haveall l = liftAnnex $ not <$> anyM donthave l
|
|
donthave sha = isNothing <$> catObjectDetails sha
|
|
handlePushNotice _ = noop
|
|
|
|
writeChunk :: Handle -> B.ByteString -> IO ()
|
|
writeChunk h b = do
|
|
B.hPut h b
|
|
hFlush h
|
|
|
|
{- Gets NetMessages for a PushSide, ensures they are in order,
|
|
- and runs an action to handle each in turn. The action will be passed
|
|
- Nothing on timeout.
|
|
-
|
|
- Does not currently reorder messages, but does ensure that any
|
|
- duplicate messages, or messages not in the sequence, are discarded.
|
|
-}
|
|
withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
|
|
withPushMessagesInSequence cid side a = loop 0
|
|
where
|
|
loop seqnum = do
|
|
m <- timeout xmppTimeout <~> waitInbox cid side
|
|
let go s = a m >> loop s
|
|
let next = seqnum + 1
|
|
case extractSequence =<< m of
|
|
Just seqnum'
|
|
| seqnum' == next -> go next
|
|
| seqnum' == 0 -> go seqnum
|
|
| seqnum' == seqnum -> do
|
|
debug ["ignoring duplicate sequence number", show seqnum]
|
|
loop seqnum
|
|
| otherwise -> do
|
|
debug ["ignoring out of order sequence number", show seqnum', "expected", show next]
|
|
loop seqnum
|
|
Nothing -> go seqnum
|
|
|
|
extractSequence :: NetMessage -> Maybe Int
|
|
extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
|
|
extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
|
|
extractSequence _ = Nothing
|