Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow reusing call instances after leaving #1433

Merged
merged 4 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 100 additions & 78 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ import {
SpeakerManager,
} from './devices';
import { getSdkSignature } from './stats/utils';
import { withoutConcurrency } from './helpers/concurrency';

/**
* An object representation of a `Call`.
Expand Down Expand Up @@ -224,6 +225,8 @@ export class Call {
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private isLeaving = false;
private initialized = false;
private readonly joinLeaveConcurrencyTag = Symbol('joinLeaveConcurrencyTag');

/**
* A list hooks/functions to invoke when the call is left.
Expand Down Expand Up @@ -276,35 +279,51 @@ export class Call {
ringing ? CallingState.RINGING : CallingState.IDLE,
);

this.on('all', (event) => {
// update state with the latest event data
this.state.updateFromEvent(event);
});

this.leaveCallHooks.add(
registerEventHandlers(this, this.state, this.dispatcher),
);
this.registerEffects();

this.leaveCallHooks.add(
createSubscription(
this.trackSubscriptionsSubject.pipe(
debounce((v) => timer(v.type)),
map((v) => v.data),
),
(subscriptions) =>
this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => {
this.logger('debug', `Failed to update track subscriptions`, err);
}),
),
);

this.camera = new CameraManager(this);
this.microphone = new MicrophoneManager(this);
this.speaker = new SpeakerManager(this);
this.screenShare = new ScreenShareManager(this);
}

private async setup() {
await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prevents a race condition with leave. It's important to wait for leave to finish before we set everything back up again. See the test case here: https://github.com/GetStream/stream-video-js/pull/1433/files#diff-912706ed8d680e55d746f9765aa5090d64771196a9894262bed3e0b73d9a8c97R92-R110

if (this.initialized) {
return;
}

this.leaveCallHooks.add(
this.on('all', (event) => {
// update state with the latest event data
this.state.updateFromEvent(event);
}),
);

this.leaveCallHooks.add(
registerEventHandlers(this, this.state, this.dispatcher),
);
this.registerEffects();

this.leaveCallHooks.add(
createSubscription(
this.trackSubscriptionsSubject.pipe(
debounce((v) => timer(v.type)),
map((v) => v.data),
),
(subscriptions) =>
this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => {
this.logger('debug', `Failed to update track subscriptions`, err);
}),
),
);

if (this.state.callingState === CallingState.LEFT) {
this.state.setCallingState(CallingState.IDLE);
}

this.initialized = true;
});
}

private registerEffects() {
this.leaveCallHooks.add(
// handles updating the permissions context when the settings change.
Expand Down Expand Up @@ -483,74 +502,74 @@ export class Call {
reject = false,
reason = 'user is leaving the call',
}: CallLeaveOptions = {}) => {
const callingState = this.state.callingState;
if (callingState === CallingState.LEFT) {
throw new Error('Cannot leave call that has already been left.');
}

if (callingState === CallingState.JOINING) {
await this.assertCallJoined();
}

this.isLeaving = true;
await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => {
const callingState = this.state.callingState;
if (callingState === CallingState.LEFT) {
throw new Error('Cannot leave call that has already been left.');
}

if (this.ringing) {
// I'm the one who started the call, so I should cancel it.
const hasOtherParticipants = this.state.remoteParticipants.length > 0;
if (
this.isCreatedByMe &&
!hasOtherParticipants &&
callingState === CallingState.RINGING
) {
// Signals other users that I have cancelled my call to them
// before they accepted it.
await this.reject();
} else if (reject && callingState === CallingState.RINGING) {
// Signals other users that I have rejected the incoming call.
await this.reject();
if (callingState === CallingState.JOINING) {
await this.assertCallJoined();
}
}

this.statsReporter?.stop();
this.statsReporter = undefined;
this.isLeaving = true;

this.sfuStatsReporter?.stop();
this.sfuStatsReporter = undefined;
if (this.ringing) {
// I'm the one who started the call, so I should cancel it.
const hasOtherParticipants = this.state.remoteParticipants.length > 0;
if (
this.isCreatedByMe &&
!hasOtherParticipants &&
callingState === CallingState.RINGING
) {
// Signals other users that I have cancelled my call to them
// before they accepted it.
await this.reject();
} else if (reject && callingState === CallingState.RINGING) {
// Signals other users that I have rejected the incoming call.
await this.reject();
}
}

this.subscriber?.close();
this.subscriber = undefined;
this.statsReporter?.stop();
this.statsReporter = undefined;

this.publisher?.close();
this.publisher = undefined;
this.sfuStatsReporter?.stop();
this.sfuStatsReporter = undefined;

this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason);
this.sfuClient = undefined;
this.subscriber?.close();
this.subscriber = undefined;

this.dispatcher.offAll();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍓 Not removing listeners from dispatcher.

That seems to be safe, since every object keeping a reference to a dispatcher (except for the call itself) is nullified on leave. So the call instance will be GC-ed properly even without offAll.

this.publisher?.close();
this.publisher = undefined;

this.state.setCallingState(CallingState.LEFT);
this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason);
this.sfuClient = undefined;

// Call all leave call hooks, e.g. to clean up global event handlers
this.leaveCallHooks.forEach((hook) => hook());
this.state.setCallingState(CallingState.LEFT);

this.clientStore.unregisterCall(this);
// Call all leave call hooks, e.g. to clean up global event handlers
this.leaveCallHooks.forEach((hook) => hook());
this.initialized = false;
this.clientStore.unregisterCall(this);

this.camera.dispose();
this.microphone.dispose();
this.screenShare.dispose();
this.speaker.dispose();
this.camera.dispose();
this.microphone.dispose();
this.screenShare.dispose();
this.speaker.dispose();

const stopOnLeavePromises: Promise<void>[] = [];
if (this.camera.stopOnLeave) {
stopOnLeavePromises.push(this.camera.disable(true));
}
if (this.microphone.stopOnLeave) {
stopOnLeavePromises.push(this.microphone.disable(true));
}
if (this.screenShare.stopOnLeave) {
stopOnLeavePromises.push(this.screenShare.disable(true));
}
await Promise.all(stopOnLeavePromises);
const stopOnLeavePromises: Promise<void>[] = [];
if (this.camera.stopOnLeave) {
stopOnLeavePromises.push(this.camera.disable(true));
}
if (this.microphone.stopOnLeave) {
stopOnLeavePromises.push(this.microphone.disable(true));
}
if (this.screenShare.stopOnLeave) {
stopOnLeavePromises.push(this.screenShare.disable(true));
}
await Promise.all(stopOnLeavePromises);
});
};

/**
Expand Down Expand Up @@ -586,6 +605,7 @@ export class Call {
notify?: boolean;
members_limit?: number;
}) => {
await this.setup();
const response = await this.streamClient.get<GetCallResponse>(
this.streamClientBasePath,
params,
Expand Down Expand Up @@ -615,6 +635,7 @@ export class Call {
* @param data the data to create the call with.
*/
getOrCreate = async (data?: GetOrCreateCallRequest) => {
await this.setup();
const response = await this.streamClient.post<
GetOrCreateCallResponse,
GetOrCreateCallRequest
Expand Down Expand Up @@ -698,6 +719,7 @@ export class Call {
* @returns a promise which resolves once the call join-flow has finished.
*/
join = async (data?: JoinCallData): Promise<void> => {
await this.setup();
const callingState = this.state.callingState;
if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) {
this.logger(
Expand Down
114 changes: 114 additions & 0 deletions packages/client/src/__tests__/Call.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { afterEach, beforeEach, expect, it, vi } from 'vitest';
import { StreamVideoClient } from '../StreamVideoClient';
import 'dotenv/config';
import { StreamClient } from '@stream-io/node-sdk';
import { generateUUIDv4 } from '../coordinator/connection/utils';
import { CallingState } from '../store';
import { Dispatcher } from '../rtc';

const apiKey = process.env.STREAM_API_KEY!;
const secret = process.env.STREAM_SECRET!;
const serverClient = new StreamClient(apiKey, secret);
const userId = 'jane';
const tokenProvider = async () =>
serverClient.createToken(userId, undefined, Date.now() / 1000 - 10);

let client: StreamVideoClient;

beforeEach(() => {
client = new StreamVideoClient({
apiKey,
options: { browser: true },
tokenProvider,
user: { id: 'jane' },
});
});

it('can get a call', async () => {
const call = client.call('default', generateUUIDv4());
expect(call.watching).toBeFalsy();
await call.getOrCreate();
expect(call.watching).toBeTruthy();
await call.leave();
expect(call.state.callingState).toBe(CallingState.LEFT);
});

it('can reuse call instance', async () => {
const call = client.call('default', generateUUIDv4());
await call.getOrCreate();
expect(call.state.callingState).toBe(CallingState.IDLE);
await call.leave();
expect(call.state.callingState).toBe(CallingState.LEFT);
await call.get();
expect(call.state.callingState).toBe(CallingState.IDLE);
await call.leave();
expect(call.state.callingState).toBe(CallingState.LEFT);
});

it('stops reacting to events when not watching', async () => {
const call = client.call('default', generateUUIDv4());
await call.getOrCreate();
expect(call.state.transcribing).toBeFalsy();
call.streamClient.dispatchEvent({
type: 'call.transcription_started',
call_cid: call.cid,
created_at: new Date().toISOString(),
});
expect(call.state.transcribing).toBeTruthy();
await call.leave();
call.streamClient.dispatchEvent({
type: 'call.transcription_stopped',
call_cid: call.cid,
created_at: new Date().toISOString(),
});
expect(call.state.transcribing).toBeTruthy();
});

it('keeps user handlers for SFU and coordinator events', async () => {
const call = client.call('default', generateUUIDv4());
const sfuEventHandler = vi.fn();
const coordinatorEventHandler = vi.fn();
call.on('participantJoined', sfuEventHandler);
call.on('call.transcription_started', coordinatorEventHandler);
await call.getOrCreate();
await call.leave();
(call as unknown as { dispatcher: Dispatcher }).dispatcher.dispatch({
eventPayload: {
oneofKind: 'participantJoined',
participantJoined: {
callCid: call.cid,
},
},
});
call.streamClient.dispatchEvent({
type: 'call.transcription_started',
call_cid: call.cid,
created_at: new Date().toISOString(),
});
expect(sfuEventHandler).toBeCalled();
expect(coordinatorEventHandler).toBeCalled();
});

it("doesn't break when joining and leaving the same instance in quick succession", async () => {
const call = client.call('default', generateUUIDv4());
let states: CallingState[] = [];
call.state.callingState$.subscribe((state) => states.push(state));
call.getOrCreate();
call.leave();
call.getOrCreate();
call.leave();
call.getOrCreate();
await call.leave();
expect(states).toMatchObject([
'idle',
'left',
'idle',
'left',
'idle',
'left',
]);
});

afterEach(() => {
client.disconnectUser();
});
Loading