Skip to content

Commit

Permalink
fix: allow reusing call instances after leaving (#1433)
Browse files Browse the repository at this point in the history
  • Loading branch information
myandrienko authored Jul 24, 2024
1 parent f4967a8 commit 61e05af
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 80 deletions.
178 changes: 100 additions & 78 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ import {
SpeakerManager,
} from './devices';
import { getSdkSignature } from './stats/utils';
import { withoutConcurrency } from './helpers/concurrency';

/**
* An object representation of a `Call`.
Expand Down Expand Up @@ -224,6 +225,8 @@ export class Call {
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private isLeaving = false;
private initialized = false;
private readonly joinLeaveConcurrencyTag = Symbol('joinLeaveConcurrencyTag');

/**
* A list hooks/functions to invoke when the call is left.
Expand Down Expand Up @@ -276,35 +279,51 @@ export class Call {
ringing ? CallingState.RINGING : CallingState.IDLE,
);

this.on('all', (event) => {
// update state with the latest event data
this.state.updateFromEvent(event);
});

this.leaveCallHooks.add(
registerEventHandlers(this, this.state, this.dispatcher),
);
this.registerEffects();

this.leaveCallHooks.add(
createSubscription(
this.trackSubscriptionsSubject.pipe(
debounce((v) => timer(v.type)),
map((v) => v.data),
),
(subscriptions) =>
this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => {
this.logger('debug', `Failed to update track subscriptions`, err);
}),
),
);

this.camera = new CameraManager(this);
this.microphone = new MicrophoneManager(this);
this.speaker = new SpeakerManager(this);
this.screenShare = new ScreenShareManager(this);
}

private async setup() {
await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => {
if (this.initialized) {
return;
}

this.leaveCallHooks.add(
this.on('all', (event) => {
// update state with the latest event data
this.state.updateFromEvent(event);
}),
);

this.leaveCallHooks.add(
registerEventHandlers(this, this.state, this.dispatcher),
);
this.registerEffects();

this.leaveCallHooks.add(
createSubscription(
this.trackSubscriptionsSubject.pipe(
debounce((v) => timer(v.type)),
map((v) => v.data),
),
(subscriptions) =>
this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => {
this.logger('debug', `Failed to update track subscriptions`, err);
}),
),
);

if (this.state.callingState === CallingState.LEFT) {
this.state.setCallingState(CallingState.IDLE);
}

this.initialized = true;
});
}

private registerEffects() {
this.leaveCallHooks.add(
// handles updating the permissions context when the settings change.
Expand Down Expand Up @@ -483,74 +502,74 @@ export class Call {
reject = false,
reason = 'user is leaving the call',
}: CallLeaveOptions = {}) => {
const callingState = this.state.callingState;
if (callingState === CallingState.LEFT) {
throw new Error('Cannot leave call that has already been left.');
}

if (callingState === CallingState.JOINING) {
await this.assertCallJoined();
}

this.isLeaving = true;
await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => {
const callingState = this.state.callingState;
if (callingState === CallingState.LEFT) {
throw new Error('Cannot leave call that has already been left.');
}

if (this.ringing) {
// I'm the one who started the call, so I should cancel it.
const hasOtherParticipants = this.state.remoteParticipants.length > 0;
if (
this.isCreatedByMe &&
!hasOtherParticipants &&
callingState === CallingState.RINGING
) {
// Signals other users that I have cancelled my call to them
// before they accepted it.
await this.reject();
} else if (reject && callingState === CallingState.RINGING) {
// Signals other users that I have rejected the incoming call.
await this.reject();
if (callingState === CallingState.JOINING) {
await this.assertCallJoined();
}
}

this.statsReporter?.stop();
this.statsReporter = undefined;
this.isLeaving = true;

this.sfuStatsReporter?.stop();
this.sfuStatsReporter = undefined;
if (this.ringing) {
// I'm the one who started the call, so I should cancel it.
const hasOtherParticipants = this.state.remoteParticipants.length > 0;
if (
this.isCreatedByMe &&
!hasOtherParticipants &&
callingState === CallingState.RINGING
) {
// Signals other users that I have cancelled my call to them
// before they accepted it.
await this.reject();
} else if (reject && callingState === CallingState.RINGING) {
// Signals other users that I have rejected the incoming call.
await this.reject();
}
}

this.subscriber?.close();
this.subscriber = undefined;
this.statsReporter?.stop();
this.statsReporter = undefined;

this.publisher?.close();
this.publisher = undefined;
this.sfuStatsReporter?.stop();
this.sfuStatsReporter = undefined;

this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason);
this.sfuClient = undefined;
this.subscriber?.close();
this.subscriber = undefined;

this.dispatcher.offAll();
this.publisher?.close();
this.publisher = undefined;

this.state.setCallingState(CallingState.LEFT);
this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason);
this.sfuClient = undefined;

// Call all leave call hooks, e.g. to clean up global event handlers
this.leaveCallHooks.forEach((hook) => hook());
this.state.setCallingState(CallingState.LEFT);

this.clientStore.unregisterCall(this);
// Call all leave call hooks, e.g. to clean up global event handlers
this.leaveCallHooks.forEach((hook) => hook());
this.initialized = false;
this.clientStore.unregisterCall(this);

this.camera.dispose();
this.microphone.dispose();
this.screenShare.dispose();
this.speaker.dispose();
this.camera.dispose();
this.microphone.dispose();
this.screenShare.dispose();
this.speaker.dispose();

const stopOnLeavePromises: Promise<void>[] = [];
if (this.camera.stopOnLeave) {
stopOnLeavePromises.push(this.camera.disable(true));
}
if (this.microphone.stopOnLeave) {
stopOnLeavePromises.push(this.microphone.disable(true));
}
if (this.screenShare.stopOnLeave) {
stopOnLeavePromises.push(this.screenShare.disable(true));
}
await Promise.all(stopOnLeavePromises);
const stopOnLeavePromises: Promise<void>[] = [];
if (this.camera.stopOnLeave) {
stopOnLeavePromises.push(this.camera.disable(true));
}
if (this.microphone.stopOnLeave) {
stopOnLeavePromises.push(this.microphone.disable(true));
}
if (this.screenShare.stopOnLeave) {
stopOnLeavePromises.push(this.screenShare.disable(true));
}
await Promise.all(stopOnLeavePromises);
});
};

/**
Expand Down Expand Up @@ -586,6 +605,7 @@ export class Call {
notify?: boolean;
members_limit?: number;
}) => {
await this.setup();
const response = await this.streamClient.get<GetCallResponse>(
this.streamClientBasePath,
params,
Expand Down Expand Up @@ -615,6 +635,7 @@ export class Call {
* @param data the data to create the call with.
*/
getOrCreate = async (data?: GetOrCreateCallRequest) => {
await this.setup();
const response = await this.streamClient.post<
GetOrCreateCallResponse,
GetOrCreateCallRequest
Expand Down Expand Up @@ -698,6 +719,7 @@ export class Call {
* @returns a promise which resolves once the call join-flow has finished.
*/
join = async (data?: JoinCallData): Promise<void> => {
await this.setup();
const callingState = this.state.callingState;
if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) {
this.logger(
Expand Down
114 changes: 114 additions & 0 deletions packages/client/src/__tests__/Call.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { afterEach, beforeEach, expect, it, vi } from 'vitest';
import { StreamVideoClient } from '../StreamVideoClient';
import 'dotenv/config';
import { StreamClient } from '@stream-io/node-sdk';
import { generateUUIDv4 } from '../coordinator/connection/utils';
import { CallingState } from '../store';
import { Dispatcher } from '../rtc';

const apiKey = process.env.STREAM_API_KEY!;
const secret = process.env.STREAM_SECRET!;
const serverClient = new StreamClient(apiKey, secret);
const userId = 'jane';
const tokenProvider = async () =>
serverClient.createToken(userId, undefined, Date.now() / 1000 - 10);

let client: StreamVideoClient;

beforeEach(() => {
client = new StreamVideoClient({
apiKey,
options: { browser: true },
tokenProvider,
user: { id: 'jane' },
});
});

it('can get a call', async () => {
const call = client.call('default', generateUUIDv4());
expect(call.watching).toBeFalsy();
await call.getOrCreate();
expect(call.watching).toBeTruthy();
await call.leave();
expect(call.state.callingState).toBe(CallingState.LEFT);
});

it('can reuse call instance', async () => {
const call = client.call('default', generateUUIDv4());
await call.getOrCreate();
expect(call.state.callingState).toBe(CallingState.IDLE);
await call.leave();
expect(call.state.callingState).toBe(CallingState.LEFT);
await call.get();
expect(call.state.callingState).toBe(CallingState.IDLE);
await call.leave();
expect(call.state.callingState).toBe(CallingState.LEFT);
});

it('stops reacting to events when not watching', async () => {
const call = client.call('default', generateUUIDv4());
await call.getOrCreate();
expect(call.state.transcribing).toBeFalsy();
call.streamClient.dispatchEvent({
type: 'call.transcription_started',
call_cid: call.cid,
created_at: new Date().toISOString(),
});
expect(call.state.transcribing).toBeTruthy();
await call.leave();
call.streamClient.dispatchEvent({
type: 'call.transcription_stopped',
call_cid: call.cid,
created_at: new Date().toISOString(),
});
expect(call.state.transcribing).toBeTruthy();
});

it('keeps user handlers for SFU and coordinator events', async () => {
const call = client.call('default', generateUUIDv4());
const sfuEventHandler = vi.fn();
const coordinatorEventHandler = vi.fn();
call.on('participantJoined', sfuEventHandler);
call.on('call.transcription_started', coordinatorEventHandler);
await call.getOrCreate();
await call.leave();
(call as unknown as { dispatcher: Dispatcher }).dispatcher.dispatch({
eventPayload: {
oneofKind: 'participantJoined',
participantJoined: {
callCid: call.cid,
},
},
});
call.streamClient.dispatchEvent({
type: 'call.transcription_started',
call_cid: call.cid,
created_at: new Date().toISOString(),
});
expect(sfuEventHandler).toBeCalled();
expect(coordinatorEventHandler).toBeCalled();
});

it("doesn't break when joining and leaving the same instance in quick succession", async () => {
const call = client.call('default', generateUUIDv4());
let states: CallingState[] = [];
call.state.callingState$.subscribe((state) => states.push(state));
call.getOrCreate();
call.leave();
call.getOrCreate();
call.leave();
call.getOrCreate();
await call.leave();
expect(states).toMatchObject([
'idle',
'left',
'idle',
'left',
'idle',
'left',
]);
});

afterEach(() => {
client.disconnectUser();
});
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ useEffect(() => {
}, [callType, callId]);
```

Once the call has been disposed of, it can no longer be used. However, you can get a new call instance with the same type and id. In practice, this is rarely an issue when you're using the pattern above.
To join the same call again, you can reuse the same call instance, or create a new one using `client.call(type, id)`.

## Calling State

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ useEffect(() => {
}, [callType, callId]);
```

Once the call has been disposed of, it can no longer be used. However, you can get a new call instance with the same type and id. In practice, this is rarely an issue when you're using the pattern above.
To join the same call again, you can reuse the same call instance, or create a new one using `client.call(type, id)`.

## Calling State

Expand Down

0 comments on commit 61e05af

Please sign in to comment.