finish implementation of Remote.P2P (untested)
Not tested at all, but it just might work. Only known problem is that progress is not updated when storing to a P2P remote. This commit was sponsored by Nick Daly on Patreon.
This commit is contained in:
parent
2bd2e0880c
commit
26a53fb4a5
1 changed files with 116 additions and 24 deletions
140
Remote/P2P.hs
140
Remote/P2P.hs
|
@ -11,14 +11,23 @@ module Remote.P2P (
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
import qualified Annex
|
||||||
|
import qualified P2P.Protocol as P2P
|
||||||
import P2P.Address
|
import P2P.Address
|
||||||
|
import P2P.Annex
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Types.GitConfig
|
import Types.GitConfig
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import Config
|
import Config
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
import Remote.Helper.Git
|
import Remote.Helper.Git
|
||||||
import Remote.Helper.Special
|
import Remote.Helper.Tor
|
||||||
|
import Utility.Tor
|
||||||
|
import Utility.Metered
|
||||||
|
import Types.NumCopies
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
|
||||||
remote :: RemoteType
|
remote :: RemoteType
|
||||||
remote = RemoteType {
|
remote = RemoteType {
|
||||||
|
@ -32,18 +41,18 @@ remote = RemoteType {
|
||||||
|
|
||||||
chainGen :: P2PAddress -> Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
|
chainGen :: P2PAddress -> Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
|
||||||
chainGen addr r u c gc = do
|
chainGen addr r u c gc = do
|
||||||
workerpool <- mkWorkerPool addr
|
connpool <- mkConnectionPool
|
||||||
cst <- remoteCost gc expensiveRemoteCost
|
cst <- remoteCost gc expensiveRemoteCost
|
||||||
let this = Remote
|
let this = Remote
|
||||||
{ uuid = u
|
{ uuid = u
|
||||||
, cost = cst
|
, cost = cst
|
||||||
, name = Git.repoDescribe r
|
, name = Git.repoDescribe r
|
||||||
, storeKey = storeKeyDummy
|
, storeKey = store addr connpool
|
||||||
, retrieveKeyFile = retreiveKeyFileDummy
|
, retrieveKeyFile = retrieve addr connpool
|
||||||
, retrieveKeyFileCheap = \_ _ _ -> return False
|
, retrieveKeyFileCheap = \_ _ _ -> return False
|
||||||
, removeKey = removeKeyDummy
|
, removeKey = remove addr connpool
|
||||||
, lockContent = Nothing -- TODO use p2p protocol locking
|
, lockContent = Just (lock u addr connpool)
|
||||||
, checkPresent = checkPresentDummy
|
, checkPresent = checkpresent addr connpool
|
||||||
, checkPresentCheap = False
|
, checkPresentCheap = False
|
||||||
, whereisKey = Nothing
|
, whereisKey = Nothing
|
||||||
, remoteFsck = Nothing
|
, remoteFsck = Nothing
|
||||||
|
@ -60,26 +69,109 @@ chainGen addr r u c gc = do
|
||||||
, claimUrl = Nothing
|
, claimUrl = Nothing
|
||||||
, checkUrl = Nothing
|
, checkUrl = Nothing
|
||||||
}
|
}
|
||||||
return $ Just $ specialRemote' (specialRemoteCfg c) c
|
return (Just this)
|
||||||
(simplyPrepare $ store this workerpool)
|
|
||||||
(simplyPrepare $ retrieve this workerpool)
|
|
||||||
(simplyPrepare $ remove this workerpool)
|
|
||||||
(simplyPrepare $ checkKey this workerpool)
|
|
||||||
this
|
|
||||||
|
|
||||||
data WorkerPool = WorkerPool
|
-- TODO update progress
|
||||||
|
store :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
|
||||||
|
store addr connpool k af p = fromMaybe False
|
||||||
|
<$> runProto addr connpool (P2P.put k af)
|
||||||
|
|
||||||
mkWorkerPool :: P2PAddress -> Annex WorkerPool
|
retrieve :: P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
|
||||||
mkWorkerPool addr = undefined
|
retrieve addr connpool k af dest _p = unVerified $ fromMaybe False
|
||||||
|
<$> runProto addr connpool (P2P.get dest k af)
|
||||||
|
|
||||||
store :: Remote -> WorkerPool -> Storer
|
remove :: P2PAddress -> ConnectionPool -> Key -> Annex Bool
|
||||||
store r workerpool = undefined
|
remove addr connpool k = fromMaybe False
|
||||||
|
<$> runProto addr connpool (P2P.remove k)
|
||||||
|
|
||||||
retrieve :: Remote -> WorkerPool -> Retriever
|
checkpresent :: P2PAddress -> ConnectionPool -> Key -> Annex Bool
|
||||||
retrieve r workerpool = undefined
|
checkpresent addr connpool k = maybe unavail return
|
||||||
|
=<< runProto addr connpool (P2P.checkPresent k)
|
||||||
|
where
|
||||||
|
unavail = giveup "can't connect to peer"
|
||||||
|
|
||||||
remove :: Remote -> WorkerPool -> Remover
|
lock :: UUID -> P2PAddress -> ConnectionPool -> Key -> (VerifiedCopy -> Annex r) -> Annex r
|
||||||
remove r workerpool k = undefined
|
lock theiruuid addr connpool k callback =
|
||||||
|
withConnection addr connpool $ \conn -> do
|
||||||
|
connv <- liftIO $ newMVar conn
|
||||||
|
let runproto d p = do
|
||||||
|
c <- liftIO $ takeMVar connv
|
||||||
|
(c', mr) <- runProto' p c
|
||||||
|
liftIO $ putMVar connv c'
|
||||||
|
return (fromMaybe d mr)
|
||||||
|
r <- P2P.lockContentWhile runproto k go
|
||||||
|
conn' <- liftIO $ takeMVar connv
|
||||||
|
return (conn', r)
|
||||||
|
where
|
||||||
|
go False = giveup "can't lock content"
|
||||||
|
go True = withVerifiedCopy LockedCopy theiruuid (return True) callback
|
||||||
|
|
||||||
checkKey :: Remote -> WorkerPool -> CheckPresent
|
-- | A connection to the peer.
|
||||||
checkKey r workerpool k = undefined
|
data Connection
|
||||||
|
= TorAnnexConnection RunEnv
|
||||||
|
| ClosedConnection
|
||||||
|
|
||||||
|
type ConnectionPool = TVar [Connection]
|
||||||
|
|
||||||
|
mkConnectionPool :: Annex ConnectionPool
|
||||||
|
mkConnectionPool = liftIO $ newTVarIO []
|
||||||
|
|
||||||
|
-- Runs the Proto action.
|
||||||
|
runProto :: P2PAddress -> ConnectionPool -> P2P.Proto a -> Annex (Maybe a)
|
||||||
|
runProto addr connpool a = withConnection addr connpool (runProto' a)
|
||||||
|
|
||||||
|
runProto' :: P2P.Proto a -> Connection -> Annex (Connection, Maybe a)
|
||||||
|
runProto' _ ClosedConnection = return (ClosedConnection, Nothing)
|
||||||
|
runProto' a conn@(TorAnnexConnection runenv) = do
|
||||||
|
r <- runFullProto Client runenv a
|
||||||
|
-- When runFullProto fails, the connection is no longer usable,
|
||||||
|
-- so close it.
|
||||||
|
if isJust r
|
||||||
|
then return (conn, r)
|
||||||
|
else do
|
||||||
|
liftIO $ hClose (runIhdl runenv)
|
||||||
|
return (ClosedConnection, r)
|
||||||
|
|
||||||
|
-- Uses an open connection if one is available in the ConnectionPool;
|
||||||
|
-- otherwise opens a new connection.
|
||||||
|
--
|
||||||
|
-- Once the action is done, the connection is added back to the
|
||||||
|
-- ConnectionPool, unless it's no longer open.
|
||||||
|
withConnection :: P2PAddress -> ConnectionPool -> (Connection -> Annex (Connection, a)) -> Annex a
|
||||||
|
withConnection addr connpool a = bracketOnError get cache go
|
||||||
|
where
|
||||||
|
get = do
|
||||||
|
mc <- liftIO $ atomically $ do
|
||||||
|
l <- readTVar connpool
|
||||||
|
case l of
|
||||||
|
[] -> do
|
||||||
|
writeTVar connpool []
|
||||||
|
return Nothing
|
||||||
|
(c:cs) -> do
|
||||||
|
writeTVar connpool cs
|
||||||
|
return (Just c)
|
||||||
|
maybe (openConnection addr) return mc
|
||||||
|
|
||||||
|
cache ClosedConnection = return ()
|
||||||
|
cache conn = liftIO $ atomically $ modifyTVar' connpool (conn:)
|
||||||
|
|
||||||
|
go conn = do
|
||||||
|
(conn', r) <- a conn
|
||||||
|
cache conn'
|
||||||
|
return r
|
||||||
|
|
||||||
|
openConnection :: P2PAddress -> Annex Connection
|
||||||
|
openConnection (TorAnnex onionaddress onionport) = do
|
||||||
|
v <- liftIO $ tryNonAsync $
|
||||||
|
torHandle =<< connectHiddenService onionaddress onionport
|
||||||
|
case v of
|
||||||
|
Right h -> do
|
||||||
|
g <- Annex.gitRepo
|
||||||
|
let runenv = RunEnv
|
||||||
|
{ runRepo = g
|
||||||
|
, runCheckAuth = const False
|
||||||
|
, runIhdl = h
|
||||||
|
, runOhdl = h
|
||||||
|
}
|
||||||
|
return (TorAnnexConnection runenv)
|
||||||
|
Left _e -> return ClosedConnection
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue