servePut and clientPut implementation

Made the data-length header required even for v0. This simplifies the
implementation, and doesn't preclude extra verification being done for
v0.

The connectionWaitVar is an ugly hack. In servePut, nothing waits
on the waitvar, and I could not find a good way to make anything wait on
it.
This commit is contained in:
Joey Hess 2024-07-22 10:20:18 -04:00
parent eb4fb388bd
commit 4826a3745d
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
10 changed files with 222 additions and 185 deletions

View file

@ -27,6 +27,7 @@ import P2P.Protocol hiding (Offset, Bypass, auth)
import P2P.IO
import P2P.Annex
import Annex.WorkerPool
import Annex.Concurrent
import Types.WorkerPool
import Types.Direction
import Utility.Metered
@ -37,6 +38,7 @@ import qualified Servant.Types.SourceT as S
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Lazy.Internal as LI
import Data.Char
import Control.Concurrent.STM
import Control.Concurrent.Async
import Control.Concurrent
@ -57,14 +59,10 @@ type P2PHttpAPI
:<|> "git-annex" :> SU :> PV0 :> "remove" :> RemoveAPI RemoveResult
:<|> "git-annex" :> SU :> PV3 :> "remove-before" :> RemoveBeforeAPI
:<|> "git-annex" :> SU :> PV3 :> "gettimestamp" :> GetTimestampAPI
:<|> "git-annex" :> SU :> PV3 :> "put" :> DataLengthHeader
:> PutAPI PutResultPlus
:<|> "git-annex" :> SU :> PV2 :> "put" :> DataLengthHeader
:> PutAPI PutResultPlus
:<|> "git-annex" :> SU :> PV1 :> "put" :> DataLengthHeader
:> PutAPI PutResult
:<|> "git-annex" :> SU :> PV0 :> "put"
:> PutAPI PutResult
:<|> "git-annex" :> SU :> PV3 :> "put" :> PutAPI PutResultPlus
:<|> "git-annex" :> SU :> PV2 :> "put" :> PutAPI PutResultPlus
:<|> "git-annex" :> SU :> PV1 :> "put" :> PutAPI PutResult
:<|> "git-annex" :> SU :> PV0 :> "put" :> PutAPI PutResult
:<|> "git-annex" :> SU :> PV3 :> "putoffset"
:> PutOffsetAPI PutOffsetResultPlus
:<|> "git-annex" :> SU :> PV2 :> "putoffset"
@ -106,7 +104,7 @@ serveP2pHttp st
:<|> servePut st id
:<|> servePut st id
:<|> servePut st dePlus
:<|> (\su v -> servePut st dePlus su v Nothing)
:<|> servePut st dePlus
:<|> servePutOffset st id
:<|> servePutOffset st id
:<|> servePutOffset st dePlus
@ -136,7 +134,7 @@ serveGetGeneric
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGetGeneric st su@(B64UUID u) k =
-- Use V0 because it does not alter the returned data to indicate
-- InValid content.
-- Invalid content.
serveGet st su V0 k cu [] Nothing Nothing
where
-- Reuse server UUID as client UUID.
@ -167,7 +165,7 @@ serveGet
-> Maybe Auth
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
conn <- getP2PConnection apiver st cu su bypass sec auth ReadAction
conn <- getP2PConnection apiver st cu su bypass sec auth ReadAction id
bsv <- liftIO newEmptyTMVarIO
endv <- liftIO newEmptyTMVarIO
validityv <- liftIO newEmptyTMVarIO
@ -181,11 +179,10 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
return $ \v -> do
liftIO $ atomically $ putTMVar validityv v
return True
v <- enteringStage (TransferStage Upload) $
enteringStage (TransferStage Upload) $
runFullProto (clientRunState conn) (clientP2PConnection conn) $
void $ receiveContent Nothing nullMeterUpdate
sizer storer getreq
return v
void $ liftIO $ forkIO $ waitfinal endv finalv conn annexworker
(Len len, bs) <- liftIO $ atomically $ takeTMVar bsv
bv <- liftIO $ newMVar (L.toChunks bs)
@ -240,7 +237,7 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
-- Make sure the annexworker is not left blocked on endv
-- if the client disconnected early.
void $ liftIO $ atomically $ tryPutTMVar endv ()
void $ void $ tryNonAsync $ wait annexworker
void $ tryNonAsync $ wait annexworker
void $ tryNonAsync $ releaseP2PConnection conn
sizer = pure $ Len $ case startat of
@ -505,7 +502,8 @@ clientGetTimestamp clientenv (ProtocolVersion ver) cu su bypass auth =
v3 :<|> _ = client p2pHttpAPI
type PutAPI result
= KeyParam
= DataLengthHeaderRequired
:> KeyParam
:> CU Required
:> BypassUUIDs
:> AssociatedFileParam
@ -521,7 +519,7 @@ servePut
-> (PutResultPlus -> t)
-> B64UUID ServerSide
-> v
-> Maybe DataLength
-> DataLength
-> B64Key
-> B64UUID ClientSide
-> [B64UUID Bypass]
@ -531,35 +529,154 @@ servePut
-> IsSecure
-> Maybe Auth
-> Handler t
servePut st resultmangle su apiver datalen k cu bypass af offset stream sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth WriteAction
$ \conn ->
liftIO $ proxyClientNetProto conn undefined
servePut st resultmangle su apiver (DataLength len) (B64Key k) cu bypass baf moffset stream sec auth = do
validityv <- liftIO newEmptyTMVarIO
let validitycheck = local $ runValidityCheck $
liftIO $ atomically $ readTMVar validityv
content <- liftIO $ S.unSourceT stream (gather validityv)
conn <- getP2PConnection apiver st cu su bypass sec auth WriteAction $
\st -> st { connectionWaitVar = False }
res <- liftIO $ inAnnexWorker st $
enteringStage (TransferStage Download) $
runFullProto (clientRunState conn) (clientP2PConnection conn) $
protoaction content validitycheck
case res of
Right (stored, plusuuids) -> return $ resultmangle $
PutResultPlus stored plusuuids
Right (Right (Just plusuuids)) -> return $ resultmangle $
PutResultPlus True (map B64UUID plusuuids)
Right (Right Nothing) -> return $ resultmangle $
PutResultPlus False []
Right (Left protofail) -> throwError $
err500 { errBody = encodeBL (describeProtoFailure protofail) }
Left err -> throwError $
err500 { errBody = encodeBL err }
err500 { errBody = encodeBL (show err) }
where
protoaction content validitycheck = put' k af $ \offset' ->
let offsetdelta = offset' - offset
in case compare offset' offset of
EQ -> sendContent' nullMeterUpdate (Len len)
content validitycheck
GT -> sendContent' nullMeterUpdate
(Len (len - fromIntegral offsetdelta))
(L.drop (fromIntegral offsetdelta) content)
validitycheck
LT -> sendContent' nullMeterUpdate
(Len 0)
mempty
(return Invalid)
offset = case moffset of
Just (Offset o) -> o
Nothing -> 0
af = AssociatedFile $ case baf of
Just (B64FilePath f) -> Just f
Nothing -> Nothing
-- Streams the ByteString from the client. Avoids returning a longer
-- or shorter than expected ByteString by truncating or padding;
-- in such cases the data is not Valid.
gather validityv = unsafeInterleaveIO . go 0
where
go n S.Stop
| n == len = do
atomically $ writeTMVar validityv Valid
return LI.Empty
| otherwise = do
atomically $ writeTMVar validityv Invalid
padout n
go n (S.Error _err) = do
atomically $ writeTMVar validityv Invalid
padout n
go n (S.Skip s) = go n s
go n (S.Effect ms) = ms >>= go n
go n (S.Yield v s) =
let !n' = n + fromIntegral (B.length v)
in if n' > len
then do
atomically $ writeTMVar validityv Invalid
return $ LI.Chunk
(B.take (fromIntegral (len - n')) v)
LI.Empty
else LI.Chunk v <$> unsafeInterleaveIO (go n' s)
padout n =return $ LI.Chunk
(B.replicate (fromIntegral (len-n))
(fromIntegral (ord 'X')))
LI.Empty
clientPut
:: ProtocolVersion
-> DataLength
:: ClientEnv
-> ProtocolVersion
-> B64Key
-> B64UUID ClientSide
-> B64UUID ServerSide
-> B64UUID ClientSide
-> [B64UUID Bypass]
-> Maybe B64FilePath
-> Maybe Offset
-> S.SourceT IO B.ByteString
-> Maybe Auth
-> ClientM PutResultPlus
clientPut (ProtocolVersion ver) sz k cu su bypass af o src auth = case ver of
3 -> v3 su V3 (Just sz) k cu bypass af o src auth
2 -> v2 su V2 (Just sz) k cu bypass af o src auth
1 -> plus <$> v1 su V1 (Just sz) k cu bypass af o src auth
0 -> plus <$> v0 su V0 k cu bypass af o src auth
_ -> error "unsupported protocol version"
-> Maybe Offset
-> AssociatedFile
-> FilePath
-> FileSize
-> Annex Bool
-> Annex PutResultPlus
clientPut clientenv (ProtocolVersion ver) k su cu bypass auth moffset af contentfile contentfilesize validitycheck = do
checkv <- liftIO newEmptyTMVarIO
checkresultv <- liftIO newEmptyTMVarIO
let checker = do
liftIO $ atomically $ takeTMVar checkv
validitycheck >>= liftIO . atomically . putTMVar checkresultv
checkerthread <- liftIO . async =<< forkState checker
liftIO (withClientM (cli (stream checkv checkresultv)) clientenv return) >>= \case
Left err -> do
void $ liftIO $ atomically $ tryPutTMVar checkv ()
join $ liftIO (wait checkerthread)
throwM err
Right res -> do
join $ liftIO (wait checkerthread)
return res
where
stream checkv checkresultv = S.SourceT $ \a -> do
bl <- L.readFile contentfile
v <- newMVar (0, L.toChunks bl)
a (go v)
where
go v = S.fromActionStep B.null $ do
res <- modifyMVar v $ pure . \case
(n, []) -> ((n, []), (n, Nothing))
(n, (b:bs)) ->
let !n' = n + B.length b
in ((n', bs), (n, Just b))
case res of
(_, Just b) -> return b
(n, Nothing) -> do
void $ liftIO $ atomically $
tryPutTMVar checkv ()
valid <- liftIO $ atomically $
readTMVar checkresultv
if not valid
then if n == fromIntegral contentfilesize
then do
modifyMVar_ v $ \(_n, l) ->
pure (n+1, l)
return "X"
else return B.empty
else return B.empty
baf = case af of
AssociatedFile Nothing -> Nothing
AssociatedFile (Just f) -> Just (B64FilePath f)
len = DataLength $ case moffset of
Nothing -> contentfilesize
Just (Offset o) -> contentfilesize - fromIntegral o
cli src = case ver of
3 -> v3 su V3 len k cu bypass baf moffset src auth
2 -> v2 su V2 len k cu bypass baf moffset src auth
1 -> plus <$> v1 su V1 len k cu bypass baf moffset src auth
0 -> plus <$> v0 su V0 len k cu bypass baf moffset src auth
_ -> error "unsupported protocol version"
_ :<|> _ :<|> _ :<|> _ :<|>
_ :<|> _ :<|> _ :<|> _ :<|>
_ :<|> _ :<|> _ :<|> _ :<|>
@ -757,6 +874,8 @@ type OffsetParam = QueryParam "offset" Offset
type DataLengthHeader = Header DataLengthHeader' DataLength
type DataLengthHeaderRequired = Header' '[Required] DataLengthHeader' DataLength
type DataLengthHeader' = "X-git-annex-data-length"
type LockIDParam = QueryParam' '[Required] "lockid" LockID