Async refresh call links on CallLinkUpdate sync
This commit is contained in:
parent
cd2bb537fa
commit
32485abf06
5 changed files with 45 additions and 56 deletions
|
@ -24,6 +24,8 @@ const DEFAULT_SLEEP_TIME = 20 * SECOND;
|
||||||
|
|
||||||
const callLinkRefreshJobDataSchema = z.object({
|
const callLinkRefreshJobDataSchema = z.object({
|
||||||
roomId: z.string(),
|
roomId: z.string(),
|
||||||
|
deleteLocallyIfMissingOnCallingServer: z.boolean(),
|
||||||
|
source: z.string(),
|
||||||
});
|
});
|
||||||
|
|
||||||
export type CallLinkRefreshJobData = z.infer<
|
export type CallLinkRefreshJobData = z.infer<
|
||||||
|
@ -54,8 +56,8 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> {
|
||||||
}: Readonly<{ data: CallLinkRefreshJobData; timestamp: number }>,
|
}: Readonly<{ data: CallLinkRefreshJobData; timestamp: number }>,
|
||||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||||
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
|
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
|
||||||
const { roomId } = data;
|
const { roomId, deleteLocallyIfMissingOnCallingServer, source } = data;
|
||||||
const logId = `callLinkRefreshJobQueue(${roomId}).run`;
|
const logId = `callLinkRefreshJobQueue(${roomId}, source=${source}).run`;
|
||||||
log.info(`${logId}: Starting`);
|
log.info(`${logId}: Starting`);
|
||||||
|
|
||||||
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
|
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
|
||||||
|
@ -89,9 +91,9 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> {
|
||||||
};
|
};
|
||||||
await DataWriter.updateCallLinkState(roomId, freshCallLinkState);
|
await DataWriter.updateCallLinkState(roomId, freshCallLinkState);
|
||||||
window.reduxActions.calling.handleCallLinkUpdateLocal(callLink);
|
window.reduxActions.calling.handleCallLinkUpdateLocal(callLink);
|
||||||
} else {
|
} else if (deleteLocallyIfMissingOnCallingServer) {
|
||||||
log.info(
|
log.info(
|
||||||
`${logId}: Call link not found on server, deleting local call link`
|
`${logId}: Call link not found on server and deleteLocallyIfMissingOnCallingServer; deleting local call link`
|
||||||
);
|
);
|
||||||
// This will leave a storage service record, and it's up to primary to delete it
|
// This will leave a storage service record, and it's up to primary to delete it
|
||||||
await DataWriter.beginDeleteCallLink(roomId, {
|
await DataWriter.beginDeleteCallLink(roomId, {
|
||||||
|
@ -99,6 +101,8 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> {
|
||||||
});
|
});
|
||||||
await DataWriter.finalizeDeleteCallLink(roomId);
|
await DataWriter.finalizeDeleteCallLink(roomId);
|
||||||
window.reduxActions.calling.handleCallLinkDelete({ roomId });
|
window.reduxActions.calling.handleCallLinkDelete({ roomId });
|
||||||
|
} else {
|
||||||
|
log.info(`${logId}: Call link not found on server, ignoring`);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
error = err;
|
error = err;
|
||||||
|
|
|
@ -2025,7 +2025,15 @@ export async function mergeCallLinkRecord(
|
||||||
DataWriter.saveCallHistory(callHistory),
|
DataWriter.saveCallHistory(callHistory),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
drop(callLinkRefreshJobQueue.add({ roomId: callLink.roomId }));
|
// The local DB record is a placeholder until confirmed refreshed. If it's gone from
|
||||||
|
// the calling server then delete the local record.
|
||||||
|
drop(
|
||||||
|
callLinkRefreshJobQueue.add({
|
||||||
|
roomId: callLink.roomId,
|
||||||
|
deleteLocallyIfMissingOnCallingServer: true,
|
||||||
|
source: 'storage.mergeCallLinkRecord',
|
||||||
|
})
|
||||||
|
);
|
||||||
window.reduxActions.callHistory.addCallHistory(callHistory);
|
window.reduxActions.callHistory.addCallHistory(callHistory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -107,6 +107,7 @@ import type { StartCallData } from '../../components/ConfirmLeaveCallModal';
|
||||||
import { getCallLinksByRoomId } from '../selectors/calling';
|
import { getCallLinksByRoomId } from '../selectors/calling';
|
||||||
import { storageServiceUploadJob } from '../../services/storage';
|
import { storageServiceUploadJob } from '../../services/storage';
|
||||||
import { CallLinkDeleteManager } from '../../jobs/CallLinkDeleteManager';
|
import { CallLinkDeleteManager } from '../../jobs/CallLinkDeleteManager';
|
||||||
|
import { callLinkRefreshJobQueue } from '../../jobs/callLinkRefreshJobQueue';
|
||||||
|
|
||||||
// State
|
// State
|
||||||
|
|
||||||
|
@ -1508,33 +1509,12 @@ function handleCallLinkUpdate(
|
||||||
const roomId = getRoomIdFromRootKey(callLinkRootKey);
|
const roomId = getRoomIdFromRootKey(callLinkRootKey);
|
||||||
const logId = `handleCallLinkUpdate(${roomId})`;
|
const logId = `handleCallLinkUpdate(${roomId})`;
|
||||||
|
|
||||||
const freshCallLinkState = await calling.readCallLink(callLinkRootKey);
|
|
||||||
const existingCallLink = await DataReader.getCallLinkByRoomId(roomId);
|
const existingCallLink = await DataReader.getCallLinkByRoomId(roomId);
|
||||||
|
|
||||||
// Only give up when server confirms the call link is gone. If we fail to fetch
|
|
||||||
// state due to unexpected errors, continue to save rootKey and adminKey.
|
|
||||||
if (freshCallLinkState == null) {
|
|
||||||
log.info(`${logId}: Call link not found on server`);
|
|
||||||
if (!existingCallLink) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the call link is gone remotely (for example if it expired on the server),
|
|
||||||
// then delete local call link.
|
|
||||||
log.info(`${logId}: Deleting existing call link`);
|
|
||||||
await DataWriter.beginDeleteCallLink(roomId, {
|
|
||||||
storageNeedsSync: true,
|
|
||||||
});
|
|
||||||
storageServiceUploadJob();
|
|
||||||
handleCallLinkDelete({ roomId });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const callLink: CallLinkType = {
|
const callLink: CallLinkType = {
|
||||||
...CALL_LINK_DEFAULT_STATE,
|
...CALL_LINK_DEFAULT_STATE,
|
||||||
storageNeedsSync: false,
|
storageNeedsSync: false,
|
||||||
...existingCallLink,
|
...existingCallLink,
|
||||||
...freshCallLinkState,
|
|
||||||
roomId,
|
roomId,
|
||||||
rootKey,
|
rootKey,
|
||||||
adminKey,
|
adminKey,
|
||||||
|
@ -1544,21 +1524,16 @@ function handleCallLinkUpdate(
|
||||||
|
|
||||||
if (existingCallLink) {
|
if (existingCallLink) {
|
||||||
if (adminKey && adminKey !== existingCallLink.adminKey) {
|
if (adminKey && adminKey !== existingCallLink.adminKey) {
|
||||||
|
log.info(`${logId}: Updating existing call link with new adminKey`);
|
||||||
await DataWriter.updateCallLinkAdminKeyByRoomId(roomId, adminKey);
|
await DataWriter.updateCallLinkAdminKeyByRoomId(roomId, adminKey);
|
||||||
log.info(`${logId}: Updated existing call link with new adminKey`);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (freshCallLinkState) {
|
|
||||||
await DataWriter.updateCallLinkState(roomId, freshCallLinkState);
|
|
||||||
log.info(`${logId}: Updated existing call link state`);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
log.info(`${logId}: Saving new call link`);
|
||||||
await DataWriter.insertCallLink(callLink);
|
await DataWriter.insertCallLink(callLink);
|
||||||
if (adminKey != null) {
|
if (adminKey != null) {
|
||||||
callHistory = toCallHistoryFromUnusedCallLink(callLink);
|
callHistory = toCallHistoryFromUnusedCallLink(callLink);
|
||||||
await DataWriter.saveCallHistory(callHistory);
|
await DataWriter.saveCallHistory(callHistory);
|
||||||
}
|
}
|
||||||
log.info(`${logId}: Saved new call link`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatch({
|
dispatch({
|
||||||
|
@ -1569,6 +1544,16 @@ function handleCallLinkUpdate(
|
||||||
if (callHistory != null) {
|
if (callHistory != null) {
|
||||||
dispatch(addCallHistory(callHistory));
|
dispatch(addCallHistory(callHistory));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Schedule async refresh. It's possible to get a big batch of sync messages.
|
||||||
|
// This job will throttle requests to the calling server.
|
||||||
|
drop(
|
||||||
|
callLinkRefreshJobQueue.add({
|
||||||
|
roomId: callLink.roomId,
|
||||||
|
deleteLocallyIfMissingOnCallingServer: false,
|
||||||
|
source: 'handleCallLinkUpdate',
|
||||||
|
})
|
||||||
|
);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,8 @@ import {
|
||||||
getCallLinkState,
|
getCallLinkState,
|
||||||
} from '../../../test-both/helpers/fakeCallLink';
|
} from '../../../test-both/helpers/fakeCallLink';
|
||||||
import { strictAssert } from '../../../util/assert';
|
import { strictAssert } from '../../../util/assert';
|
||||||
|
import { callLinkRefreshJobQueue } from '../../../jobs/callLinkRefreshJobQueue';
|
||||||
|
import { CALL_LINK_DEFAULT_STATE } from '../../../util/callLinks';
|
||||||
|
|
||||||
const ACI_1 = generateAci();
|
const ACI_1 = generateAci();
|
||||||
const NOW = new Date('2020-01-23T04:56:00.000');
|
const NOW = new Date('2020-01-23T04:56:00.000');
|
||||||
|
@ -1427,20 +1429,13 @@ describe('calling duck', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('handleCallLinkUpdate', () => {
|
describe('handleCallLinkUpdate', () => {
|
||||||
const {
|
const { roomId, rootKey, adminKey } = FAKE_CALL_LINK;
|
||||||
roomId,
|
|
||||||
name,
|
|
||||||
restrictions,
|
|
||||||
expiration,
|
|
||||||
revoked,
|
|
||||||
rootKey,
|
|
||||||
adminKey,
|
|
||||||
} = FAKE_CALL_LINK;
|
|
||||||
|
|
||||||
beforeEach(function (this: Mocha.Context) {
|
beforeEach(function (this: Mocha.Context) {
|
||||||
this.callingServiceReadCallLink = this.sandbox
|
this.callLinkRefreshJobQueueAdd = this.sandbox.stub(
|
||||||
.stub(callingService, 'readCallLink')
|
callLinkRefreshJobQueue,
|
||||||
.resolves(getCallLinkState(FAKE_CALL_LINK));
|
'add'
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
const doAction = async (
|
const doAction = async (
|
||||||
|
@ -1452,10 +1447,10 @@ describe('calling duck', () => {
|
||||||
return { dispatch };
|
return { dispatch };
|
||||||
};
|
};
|
||||||
|
|
||||||
it('reads the call link from calling service', async function (this: Mocha.Context) {
|
it('queues call link refresh', async function (this: Mocha.Context) {
|
||||||
await doAction({ rootKey, adminKey: null });
|
await doAction({ rootKey, adminKey: null });
|
||||||
|
|
||||||
sinon.assert.calledOnce(this.callingServiceReadCallLink);
|
sinon.assert.calledOnce(this.callLinkRefreshJobQueueAdd);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('dispatches HANDLE_CALL_LINK_UPDATE', async () => {
|
it('dispatches HANDLE_CALL_LINK_UPDATE', async () => {
|
||||||
|
@ -1466,10 +1461,7 @@ describe('calling duck', () => {
|
||||||
type: 'calling/HANDLE_CALL_LINK_UPDATE',
|
type: 'calling/HANDLE_CALL_LINK_UPDATE',
|
||||||
payload: {
|
payload: {
|
||||||
callLink: {
|
callLink: {
|
||||||
name,
|
...CALL_LINK_DEFAULT_STATE,
|
||||||
restrictions,
|
|
||||||
expiration,
|
|
||||||
revoked,
|
|
||||||
roomId,
|
roomId,
|
||||||
rootKey,
|
rootKey,
|
||||||
adminKey,
|
adminKey,
|
||||||
|
@ -1490,10 +1482,7 @@ describe('calling duck', () => {
|
||||||
type: 'calling/HANDLE_CALL_LINK_UPDATE',
|
type: 'calling/HANDLE_CALL_LINK_UPDATE',
|
||||||
payload: {
|
payload: {
|
||||||
callLink: {
|
callLink: {
|
||||||
name,
|
...CALL_LINK_DEFAULT_STATE,
|
||||||
restrictions,
|
|
||||||
expiration,
|
|
||||||
revoked,
|
|
||||||
roomId,
|
roomId,
|
||||||
rootKey,
|
rootKey,
|
||||||
adminKey: 'banana',
|
adminKey: 'banana',
|
||||||
|
|
|
@ -17,7 +17,10 @@ import {
|
||||||
} from '../types/CallDisposition';
|
} from '../types/CallDisposition';
|
||||||
import { DAY } from './durations';
|
import { DAY } from './durations';
|
||||||
|
|
||||||
export const CALL_LINK_DEFAULT_STATE: Partial<CallLinkType> = {
|
export const CALL_LINK_DEFAULT_STATE: Pick<
|
||||||
|
CallLinkType,
|
||||||
|
'name' | 'restrictions' | 'revoked' | 'expiration' | 'storageNeedsSync'
|
||||||
|
> = {
|
||||||
name: '',
|
name: '',
|
||||||
restrictions: CallLinkRestrictions.Unknown,
|
restrictions: CallLinkRestrictions.Unknown,
|
||||||
revoked: false,
|
revoked: false,
|
||||||
|
|
Loading…
Add table
Reference in a new issue