implemented servePutOffset and clientPutOffset
But, it's buggy: the server hangs without processing the VALIDITY, and I can't seem to work out why. As far as I can see, storefile is getting as far as running the validitycheck, which is supposed to read that, but never does. This is especially strange because what seems like the same protocol doesn't hang when servePut runs it. This made me think that it needed to use inAnnexWorker to be more like servePut, but that didn't help. Another small problem with this is that it does create an empty .git/annex/tmp/ file for the key. Since this will usually be used in combination with servePut, that doesn't seem worth worrying about much.
This commit is contained in:
parent
c36b9cd1fc
commit
3069e28dd8
6 changed files with 110 additions and 31 deletions
|
@ -73,7 +73,7 @@ seek o = getAnnexWorkerPool $ \workerpool -> do
|
||||||
-- XXX remove this
|
-- XXX remove this
|
||||||
when (isNothing (portOption o)) $ do
|
when (isNothing (portOption o)) $ do
|
||||||
liftIO $ putStrLn "test begins"
|
liftIO $ putStrLn "test begins"
|
||||||
testPut
|
testPutOffset
|
||||||
giveup "TEST DONE"
|
giveup "TEST DONE"
|
||||||
withLocalP2PConnections workerpool $ \acquireconn -> liftIO $ do
|
withLocalP2PConnections workerpool $ \acquireconn -> liftIO $ do
|
||||||
authenv <- getAuthEnv
|
authenv <- getAuthEnv
|
||||||
|
@ -182,11 +182,23 @@ testPut = do
|
||||||
(B64UUID (toUUID ("cu" :: String)))
|
(B64UUID (toUUID ("cu" :: String)))
|
||||||
[]
|
[]
|
||||||
Nothing
|
Nothing
|
||||||
(Just (Offset 584754208))
|
Nothing
|
||||||
(AssociatedFile (Just "foo"))
|
(AssociatedFile (Just "foo"))
|
||||||
"bigfile3content"
|
"emptyfile"
|
||||||
1048576000
|
0
|
||||||
(liftIO (print "validity check") >> return True)
|
(liftIO (print "validity check") >> return False)
|
||||||
|
liftIO $ print res
|
||||||
|
|
||||||
|
testPutOffset = do
|
||||||
|
mgr <- httpManager <$> getUrlOptions
|
||||||
|
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
|
||||||
|
res <- liftIO $ clientPutOffset (mkClientEnv mgr burl)
|
||||||
|
(P2P.ProtocolVersion 3)
|
||||||
|
(B64Key (fromJust $ deserializeKey ("SHA256E-s1048576000--b460ca923520db561d01b99483e9e2fe65ff9dfbdd52c17acba6ac4e874e27d5")))
|
||||||
|
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
|
||||||
|
(B64UUID (toUUID ("cu" :: String)))
|
||||||
|
[]
|
||||||
|
Nothing
|
||||||
liftIO $ print res
|
liftIO $ print res
|
||||||
|
|
||||||
testRemove = do
|
testRemove = do
|
||||||
|
|
41
P2P/Http.hs
41
P2P/Http.hs
|
@ -720,6 +720,8 @@ type PutOffsetAPI result
|
||||||
= KeyParam
|
= KeyParam
|
||||||
:> CU Required
|
:> CU Required
|
||||||
:> BypassUUIDs
|
:> BypassUUIDs
|
||||||
|
:> IsSecure
|
||||||
|
:> AuthHeader
|
||||||
:> Post '[JSON] result
|
:> Post '[JSON] result
|
||||||
|
|
||||||
servePutOffset
|
servePutOffset
|
||||||
|
@ -731,29 +733,50 @@ servePutOffset
|
||||||
-> B64Key
|
-> B64Key
|
||||||
-> B64UUID ClientSide
|
-> B64UUID ClientSide
|
||||||
-> [B64UUID Bypass]
|
-> [B64UUID Bypass]
|
||||||
|
-> IsSecure
|
||||||
|
-> Maybe Auth
|
||||||
-> Handler t
|
-> Handler t
|
||||||
servePutOffset st resultmangle su apiver (B64Key k) cu bypass = undefined
|
servePutOffset st resultmangle su apiver (B64Key k) cu bypass sec auth = do
|
||||||
|
res <- withP2PConnection apiver st cu su bypass sec auth WriteAction
|
||||||
|
(\cst -> cst { connectionWaitVar = False }) $ \conn ->
|
||||||
|
liftIO $ proxyClientNetProto conn $ getPutOffset k af
|
||||||
|
case res of
|
||||||
|
Right offset -> return $ resultmangle $
|
||||||
|
PutOffsetResultPlus (Offset offset)
|
||||||
|
Left plusuuids -> return $ resultmangle $
|
||||||
|
PutOffsetResultAlreadyHavePlus (map B64UUID plusuuids)
|
||||||
|
where
|
||||||
|
af = AssociatedFile Nothing
|
||||||
|
|
||||||
clientPutOffset
|
clientPutOffset
|
||||||
:: B64UUID ServerSide
|
:: ClientEnv
|
||||||
-> ProtocolVersion
|
-> ProtocolVersion
|
||||||
-> B64Key
|
-> B64Key
|
||||||
|
-> B64UUID ServerSide
|
||||||
-> B64UUID ClientSide
|
-> B64UUID ClientSide
|
||||||
-> [B64UUID Bypass]
|
-> [B64UUID Bypass]
|
||||||
-> ClientM PutOffsetResultPlus
|
-> Maybe Auth
|
||||||
clientPutOffset su (ProtocolVersion ver) = case ver of
|
-> IO PutOffsetResultPlus
|
||||||
3 -> v3 su V3
|
clientPutOffset clientenv (ProtocolVersion ver) k su cu bypass auth
|
||||||
2 -> v2 su V2
|
| ver == 0 = return (PutOffsetResultPlus (Offset 0))
|
||||||
_ -> error "unsupported protocol version"
|
| otherwise =
|
||||||
|
withClientM cli clientenv $ \case
|
||||||
|
Left err -> throwM err
|
||||||
|
Right res -> return res
|
||||||
where
|
where
|
||||||
|
cli = case ver of
|
||||||
|
3 -> v3 su V3 k cu bypass auth
|
||||||
|
2 -> v2 su V2 k cu bypass auth
|
||||||
|
1 -> plus <$> v1 su V1 k cu bypass auth
|
||||||
|
_ -> error "unsupported protocol version"
|
||||||
|
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
_ :<|>
|
_ :<|>
|
||||||
_ :<|>
|
_ :<|>
|
||||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||||
v3 :<|> v2 :<|> _ = client p2pHttpAPI
|
v3 :<|> v2 :<|> v1 :<|> _ = client p2pHttpAPI
|
||||||
|
|
||||||
type LockContentAPI
|
type LockContentAPI
|
||||||
= KeyParam
|
= KeyParam
|
||||||
|
|
|
@ -107,10 +107,14 @@ newtype PutResult = PutResult Bool
|
||||||
data PutResultPlus = PutResultPlus Bool [B64UUID Plus]
|
data PutResultPlus = PutResultPlus Bool [B64UUID Plus]
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
newtype PutOffsetResult = PutOffsetResult Offset
|
data PutOffsetResult
|
||||||
|
= PutOffsetResult Offset
|
||||||
|
| PutOffsetResultAlreadyHave
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
data PutOffsetResultPlus = PutOffsetResultPlus Offset [B64UUID Plus]
|
data PutOffsetResultPlus
|
||||||
|
= PutOffsetResultPlus Offset
|
||||||
|
| PutOffsetResultAlreadyHavePlus [B64UUID Plus]
|
||||||
deriving (Show, Generic, NFData)
|
deriving (Show, Generic, NFData)
|
||||||
|
|
||||||
newtype Offset = Offset P2P.Offset
|
newtype Offset = Offset P2P.Offset
|
||||||
|
@ -296,23 +300,37 @@ instance FromJSON GetTimestampResult where
|
||||||
instance ToJSON PutOffsetResult where
|
instance ToJSON PutOffsetResult where
|
||||||
toJSON (PutOffsetResult (Offset (P2P.Offset o))) = object
|
toJSON (PutOffsetResult (Offset (P2P.Offset o))) = object
|
||||||
["offset" .= o]
|
["offset" .= o]
|
||||||
|
toJSON PutOffsetResultAlreadyHave = object
|
||||||
|
["alreadyhave" .= True]
|
||||||
|
|
||||||
instance FromJSON PutOffsetResult where
|
instance FromJSON PutOffsetResult where
|
||||||
parseJSON = withObject "PutOffsetResult" $ \v ->
|
parseJSON = withObject "PutOffsetResult" $ \v ->
|
||||||
PutOffsetResult
|
(PutOffsetResult
|
||||||
<$> (Offset . P2P.Offset <$> v .: "offset")
|
<$> (Offset . P2P.Offset <$> v .: "offset"))
|
||||||
|
<|> (mkalreadyhave
|
||||||
|
<$> (v .: "alreadyhave"))
|
||||||
|
where
|
||||||
|
mkalreadyhave :: Bool -> PutOffsetResult
|
||||||
|
mkalreadyhave _ = PutOffsetResultAlreadyHave
|
||||||
|
|
||||||
instance ToJSON PutOffsetResultPlus where
|
instance ToJSON PutOffsetResultPlus where
|
||||||
toJSON (PutOffsetResultPlus (Offset (P2P.Offset o)) us) = object
|
toJSON (PutOffsetResultPlus (Offset (P2P.Offset o))) = object
|
||||||
[ "offset" .= o
|
[ "offset" .= o ]
|
||||||
|
toJSON (PutOffsetResultAlreadyHavePlus us) = object
|
||||||
|
[ "alreadyhave" .= True
|
||||||
, "plusuuids" .= plusList us
|
, "plusuuids" .= plusList us
|
||||||
]
|
]
|
||||||
|
|
||||||
instance FromJSON PutOffsetResultPlus where
|
instance FromJSON PutOffsetResultPlus where
|
||||||
parseJSON = withObject "PutOffsetResultPlus" $ \v ->
|
parseJSON = withObject "PutOffsetResultPlus" $ \v ->
|
||||||
PutOffsetResultPlus
|
(PutOffsetResultPlus
|
||||||
<$> (Offset . P2P.Offset <$> v .: "offset")
|
<$> (Offset . P2P.Offset <$> v .: "offset"))
|
||||||
<*> v .: "plusuuids"
|
<|> (mkalreadyhave
|
||||||
|
<$> (v .: "alreadyhave")
|
||||||
|
<*> (v .: "plusuuids"))
|
||||||
|
where
|
||||||
|
mkalreadyhave :: Bool -> [B64UUID Plus] -> PutOffsetResultPlus
|
||||||
|
mkalreadyhave _ us = PutOffsetResultAlreadyHavePlus us
|
||||||
|
|
||||||
instance FromJSON (B64UUID t) where
|
instance FromJSON (B64UUID t) where
|
||||||
parseJSON (String t) = case decodeB64Text t of
|
parseJSON (String t) = case decodeB64Text t of
|
||||||
|
@ -358,5 +376,8 @@ instance PlusClass PutResultPlus PutResult where
|
||||||
plus (PutResult b) = PutResultPlus b mempty
|
plus (PutResult b) = PutResultPlus b mempty
|
||||||
|
|
||||||
instance PlusClass PutOffsetResultPlus PutOffsetResult where
|
instance PlusClass PutOffsetResultPlus PutOffsetResult where
|
||||||
dePlus (PutOffsetResultPlus o _) = PutOffsetResult o
|
dePlus (PutOffsetResultPlus o) = PutOffsetResult o
|
||||||
plus (PutOffsetResult o) = PutOffsetResultPlus o mempty
|
dePlus (PutOffsetResultAlreadyHavePlus _) = PutOffsetResultAlreadyHave
|
||||||
|
plus (PutOffsetResult o) = PutOffsetResultPlus o
|
||||||
|
plus PutOffsetResultAlreadyHave = PutOffsetResultAlreadyHavePlus []
|
||||||
|
|
||||||
|
|
|
@ -482,6 +482,24 @@ put' key af sender = do
|
||||||
net $ sendMessage (ERROR "expected PUT_FROM or ALREADY_HAVE")
|
net $ sendMessage (ERROR "expected PUT_FROM or ALREADY_HAVE")
|
||||||
return Nothing
|
return Nothing
|
||||||
|
|
||||||
|
-- The protocol does not have a way to get the PUT offset
|
||||||
|
-- without sending DATA, so send an empty bytestring and indicate
|
||||||
|
-- it is not valid.
|
||||||
|
getPutOffset :: Key -> AssociatedFile -> Proto (Either [UUID] Offset)
|
||||||
|
getPutOffset key af = do
|
||||||
|
net $ sendMessage (PUT (ProtoAssociatedFile af) key)
|
||||||
|
r <- net receiveMessage
|
||||||
|
case r of
|
||||||
|
Just (PUT_FROM offset) -> do
|
||||||
|
void $ sendContent' nullMeterUpdate (Len 0) L.empty $
|
||||||
|
return Invalid
|
||||||
|
return (Right offset)
|
||||||
|
Just ALREADY_HAVE -> return (Left [])
|
||||||
|
Just (ALREADY_HAVE_PLUS uuids) -> return (Left uuids)
|
||||||
|
_ -> do
|
||||||
|
net $ sendMessage (ERROR "expected PUT_FROM or ALREADY_HAVE")
|
||||||
|
return (Left [])
|
||||||
|
|
||||||
data ServerHandler a
|
data ServerHandler a
|
||||||
= ServerGot a
|
= ServerGot a
|
||||||
| ServerContinue
|
| ServerContinue
|
||||||
|
@ -686,7 +704,7 @@ sendContent key af o offset@(Offset n) p = go =<< local (contentSize key)
|
||||||
else local $ readContent key af o offset $
|
else local $ readContent key af o offset $
|
||||||
sender (Len len)
|
sender (Len len)
|
||||||
-- Content not available to send. Indicate this by sending
|
-- Content not available to send. Indicate this by sending
|
||||||
-- empty data and indlicate it's invalid.
|
-- empty data and indicate it's invalid.
|
||||||
go Nothing = sender (Len 0) L.empty (return Invalid)
|
go Nothing = sender (Len 0) L.empty (return Invalid)
|
||||||
|
|
||||||
sender = sendContent' p'
|
sender = sendContent' p'
|
||||||
|
|
|
@ -276,7 +276,7 @@ Same as v3, except the JSON will not include "plusuuids".
|
||||||
|
|
||||||
### POST /git-annex/$uuid/v0/remove
|
### POST /git-annex/$uuid/v0/remove
|
||||||
|
|
||||||
Identival to v1.
|
Identical to v1.
|
||||||
|
|
||||||
## POST /git-annex/$uuid/v3/remove-before
|
## POST /git-annex/$uuid/v3/remove-before
|
||||||
|
|
||||||
|
@ -389,8 +389,8 @@ The body of the request is empty.
|
||||||
The server responds with a JSON object with an "offset" field that
|
The server responds with a JSON object with an "offset" field that
|
||||||
is the largest allowable offset.
|
is the largest allowable offset.
|
||||||
|
|
||||||
If the server already has the content of the key, it will respond with a
|
If the server already has the content of the key, it will respond instead
|
||||||
JSON object with an "alreadyhave" field that is set to true. This JSON
|
with a JSON object with an "alreadyhave" field that is set to true. This JSON
|
||||||
object may also have a field "plusuuids" that lists
|
object may also have a field "plusuuids" that lists
|
||||||
the UUIDs of other repositories where the content is stored, in addition to
|
the UUIDs of other repositories where the content is stored, in addition to
|
||||||
the serveruuid.
|
the serveruuid.
|
||||||
|
|
|
@ -28,14 +28,17 @@ Planned schedule of work:
|
||||||
|
|
||||||
## work notes
|
## work notes
|
||||||
|
|
||||||
* Implement: servePutOffset, serveLockContent
|
* servePutOffset crashes with "TMVar left full VALIDITY Invalid".
|
||||||
|
Seems nothing consumes the INVALID sent by the client, but why?
|
||||||
|
|
||||||
|
* Implement serveLockContent
|
||||||
|
|
||||||
* A Locker should expire the lock on its own after 10 minutes initially.
|
* A Locker should expire the lock on its own after 10 minutes initially.
|
||||||
|
|
||||||
* Since each held lock needs a connection to a proxy, the Locker
|
* Since each held lock needs a connection to a proxy, the Locker
|
||||||
could reference count, and avoid holding more than one lock per key.
|
could reference count, and avoid holding more than one lock per key.
|
||||||
|
|
||||||
* Make Remote.Git use http client when annex.url is configured.
|
* Make Remote.Git use http client when remote.name.annex-url is configured.
|
||||||
|
|
||||||
* Make http server support proxies and clusters.
|
* Make http server support proxies and clusters.
|
||||||
|
|
||||||
|
@ -51,6 +54,8 @@ Planned schedule of work:
|
||||||
|
|
||||||
* implemented server and client for HTTP P2P protocol
|
* implemented server and client for HTTP P2P protocol
|
||||||
|
|
||||||
|
* added git-annex p2phttp command to serve HTTP P2P protocol
|
||||||
|
|
||||||
## items deferred until later for [[design/passthrough_proxy]]
|
## items deferred until later for [[design/passthrough_proxy]]
|
||||||
|
|
||||||
* Check annex.diskreserve when proxying for special remotes
|
* Check annex.diskreserve when proxying for special remotes
|
||||||
|
|
Loading…
Reference in a new issue