Clean up old messages, better handle errors from sending

This commit is contained in:
Scott Nonnenberg 2018-08-07 12:33:56 -07:00
parent bf63c7cc13
commit 727925a266
10 changed files with 232 additions and 66 deletions

View file

@ -25,6 +25,7 @@ module.exports = {
getAllMessageIds,
getMessagesBySentAt,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
getMessagesByConversation,
@ -122,21 +123,21 @@ async function updateToSchemaVersion1(currentVersion, instance) {
await instance.run(
`CREATE TABLE messages(
id STRING PRIMARY KEY ASC,
json TEXT,
id STRING PRIMARY KEY ASC,
json TEXT,
unread INTEGER,
expires_at INTEGER,
sent_at INTEGER,
schemaVersion INTEGER,
conversationId STRING,
received_at INTEGER,
source STRING,
sourceDevice STRING,
hasAttachments INTEGER,
hasFileAttachments INTEGER,
hasVisualMediaAttachments INTEGER
);`
unread INTEGER,
expires_at INTEGER,
sent_at INTEGER,
schemaVersion INTEGER,
conversationId STRING,
received_at INTEGER,
source STRING,
sourceDevice STRING,
hasAttachments INTEGER,
hasFileAttachments INTEGER,
hasVisualMediaAttachments INTEGER
);`
);
await instance.run(`CREATE INDEX messages_unread ON messages (
@ -196,7 +197,51 @@ async function updateToSchemaVersion1(currentVersion, instance) {
console.log('updateToSchemaVersion1: success!');
}
const SCHEMA_VERSIONS = [updateToSchemaVersion1];
async function updateToSchemaVersion2(currentVersion, instance) {
if (currentVersion >= 2) {
return;
}
console.log('updateToSchemaVersion2: starting...');
await instance.run('BEGIN TRANSACTION;');
await instance.run(
`ALTER TABLE messages
ADD COLUMN expireTimer INTEGER;`
);
await instance.run(
`ALTER TABLE messages
ADD COLUMN expirationStartTimestamp INTEGER;`
);
await instance.run(
`ALTER TABLE messages
ADD COLUMN type STRING;`
);
await instance.run(`CREATE INDEX messages_expiring ON messages (
expireTimer,
expirationStartTimestamp,
expires_at
);`);
await instance.run(
`UPDATE messages SET
expirationStartTimestamp = json_extract(json, '$.expirationStartTimestamp'),
expireTimer = json_extract(json, '$.expireTimer'),
type = json_extract(json, '$.type');`
);
await instance.run('PRAGMA schema_version = 2;');
await instance.run('COMMIT TRANSACTION;');
console.log('updateToSchemaVersion2: success!');
}
// const SCHEMA_VERSIONS = [updateToSchemaVersion1];
const SCHEMA_VERSIONS = [updateToSchemaVersion1, updateToSchemaVersion2];
async function updateSchema(instance) {
const sqliteVersion = await getSQLiteVersion(instance);
@ -289,15 +334,40 @@ async function saveMessage(data, { forceSave } = {}) {
sent_at,
source,
sourceDevice,
type,
unread,
expireTimer,
expirationStartTimestamp,
} = data;
const payload = {
$id: id,
$json: objectToJSON(data),
$conversationId: conversationId,
$expirationStartTimestamp: expirationStartTimestamp,
$expires_at: expires_at,
$expireTimer: expireTimer,
$hasAttachments: hasAttachments,
$hasFileAttachments: hasFileAttachments,
$hasVisualMediaAttachments: hasVisualMediaAttachments,
$received_at: received_at,
$schemaVersion: schemaVersion,
$sent_at: sent_at,
$source: source,
$sourceDevice: sourceDevice,
$type: type,
$unread: unread,
};
if (id && !forceSave) {
await db.run(
`UPDATE messages SET
json = $json,
conversationId = $conversationId,
expirationStartTimestamp = $expirationStartTimestamp,
expires_at = $expires_at,
expireTimer = $expireTimer,
hasAttachments = $hasAttachments,
hasFileAttachments = $hasFileAttachments,
hasVisualMediaAttachments = $hasVisualMediaAttachments,
@ -307,24 +377,10 @@ async function saveMessage(data, { forceSave } = {}) {
sent_at = $sent_at,
source = $source,
sourceDevice = $sourceDevice,
type = $type,
unread = $unread
WHERE id = $id;`,
{
$id: id,
$json: objectToJSON(data),
$conversationId: conversationId,
$expires_at: expires_at,
$hasAttachments: hasAttachments,
$hasFileAttachments: hasFileAttachments,
$hasVisualMediaAttachments: hasVisualMediaAttachments,
$received_at: received_at,
$schemaVersion: schemaVersion,
$sent_at: sent_at,
$source: source,
$sourceDevice: sourceDevice,
$unread: unread,
}
payload
);
return id;
@ -341,7 +397,9 @@ async function saveMessage(data, { forceSave } = {}) {
json,
conversationId,
expirationStartTimestamp,
expires_at,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
@ -350,13 +408,16 @@ async function saveMessage(data, { forceSave } = {}) {
sent_at,
source,
sourceDevice,
type,
unread
) values (
$id,
$json,
$conversationId,
$expirationStartTimestamp,
$expires_at,
$expireTimer,
$hasAttachments,
$hasFileAttachments,
$hasVisualMediaAttachments,
@ -365,23 +426,13 @@ async function saveMessage(data, { forceSave } = {}) {
$sent_at,
$source,
$sourceDevice,
$type,
$unread
);`,
{
...payload,
$id: toCreate.id,
$json: objectToJSON(toCreate),
$conversationId: conversationId,
$expires_at: expires_at,
$hasAttachments: hasAttachments,
$hasFileAttachments: hasFileAttachments,
$hasVisualMediaAttachments: hasVisualMediaAttachments,
$received_at: received_at,
$schemaVersion: schemaVersion,
$sent_at: sent_at,
$source: source,
$sourceDevice: sourceDevice,
$unread: unread,
}
);
@ -533,6 +584,23 @@ async function getExpiredMessages() {
return map(rows, row => jsonToObject(row.json));
}
async function getOutgoingWithoutExpiresAt() {
const rows = await db.all(`
SELECT json FROM messages
WHERE
(expireTimer IS NOT NULL AND expireTimer IS NOT 0) AND
type IS 'outgoing' AND
(expirationStartTimestamp IS NULL OR expires_at IS NULL)
ORDER BY expires_at ASC;
`);
if (!rows) {
return null;
}
return map(rows, row => jsonToObject(row.json));
}
async function getNextExpiringMessage() {
const rows = await db.all(`
SELECT json FROM messages

View file

@ -346,6 +346,49 @@
});
}
Views.Initialization.setMessage(window.i18n('optimizingApplication'));
window.log.info('Cleanup: starting...');
const messagesForCleanup = await window.Signal.Data.getOutgoingWithoutExpiresAt(
{
MessageCollection: Whisper.MessageCollection,
}
);
window.log.info(
`Cleanup: Found ${messagesForCleanup.length} messages for cleanup`
);
await Promise.all(
messagesForCleanup.map(async message => {
const delivered = message.get('delivered');
const sentAt = message.get('sent_at');
const expirationStartTimestamp = message.get(
'expirationStartTimestamp'
);
if (message.hasErrors()) {
return;
}
if (delivered) {
window.log.info(
`Cleanup: Starting timer for delivered message ${sentAt}`
);
message.set(
'expirationStartTimestamp',
expirationStartTimestamp || sentAt
);
await message.setToExpire();
return;
}
window.log.info(`Cleanup: Deleting unsent message ${sentAt}`);
await window.Signal.Data.removeMessage(message.id, {
Message: Whisper.Message,
});
})
);
window.log.info('Cleanup: complete');
Views.Initialization.setMessage(window.i18n('loading'));
// Note: We are not invoking the second set of IndexedDB migrations because it is

View file

@ -73,15 +73,25 @@
const deliveries = message.get('delivered') || 0;
const deliveredTo = message.get('delivered_to') || [];
const expirationStartTimestamp = message.get(
'expirationStartTimestamp'
);
message.set({
delivered_to: _.union(deliveredTo, [receipt.get('source')]),
delivered: deliveries + 1,
expirationStartTimestamp: expirationStartTimestamp || Date.now(),
sent: true,
});
await window.Signal.Data.saveMessage(message.attributes, {
Message: Whisper.Message,
});
if (message.isExpiring() && !expirationStartTimestamp) {
// This will save the message for us while starting the timer
await message.setToExpire();
} else {
await window.Signal.Data.saveMessage(message.attributes, {
Message: Whisper.Message,
});
}
// notify frontend listeners
const conversation = ConversationController.get(
message.get('conversationId')
@ -91,9 +101,6 @@
}
this.remove(receipt);
// TODO: consider keeping a list of numbers we've
// successfully delivered to?
} catch (error) {
window.log.error(
'DeliveryReceipts.onReceipt error:',

View file

@ -12,6 +12,7 @@
async function destroyExpiredMessages() {
try {
window.log.info('destroyExpiredMessages: Loading messages...');
const messages = await window.Signal.Data.getExpiredMessages({
MessageCollection: Whisper.MessageCollection,
});
@ -41,6 +42,7 @@
);
}
window.log.info('destroyExpiredMessages: complete');
checkExpiringMessages();
}

View file

@ -797,6 +797,18 @@
});
message.set({ id });
// We're offline!
if (!textsecure.messaging) {
const errors = this.contactCollection.map(contact => {
const error = new Error('Network is not available');
error.name = 'SendMessageNetworkError';
error.number = contact.id;
return error;
});
await message.saveErrors(errors);
return;
}
const conversationType = this.get('type');
const sendFunction = (() => {
switch (conversationType) {

View file

@ -36,15 +36,23 @@
return window.AccountJobs[number];
}
// eslint-disable-next-line more/no-then
const job = textsecure.messaging
.getProfile(number)
.then(() => {
window.AccountCache[number] = true;
})
.catch(() => {
let job;
if (textsecure.messaging) {
// eslint-disable-next-line more/no-then
job = textsecure.messaging
.getProfile(number)
.then(() => {
window.AccountCache[number] = true;
})
.catch(() => {
window.AccountCache[number] = false;
});
} else {
// We're offline!
job = Promise.resolve().then(() => {
window.AccountCache[number] = false;
});
}
window.AccountJobs[number] = job;
@ -661,6 +669,11 @@
// One caller today: event handler for the 'Retry Send' entry in triple-dot menu
async retrySend() {
if (!textsecure.messaging) {
window.log.error('retrySend: Cannot retry since we are offline!');
return null;
}
const [retries, errors] = _.partition(
this.get('errors'),
this.isReplayableError.bind(this)
@ -1140,7 +1153,10 @@
// This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for
// a message we already know about.
Whisper.ReadSyncs.notifyConversation(message);
const c = message.getConversation();
if (c) {
c.onReadMessage(message);
}
} else {
conversation.set(
'unreadCount',

View file

@ -49,6 +49,7 @@ module.exports = {
getAllMessageIds,
getMessagesBySentAt,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
getMessagesByConversation,
@ -319,11 +320,15 @@ async function getMessagesBySentAt(sentAt, { MessageCollection }) {
}
async function getExpiredMessages({ MessageCollection }) {
window.log.info('Load expired messages');
const messages = await channels.getExpiredMessages();
return new MessageCollection(messages);
}
async function getOutgoingWithoutExpiresAt({ MessageCollection }) {
const messages = await channels.getOutgoingWithoutExpiresAt();
return new MessageCollection(messages);
}
async function getNextExpiringMessage({ MessageCollection }) {
const messages = await channels.getNextExpiringMessage();
return new MessageCollection(messages);

View file

@ -74,14 +74,26 @@
}
const readBy = message.get('read_by') || [];
const expirationStartTimestamp = message.get(
'expirationStartTimestamp'
);
readBy.push(receipt.get('reader'));
message.set({ read_by: readBy });
await window.Signal.Data.saveMessage(message.attributes, {
Message: Whisper.Message,
message.set({
read_by: readBy,
expirationStartTimestamp: expirationStartTimestamp || Date.now(),
sent: true,
});
if (message.isExpiring() && !expirationStartTimestamp) {
// This will save the message for us while starting the timer
await message.setToExpire();
} else {
await window.Signal.Data.saveMessage(message.attributes, {
Message: Whisper.Message,
});
}
// notify frontend listeners
const conversation = ConversationController.get(
message.get('conversationId')

View file

@ -913,6 +913,7 @@ textsecure.MessageSender = function MessageSenderWrapper(
sender
);
this.sendMessageToNumber = sender.sendMessageToNumber.bind(sender);
this.sendMessage = sender.sendMessage.bind(sender);
this.resetSession = sender.resetSession.bind(sender);
this.sendMessageToGroup = sender.sendMessageToGroup.bind(sender);
this.createGroup = sender.createGroup.bind(sender);

View file

@ -161,8 +161,8 @@ export class MessageDetail extends React.Component<Props> {
</div>
<table className="module-message-detail__info">
<tbody>
{(errors || []).map(error => (
<tr>
{(errors || []).map((error, index) => (
<tr key={index}>
<td className="module-message-detail__label">
{i18n('error')}
</td>