implement put data-present parameter in http servant
Changed the protocol docs because servant parses "true" and "false" for booleans in query parameters, not "1" and "0". clientPut with datapresent=True is not used by git-annex, and I don't anticipate it being used in git-annex, except for testing. I've tested this by making clientPut be called with datapresent=True and git-annex copy to a remote succeeds once the object file is first manually copied to the remote. That would be a good test for the test suite, but running the http client means exposing it to at least localhost, and would fail if a real http client was already running on that port.
This commit is contained in:
parent
57e27adb55
commit
a4e9057486
5 changed files with 106 additions and 60 deletions
|
@ -44,7 +44,9 @@ type P2PHttpAPI
|
|||
:<|> "git-annex" :> SU :> PV3 :> "remove-before" :> RemoveBeforeAPI
|
||||
:<|> "git-annex" :> SU :> PV4 :> "gettimestamp" :> GetTimestampAPI
|
||||
:<|> "git-annex" :> SU :> PV3 :> "gettimestamp" :> GetTimestampAPI
|
||||
:<|> "git-annex" :> SU :> PV4 :> "put" :> PutAPI PutResultPlus
|
||||
:<|> "git-annex" :> SU :> PV4 :> "put"
|
||||
:> QueryParam "data-present" Bool
|
||||
:> PutAPI PutResultPlus
|
||||
:<|> "git-annex" :> SU :> PV3 :> "put" :> PutAPI PutResultPlus
|
||||
:<|> "git-annex" :> SU :> PV2 :> "put" :> PutAPI PutResultPlus
|
||||
:<|> "git-annex" :> SU :> PV1 :> "put" :> PutAPI PutResult
|
||||
|
|
|
@ -329,27 +329,32 @@ clientPut
|
|||
-> FileSize
|
||||
-> Annex Bool
|
||||
-- ^ Called after sending the file to check if it's valid.
|
||||
-> Bool
|
||||
-- ^ Set data-present parameter and do not actually send data
|
||||
-- (v4+ only)
|
||||
-> ClientAction PutResultPlus
|
||||
#ifdef WITH_SERVANT
|
||||
clientPut meterupdate k moffset af contentfile contentfilesize validitycheck clientenv (ProtocolVersion ver) su cu bypass auth = do
|
||||
checkv <- liftIO newEmptyTMVarIO
|
||||
checkresultv <- liftIO newEmptyTMVarIO
|
||||
let checker = do
|
||||
liftIO $ atomically $ takeTMVar checkv
|
||||
validitycheck >>= liftIO . atomically . putTMVar checkresultv
|
||||
checkerthread <- liftIO . async =<< forkState checker
|
||||
v <- liftIO $ withBinaryFile contentfile ReadMode $ \h -> do
|
||||
when (offset /= 0) $
|
||||
hSeek h AbsoluteSeek offset
|
||||
withClientM (cli (stream h checkv checkresultv)) clientenv return
|
||||
case v of
|
||||
Left err -> do
|
||||
void $ liftIO $ atomically $ tryPutTMVar checkv ()
|
||||
join $ liftIO (wait checkerthread)
|
||||
return (Left err)
|
||||
Right res -> do
|
||||
join $ liftIO (wait checkerthread)
|
||||
return (Right res)
|
||||
clientPut meterupdate k moffset af contentfile contentfilesize validitycheck datapresent clientenv (ProtocolVersion ver) su cu bypass auth
|
||||
| datapresent = liftIO $ withClientM (cli mempty) clientenv return
|
||||
| otherwise = do
|
||||
checkv <- liftIO newEmptyTMVarIO
|
||||
checkresultv <- liftIO newEmptyTMVarIO
|
||||
let checker = do
|
||||
liftIO $ atomically $ takeTMVar checkv
|
||||
validitycheck >>= liftIO . atomically . putTMVar checkresultv
|
||||
checkerthread <- liftIO . async =<< forkState checker
|
||||
v <- liftIO $ withBinaryFile contentfile ReadMode $ \h -> do
|
||||
when (offset /= 0) $
|
||||
hSeek h AbsoluteSeek offset
|
||||
withClientM (cli (stream h checkv checkresultv)) clientenv return
|
||||
case v of
|
||||
Left err -> do
|
||||
void $ liftIO $ atomically $ tryPutTMVar checkv ()
|
||||
join $ liftIO (wait checkerthread)
|
||||
return (Left err)
|
||||
Right res -> do
|
||||
join $ liftIO (wait checkerthread)
|
||||
return (Right res)
|
||||
where
|
||||
stream h checkv checkresultv = S.SourceT $ \a -> do
|
||||
bl <- hGetContentsMetered h meterupdate
|
||||
|
@ -401,7 +406,7 @@ clientPut meterupdate k moffset af contentfile contentfilesize validitycheck cli
|
|||
bk = B64Key k
|
||||
|
||||
cli src = case ver of
|
||||
4 -> v4 su V4 len bk cu bypass baf moffset src auth
|
||||
4 -> v4 su V4 (if datapresent then Just True else Nothing) len bk cu bypass baf moffset src auth
|
||||
3 -> v3 su V3 len bk cu bypass baf moffset src auth
|
||||
2 -> v2 su V2 len bk cu bypass baf moffset src auth
|
||||
1 -> plus <$> v1 su V1 len bk cu bypass baf moffset src auth
|
||||
|
|
|
@ -26,6 +26,7 @@ import P2P.Http
|
|||
import P2P.Http.Types
|
||||
import P2P.Http.State
|
||||
import P2P.Protocol hiding (Offset, Bypass, auth)
|
||||
import qualified P2P.Protocol
|
||||
import P2P.IO
|
||||
import P2P.Annex
|
||||
import Annex.WorkerPool
|
||||
|
@ -69,10 +70,10 @@ serveP2pHttp st
|
|||
:<|> serveGetTimestamp st
|
||||
:<|> serveGetTimestamp st
|
||||
:<|> servePut st id
|
||||
:<|> servePut st id
|
||||
:<|> servePut st id
|
||||
:<|> servePut st dePlus
|
||||
:<|> servePut st dePlus
|
||||
:<|> servePut' st id
|
||||
:<|> servePut' st id
|
||||
:<|> servePut' st dePlus
|
||||
:<|> servePut' st dePlus
|
||||
:<|> servePutOffset st id
|
||||
:<|> servePutOffset st id
|
||||
:<|> servePutOffset st id
|
||||
|
@ -307,6 +308,7 @@ servePut
|
|||
-> (PutResultPlus -> t)
|
||||
-> B64UUID ServerSide
|
||||
-> v
|
||||
-> Maybe Bool
|
||||
-> DataLength
|
||||
-> B64Key
|
||||
-> B64UUID ClientSide
|
||||
|
@ -317,7 +319,15 @@ servePut
|
|||
-> IsSecure
|
||||
-> Maybe Auth
|
||||
-> Handler t
|
||||
servePut st resultmangle su apiver (DataLength len) (B64Key k) cu bypass baf moffset stream sec auth = do
|
||||
servePut st resultmangle su apiver (Just True) _ k cu bypass baf _ _ sec auth = do
|
||||
res <- withP2PConnection' apiver st cu su bypass sec auth WriteAction
|
||||
(\cst -> cst { connectionWaitVar = False }) (liftIO . protoaction)
|
||||
servePutResult resultmangle res
|
||||
where
|
||||
protoaction conn = servePutAction st conn k baf $ \_offset -> do
|
||||
net $ sendMessage DATA_PRESENT
|
||||
checkSuccessPlus
|
||||
servePut st resultmangle su apiver _datapresent (DataLength len) k cu bypass baf moffset stream sec auth = do
|
||||
validityv <- liftIO newEmptyTMVarIO
|
||||
let validitycheck = local $ runValidityCheck $
|
||||
liftIO $ atomically $ readTMVar validityv
|
||||
|
@ -327,41 +337,27 @@ servePut st resultmangle su apiver (DataLength len) (B64Key k) cu bypass baf mof
|
|||
(\cst -> cst { connectionWaitVar = False }) $ \conn -> do
|
||||
liftIO $ void $ async $ checktooshort conn tooshortv
|
||||
liftIO (protoaction conn content validitycheck)
|
||||
case res of
|
||||
Right (Right (Just plusuuids)) -> return $ resultmangle $
|
||||
PutResultPlus True (map B64UUID plusuuids)
|
||||
Right (Right Nothing) -> return $ resultmangle $
|
||||
PutResultPlus False []
|
||||
Right (Left protofail) -> throwError $
|
||||
err500 { errBody = encodeBL (describeProtoFailure protofail) }
|
||||
Left err -> throwError $
|
||||
err500 { errBody = encodeBL (show err) }
|
||||
servePutResult resultmangle res
|
||||
where
|
||||
protoaction conn content validitycheck = inAnnexWorker st $
|
||||
enteringStage (TransferStage Download) $
|
||||
runFullProto (clientRunState conn) (clientP2PConnection conn) $
|
||||
protoaction' content validitycheck
|
||||
|
||||
protoaction' content validitycheck = put' k af $ \offset' ->
|
||||
let offsetdelta = offset' - offset
|
||||
in case compare offset' offset of
|
||||
EQ -> sendContent' nullMeterUpdate (Len len)
|
||||
content validitycheck
|
||||
GT -> sendContent' nullMeterUpdate
|
||||
(Len (len - fromIntegral offsetdelta))
|
||||
(L.drop (fromIntegral offsetdelta) content)
|
||||
validitycheck
|
||||
LT -> sendContent' nullMeterUpdate
|
||||
(Len len)
|
||||
content
|
||||
(validitycheck >>= \_ -> return Invalid)
|
||||
protoaction conn content validitycheck =
|
||||
servePutAction st conn k baf $ \offset' ->
|
||||
let offsetdelta = offset' - offset
|
||||
in case compare offset' offset of
|
||||
EQ -> sendContent' nullMeterUpdate (Len len)
|
||||
content validitycheck
|
||||
GT -> sendContent' nullMeterUpdate
|
||||
(Len (len - fromIntegral offsetdelta))
|
||||
(L.drop (fromIntegral offsetdelta) content)
|
||||
validitycheck
|
||||
LT -> sendContent' nullMeterUpdate
|
||||
(Len len)
|
||||
content
|
||||
(validitycheck >>= \_ -> return Invalid)
|
||||
|
||||
offset = case moffset of
|
||||
Just (Offset o) -> o
|
||||
Nothing -> 0
|
||||
|
||||
af = b64FilePathToAssociatedFile baf
|
||||
|
||||
-- Streams the ByteString from the client. Avoids returning a longer
|
||||
-- than expected ByteString by truncating to the expected length.
|
||||
-- Returns a shorter than expected ByteString when the data is not
|
||||
|
@ -399,6 +395,49 @@ servePut st resultmangle su apiver (DataLength len) (B64Key k) cu bypass baf mof
|
|||
liftIO $ whenM (atomically $ takeTMVar tooshortv) $
|
||||
closeP2PConnection conn
|
||||
|
||||
servePutAction
|
||||
:: P2PHttpServerState
|
||||
-> P2PConnectionPair
|
||||
-> B64Key
|
||||
-> Maybe B64FilePath
|
||||
-> (P2P.Protocol.Offset -> Proto (Maybe [UUID]))
|
||||
-> IO (Either SomeException (Either ProtoFailure (Maybe [UUID])))
|
||||
servePutAction st conn (B64Key k) baf a = inAnnexWorker st $
|
||||
enteringStage (TransferStage Download) $
|
||||
runFullProto (clientRunState conn) (clientP2PConnection conn) $
|
||||
put' k af a
|
||||
where
|
||||
af = b64FilePathToAssociatedFile baf
|
||||
|
||||
servePutResult :: (PutResultPlus -> t) -> Either SomeException (Either ProtoFailure (Maybe [UUID])) -> Handler t
|
||||
servePutResult resultmangle res = case res of
|
||||
Right (Right (Just plusuuids)) -> return $ resultmangle $
|
||||
PutResultPlus True (map B64UUID plusuuids)
|
||||
Right (Right Nothing) -> return $ resultmangle $
|
||||
PutResultPlus False []
|
||||
Right (Left protofail) -> throwError $
|
||||
err500 { errBody = encodeBL (describeProtoFailure protofail) }
|
||||
Left err -> throwError $
|
||||
err500 { errBody = encodeBL (show err) }
|
||||
|
||||
servePut'
|
||||
:: APIVersion v
|
||||
=> P2PHttpServerState
|
||||
-> (PutResultPlus -> t)
|
||||
-> B64UUID ServerSide
|
||||
-> v
|
||||
-> DataLength
|
||||
-> B64Key
|
||||
-> B64UUID ClientSide
|
||||
-> [B64UUID Bypass]
|
||||
-> Maybe B64FilePath
|
||||
-> Maybe Offset
|
||||
-> S.SourceT IO B.ByteString
|
||||
-> IsSecure
|
||||
-> Maybe Auth
|
||||
-> Handler t
|
||||
servePut' st resultmangle su v = servePut st resultmangle su v Nothing
|
||||
|
||||
servePutOffset
|
||||
:: APIVersion v
|
||||
=> P2PHttpServerState
|
||||
|
|
|
@ -687,7 +687,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key af o meterupdate
|
|||
metered (Just meterupdate) key bwlimit $ \_ p -> do
|
||||
let p' = offsetMeterUpdate p (BytesProcessed n)
|
||||
res <- p2pHttpClient r giveup $
|
||||
clientPut p' key (Just offset) af object sz check'
|
||||
clientPut p' key (Just offset) af object sz check' False
|
||||
case res of
|
||||
PutResultPlus False fanoutuuids -> do
|
||||
storefanout fanoutuuids
|
||||
|
|
|
@ -388,10 +388,10 @@ There are are also these optional parameters:
|
|||
|
||||
* `data-present`
|
||||
|
||||
When set to 1, this indicates that the data has been sent to the repository
|
||||
in some other way. The body of the request will be empty. The server will
|
||||
verify that the data is present in the repository and will proceed the
|
||||
same as if the data was sent in the request.
|
||||
When set to "true", this indicates that the data has been sent to the
|
||||
repository in some other way. The body of the request will be empty.
|
||||
The server will verify that the data is present in the repository and
|
||||
will proceed the same as if the data was sent in the request.
|
||||
|
||||
The `Content-Type` header should be `application/octet-stream`.
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue