diff --git a/Remote/P2P.hs b/Remote/P2P.hs index e0428eeeb5..f97d76e71c 100644 --- a/Remote/P2P.hs +++ b/Remote/P2P.hs @@ -11,14 +11,23 @@ module Remote.P2P ( ) where import Annex.Common +import qualified Annex +import qualified P2P.Protocol as P2P import P2P.Address +import P2P.Annex import Types.Remote import Types.GitConfig import qualified Git import Config import Config.Cost import Remote.Helper.Git -import Remote.Helper.Special +import Remote.Helper.Tor +import Utility.Tor +import Utility.Metered +import Types.NumCopies + +import Control.Concurrent +import Control.Concurrent.STM remote :: RemoteType remote = RemoteType { @@ -32,18 +41,18 @@ remote = RemoteType { chainGen :: P2PAddress -> Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote) chainGen addr r u c gc = do - workerpool <- mkWorkerPool addr + connpool <- mkConnectionPool cst <- remoteCost gc expensiveRemoteCost let this = Remote { uuid = u , cost = cst , name = Git.repoDescribe r - , storeKey = storeKeyDummy - , retrieveKeyFile = retreiveKeyFileDummy + , storeKey = store addr connpool + , retrieveKeyFile = retrieve addr connpool , retrieveKeyFileCheap = \_ _ _ -> return False - , removeKey = removeKeyDummy - , lockContent = Nothing -- TODO use p2p protocol locking - , checkPresent = checkPresentDummy + , removeKey = remove addr connpool + , lockContent = Just (lock u addr connpool) + , checkPresent = checkpresent addr connpool , checkPresentCheap = False , whereisKey = Nothing , remoteFsck = Nothing @@ -60,26 +69,109 @@ chainGen addr r u c gc = do , claimUrl = Nothing , checkUrl = Nothing } - return $ Just $ specialRemote' (specialRemoteCfg c) c - (simplyPrepare $ store this workerpool) - (simplyPrepare $ retrieve this workerpool) - (simplyPrepare $ remove this workerpool) - (simplyPrepare $ checkKey this workerpool) - this + return (Just this) -data WorkerPool = WorkerPool +-- TODO update progress +store :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool +store addr connpool k af p = fromMaybe False + <$> runProto addr connpool (P2P.put k af) -mkWorkerPool :: P2PAddress -> Annex WorkerPool -mkWorkerPool addr = undefined +retrieve :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification) +retrieve addr connpool k af dest _p = unVerified $ fromMaybe False + <$> runProto addr connpool (P2P.get dest k af) -store :: Remote -> WorkerPool -> Storer -store r workerpool = undefined +remove :: P2PAddress -> ConnectionPool -> Key -> Annex Bool +remove addr connpool k = fromMaybe False + <$> runProto addr connpool (P2P.remove k) -retrieve :: Remote -> WorkerPool -> Retriever -retrieve r workerpool = undefined +checkpresent :: P2PAddress -> ConnectionPool -> Key -> Annex Bool +checkpresent addr connpool k = maybe unavail return + =<< runProto addr connpool (P2P.checkPresent k) + where + unavail = giveup "can't connect to peer" -remove :: Remote -> WorkerPool -> Remover -remove r workerpool k = undefined +lock :: UUID -> P2PAddress -> ConnectionPool -> Key -> (VerifiedCopy -> Annex r) -> Annex r +lock theiruuid addr connpool k callback = + withConnection addr connpool $ \conn -> do + connv <- liftIO $ newMVar conn + let runproto d p = do + c <- liftIO $ takeMVar connv + (c', mr) <- runProto' p c + liftIO $ putMVar connv c' + return (fromMaybe d mr) + r <- P2P.lockContentWhile runproto k go + conn' <- liftIO $ takeMVar connv + return (conn', r) + where + go False = giveup "can't lock content" + go True = withVerifiedCopy LockedCopy theiruuid (return True) callback -checkKey :: Remote -> WorkerPool -> CheckPresent -checkKey r workerpool k = undefined +-- | A connection to the peer. +data Connection + = TorAnnexConnection RunEnv + | ClosedConnection + +type ConnectionPool = TVar [Connection] + +mkConnectionPool :: Annex ConnectionPool +mkConnectionPool = liftIO $ newTVarIO [] + +-- Runs the Proto action. +runProto :: P2PAddress -> ConnectionPool -> P2P.Proto a -> Annex (Maybe a) +runProto addr connpool a = withConnection addr connpool (runProto' a) + +runProto' :: P2P.Proto a -> Connection -> Annex (Connection, Maybe a) +runProto' _ ClosedConnection = return (ClosedConnection, Nothing) +runProto' a conn@(TorAnnexConnection runenv) = do + r <- runFullProto Client runenv a + -- When runFullProto fails, the connection is no longer usable, + -- so close it. + if isJust r + then return (conn, r) + else do + liftIO $ hClose (runIhdl runenv) + return (ClosedConnection, r) + +-- Uses an open connection if one is available in the ConnectionPool; +-- otherwise opens a new connection. +-- +-- Once the action is done, the connection is added back to the +-- ConnectionPool, unless it's no longer open. +withConnection :: P2PAddress -> ConnectionPool -> (Connection -> Annex (Connection, a)) -> Annex a +withConnection addr connpool a = bracketOnError get cache go + where + get = do + mc <- liftIO $ atomically $ do + l <- readTVar connpool + case l of + [] -> do + writeTVar connpool [] + return Nothing + (c:cs) -> do + writeTVar connpool cs + return (Just c) + maybe (openConnection addr) return mc + + cache ClosedConnection = return () + cache conn = liftIO $ atomically $ modifyTVar' connpool (conn:) + + go conn = do + (conn', r) <- a conn + cache conn' + return r + +openConnection :: P2PAddress -> Annex Connection +openConnection (TorAnnex onionaddress onionport) = do + v <- liftIO $ tryNonAsync $ + torHandle =<< connectHiddenService onionaddress onionport + case v of + Right h -> do + g <- Annex.gitRepo + let runenv = RunEnv + { runRepo = g + , runCheckAuth = const False + , runIhdl = h + , runOhdl = h + } + return (TorAnnexConnection runenv) + Left _e -> return ClosedConnection