-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: allow reusing call instances after leaving #1433
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -133,6 +133,7 @@ import { | |
SpeakerManager, | ||
} from './devices'; | ||
import { getSdkSignature } from './stats/utils'; | ||
import { withoutConcurrency } from './helpers/concurrency'; | ||
|
||
/** | ||
* An object representation of a `Call`. | ||
|
@@ -224,6 +225,8 @@ export class Call { | |
private reconnectAttempts = 0; | ||
private maxReconnectAttempts = 10; | ||
private isLeaving = false; | ||
private initialized = false; | ||
private readonly joinLeaveConcurrencyTag = Symbol('joinLeaveConcurrencyTag'); | ||
|
||
/** | ||
* A list hooks/functions to invoke when the call is left. | ||
|
@@ -276,35 +279,51 @@ export class Call { | |
ringing ? CallingState.RINGING : CallingState.IDLE, | ||
); | ||
|
||
this.on('all', (event) => { | ||
// update state with the latest event data | ||
this.state.updateFromEvent(event); | ||
}); | ||
|
||
this.leaveCallHooks.add( | ||
registerEventHandlers(this, this.state, this.dispatcher), | ||
); | ||
this.registerEffects(); | ||
|
||
this.leaveCallHooks.add( | ||
createSubscription( | ||
this.trackSubscriptionsSubject.pipe( | ||
debounce((v) => timer(v.type)), | ||
map((v) => v.data), | ||
), | ||
(subscriptions) => | ||
this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => { | ||
this.logger('debug', `Failed to update track subscriptions`, err); | ||
}), | ||
), | ||
); | ||
|
||
this.camera = new CameraManager(this); | ||
this.microphone = new MicrophoneManager(this); | ||
this.speaker = new SpeakerManager(this); | ||
this.screenShare = new ScreenShareManager(this); | ||
} | ||
|
||
private async setup() { | ||
await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => { | ||
if (this.initialized) { | ||
return; | ||
} | ||
|
||
this.leaveCallHooks.add( | ||
this.on('all', (event) => { | ||
// update state with the latest event data | ||
this.state.updateFromEvent(event); | ||
}), | ||
); | ||
|
||
this.leaveCallHooks.add( | ||
registerEventHandlers(this, this.state, this.dispatcher), | ||
); | ||
this.registerEffects(); | ||
|
||
this.leaveCallHooks.add( | ||
createSubscription( | ||
this.trackSubscriptionsSubject.pipe( | ||
debounce((v) => timer(v.type)), | ||
map((v) => v.data), | ||
), | ||
(subscriptions) => | ||
this.sfuClient?.updateSubscriptions(subscriptions).catch((err) => { | ||
this.logger('debug', `Failed to update track subscriptions`, err); | ||
}), | ||
), | ||
); | ||
|
||
if (this.state.callingState === CallingState.LEFT) { | ||
this.state.setCallingState(CallingState.IDLE); | ||
} | ||
|
||
this.initialized = true; | ||
}); | ||
} | ||
|
||
private registerEffects() { | ||
this.leaveCallHooks.add( | ||
// handles updating the permissions context when the settings change. | ||
|
@@ -483,74 +502,74 @@ export class Call { | |
reject = false, | ||
reason = 'user is leaving the call', | ||
}: CallLeaveOptions = {}) => { | ||
const callingState = this.state.callingState; | ||
if (callingState === CallingState.LEFT) { | ||
throw new Error('Cannot leave call that has already been left.'); | ||
} | ||
|
||
if (callingState === CallingState.JOINING) { | ||
await this.assertCallJoined(); | ||
} | ||
|
||
this.isLeaving = true; | ||
await withoutConcurrency(this.joinLeaveConcurrencyTag, async () => { | ||
const callingState = this.state.callingState; | ||
if (callingState === CallingState.LEFT) { | ||
throw new Error('Cannot leave call that has already been left.'); | ||
} | ||
|
||
if (this.ringing) { | ||
// I'm the one who started the call, so I should cancel it. | ||
const hasOtherParticipants = this.state.remoteParticipants.length > 0; | ||
if ( | ||
this.isCreatedByMe && | ||
!hasOtherParticipants && | ||
callingState === CallingState.RINGING | ||
) { | ||
// Signals other users that I have cancelled my call to them | ||
// before they accepted it. | ||
await this.reject(); | ||
} else if (reject && callingState === CallingState.RINGING) { | ||
// Signals other users that I have rejected the incoming call. | ||
await this.reject(); | ||
if (callingState === CallingState.JOINING) { | ||
await this.assertCallJoined(); | ||
} | ||
} | ||
|
||
this.statsReporter?.stop(); | ||
this.statsReporter = undefined; | ||
this.isLeaving = true; | ||
|
||
this.sfuStatsReporter?.stop(); | ||
this.sfuStatsReporter = undefined; | ||
if (this.ringing) { | ||
// I'm the one who started the call, so I should cancel it. | ||
const hasOtherParticipants = this.state.remoteParticipants.length > 0; | ||
if ( | ||
this.isCreatedByMe && | ||
!hasOtherParticipants && | ||
callingState === CallingState.RINGING | ||
) { | ||
// Signals other users that I have cancelled my call to them | ||
// before they accepted it. | ||
await this.reject(); | ||
} else if (reject && callingState === CallingState.RINGING) { | ||
// Signals other users that I have rejected the incoming call. | ||
await this.reject(); | ||
} | ||
} | ||
|
||
this.subscriber?.close(); | ||
this.subscriber = undefined; | ||
this.statsReporter?.stop(); | ||
this.statsReporter = undefined; | ||
|
||
this.publisher?.close(); | ||
this.publisher = undefined; | ||
this.sfuStatsReporter?.stop(); | ||
this.sfuStatsReporter = undefined; | ||
|
||
this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason); | ||
this.sfuClient = undefined; | ||
this.subscriber?.close(); | ||
this.subscriber = undefined; | ||
|
||
this.dispatcher.offAll(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🍓 Not removing listeners from dispatcher. That seems to be safe, since every object keeping a reference to a dispatcher (except for the call itself) is nullified on leave. So the call instance will be GC-ed properly even without |
||
this.publisher?.close(); | ||
this.publisher = undefined; | ||
|
||
this.state.setCallingState(CallingState.LEFT); | ||
this.sfuClient?.close(StreamSfuClient.NORMAL_CLOSURE, reason); | ||
this.sfuClient = undefined; | ||
|
||
// Call all leave call hooks, e.g. to clean up global event handlers | ||
this.leaveCallHooks.forEach((hook) => hook()); | ||
this.state.setCallingState(CallingState.LEFT); | ||
|
||
this.clientStore.unregisterCall(this); | ||
// Call all leave call hooks, e.g. to clean up global event handlers | ||
this.leaveCallHooks.forEach((hook) => hook()); | ||
this.initialized = false; | ||
this.clientStore.unregisterCall(this); | ||
|
||
this.camera.dispose(); | ||
this.microphone.dispose(); | ||
this.screenShare.dispose(); | ||
this.speaker.dispose(); | ||
this.camera.dispose(); | ||
this.microphone.dispose(); | ||
this.screenShare.dispose(); | ||
this.speaker.dispose(); | ||
|
||
const stopOnLeavePromises: Promise<void>[] = []; | ||
if (this.camera.stopOnLeave) { | ||
stopOnLeavePromises.push(this.camera.disable(true)); | ||
} | ||
if (this.microphone.stopOnLeave) { | ||
stopOnLeavePromises.push(this.microphone.disable(true)); | ||
} | ||
if (this.screenShare.stopOnLeave) { | ||
stopOnLeavePromises.push(this.screenShare.disable(true)); | ||
} | ||
await Promise.all(stopOnLeavePromises); | ||
const stopOnLeavePromises: Promise<void>[] = []; | ||
if (this.camera.stopOnLeave) { | ||
stopOnLeavePromises.push(this.camera.disable(true)); | ||
} | ||
if (this.microphone.stopOnLeave) { | ||
stopOnLeavePromises.push(this.microphone.disable(true)); | ||
} | ||
if (this.screenShare.stopOnLeave) { | ||
stopOnLeavePromises.push(this.screenShare.disable(true)); | ||
} | ||
await Promise.all(stopOnLeavePromises); | ||
}); | ||
}; | ||
|
||
/** | ||
|
@@ -586,6 +605,7 @@ export class Call { | |
notify?: boolean; | ||
members_limit?: number; | ||
}) => { | ||
await this.setup(); | ||
const response = await this.streamClient.get<GetCallResponse>( | ||
this.streamClientBasePath, | ||
params, | ||
|
@@ -615,6 +635,7 @@ export class Call { | |
* @param data the data to create the call with. | ||
*/ | ||
getOrCreate = async (data?: GetOrCreateCallRequest) => { | ||
await this.setup(); | ||
const response = await this.streamClient.post< | ||
GetOrCreateCallResponse, | ||
GetOrCreateCallRequest | ||
|
@@ -698,6 +719,7 @@ export class Call { | |
* @returns a promise which resolves once the call join-flow has finished. | ||
*/ | ||
join = async (data?: JoinCallData): Promise<void> => { | ||
await this.setup(); | ||
const callingState = this.state.callingState; | ||
if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) { | ||
this.logger( | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import { afterEach, beforeEach, expect, it, vi } from 'vitest'; | ||
import { StreamVideoClient } from '../StreamVideoClient'; | ||
import 'dotenv/config'; | ||
import { StreamClient } from '@stream-io/node-sdk'; | ||
import { generateUUIDv4 } from '../coordinator/connection/utils'; | ||
import { CallingState } from '../store'; | ||
import { Dispatcher } from '../rtc'; | ||
|
||
const apiKey = process.env.STREAM_API_KEY!; | ||
const secret = process.env.STREAM_SECRET!; | ||
const serverClient = new StreamClient(apiKey, secret); | ||
const userId = 'jane'; | ||
const tokenProvider = async () => | ||
serverClient.createToken(userId, undefined, Date.now() / 1000 - 10); | ||
|
||
let client: StreamVideoClient; | ||
|
||
beforeEach(() => { | ||
client = new StreamVideoClient({ | ||
apiKey, | ||
options: { browser: true }, | ||
tokenProvider, | ||
user: { id: 'jane' }, | ||
}); | ||
}); | ||
|
||
it('can get a call', async () => { | ||
const call = client.call('default', generateUUIDv4()); | ||
expect(call.watching).toBeFalsy(); | ||
await call.getOrCreate(); | ||
expect(call.watching).toBeTruthy(); | ||
await call.leave(); | ||
expect(call.state.callingState).toBe(CallingState.LEFT); | ||
}); | ||
|
||
it('can reuse call instance', async () => { | ||
const call = client.call('default', generateUUIDv4()); | ||
await call.getOrCreate(); | ||
expect(call.state.callingState).toBe(CallingState.IDLE); | ||
await call.leave(); | ||
expect(call.state.callingState).toBe(CallingState.LEFT); | ||
await call.get(); | ||
expect(call.state.callingState).toBe(CallingState.IDLE); | ||
await call.leave(); | ||
expect(call.state.callingState).toBe(CallingState.LEFT); | ||
}); | ||
|
||
it('stops reacting to events when not watching', async () => { | ||
const call = client.call('default', generateUUIDv4()); | ||
await call.getOrCreate(); | ||
expect(call.state.transcribing).toBeFalsy(); | ||
call.streamClient.dispatchEvent({ | ||
type: 'call.transcription_started', | ||
call_cid: call.cid, | ||
created_at: new Date().toISOString(), | ||
}); | ||
expect(call.state.transcribing).toBeTruthy(); | ||
await call.leave(); | ||
call.streamClient.dispatchEvent({ | ||
type: 'call.transcription_stopped', | ||
call_cid: call.cid, | ||
created_at: new Date().toISOString(), | ||
}); | ||
expect(call.state.transcribing).toBeTruthy(); | ||
}); | ||
|
||
it('keeps user handlers for SFU and coordinator events', async () => { | ||
const call = client.call('default', generateUUIDv4()); | ||
const sfuEventHandler = vi.fn(); | ||
const coordinatorEventHandler = vi.fn(); | ||
call.on('participantJoined', sfuEventHandler); | ||
call.on('call.transcription_started', coordinatorEventHandler); | ||
await call.getOrCreate(); | ||
await call.leave(); | ||
(call as unknown as { dispatcher: Dispatcher }).dispatcher.dispatch({ | ||
eventPayload: { | ||
oneofKind: 'participantJoined', | ||
participantJoined: { | ||
callCid: call.cid, | ||
}, | ||
}, | ||
}); | ||
call.streamClient.dispatchEvent({ | ||
type: 'call.transcription_started', | ||
call_cid: call.cid, | ||
created_at: new Date().toISOString(), | ||
}); | ||
expect(sfuEventHandler).toBeCalled(); | ||
expect(coordinatorEventHandler).toBeCalled(); | ||
}); | ||
|
||
it("doesn't break when joining and leaving the same instance in quick succession", async () => { | ||
const call = client.call('default', generateUUIDv4()); | ||
let states: CallingState[] = []; | ||
call.state.callingState$.subscribe((state) => states.push(state)); | ||
call.getOrCreate(); | ||
call.leave(); | ||
call.getOrCreate(); | ||
call.leave(); | ||
call.getOrCreate(); | ||
await call.leave(); | ||
expect(states).toMatchObject([ | ||
'idle', | ||
'left', | ||
'idle', | ||
'left', | ||
'idle', | ||
'left', | ||
]); | ||
}); | ||
|
||
afterEach(() => { | ||
client.disconnectUser(); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevents a race condition with
leave
. It's important to wait forleave
to finish before we set everything back up again. See the test case here: https://github.com/GetStream/stream-video-js/pull/1433/files#diff-912706ed8d680e55d746f9765aa5090d64771196a9894262bed3e0b73d9a8c97R92-R110