fix: race condition in NodeStreamLoader (#19811)
* fix: race condition in NodeStreamLoader * nit: add comments
This commit is contained in:
parent
b7defaaf6a
commit
3f49f984e6
2 changed files with 24 additions and 4 deletions
|
@ -71,12 +71,18 @@ void NodeStreamLoader::Start(network::ResourceResponseHead head) {
|
||||||
base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK));
|
base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak, net::OK));
|
||||||
On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak,
|
On("error", base::BindRepeating(&NodeStreamLoader::NotifyComplete, weak,
|
||||||
net::ERR_FAILED));
|
net::ERR_FAILED));
|
||||||
On("readable", base::BindRepeating(&NodeStreamLoader::ReadMore, weak));
|
On("readable", base::BindRepeating(&NodeStreamLoader::NotifyReadable, weak));
|
||||||
|
}
|
||||||
|
|
||||||
|
void NodeStreamLoader::NotifyReadable() {
|
||||||
|
if (!readable_)
|
||||||
|
ReadMore();
|
||||||
|
readable_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void NodeStreamLoader::NotifyComplete(int result) {
|
void NodeStreamLoader::NotifyComplete(int result) {
|
||||||
// Wait until write finishes or fails.
|
// Wait until write finishes or fails.
|
||||||
if (is_writing_) {
|
if (is_reading_ || is_writing_) {
|
||||||
ended_ = true;
|
ended_ = true;
|
||||||
result_ = result;
|
result_ = result;
|
||||||
return;
|
return;
|
||||||
|
@ -87,19 +93,24 @@ void NodeStreamLoader::NotifyComplete(int result) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void NodeStreamLoader::ReadMore() {
|
void NodeStreamLoader::ReadMore() {
|
||||||
|
is_reading_ = true;
|
||||||
// buffer = emitter.read()
|
// buffer = emitter.read()
|
||||||
v8::MaybeLocal<v8::Value> ret = node::MakeCallback(
|
v8::MaybeLocal<v8::Value> ret = node::MakeCallback(
|
||||||
isolate_, emitter_.Get(isolate_), "read", 0, nullptr, {0, 0});
|
isolate_, emitter_.Get(isolate_), "read", 0, nullptr, {0, 0});
|
||||||
|
|
||||||
// If there is no buffer read, wait until |readable| is emitted again.
|
// If there is no buffer read, wait until |readable| is emitted again.
|
||||||
v8::Local<v8::Value> buffer;
|
v8::Local<v8::Value> buffer;
|
||||||
if (!ret.ToLocal(&buffer) || !node::Buffer::HasInstance(buffer))
|
if (!ret.ToLocal(&buffer) || !node::Buffer::HasInstance(buffer)) {
|
||||||
|
readable_ = false;
|
||||||
|
is_reading_ = false;
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Hold the buffer until the write is done.
|
// Hold the buffer until the write is done.
|
||||||
buffer_.Reset(isolate_, buffer);
|
buffer_.Reset(isolate_, buffer);
|
||||||
|
|
||||||
// Write buffer to mojo pipe asyncronously.
|
// Write buffer to mojo pipe asyncronously.
|
||||||
|
is_reading_ = false;
|
||||||
is_writing_ = true;
|
is_writing_ = true;
|
||||||
producer_->Write(
|
producer_->Write(
|
||||||
std::make_unique<mojo::StringDataSource>(
|
std::make_unique<mojo::StringDataSource>(
|
||||||
|
@ -118,7 +129,7 @@ void NodeStreamLoader::DidWrite(MojoResult result) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (result == MOJO_RESULT_OK)
|
if (result == MOJO_RESULT_OK && readable_)
|
||||||
ReadMore();
|
ReadMore();
|
||||||
else
|
else
|
||||||
NotifyComplete(net::ERR_FAILED);
|
NotifyComplete(net::ERR_FAILED);
|
||||||
|
|
|
@ -39,6 +39,7 @@ class NodeStreamLoader : public network::mojom::URLLoader {
|
||||||
using EventCallback = base::RepeatingCallback<void()>;
|
using EventCallback = base::RepeatingCallback<void()>;
|
||||||
|
|
||||||
void Start(network::ResourceResponseHead head);
|
void Start(network::ResourceResponseHead head);
|
||||||
|
void NotifyReadable();
|
||||||
void NotifyComplete(int result);
|
void NotifyComplete(int result);
|
||||||
void ReadMore();
|
void ReadMore();
|
||||||
void DidWrite(MojoResult result);
|
void DidWrite(MojoResult result);
|
||||||
|
@ -68,11 +69,19 @@ class NodeStreamLoader : public network::mojom::URLLoader {
|
||||||
// Whether we are in the middle of write.
|
// Whether we are in the middle of write.
|
||||||
bool is_writing_ = false;
|
bool is_writing_ = false;
|
||||||
|
|
||||||
|
// Whether we are in the middle of a stream.read().
|
||||||
|
bool is_reading_ = false;
|
||||||
|
|
||||||
// When NotifyComplete is called while writing, we will save the result and
|
// When NotifyComplete is called while writing, we will save the result and
|
||||||
// quit with it after the write is done.
|
// quit with it after the write is done.
|
||||||
bool ended_ = false;
|
bool ended_ = false;
|
||||||
int result_ = net::OK;
|
int result_ = net::OK;
|
||||||
|
|
||||||
|
// When the stream emits the readable event, we only want to start reading
|
||||||
|
// data if the stream was not readable before, so we store the state in a
|
||||||
|
// flag.
|
||||||
|
bool readable_ = false;
|
||||||
|
|
||||||
// Store the V8 callbacks to unsubscribe them later.
|
// Store the V8 callbacks to unsubscribe them later.
|
||||||
std::map<std::string, v8::Global<v8::Value>> handlers_;
|
std::map<std::string, v8::Global<v8::Value>> handlers_;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue