Add p-queue timeouts; fix search crash; improve promise rejection logging

This commit is contained in:
Scott Nonnenberg 2020-09-18 13:40:41 -07:00 committed by Josh Perez
parent 9f9ce91a9c
commit bc3b61db1d
17 changed files with 54 additions and 26 deletions

View file

@ -19,11 +19,11 @@ let singleQueue = null;
let multipleQueue = null; let multipleQueue = null;
function makeNewSingleQueue() { function makeNewSingleQueue() {
singleQueue = new Queue({ concurrency: 1 }); singleQueue = new Queue({ concurrency: 1, timeout: 1000 * 60 * 2 });
return singleQueue; return singleQueue;
} }
function makeNewMultipleQueue() { function makeNewMultipleQueue() {
multipleQueue = new Queue({ concurrency: 10 }); multipleQueue = new Queue({ concurrency: 10, timeout: 1000 * 60 * 2 });
return multipleQueue; return multipleQueue;
} }

View file

@ -14,9 +14,13 @@
// eslint-disable-next-line func-names // eslint-disable-next-line func-names
(async function() { (async function() {
const eventHandlerQueue = new window.PQueue({ concurrency: 1 }); const eventHandlerQueue = new window.PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
});
Whisper.deliveryReceiptQueue = new window.PQueue({ Whisper.deliveryReceiptQueue = new window.PQueue({
concurrency: 1, concurrency: 1,
timeout: 1000 * 60 * 2,
}); });
Whisper.deliveryReceiptQueue.pause(); Whisper.deliveryReceiptQueue.pause();
Whisper.deliveryReceiptBatcher = window.Signal.Util.createBatcher({ Whisper.deliveryReceiptBatcher = window.Signal.Util.createBatcher({

View file

@ -142,7 +142,8 @@ window.onerror = (message, script, line, col, error) => {
}; };
window.addEventListener('unhandledrejection', rejectionEvent => { window.addEventListener('unhandledrejection', rejectionEvent => {
window.log.error( const error = rejectionEvent.reason;
`Top-level unhandled promise rejection: ${rejectionEvent.reason}` const errorString =
); error && error.stack ? error.stack : JSON.stringify(error);
window.log.error(`Top-level unhandled promise rejection: ${errorString}`);
}); });

View file

@ -225,8 +225,9 @@
getPropsForSearchResult() { getPropsForSearchResult() {
const sourceId = this.getContactId(); const sourceId = this.getContactId();
const from = this.findAndFormatContact(sourceId); const from = this.findAndFormatContact(sourceId);
const convo = this.getConversation();
const to = this.findAndFormatContact(convo.get('id')); const conversationId = this.get('conversationId');
const to = this.findAndFormatContact(conversationId);
return { return {
from, from,
@ -235,7 +236,7 @@
isSelected: this.isSelected, isSelected: this.isSelected,
id: this.id, id: this.id,
conversationId: this.get('conversationId'), conversationId,
sentAt: this.get('sent_at'), sentAt: this.get('sent_at'),
snippet: this.get('snippet'), snippet: this.get('snippet'),
}; };

View file

@ -75,7 +75,7 @@ module.exports = {
let initialState = null; let initialState = null;
let packsToDownload = null; let packsToDownload = null;
const downloadQueue = new Queue({ concurrency: 1 }); const downloadQueue = new Queue({ concurrency: 1, timeout: 1000 * 60 * 2 });
async function load() { async function load() {
const [packs, recentStickers] = await Promise.all([ const [packs, recentStickers] = await Promise.all([
@ -336,6 +336,7 @@ async function removeEphemeralPack(packId) {
const paths = stickers.map(sticker => sticker.path); const paths = stickers.map(sticker => sticker.path);
await pMap(paths, Signal.Migrations.deleteTempFile, { await pMap(paths, Signal.Migrations.deleteTempFile, {
concurrency: 3, concurrency: 3,
timeout: 1000 * 60 * 2,
}); });
// Remove it from database in case it made it there // Remove it from database in case it made it there
@ -429,7 +430,10 @@ async function downloadEphemeralPack(packId, packKey) {
await downloadStickerJob(coverProto); await downloadStickerJob(coverProto);
// Then the rest // Then the rest
await pMap(nonCoverStickers, downloadStickerJob, { concurrency: 3 }); await pMap(nonCoverStickers, downloadStickerJob, {
concurrency: 3,
timeout: 1000 * 60 * 2,
});
} catch (error) { } catch (error) {
// Because the user could install this pack while we are still downloading this // Because the user could install this pack while we are still downloading this
// ephemeral pack, we don't want to go change its status unless we're still in // ephemeral pack, we don't want to go change its status unless we're still in
@ -610,7 +614,10 @@ async function doDownloadStickerPack(packId, packKey, options = {}) {
await downloadStickerJob(coverProto); await downloadStickerJob(coverProto);
// Then the rest // Then the rest
await pMap(nonCoverStickers, downloadStickerJob, { concurrency: 3 }); await pMap(nonCoverStickers, downloadStickerJob, {
concurrency: 3,
timeout: 1000 * 60 * 2,
});
// Allow for the user marking this pack as installed in the middle of our download; // Allow for the user marking this pack as installed in the middle of our download;
// don't overwrite that status. // don't overwrite that status.
@ -724,6 +731,7 @@ async function deletePackReference(messageId, packId) {
await pMap(paths, Signal.Migrations.deleteSticker, { await pMap(paths, Signal.Migrations.deleteSticker, {
concurrency: 3, concurrency: 3,
timeout: 1000 * 60 * 2,
}); });
} }
@ -743,5 +751,6 @@ async function deletePack(packId) {
await pMap(paths, Signal.Migrations.deleteSticker, { await pMap(paths, Signal.Migrations.deleteSticker, {
concurrency: 3, concurrency: 3,
timeout: 1000 * 60 * 2,
}); });
} }

View file

@ -25175,7 +25175,7 @@ var jobQueue = {};
Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJob) { Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJob) {
if (window.PQueue) { if (window.PQueue) {
jobQueue[number] = jobQueue[number] || new window.PQueue({ concurrency: 1 }); jobQueue[number] = jobQueue[number] || new window.PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
var queue = jobQueue[number]; var queue = jobQueue[number];
return queue.add(runJob); return queue.add(runJob);
} }

View file

@ -1433,7 +1433,7 @@ async function ensureFilePermissions(onlyFiles) {
console.log(`Ensuring file permissions for ${files.length} files`); console.log(`Ensuring file permissions for ${files.length} files`);
// Touch each file in a queue // Touch each file in a queue
const q = new PQueue({ concurrency: 5 }); const q = new PQueue({ concurrency: 5, timeout: 1000 * 60 * 2 });
q.addAll( q.addAll(
files.map(f => async () => { files.map(f => async () => {
const isDir = f.endsWith('/'); const isDir = f.endsWith('/');

View file

@ -83,7 +83,10 @@ async function main() {
dependencyNames, dependencyNames,
getMarkdownForDependency, getMarkdownForDependency,
// Without this, we may run into "too many open files" errors. // Without this, we may run into "too many open files" errors.
{ concurrency: 100 } {
concurrency: 100,
timeout: 1000 * 60 * 2,
}
); );
const unformattedOutput = [ const unformattedOutput = [

View file

@ -11,7 +11,7 @@ import { stickersDuck } from '../store';
import { DropZone, Props as DropZoneProps } from '../elements/DropZone'; import { DropZone, Props as DropZoneProps } from '../elements/DropZone';
import { convertToWebp } from '../util/preload'; import { convertToWebp } from '../util/preload';
const queue = new PQueue({ concurrency: 3 }); const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 });
const SmartStickerFrame = SortableElement( const SmartStickerFrame = SortableElement(
({ id, showGuide, mode }: StickerFrameProps) => { ({ id, showGuide, mode }: StickerFrameProps) => {

View file

@ -133,7 +133,10 @@ window.encryptAndUpload = async (
const encryptedStickers = await pMap( const encryptedStickers = await pMap(
uniqueStickers, uniqueStickers,
({ webp }) => encrypt(webp.buffer, encryptionKey, iv), ({ webp }) => encrypt(webp.buffer, encryptionKey, iv),
{ concurrency: 3 } {
concurrency: 3,
timeout: 1000 * 60 * 2,
}
); );
const packId = await server.putStickers( const packId = await server.putStickers(

View file

@ -92,7 +92,7 @@ const makeImagePath = (src: string) => {
return `${ROOT_PATH}node_modules/emoji-datasource-apple/img/apple/64/${src}`; return `${ROOT_PATH}node_modules/emoji-datasource-apple/img/apple/64/${src}`;
}; };
const imageQueue = new PQueue({ concurrency: 10 }); const imageQueue = new PQueue({ concurrency: 10, timeout: 1000 * 60 * 2 });
const images = new Set(); const images = new Set();
export const preloadImages = async (): Promise<void> => { export const preloadImages = async (): Promise<void> => {

View file

@ -1347,7 +1347,11 @@ async function updateToSchemaVersion20(
await instance.run('BEGIN TRANSACTION;'); await instance.run('BEGIN TRANSACTION;');
try { try {
const migrationJobQueue = new PQueue({ concurrency: 10 }); const migrationJobQueue = new PQueue({
concurrency: 10,
timeout: 1000 * 60 * 5,
throwOnTimeout: true,
});
// The triggers on the messages table slow down this migration // The triggers on the messages table slow down this migration
// significantly, so we drop them and recreate them later. // significantly, so we drop them and recreate them later.
// Drop triggers // Drop triggers

View file

@ -147,9 +147,9 @@ class MessageReceiverInner extends EventTarget {
10 10
); );
this.incomingQueue = new PQueue({ concurrency: 1 }); this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.pendingQueue = new PQueue({ concurrency: 1 }); this.pendingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.appQueue = new PQueue({ concurrency: 1 }); this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.cacheAddBatcher = createBatcher<CacheAddItemType>({ this.cacheAddBatcher = createBatcher<CacheAddItemType>({
wait: 200, wait: 200,

View file

@ -1677,7 +1677,7 @@ export function initialize({
}); });
// Upload stickers // Upload stickers
const queue = new PQueue({ concurrency: 3 }); const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 });
await Promise.all( await Promise.all(
stickers.map(async (sticker: ServerAttachmentType, index: number) => { stickers.map(async (sticker: ServerAttachmentType, index: number) => {
const stickerParams = makePutParams( const stickerParams = makePutParams(

View file

@ -39,7 +39,7 @@ export function createBatcher<ItemType>(
let batcher: BatcherType<ItemType>; let batcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null; let timeout: NodeJS.Timeout | null;
let items: Array<ItemType> = []; let items: Array<ItemType> = [];
const queue = new PQueue({ concurrency: 1 }); const queue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
function _kickBatchOff() { function _kickBatchOff() {
const itemsRef = items; const itemsRef = items;

View file

@ -1,7 +1,10 @@
import PQueue from 'p-queue'; import PQueue from 'p-queue';
import { Sound } from './Sound'; import { Sound } from './Sound';
const ringtoneEventQueue = new PQueue({ concurrency: 1 }); const ringtoneEventQueue = new PQueue({
concurrency: 1,
timeout: 1000 * 60 * 2,
});
class CallingTones { class CallingTones {
private ringtone?: Sound; private ringtone?: Sound;

View file

@ -50,7 +50,7 @@ export function createWaitBatcher<ItemType>(
let waitBatcher: BatcherType<ItemType>; let waitBatcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null; let timeout: NodeJS.Timeout | null;
let items: Array<ItemHolderType<ItemType>> = []; let items: Array<ItemHolderType<ItemType>> = [];
const queue = new PQueue({ concurrency: 1 }); const queue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
function _kickBatchOff() { function _kickBatchOff() {
const itemsRef = items; const itemsRef = items;