diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 31d7746e33..1209f35b54 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -133,6 +133,7 @@ import { SpeakerManager, } from './devices'; import { getSdkSignature } from './stats/utils'; +import { withoutConcurrency } from './helpers/concurrency'; /** * An object representation of a `Call`. @@ -224,6 +225,8 @@ export class Call { private reconnectAttempts = 0; private maxReconnectAttempts = 10; private isLeaving = false; + private initialized = false; + private readonly joinLeaveConcurrencyTag = Symbol('joinLeaveConcurrencyTag'); /** * A list hooks/functions to invoke when the call is left. @@ -276,35 +279,51 @@ export class Call { ringing ? CallingState.RINGING : CallingState.IDLE, ); - this.on('all', (event) => { - // update state with the latest event data - this.state.updateFromEvent(event); - }); - - this.leaveCallHooks.add( - registerEventHandlers(this, this.state, this.dispatcher), - ); - this.registerEffects(); - - this.leaveCallHooks.add( - createSubscription( - this.trackSubscriptionsSubject.pipe( - debounce((v) => timer(v.type)), - map((v) => v.data), - ), - (subscriptions) => - this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => { - this.logger('debug', `Failed to update track subscriptions`, err); - }), - ), - ); - this.camera = new CameraManager(this); this.microphone = new MicrophoneManager(this); this.speaker = new SpeakerManager(this); this.screenShare = new ScreenShareManager(this); } + private async setup() { + await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => { + if (this.initialized) { + return; + } + + this.leaveCallHooks.add( + this.on('all', (event) => { + // update state with the latest event data + this.state.updateFromEvent(event); + }), + ); + + this.leaveCallHooks.add( + registerEventHandlers(this, this.state, this.dispatcher), + ); + this.registerEffects(); + + this.leaveCallHooks.add( + createSubscription( + this.trackSubscriptionsSubject.pipe( + debounce((v) => timer(v.type)), + map((v) => v.data), + ), + (subscriptions) => + this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => { + this.logger('debug', `Failed to update track subscriptions`, err); + }), + ), + ); + + if (this.state.callingState === CallingState.LEFT) { + this.state.setCallingState(CallingState.IDLE); + } + + this.initialized = true; + }); + } + private registerEffects() { this.leaveCallHooks.add( // handles updating the permissions context when the settings change. @@ -483,74 +502,74 @@ export class Call { reject = false, reason = 'user is leaving the call', }: CallLeaveOptions = {}) => { - const callingState = this.state.callingState; - if (callingState === CallingState.LEFT) { - throw new Error('Cannot leave call that has already been left.'); - } - - if (callingState === CallingState.JOINING) { - await this.assertCallJoined(); - } - - this.isLeaving = true; + await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => { + const callingState = this.state.callingState; + if (callingState === CallingState.LEFT) { + throw new Error('Cannot leave call that has already been left.'); + } - if (this.ringing) { - // I'm the one who started the call, so I should cancel it. - const hasOtherParticipants = this.state.remoteParticipants.length > 0; - if ( - this.isCreatedByMe && - !hasOtherParticipants && - callingState === CallingState.RINGING - ) { - // Signals other users that I have cancelled my call to them - // before they accepted it. - await this.reject(); - } else if (reject && callingState === CallingState.RINGING) { - // Signals other users that I have rejected the incoming call. - await this.reject(); + if (callingState === CallingState.JOINING) { + await this.assertCallJoined(); } - } - this.statsReporter?.stop(); - this.statsReporter = undefined; + this.isLeaving = true; - this.sfuStatsReporter?.stop(); - this.sfuStatsReporter = undefined; + if (this.ringing) { + // I'm the one who started the call, so I should cancel it. + const hasOtherParticipants = this.state.remoteParticipants.length > 0; + if ( + this.isCreatedByMe && + !hasOtherParticipants && + callingState === CallingState.RINGING + ) { + // Signals other users that I have cancelled my call to them + // before they accepted it. + await this.reject(); + } else if (reject && callingState === CallingState.RINGING) { + // Signals other users that I have rejected the incoming call. + await this.reject(); + } + } - this.subscriber?.close(); - this.subscriber = undefined; + this.statsReporter?.stop(); + this.statsReporter = undefined; - this.publisher?.close(); - this.publisher = undefined; + this.sfuStatsReporter?.stop(); + this.sfuStatsReporter = undefined; - this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason); - this.sfuClient = undefined; + this.subscriber?.close(); + this.subscriber = undefined; - this.dispatcher.offAll(); + this.publisher?.close(); + this.publisher = undefined; - this.state.setCallingState(CallingState.LEFT); + this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason); + this.sfuClient = undefined; - // Call all leave call hooks, e.g. to clean up global event handlers - this.leaveCallHooks.forEach((hook) => hook()); + this.state.setCallingState(CallingState.LEFT); - this.clientStore.unregisterCall(this); + // Call all leave call hooks, e.g. to clean up global event handlers + this.leaveCallHooks.forEach((hook) => hook()); + this.initialized = false; + this.clientStore.unregisterCall(this); - this.camera.dispose(); - this.microphone.dispose(); - this.screenShare.dispose(); - this.speaker.dispose(); + this.camera.dispose(); + this.microphone.dispose(); + this.screenShare.dispose(); + this.speaker.dispose(); - const stopOnLeavePromises: Promise[] = []; - if (this.camera.stopOnLeave) { - stopOnLeavePromises.push(this.camera.disable(true)); - } - if (this.microphone.stopOnLeave) { - stopOnLeavePromises.push(this.microphone.disable(true)); - } - if (this.screenShare.stopOnLeave) { - stopOnLeavePromises.push(this.screenShare.disable(true)); - } - await Promise.all(stopOnLeavePromises); + const stopOnLeavePromises: Promise[] = []; + if (this.camera.stopOnLeave) { + stopOnLeavePromises.push(this.camera.disable(true)); + } + if (this.microphone.stopOnLeave) { + stopOnLeavePromises.push(this.microphone.disable(true)); + } + if (this.screenShare.stopOnLeave) { + stopOnLeavePromises.push(this.screenShare.disable(true)); + } + await Promise.all(stopOnLeavePromises); + }); }; /** @@ -586,6 +605,7 @@ export class Call { notify?: boolean; members_limit?: number; }) => { + await this.setup(); const response = await this.streamClient.get( this.streamClientBasePath, params, @@ -615,6 +635,7 @@ export class Call { * @param data the data to create the call with. */ getOrCreate = async (data?: GetOrCreateCallRequest) => { + await this.setup(); const response = await this.streamClient.post< GetOrCreateCallResponse, GetOrCreateCallRequest @@ -698,6 +719,7 @@ export class Call { * @returns a promise which resolves once the call join-flow has finished. */ join = async (data?: JoinCallData): Promise => { + await this.setup(); const callingState = this.state.callingState; if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) { this.logger( diff --git a/packages/client/src/__tests__/Call.test.ts b/packages/client/src/__tests__/Call.test.ts new file mode 100644 index 0000000000..3ad1f8a5d0 --- /dev/null +++ b/packages/client/src/__tests__/Call.test.ts @@ -0,0 +1,114 @@ +import { afterEach, beforeEach, expect, it, vi } from 'vitest'; +import { StreamVideoClient } from '../StreamVideoClient'; +import 'dotenv/config'; +import { StreamClient } from '@stream-io/node-sdk'; +import { generateUUIDv4 } from '../coordinator/connection/utils'; +import { CallingState } from '../store'; +import { Dispatcher } from '../rtc'; + +const apiKey = process.env.STREAM_API_KEY!; +const secret = process.env.STREAM_SECRET!; +const serverClient = new StreamClient(apiKey, secret); +const userId = 'jane'; +const tokenProvider = async () => + serverClient.createToken(userId, undefined, Date.now() / 1000 - 10); + +let client: StreamVideoClient; + +beforeEach(() => { + client = new StreamVideoClient({ + apiKey, + options: { browser: true }, + tokenProvider, + user: { id: 'jane' }, + }); +}); + +it('can get a call', async () => { + const call = client.call('default', generateUUIDv4()); + expect(call.watching).toBeFalsy(); + await call.getOrCreate(); + expect(call.watching).toBeTruthy(); + await call.leave(); + expect(call.state.callingState).toBe(CallingState.LEFT); +}); + +it('can reuse call instance', async () => { + const call = client.call('default', generateUUIDv4()); + await call.getOrCreate(); + expect(call.state.callingState).toBe(CallingState.IDLE); + await call.leave(); + expect(call.state.callingState).toBe(CallingState.LEFT); + await call.get(); + expect(call.state.callingState).toBe(CallingState.IDLE); + await call.leave(); + expect(call.state.callingState).toBe(CallingState.LEFT); +}); + +it('stops reacting to events when not watching', async () => { + const call = client.call('default', generateUUIDv4()); + await call.getOrCreate(); + expect(call.state.transcribing).toBeFalsy(); + call.streamClient.dispatchEvent({ + type: 'call.transcription_started', + call_cid: call.cid, + created_at: new Date().toISOString(), + }); + expect(call.state.transcribing).toBeTruthy(); + await call.leave(); + call.streamClient.dispatchEvent({ + type: 'call.transcription_stopped', + call_cid: call.cid, + created_at: new Date().toISOString(), + }); + expect(call.state.transcribing).toBeTruthy(); +}); + +it('keeps user handlers for SFU and coordinator events', async () => { + const call = client.call('default', generateUUIDv4()); + const sfuEventHandler = vi.fn(); + const coordinatorEventHandler = vi.fn(); + call.on('participantJoined', sfuEventHandler); + call.on('call.transcription_started', coordinatorEventHandler); + await call.getOrCreate(); + await call.leave(); + (call as unknown as { dispatcher: Dispatcher }).dispatcher.dispatch({ + eventPayload: { + oneofKind: 'participantJoined', + participantJoined: { + callCid: call.cid, + }, + }, + }); + call.streamClient.dispatchEvent({ + type: 'call.transcription_started', + call_cid: call.cid, + created_at: new Date().toISOString(), + }); + expect(sfuEventHandler).toBeCalled(); + expect(coordinatorEventHandler).toBeCalled(); +}); + +it("doesn't break when joining and leaving the same instance in quick succession", async () => { + const call = client.call('default', generateUUIDv4()); + let states: CallingState[] = []; + call.state.callingState$.subscribe((state) => states.push(state)); + call.getOrCreate(); + call.leave(); + call.getOrCreate(); + call.leave(); + call.getOrCreate(); + await call.leave(); + expect(states).toMatchObject([ + 'idle', + 'left', + 'idle', + 'left', + 'idle', + 'left', + ]); +}); + +afterEach(() => { + client.disconnectUser(); +}); diff --git a/packages/react-native-sdk/docusaurus/docs/reactnative/03-core/03-calling-state-and-lifecycle.mdx b/packages/react-native-sdk/docusaurus/docs/reactnative/03-core/03-calling-state-and-lifecycle.mdx index 6d3e616e82..d350ec8fc0 100644 --- a/packages/react-native-sdk/docusaurus/docs/reactnative/03-core/03-calling-state-and-lifecycle.mdx +++ b/packages/react-native-sdk/docusaurus/docs/reactnative/03-core/03-calling-state-and-lifecycle.mdx @@ -70,7 +70,7 @@ useEffect(() => { }, [callType, callId]); ``` -Once the call has been disposed of, it can no longer be used. However, you can get a new call instance with the same type and id. In practice, this is rarely an issue when you're using the pattern above. +To join the same call again, you can reuse the same call instance, or create a new one using `client.call(type, id)`. ## Calling State diff --git a/packages/react-sdk/docusaurus/docs/React/02-guides/03-calling-state-and-lifecycle.mdx b/packages/react-sdk/docusaurus/docs/React/02-guides/03-calling-state-and-lifecycle.mdx index 7e4e0ba5d2..7b9200b433 100644 --- a/packages/react-sdk/docusaurus/docs/React/02-guides/03-calling-state-and-lifecycle.mdx +++ b/packages/react-sdk/docusaurus/docs/React/02-guides/03-calling-state-and-lifecycle.mdx @@ -70,7 +70,7 @@ useEffect(() => { }, [callType, callId]); ``` -Once the call has been disposed of, it can no longer be used. However, you can get a new call instance with the same type and id. In practice, this is rarely an issue when you're using the pattern above. +To join the same call again, you can reuse the same call instance, or create a new one using `client.call(type, id)`. ## Calling State