implement remove-before

The reason to use removeBeforeRemoteEndTime is twofold.

First, removeBefore sends two protocol commands. Currently, the HTTP
protocol runner only supports sending a single command per invocation.

Secondly, the http server gets a monotonic timestamp from the client. So
translating back to a POSIXTime would be annoying.

The timestamp flow with a proxy will be:

- client gets timestamp, which gets the monotonic timestamp from the
  proxied remote via the proxy. The timestamp is currently not
  proxied when there is a single proxy.
- client calls remove-before
- http server calls removeBeforeRemoteEndTime which sends REMOVE-BEFORE
  to the proxied remote.
This commit is contained in:
Joey Hess 2024-07-10 10:03:26 -04:00
parent e9cba0a580
commit 7c588a5791
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 65 additions and 38 deletions

View file

@ -16,6 +16,8 @@ import P2P.Http
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import Annex.Url import Annex.Url
import Utility.Env import Utility.Env
import Utility.ThreadScheduler
import Utility.MonotonicClock
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
@ -71,7 +73,7 @@ seek o = startConcurrency commandStages $ do
-- XXX remove this -- XXX remove this
when (isNothing (portOption o)) $ do when (isNothing (portOption o)) $ do
liftIO $ putStrLn "test begins" liftIO $ putStrLn "test begins"
testRemove testRemoveBefore
giveup "TEST DONE" giveup "TEST DONE"
withLocalP2PConnections $ \acquireconn -> liftIO $ do withLocalP2PConnections $ \acquireconn -> liftIO $ do
authenv <- getAuthEnv authenv <- getAuthEnv
@ -149,7 +151,7 @@ testCheckPresent = do
burl <- liftIO $ parseBaseUrl "http://localhost:8080/" burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
res <- liftIO $ clientCheckPresent (mkClientEnv mgr burl) res <- liftIO $ clientCheckPresent (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3) (P2P.ProtocolVersion 3)
(B64Key (fromJust $ deserializeKey ("WORM-s30-m1720547401--foo" :: String))) (B64Key (fromJust $ deserializeKey ("WORM-s30-m1720617630--bar" :: String)))
(B64UUID (toUUID ("cu" :: String))) (B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String))) (B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
[] []
@ -168,3 +170,20 @@ testRemove = do
Nothing Nothing
liftIO $ print res liftIO $ print res
testRemoveBefore = do
mgr <- httpManager <$> getUrlOptions
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
MonotonicTimestamp t <- liftIO currentMonotonicTimestamp
--liftIO $ threadDelaySeconds (Seconds 10)
let ts = MonotonicTimestamp (t + 10)
liftIO $ print ("running with timestamp", ts)
res <- liftIO $ clientRemoveBefore (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3)
(B64Key (fromJust $ deserializeKey ("WORM-s30-m1720617630--bar" :: String)))
(B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
[]
(Timestamp ts)
Nothing
liftIO $ print res

View file

@ -111,7 +111,7 @@ serveP2pHttp st
type GetGenericAPI = StreamGet NoFraming OctetStream (SourceIO B.ByteString) type GetGenericAPI = StreamGet NoFraming OctetStream (SourceIO B.ByteString)
serveGetGeneric :: P2PHttpServerState -> B64Key -> Handler (S.SourceT IO B.ByteString) serveGetGeneric :: P2PHttpServerState -> B64Key -> Handler (S.SourceT IO B.ByteString)
serveGetGeneric = undefined serveGetGeneric = undefined -- TODO
type GetAPI type GetAPI
= ClientUUID Optional = ClientUUID Optional
@ -135,7 +135,7 @@ serveGet
-> Maybe Offset -> Maybe Offset
-> Maybe Auth -> Maybe Auth
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGet = undefined serveGet = undefined -- TODO
clientGet clientGet
:: ProtocolVersion :: ProtocolVersion
@ -234,8 +234,8 @@ serveRemove st resultmangle apiver (B64Key k) cu su bypass sec auth = do
$ \runst conn -> $ \runst conn ->
liftIO $ runNetProto runst conn $ remove Nothing k liftIO $ runNetProto runst conn $ remove Nothing k
case res of case res of
(Right b, plus) -> return $ resultmangle $ (Right b, plusuuids) -> return $ resultmangle $
RemoveResultPlus b (map B64UUID (fromMaybe [] plus)) RemoveResultPlus b (map B64UUID (fromMaybe [] plusuuids))
(Left err, _) -> throwError $ (Left err, _) -> throwError $
err500 { errBody = encodeBL err } err500 { errBody = encodeBL err }
@ -270,7 +270,9 @@ type RemoveBeforeAPI
:> ServerUUID Required :> ServerUUID Required
:> BypassUUIDs :> BypassUUIDs
:> QueryParam' '[Required] "timestamp" Timestamp :> QueryParam' '[Required] "timestamp" Timestamp
:> Post '[JSON] RemoveResult :> IsSecure
:> AuthHeader
:> Post '[JSON] RemoveResultPlus
serveRemoveBefore serveRemoveBefore
:: APIVersion v :: APIVersion v
@ -281,27 +283,44 @@ serveRemoveBefore
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Timestamp -> Timestamp
-> Handler RemoveResult -> IsSecure
serveRemoveBefore = undefined -> Maybe Auth
-> Handler RemoveResultPlus
serveRemoveBefore st apiver (B64Key k) cu su bypass (Timestamp ts) sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction
$ \runst conn ->
liftIO $ runNetProto runst conn $
removeBeforeRemoteEndTime ts k
case res of
(Right b, plusuuids) -> return $
RemoveResultPlus b (map B64UUID (fromMaybe [] plusuuids))
(Left err, _) -> throwError $
err500 { errBody = encodeBL err }
clientRemoveBefore clientRemoveBefore
:: ProtocolVersion :: ClientEnv
-> ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Timestamp -> Timestamp
-> ClientM RemoveResult -> Maybe Auth
clientRemoveBefore (ProtocolVersion ver) = case ver of -> IO RemoveResultPlus
3 -> v3 V3 clientRemoveBefore clientenv (ProtocolVersion ver) key cu su bypass ts auth =
_ -> error "unsupported protocol version" withClientM (cli key cu su bypass ts auth) clientenv $ \case
Left err -> throwM err
Right res -> return res
where where
cli = case ver of
3 -> v3 V3
_ -> error "unsupported protocol version"
_ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|>
_ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|>
_ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|>
v3 :<|> _ = client p2pHttpAPI v3 :<|> _ = client p2pHttpAPI
type GetTimestampAPI type GetTimestampAPI
= ClientUUID Required = ClientUUID Required
:> ServerUUID Required :> ServerUUID Required
@ -316,7 +335,7 @@ serveGetTimestamp
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Handler GetTimestampResult -> Handler GetTimestampResult
serveGetTimestamp = undefined serveGetTimestamp = undefined -- TODO
clientGetTimestamp clientGetTimestamp
:: ProtocolVersion :: ProtocolVersion
@ -360,7 +379,7 @@ servePut
-> DataLength -> DataLength
-> S.SourceT IO B.ByteString -> S.SourceT IO B.ByteString
-> Handler t -> Handler t
servePut = undefined servePut = undefined -- TODO
clientPut clientPut
:: ProtocolVersion :: ProtocolVersion
@ -405,7 +424,7 @@ servePutOffset
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Handler t -> Handler t
servePutOffset = undefined servePutOffset = undefined -- TODO
clientPutOffset clientPutOffset
:: ProtocolVersion :: ProtocolVersion
@ -443,7 +462,7 @@ serveLockContent
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Handler LockResult -> Handler LockResult
serveLockContent = undefined serveLockContent = undefined -- TODO
clientLockContent clientLockContent
:: ProtocolVersion :: ProtocolVersion

View file

@ -437,15 +437,18 @@ removeBefore endtime key = do
let remoteendtime = remotetime + timeleft' let remoteendtime = remotetime + timeleft'
if timeleft <= 0 if timeleft <= 0
then return (Right False, Nothing) then return (Right False, Nothing)
else do else removeBeforeRemoteEndTime remoteendtime key
net $ sendMessage $
REMOVE_BEFORE remoteendtime key
checkSuccessFailurePlus
Just (ERROR err) -> return (Left err, Nothing) Just (ERROR err) -> return (Left err, Nothing)
_ -> do _ -> do
net $ sendMessage (ERROR "expected TIMESTAMP") net $ sendMessage (ERROR "expected TIMESTAMP")
return (Right False, Nothing) return (Right False, Nothing)
removeBeforeRemoteEndTime :: MonotonicTimestamp -> Key -> Proto (Either String Bool, Maybe [UUID])
removeBeforeRemoteEndTime remoteendtime key = do
net $ sendMessage $
REMOVE_BEFORE remoteendtime key
checkSuccessFailurePlus
get :: FilePath -> Key -> Maybe IncrementalVerifier -> AssociatedFile -> Meter -> MeterUpdate -> Proto (Bool, Verification) get :: FilePath -> Key -> Maybe IncrementalVerifier -> AssociatedFile -> Meter -> MeterUpdate -> Proto (Bool, Verification)
get dest key iv af m p = get dest key iv af m p =
receiveContent (Just m) p sizer storer $ \offset -> receiveContent (Just m) p sizer storer $ \offset ->

View file

@ -272,10 +272,6 @@ or false if the key was not able to be removed.
The JSON object can have an additional field "plusuuids" that is a list of The JSON object can have an additional field "plusuuids" that is a list of
UUIDs of other repositories that the content was removed from. UUIDs of other repositories that the content was removed from.
If the server does not allow removing the key due to a policy
(eg due to being read-only or append-only), it will respond with a JSON
object with an "error" field that has an error message as its value.
### POST /git-annex/v2/remove ### POST /git-annex/v2/remove
Identical to v3. Identical to v3.
@ -362,17 +358,11 @@ should not be used. This can happen when eg, the data was being sent from
an unlocked annexed file, which got modified while it was being sent. an unlocked annexed file, which got modified while it was being sent.
The server responds with a JSON object with a field "stored" The server responds with a JSON object with a field "stored"
that is true if it received the data and stored the that is true if it received the data and stored the content.
content.
The JSON object can have an additional field "plusuuids" that is a list of The JSON object can have an additional field "plusuuids" that is a list of
UUIDs of other repositories that the content was stored to. UUIDs of other repositories that the content was stored to.
If the server does not allow storing the key due eg to a policy
(eg due to being read-only or append-only), or due to the data being
invalid, or because it ran out of disk space, it will respond with a
JSON object with an "error" field that has an error message as its value.
### POST /git-annex/v2/put ### POST /git-annex/v2/put
Identical to v3. Identical to v3.
@ -412,10 +402,6 @@ object may also have a field "plusuuids" that lists
the UUIDs of other repositories where the content is stored, in addition to the UUIDs of other repositories where the content is stored, in addition to
the serveruuid. the serveruuid.
If the server does not allow storing the key due to a policy
(eg due to being read-only or append-only), it will respond with a JSON
object with an "error" field that has an error message as its value.
[Implementation note: This will be implemented by sending `PUT` and [Implementation note: This will be implemented by sending `PUT` and
returning the `PUT-FROM` offset. To avoid leaving the P2P protocol stuck returning the `PUT-FROM` offset. To avoid leaving the P2P protocol stuck
part way through a `PUT`, a synthetic empty `DATA` followed by `INVALID` part way through a `PUT`, a synthetic empty `DATA` followed by `INVALID`