version the P2P protocol
Unfortunately ReceiveMessage didn't handle unknown messages the way it was documented to; client sending VERSION would cause the server to return an ERROR and hang up. Fixed that, but old releases of git-annex use the P2P protocol for tor and will still have that behavior. So, version is not negotiated for Remote.P2P connections, only for Remote.Git connections, which will support VERSION from their first release. There will need to be a later flag day to change Remote.P2P; left a commented out line that is the only thing that will need to be changed then. Version 1 of the P2P protocol is not implemented yet, but updated the docs for the DATA change that will be allowed by that version. This commit was sponsored by Jeff Goeke-Smith on Patreon.
This commit is contained in:
parent
5ae103e09a
commit
c81768d425
11 changed files with 201 additions and 88 deletions
45
P2P/Annex.hs
45
P2P/Annex.hs
|
@ -1,6 +1,6 @@
|
|||
{- P2P protocol, Annex implementation
|
||||
-
|
||||
- Copyright 2016 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2016-2018 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
@ -8,7 +8,8 @@
|
|||
{-# LANGUAGE RankNTypes, FlexibleContexts #-}
|
||||
|
||||
module P2P.Annex
|
||||
( RunMode(..)
|
||||
( RunState(..)
|
||||
, mkRunState
|
||||
, P2PConnection(..)
|
||||
, runFullProto
|
||||
) where
|
||||
|
@ -24,22 +25,28 @@ import Types.NumCopies
|
|||
import Utility.Metered
|
||||
|
||||
import Control.Monad.Free
|
||||
import Control.Concurrent.STM
|
||||
|
||||
data RunMode
|
||||
= Serving UUID (Maybe ChangedRefsHandle)
|
||||
| Client
|
||||
data RunState
|
||||
= Serving UUID (Maybe ChangedRefsHandle) (TVar ProtocolVersion)
|
||||
| Client (TVar ProtocolVersion)
|
||||
|
||||
mkRunState :: (TVar ProtocolVersion -> RunState) -> IO RunState
|
||||
mkRunState mk = do
|
||||
tvar <- newTVarIO defaultProtocolVersion
|
||||
return (mk tvar)
|
||||
|
||||
-- Full interpreter for Proto, that can receive and send objects.
|
||||
runFullProto :: RunMode -> P2PConnection -> Proto a -> Annex (Either String a)
|
||||
runFullProto runmode conn = go
|
||||
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either String a)
|
||||
runFullProto runst conn = go
|
||||
where
|
||||
go :: RunProto Annex
|
||||
go (Pure v) = return (Right v)
|
||||
go (Free (Net n)) = runNet conn go n
|
||||
go (Free (Local l)) = runLocal runmode go l
|
||||
go (Free (Local l)) = runLocal runst go l
|
||||
|
||||
runLocal :: RunMode -> RunProto Annex -> LocalF (Proto a) -> Annex (Either String a)
|
||||
runLocal runmode runner a = case a of
|
||||
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either String a)
|
||||
runLocal runst runner a = case a of
|
||||
TmpContentSize k next -> do
|
||||
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
|
||||
size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp
|
||||
|
@ -113,21 +120,26 @@ runLocal runmode runner a = case a of
|
|||
protoaction False
|
||||
next
|
||||
Right _ -> runner next
|
||||
WaitRefChange next -> case runmode of
|
||||
Serving _ (Just h) -> do
|
||||
WaitRefChange next -> case runst of
|
||||
Serving _ (Just h) _ -> do
|
||||
v <- tryNonAsync $ liftIO $ waitChangedRefs h
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Right changedrefs -> runner (next changedrefs)
|
||||
_ -> return $ Left "change notification not available"
|
||||
SetProtocolVersion v next -> do
|
||||
liftIO $ atomically $ writeTVar versiontvar v
|
||||
runner next
|
||||
GetProtocolVersion next ->
|
||||
liftIO (readTVarIO versiontvar) >>= runner . next
|
||||
where
|
||||
transfer mk k af ta = case runmode of
|
||||
transfer mk k af ta = case runst of
|
||||
-- Update transfer logs when serving.
|
||||
Serving theiruuid _ ->
|
||||
Serving theiruuid _ _ ->
|
||||
mk theiruuid k af noRetry ta noNotification
|
||||
-- Transfer logs are updated higher in the stack when
|
||||
-- a client.
|
||||
Client -> ta nullMeterUpdate
|
||||
Client _ -> ta nullMeterUpdate
|
||||
|
||||
storefile dest (Offset o) (Len l) getb p = do
|
||||
let p' = offsetMeterUpdate p (toBytesProcessed o)
|
||||
|
@ -152,3 +164,6 @@ runLocal runmode runner a = case a of
|
|||
liftIO $ hSeek h AbsoluteSeek o
|
||||
b <- liftIO $ hGetContentsMetered h p'
|
||||
runner (sender b)
|
||||
versiontvar = case runst of
|
||||
Serving _ _ tv -> tv
|
||||
Client tv -> tv
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue