2024-07-23 18:12:03 +00:00
|
|
|
{- P2P protocol over HTTP, server
|
|
|
|
-
|
|
|
|
- https://git-annex.branchable.com/design/p2p_protocol_over_http/
|
|
|
|
-
|
|
|
|
- Copyright 2024 Joey Hess <id@joeyh.name>
|
|
|
|
-
|
|
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
|
|
|
{-# LANGUAGE DataKinds #-}
|
|
|
|
{-# LANGUAGE TypeOperators #-}
|
|
|
|
{-# LANGUAGE TypeFamilies #-}
|
|
|
|
{-# LANGUAGE TypeApplications #-}
|
|
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
|
|
{-# LANGUAGE BangPatterns #-}
|
|
|
|
|
|
|
|
module P2P.Http.Server (
|
|
|
|
module P2P.Http,
|
|
|
|
module P2P.Http.Server,
|
|
|
|
module P2P.Http.Types,
|
|
|
|
module P2P.Http.State,
|
|
|
|
) where
|
|
|
|
|
|
|
|
import Annex.Common
|
|
|
|
import P2P.Http
|
|
|
|
import P2P.Http.Types
|
|
|
|
import P2P.Http.State
|
|
|
|
import P2P.Protocol hiding (Offset, Bypass, auth)
|
|
|
|
import P2P.IO
|
|
|
|
import P2P.Annex
|
|
|
|
import Annex.WorkerPool
|
|
|
|
import Types.WorkerPool
|
|
|
|
import Types.Direction
|
|
|
|
import Utility.Metered
|
2024-10-18 00:55:31 +00:00
|
|
|
import Utility.STM
|
2024-07-23 18:12:03 +00:00
|
|
|
|
|
|
|
import Servant
|
|
|
|
import qualified Servant.Types.SourceT as S
|
|
|
|
import qualified Data.ByteString as B
|
|
|
|
import qualified Data.ByteString.Lazy as L
|
|
|
|
import qualified Data.ByteString.Lazy.Internal as LI
|
|
|
|
import Control.Concurrent.Async
|
|
|
|
import Control.Concurrent
|
|
|
|
import System.IO.Unsafe
|
2024-07-26 19:50:01 +00:00
|
|
|
import Data.Either
|
2024-07-23 18:12:03 +00:00
|
|
|
|
|
|
|
p2pHttpApp :: P2PHttpServerState -> Application
|
|
|
|
p2pHttpApp = serve p2pHttpAPI . serveP2pHttp
|
|
|
|
|
|
|
|
serveP2pHttp :: P2PHttpServerState -> Server P2PHttpAPI
|
|
|
|
serveP2pHttp st
|
|
|
|
= serveGet st
|
|
|
|
:<|> serveGet st
|
|
|
|
:<|> serveGet st
|
|
|
|
:<|> serveGet st
|
2024-10-29 16:13:05 +00:00
|
|
|
:<|> serveGet st
|
|
|
|
:<|> serveCheckPresent st
|
2024-07-23 18:12:03 +00:00
|
|
|
:<|> serveCheckPresent st
|
|
|
|
:<|> serveCheckPresent st
|
|
|
|
:<|> serveCheckPresent st
|
|
|
|
:<|> serveCheckPresent st
|
|
|
|
:<|> serveRemove st id
|
|
|
|
:<|> serveRemove st id
|
2024-10-29 16:13:05 +00:00
|
|
|
:<|> serveRemove st id
|
2024-07-23 18:12:03 +00:00
|
|
|
:<|> serveRemove st dePlus
|
|
|
|
:<|> serveRemove st dePlus
|
|
|
|
:<|> serveRemoveBefore st
|
2024-10-29 16:13:05 +00:00
|
|
|
:<|> serveRemoveBefore st
|
|
|
|
:<|> serveGetTimestamp st
|
2024-07-23 18:12:03 +00:00
|
|
|
:<|> serveGetTimestamp st
|
|
|
|
:<|> servePut st id
|
|
|
|
:<|> servePut st id
|
2024-10-29 16:13:05 +00:00
|
|
|
:<|> servePut st id
|
2024-07-23 18:12:03 +00:00
|
|
|
:<|> servePut st dePlus
|
|
|
|
:<|> servePut st dePlus
|
|
|
|
:<|> servePutOffset st id
|
|
|
|
:<|> servePutOffset st id
|
2024-10-29 16:13:05 +00:00
|
|
|
:<|> servePutOffset st id
|
2024-07-23 18:12:03 +00:00
|
|
|
:<|> servePutOffset st dePlus
|
|
|
|
:<|> serveLockContent st
|
|
|
|
:<|> serveLockContent st
|
|
|
|
:<|> serveLockContent st
|
|
|
|
:<|> serveLockContent st
|
2024-10-29 16:13:05 +00:00
|
|
|
:<|> serveLockContent st
|
|
|
|
:<|> serveKeepLocked st
|
2024-07-23 18:12:03 +00:00
|
|
|
:<|> serveKeepLocked st
|
|
|
|
:<|> serveKeepLocked st
|
|
|
|
:<|> serveKeepLocked st
|
|
|
|
:<|> serveKeepLocked st
|
|
|
|
:<|> serveGetGeneric st
|
|
|
|
|
|
|
|
serveGetGeneric
|
|
|
|
:: P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> B64Key
|
2024-07-25 00:55:58 +00:00
|
|
|
-> Maybe (B64UUID ClientSide)
|
|
|
|
-> [B64UUID Bypass]
|
2024-07-23 18:12:03 +00:00
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
|
2024-07-25 00:55:58 +00:00
|
|
|
serveGetGeneric st su@(B64UUID u) k mcu bypass =
|
2024-07-23 18:12:03 +00:00
|
|
|
-- Use V0 because it does not alter the returned data to indicate
|
|
|
|
-- Invalid content.
|
2024-07-25 00:55:58 +00:00
|
|
|
serveGet st su V0 k (fromMaybe scu mcu) bypass Nothing Nothing
|
2024-07-23 18:12:03 +00:00
|
|
|
where
|
|
|
|
-- Reuse server UUID as client UUID.
|
2024-07-25 00:55:58 +00:00
|
|
|
scu = B64UUID u :: B64UUID ClientSide
|
2024-07-23 18:12:03 +00:00
|
|
|
|
|
|
|
serveGet
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> Maybe B64FilePath
|
|
|
|
-> Maybe Offset
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
|
|
|
|
serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
|
|
|
|
conn <- getP2PConnection apiver st cu su bypass sec auth ReadAction id
|
|
|
|
bsv <- liftIO newEmptyTMVarIO
|
|
|
|
endv <- liftIO newEmptyTMVarIO
|
|
|
|
validityv <- liftIO newEmptyTMVarIO
|
|
|
|
finalv <- liftIO newEmptyTMVarIO
|
|
|
|
annexworker <- liftIO $ async $ inAnnexWorker st $ do
|
2024-07-26 14:24:23 +00:00
|
|
|
let storer _offset len = sendContentWith $ \bs -> liftIO $ do
|
|
|
|
atomically $ putTMVar bsv (len, bs)
|
|
|
|
atomically $ takeTMVar endv
|
2024-07-26 16:49:06 +00:00
|
|
|
signalFullyConsumedByteString $
|
|
|
|
connOhdl $ serverP2PConnection conn
|
2024-07-23 18:12:03 +00:00
|
|
|
return $ \v -> do
|
|
|
|
liftIO $ atomically $ putTMVar validityv v
|
|
|
|
return True
|
2024-10-29 16:13:05 +00:00
|
|
|
let noothermessages = const Nothing
|
2024-07-23 18:12:03 +00:00
|
|
|
enteringStage (TransferStage Upload) $
|
|
|
|
runFullProto (clientRunState conn) (clientP2PConnection conn) $
|
|
|
|
void $ receiveContent Nothing nullMeterUpdate
|
2024-10-29 16:13:05 +00:00
|
|
|
sizer storer noothermessages getreq
|
2024-07-23 18:12:03 +00:00
|
|
|
void $ liftIO $ forkIO $ waitfinal endv finalv conn annexworker
|
|
|
|
(Len len, bs) <- liftIO $ atomically $ takeTMVar bsv
|
|
|
|
bv <- liftIO $ newMVar (filter (not . B.null) (L.toChunks bs))
|
|
|
|
szv <- liftIO $ newMVar 0
|
|
|
|
let streamer = S.SourceT $ \s -> s =<< return
|
|
|
|
(stream (bv, szv, len, endv, validityv, finalv))
|
|
|
|
return $ addHeader (DataLength len) streamer
|
|
|
|
where
|
|
|
|
stream (bv, szv, len, endv, validityv, finalv) =
|
|
|
|
S.fromActionStep B.null $
|
|
|
|
modifyMVar bv $ nextchunk szv $
|
|
|
|
checkvalidity szv len endv validityv finalv
|
|
|
|
|
|
|
|
nextchunk szv checkvalid (b:[]) = do
|
|
|
|
updateszv szv b
|
|
|
|
ifM checkvalid
|
|
|
|
( return ([], b)
|
|
|
|
-- The key's content is invalid, but
|
|
|
|
-- the amount of data is the same as the
|
|
|
|
-- DataLengthHeader indicated. Truncate
|
|
|
|
-- the response by one byte to indicate
|
|
|
|
-- to the client that it's not valid.
|
|
|
|
, return ([], B.take (B.length b - 1) b)
|
|
|
|
)
|
|
|
|
nextchunk szv _checkvalid (b:bs) = do
|
|
|
|
updateszv szv b
|
|
|
|
return (bs, b)
|
|
|
|
nextchunk _szv checkvalid [] = do
|
|
|
|
void checkvalid
|
|
|
|
-- Result ignored because 0 bytes of data are sent,
|
|
|
|
-- so even if the key is invalid, if that's the
|
|
|
|
-- amount of data that the DataLengthHeader indicates,
|
|
|
|
-- we've successfully served an empty key.
|
|
|
|
return ([], mempty)
|
|
|
|
|
|
|
|
updateszv szv b = modifyMVar szv $ \sz ->
|
|
|
|
let !sz' = sz + fromIntegral (B.length b)
|
|
|
|
in return (sz', ())
|
|
|
|
|
|
|
|
-- Returns False when the key's content is invalid, but the
|
|
|
|
-- amount of data sent was the same as indicated by the
|
|
|
|
-- DataLengthHeader.
|
|
|
|
checkvalidity szv len endv validityv finalv =
|
|
|
|
ifM (atomically $ isEmptyTMVar endv)
|
|
|
|
( do
|
|
|
|
atomically $ putTMVar endv ()
|
|
|
|
validity <- atomically $ takeTMVar validityv
|
|
|
|
sz <- takeMVar szv
|
|
|
|
atomically $ putTMVar finalv ()
|
2024-07-24 13:45:14 +00:00
|
|
|
atomically $ putTMVar endv ()
|
2024-07-23 18:12:03 +00:00
|
|
|
return $ case validity of
|
|
|
|
Nothing -> True
|
|
|
|
Just Valid -> True
|
|
|
|
Just Invalid -> sz /= len
|
|
|
|
, pure True
|
|
|
|
)
|
|
|
|
|
|
|
|
waitfinal endv finalv conn annexworker = do
|
|
|
|
-- Wait for everything to be transferred before
|
2024-07-26 19:50:01 +00:00
|
|
|
-- stopping the annexworker. The finalv will usually
|
2024-07-23 18:12:03 +00:00
|
|
|
-- be written to at the end. If the client disconnects
|
|
|
|
-- early that does not happen, so catch STM exception.
|
2024-07-26 19:50:01 +00:00
|
|
|
alltransferred <- isRight
|
|
|
|
<$> tryNonAsync (liftIO $ atomically $ takeTMVar finalv)
|
2024-07-23 18:12:03 +00:00
|
|
|
-- Make sure the annexworker is not left blocked on endv
|
|
|
|
-- if the client disconnected early.
|
|
|
|
void $ liftIO $ atomically $ tryPutTMVar endv ()
|
2024-07-26 19:50:01 +00:00
|
|
|
void $ tryNonAsync $ if alltransferred
|
|
|
|
then releaseP2PConnection conn
|
|
|
|
else closeP2PConnection conn
|
2024-07-26 19:25:15 +00:00
|
|
|
void $ tryNonAsync $ wait annexworker
|
2024-07-23 18:12:03 +00:00
|
|
|
|
|
|
|
sizer = pure $ Len $ case startat of
|
|
|
|
Just (Offset o) -> fromIntegral o
|
|
|
|
Nothing -> 0
|
|
|
|
|
2024-07-24 13:45:14 +00:00
|
|
|
getreq offset = P2P.Protocol.GET offset af k
|
2024-07-23 18:12:03 +00:00
|
|
|
|
2024-07-24 13:45:14 +00:00
|
|
|
af = ProtoAssociatedFile $ b64FilePathToAssociatedFile baf
|
2024-07-23 18:12:03 +00:00
|
|
|
|
|
|
|
serveCheckPresent
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler CheckPresentResult
|
|
|
|
serveCheckPresent st su apiver (B64Key k) cu bypass sec auth = do
|
|
|
|
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction id
|
|
|
|
$ \conn -> liftIO $ proxyClientNetProto conn $ checkPresent k
|
|
|
|
case res of
|
|
|
|
Right b -> return (CheckPresentResult b)
|
|
|
|
Left err -> throwError $ err500 { errBody = encodeBL err }
|
|
|
|
|
|
|
|
serveRemove
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> (RemoveResultPlus -> t)
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler t
|
|
|
|
serveRemove st resultmangle su apiver (B64Key k) cu bypass sec auth = do
|
|
|
|
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction id
|
|
|
|
$ \conn ->
|
|
|
|
liftIO $ proxyClientNetProto conn $ remove Nothing k
|
|
|
|
case res of
|
|
|
|
(Right b, plusuuids) -> return $ resultmangle $
|
|
|
|
RemoveResultPlus b (map B64UUID (fromMaybe [] plusuuids))
|
|
|
|
(Left err, _) -> throwError $
|
|
|
|
err500 { errBody = encodeBL err }
|
|
|
|
|
|
|
|
serveRemoveBefore
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> Timestamp
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler RemoveResultPlus
|
|
|
|
serveRemoveBefore st su apiver (B64Key k) cu bypass (Timestamp ts) sec auth = do
|
|
|
|
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction id
|
|
|
|
$ \conn ->
|
|
|
|
liftIO $ proxyClientNetProto conn $
|
|
|
|
removeBeforeRemoteEndTime ts k
|
|
|
|
case res of
|
|
|
|
(Right b, plusuuids) -> return $
|
|
|
|
RemoveResultPlus b (map B64UUID (fromMaybe [] plusuuids))
|
|
|
|
(Left err, _) -> throwError $
|
|
|
|
err500 { errBody = encodeBL err }
|
|
|
|
|
|
|
|
serveGetTimestamp
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler GetTimestampResult
|
|
|
|
serveGetTimestamp st su apiver cu bypass sec auth = do
|
|
|
|
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction id
|
|
|
|
$ \conn ->
|
|
|
|
liftIO $ proxyClientNetProto conn getTimestamp
|
|
|
|
case res of
|
|
|
|
Right ts -> return $ GetTimestampResult (Timestamp ts)
|
|
|
|
Left err -> throwError $
|
|
|
|
err500 { errBody = encodeBL err }
|
|
|
|
|
|
|
|
servePut
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> (PutResultPlus -> t)
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> DataLength
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> Maybe B64FilePath
|
|
|
|
-> Maybe Offset
|
|
|
|
-> S.SourceT IO B.ByteString
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler t
|
|
|
|
servePut st resultmangle su apiver (DataLength len) (B64Key k) cu bypass baf moffset stream sec auth = do
|
|
|
|
validityv <- liftIO newEmptyTMVarIO
|
|
|
|
let validitycheck = local $ runValidityCheck $
|
|
|
|
liftIO $ atomically $ readTMVar validityv
|
|
|
|
tooshortv <- liftIO newEmptyTMVarIO
|
|
|
|
content <- liftIO $ S.unSourceT stream (gather validityv tooshortv)
|
|
|
|
res <- withP2PConnection' apiver st cu su bypass sec auth WriteAction
|
cleanly close proxy connection on interrupted PUT
An interrupted PUT to cluster that has a node that is a special remote
over http left open the connection to the cluster, so the next request
opens another one. So did an interrupted PUT directly to the proxied
special remote over http.
proxySpecialRemote was stuck waiting for all the DATA. Its connection
remained open so it kept waiting.
In servePut, checktooshort handles closing the P2P connection
when too short a data is received from PUT. But, checktooshort was only
called after the protoaction, which is what runs the proxy, which is
what was getting stuck. Modified it to run as a background thread,
which waits for the tooshortv to be written to, which gather always does
once it gets to the end of the data received from the http client.
That makes proxyConnection's releaseconn run once all data is received
from the http client. Made it close the connection handles before
waiting on the asyncworker thread. This lets proxySpecialRemote finish
processing any data from the handle, and then it will give up,
more or less cleanly, if it didn't receive enough data.
I say "more or less cleanly" because with both sides of the P2P
connection taken down, some protocol unhappyness results. Which can lead
to some ugly debug messages. But also can cause the asyncworker thread
to throw an exception. So made withP2PConnections not crash when it
receives an exception from releaseconn.
This did have a small change to the behavior of an interrupted PUT when
proxying to a regular remote. proxyConnection has a protoerrorhandler
that closes the proxy connection on a protocol error. But the proxy
connection is also closed by checktooshort when it closes the P2P
connection. Closing the same proxy connection twice is not a problem,
it just results in duplicated debug messages about it.
2024-07-29 14:33:26 +00:00
|
|
|
(\cst -> cst { connectionWaitVar = False }) $ \conn -> do
|
|
|
|
liftIO $ void $ async $ checktooshort conn tooshortv
|
2024-07-23 18:12:03 +00:00
|
|
|
liftIO (protoaction conn content validitycheck)
|
|
|
|
case res of
|
|
|
|
Right (Right (Just plusuuids)) -> return $ resultmangle $
|
|
|
|
PutResultPlus True (map B64UUID plusuuids)
|
|
|
|
Right (Right Nothing) -> return $ resultmangle $
|
|
|
|
PutResultPlus False []
|
|
|
|
Right (Left protofail) -> throwError $
|
|
|
|
err500 { errBody = encodeBL (describeProtoFailure protofail) }
|
|
|
|
Left err -> throwError $
|
|
|
|
err500 { errBody = encodeBL (show err) }
|
|
|
|
where
|
|
|
|
protoaction conn content validitycheck = inAnnexWorker st $
|
|
|
|
enteringStage (TransferStage Download) $
|
|
|
|
runFullProto (clientRunState conn) (clientP2PConnection conn) $
|
|
|
|
protoaction' content validitycheck
|
|
|
|
|
|
|
|
protoaction' content validitycheck = put' k af $ \offset' ->
|
|
|
|
let offsetdelta = offset' - offset
|
|
|
|
in case compare offset' offset of
|
|
|
|
EQ -> sendContent' nullMeterUpdate (Len len)
|
|
|
|
content validitycheck
|
|
|
|
GT -> sendContent' nullMeterUpdate
|
|
|
|
(Len (len - fromIntegral offsetdelta))
|
|
|
|
(L.drop (fromIntegral offsetdelta) content)
|
|
|
|
validitycheck
|
|
|
|
LT -> sendContent' nullMeterUpdate
|
|
|
|
(Len len)
|
|
|
|
content
|
|
|
|
(validitycheck >>= \_ -> return Invalid)
|
|
|
|
|
|
|
|
offset = case moffset of
|
|
|
|
Just (Offset o) -> o
|
|
|
|
Nothing -> 0
|
|
|
|
|
2024-07-24 13:45:14 +00:00
|
|
|
af = b64FilePathToAssociatedFile baf
|
2024-07-23 18:12:03 +00:00
|
|
|
|
|
|
|
-- Streams the ByteString from the client. Avoids returning a longer
|
|
|
|
-- than expected ByteString by truncating to the expected length.
|
|
|
|
-- Returns a shorter than expected ByteString when the data is not
|
|
|
|
-- valid.
|
|
|
|
gather validityv tooshortv = unsafeInterleaveIO . go 0
|
|
|
|
where
|
|
|
|
go n S.Stop = do
|
|
|
|
atomically $ do
|
|
|
|
writeTMVar validityv $
|
|
|
|
if n == len then Valid else Invalid
|
|
|
|
writeTMVar tooshortv (n /= len)
|
|
|
|
return LI.Empty
|
|
|
|
go n (S.Error _err) = do
|
|
|
|
atomically $ do
|
|
|
|
writeTMVar validityv Invalid
|
|
|
|
writeTMVar tooshortv (n /= len)
|
|
|
|
return LI.Empty
|
|
|
|
go n (S.Skip s) = go n s
|
|
|
|
go n (S.Effect ms) = ms >>= go n
|
|
|
|
go n (S.Yield v s) =
|
|
|
|
let !n' = n + fromIntegral (B.length v)
|
|
|
|
in if n' > len
|
|
|
|
then do
|
|
|
|
atomically $ do
|
|
|
|
writeTMVar validityv Invalid
|
|
|
|
writeTMVar tooshortv True
|
|
|
|
return $ LI.Chunk
|
|
|
|
(B.take (fromIntegral (len - n')) v)
|
|
|
|
LI.Empty
|
|
|
|
else LI.Chunk v <$> unsafeInterleaveIO (go n' s)
|
|
|
|
|
|
|
|
-- The connection can no longer be used when too short a DATA has
|
|
|
|
-- been written to it.
|
cleanly close proxy connection on interrupted PUT
An interrupted PUT to cluster that has a node that is a special remote
over http left open the connection to the cluster, so the next request
opens another one. So did an interrupted PUT directly to the proxied
special remote over http.
proxySpecialRemote was stuck waiting for all the DATA. Its connection
remained open so it kept waiting.
In servePut, checktooshort handles closing the P2P connection
when too short a data is received from PUT. But, checktooshort was only
called after the protoaction, which is what runs the proxy, which is
what was getting stuck. Modified it to run as a background thread,
which waits for the tooshortv to be written to, which gather always does
once it gets to the end of the data received from the http client.
That makes proxyConnection's releaseconn run once all data is received
from the http client. Made it close the connection handles before
waiting on the asyncworker thread. This lets proxySpecialRemote finish
processing any data from the handle, and then it will give up,
more or less cleanly, if it didn't receive enough data.
I say "more or less cleanly" because with both sides of the P2P
connection taken down, some protocol unhappyness results. Which can lead
to some ugly debug messages. But also can cause the asyncworker thread
to throw an exception. So made withP2PConnections not crash when it
receives an exception from releaseconn.
This did have a small change to the behavior of an interrupted PUT when
proxying to a regular remote. proxyConnection has a protoerrorhandler
that closes the proxy connection on a protocol error. But the proxy
connection is also closed by checktooshort when it closes the P2P
connection. Closing the same proxy connection twice is not a problem,
it just results in duplicated debug messages about it.
2024-07-29 14:33:26 +00:00
|
|
|
checktooshort conn tooshortv = do
|
|
|
|
liftIO $ whenM (atomically $ takeTMVar tooshortv) $
|
2024-07-23 18:12:03 +00:00
|
|
|
closeP2PConnection conn
|
|
|
|
|
|
|
|
servePutOffset
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> (PutOffsetResultPlus -> t)
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler t
|
|
|
|
servePutOffset st resultmangle su apiver (B64Key k) cu bypass sec auth = do
|
|
|
|
res <- withP2PConnection apiver st cu su bypass sec auth WriteAction
|
|
|
|
(\cst -> cst { connectionWaitVar = False }) $ \conn ->
|
|
|
|
liftIO $ proxyClientNetProto conn $ getPutOffset k af
|
|
|
|
case res of
|
|
|
|
Right offset -> return $ resultmangle $
|
|
|
|
PutOffsetResultPlus (Offset offset)
|
|
|
|
Left plusuuids -> return $ resultmangle $
|
|
|
|
PutOffsetResultAlreadyHavePlus (map B64UUID plusuuids)
|
|
|
|
where
|
|
|
|
af = AssociatedFile Nothing
|
|
|
|
|
|
|
|
serveLockContent
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> B64Key
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Handler LockResult
|
|
|
|
serveLockContent st su apiver (B64Key k) cu bypass sec auth = do
|
2024-10-21 14:02:12 +00:00
|
|
|
conn <- getP2PConnection apiver st cu su bypass sec auth LockAction id
|
2024-07-23 18:12:03 +00:00
|
|
|
let lock = do
|
|
|
|
lockresv <- newEmptyTMVarIO
|
|
|
|
unlockv <- newEmptyTMVarIO
|
|
|
|
annexworker <- async $ inAnnexWorker st $ do
|
|
|
|
lockres <- runFullProto (clientRunState conn) (clientP2PConnection conn) $ do
|
|
|
|
net $ sendMessage (LOCKCONTENT k)
|
|
|
|
checkSuccess
|
|
|
|
liftIO $ atomically $ putTMVar lockresv lockres
|
|
|
|
liftIO $ atomically $ takeTMVar unlockv
|
|
|
|
void $ runFullProto (clientRunState conn) (clientP2PConnection conn) $ do
|
|
|
|
net $ sendMessage UNLOCKCONTENT
|
|
|
|
atomically (takeTMVar lockresv) >>= \case
|
|
|
|
Right True -> return (Just (annexworker, unlockv))
|
|
|
|
_ -> return Nothing
|
|
|
|
let unlock (annexworker, unlockv) = do
|
|
|
|
atomically $ putTMVar unlockv ()
|
|
|
|
void $ wait annexworker
|
|
|
|
releaseP2PConnection conn
|
|
|
|
liftIO $ mkLocker lock unlock >>= \case
|
|
|
|
Just (locker, lockid) -> do
|
|
|
|
liftIO $ storeLock lockid locker st
|
|
|
|
return $ LockResult True (Just lockid)
|
|
|
|
Nothing -> return $ LockResult False Nothing
|
|
|
|
|
|
|
|
serveKeepLocked
|
|
|
|
:: APIVersion v
|
|
|
|
=> P2PHttpServerState
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> v
|
|
|
|
-> LockID
|
|
|
|
-> Maybe (B64UUID ClientSide)
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> Maybe ConnectionKeepAlive
|
|
|
|
-> Maybe KeepAlive
|
|
|
|
-> S.SourceT IO UnlockRequest
|
|
|
|
-> Handler LockResult
|
|
|
|
serveKeepLocked st _su _apiver lckid _cu _bypass sec auth _ _ unlockrequeststream = do
|
2024-10-21 14:02:12 +00:00
|
|
|
checkAuthActionClass st sec auth LockAction $ \_ -> do
|
2024-07-24 18:25:40 +00:00
|
|
|
liftIO $ keepingLocked lckid st
|
2024-07-23 18:12:03 +00:00
|
|
|
_ <- liftIO $ S.unSourceT unlockrequeststream go
|
|
|
|
return (LockResult False Nothing)
|
|
|
|
where
|
|
|
|
go S.Stop = dropLock lckid st
|
|
|
|
go (S.Error _err) = dropLock lckid st
|
|
|
|
go (S.Skip s) = go s
|
|
|
|
go (S.Effect ms) = ms >>= go
|
|
|
|
go (S.Yield (UnlockRequest False) s) = go s
|
|
|
|
go (S.Yield (UnlockRequest True) _) = dropLock lckid st
|