Skip to content

Commit

Permalink
feat!: Server-side participant pinning (#881)
Browse files Browse the repository at this point in the history
Adds support for Server-Side participant pinning.

Reference: GetStream/chat#5239,
GetStream/protocol#174
Specs:
https://www.notion.so/stream-wiki/Video-Pinning-2b348a962da742389f734b7fd9b1cf46

## Breaking changes
This PR brings a small breaking change in the participant pinning API.
Previously, we used to have `call.setParticipantPinnedAt` API which is
now replaced with `call.pin(sessionId)` and `call.unpin(sessionId)`
APIs.

Additionally, the `participant.pinnedAt` property has been replaced with
a new `pin` structure of this shape:
```
const participant = {
  ...,
  pin: {
    isLocal: boolean, // set to `true` when this participant is server-side pinned
    pinnedAt: number, // a timestamp, shows when "pinning" happened
  }
}
```

---------

Co-authored-by: Khushal Agarwal <[email protected]>
  • Loading branch information
oliverlaz and khushal87 authored Aug 7, 2023
1 parent 42bc54f commit 72829f1
Show file tree
Hide file tree
Showing 23 changed files with 825 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ The `StreamVideoParticipant` object contains the following information:
| `videoStream` | The published video `MediaStream`. |
| `screenShareStream` | The published screen share `MediaStream`. |
| `isLocalParticipant` | It's `true` if the participant is the local participant. |
| `pinnedAt` | The time the participant was pinned. |
| `pin` | Holds pinning information. |
| `reaction` | The last reaction this user has sent to this call. |

The `StreamVideoLocalParticipant` has these additional properties:
Expand Down
70 changes: 60 additions & 10 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import {
MuteUsersRequest,
MuteUsersResponse,
OwnCapability,
PinRequest,
PinResponse,
QueryMembersRequest,
QueryMembersResponse,
RejectCallResponse,
Expand All @@ -50,6 +52,8 @@ import {
StopRecordingResponse,
UnblockUserRequest,
UnblockUserResponse,
UnpinRequest,
UnpinResponse,
UpdateCallMembersRequest,
UpdateCallMembersResponse,
UpdateCallRequest,
Expand Down Expand Up @@ -879,14 +883,15 @@ export class Call {
const startedAt = callState?.startedAt
? Timestamp.toDate(callState.startedAt)
: new Date();
const pins = callState?.pins ?? [];
this.state.setParticipants(() => {
const participantLookup = this.state.getParticipantLookupBySessionId();
return currentParticipants.map((p) => {
const participant: StreamVideoParticipant = Object.assign(p, {
isLocalParticipant: p.sessionId === sfuClient.sessionId,
viewportVisibilityState: VisibilityState.UNKNOWN,
});
// We need to preserve some of the local state of the participant
// We need to preserve the local state of the participant
// (e.g. videoDimension, visibilityState, pinnedAt, etc.)
// as it doesn't exist on the server.
const existingParticipant = participantLookup[p.sessionId];
Expand All @@ -899,6 +904,7 @@ export class Call {
this.state.setParticipantCount(participantCount?.total || 0);
this.state.setAnonymousParticipantCount(participantCount?.anonymous || 0);
this.state.setStartedAt(startedAt);
this.state.setServerSidePins(pins);

this.reconnectAttempts = 0; // reset the reconnect attempts counter
this.state.setCallingState(CallingState.JOINED);
Expand Down Expand Up @@ -1176,7 +1182,8 @@ export class Call {
* @param deviceId the selected device, `undefined` means the user wants to use the system's default audio output
*/
setAudioOutputDevice = (deviceId?: string) => {
this.state.updateParticipant(this.sfuClient!.sessionId, {
if (!this.sfuClient) return;
this.state.updateParticipant(this.sfuClient.sessionId, {
audioOutputDeviceId: deviceId,
});
};
Expand All @@ -1190,7 +1197,8 @@ export class Call {
* @param deviceId the selected device, pass `undefined` to clear the device selection
*/
setAudioDevice = (deviceId?: string) => {
this.state.updateParticipant(this.sfuClient!.sessionId, {
if (!this.sfuClient) return;
this.state.updateParticipant(this.sfuClient.sessionId, {
audioDeviceId: deviceId,
});
};
Expand All @@ -1203,7 +1211,8 @@ export class Call {
* @param deviceId the selected device, pass `undefined` to clear the device selection
*/
setVideoDevice = (deviceId?: string) => {
this.state.updateParticipant(this.sfuClient!.sessionId, {
if (!this.sfuClient) return;
this.state.updateParticipant(this.sfuClient.sessionId, {
videoDeviceId: deviceId,
});
};
Expand Down Expand Up @@ -1520,17 +1529,58 @@ export class Call {
};

/**
* Sets the `participant.pinnedAt` value.
* @param sessionId the session id of the participant
* @param pinnedAt the value to set the participant.pinnedAt
* @returns
* Pins the given session to the top of the participants list.
*
* @param sessionId the sessionId to pin.
*/
setParticipantPinnedAt = (sessionId: string, pinnedAt?: number): void => {
pin = (sessionId: string) => {
this.state.updateParticipant(sessionId, {
pinnedAt,
pin: {
isLocalPin: true,
pinnedAt: Date.now(),
},
});
};

/**
* Unpins the given session from the top of the participants list.
*
* @param sessionId the sessionId to unpin.
*/
unpin = (sessionId: string) => {
this.state.updateParticipant(sessionId, {
pin: undefined,
});
};

/**
* Pins the given session to the top of the participants list for everyone
* in the call.
* You can execute this method only if you have the `pin-for-everyone` capability.
*
* @param request the request object.
*/
pinForEveryone = async (request: PinRequest) => {
return this.streamClient.post<PinResponse, PinRequest>(
`${this.streamClientBasePath}/pin`,
request,
);
};

/**
* Unpins the given session from the top of the participants list for everyone
* in the call.
* You can execute this method only if you have the `pin-for-everyone` capability.
*
* @param request the request object.
*/
unpinForEveryone = async (request: UnpinRequest) => {
return this.streamClient.post<UnpinResponse, UnpinRequest>(
`${this.streamClientBasePath}/unpin`,
request,
);
};

/**
* Query call members with filter query. The result won't be stored in call state.
* @param request
Expand Down
2 changes: 2 additions & 0 deletions packages/client/src/events/callEventHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
watchParticipantCountChanged,
watchParticipantJoined,
watchParticipantLeft,
watchPinsUpdated,
watchSfuErrorReports,
watchTrackPublished,
watchTrackUnpublished,
Expand Down Expand Up @@ -109,6 +110,7 @@ export const registerEventHandlers = (
watchDominantSpeakerChanged(dispatcher, state),

call.on('callGrantsUpdated', watchCallGrantsUpdated(state)),
call.on('pinsUpdated', watchPinsUpdated(state)),
];

Object.keys(coordinatorEvents).forEach((event) => {
Expand Down
13 changes: 13 additions & 0 deletions packages/client/src/events/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Call } from '../Call';
import { CallState } from '../store';
import { StreamVideoParticipantPatches } from '../types';
import { getLogger } from '../logger';
import { SfuEvent } from '../gen/video/sfu/event/events';
import { ErrorCode } from '../gen/video/sfu/models/models';
import { OwnCapability } from '../gen/coordinator';

Expand Down Expand Up @@ -98,3 +99,15 @@ export const watchSfuErrorReports = (dispatcher: Dispatcher) => {
});
});
};

/**
* Watches for `pinsUpdated` events and updates the pinned state of participants
* in the call.
*/
export const watchPinsUpdated = (state: CallState) => {
return function onPinsUpdated(e: SfuEvent) {
if (e.eventPayload.oneofKind !== 'pinsUpdated') return;
const { pins } = e.eventPayload.pinsUpdated;
state.setServerSidePins(pins);
};
};
115 changes: 115 additions & 0 deletions packages/client/src/gen/coordinator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,25 @@ export interface BroadcastSettings {
*/
hls: HLSSettings;
}
/**
*
* @export
* @interface BroadcastSettingsRequest
*/
export interface BroadcastSettingsRequest {
/**
*
* @type {boolean}
* @memberof BroadcastSettingsRequest
*/
enabled?: boolean;
/**
*
* @type {HLSSettingsRequest}
* @memberof BroadcastSettingsRequest
*/
hls?: HLSSettingsRequest;
}
/**
* This event is sent when a user accepts a notification to join a call.
* @export
Expand Down Expand Up @@ -1340,6 +1359,12 @@ export interface CallSettingsRequest {
* @memberof CallSettingsRequest
*/
backstage?: BackstageSettingsRequest;
/**
*
* @type {BroadcastSettingsRequest}
* @memberof CallSettingsRequest
*/
broadcasting?: BroadcastSettingsRequest;
/**
*
* @type {GeofenceSettingsRequest}
Expand Down Expand Up @@ -2413,6 +2438,31 @@ export interface HLSSettings {
*/
quality_tracks: Array<string>;
}
/**
*
* @export
* @interface HLSSettingsRequest
*/
export interface HLSSettingsRequest {
/**
*
* @type {boolean}
* @memberof HLSSettingsRequest
*/
auto_on?: boolean;
/**
*
* @type {boolean}
* @memberof HLSSettingsRequest
*/
enabled?: boolean;
/**
*
* @type {Array<string>}
* @memberof HLSSettingsRequest
*/
quality_tracks?: Array<string>;
}
/**
*
* @export
Expand Down Expand Up @@ -2836,6 +2886,7 @@ export const OwnCapability = {
JOIN_CALL: 'join-call',
JOIN_ENDED_CALL: 'join-ended-call',
MUTE_USERS: 'mute-users',
PIN_FOR_EVERYONE: 'pin-for-everyone',
READ_CALL: 'read-call',
REMOVE_CALL_MEMBER: 'remove-call-member',
SCREENSHARE: 'screenshare',
Expand Down Expand Up @@ -2959,6 +3010,38 @@ export interface PermissionRequestEvent {
*/
user: UserResponse;
}
/**
*
* @export
* @interface PinRequest
*/
export interface PinRequest {
/**
*
* @type {string}
* @memberof PinRequest
*/
session_id: string;
/**
*
* @type {string}
* @memberof PinRequest
*/
user_id: string;
}
/**
*
* @export
* @interface PinResponse
*/
export interface PinResponse {
/**
* Duration of the request in human-readable format
* @type {string}
* @memberof PinResponse
*/
duration: string;
}
/**
*
* @export
Expand Down Expand Up @@ -3774,6 +3857,38 @@ export interface UnblockedUserEvent {
*/
user: UserResponse;
}
/**
*
* @export
* @interface UnpinRequest
*/
export interface UnpinRequest {
/**
*
* @type {string}
* @memberof UnpinRequest
*/
session_id: string;
/**
*
* @type {string}
* @memberof UnpinRequest
*/
user_id: string;
}
/**
*
* @export
* @interface UnpinResponse
*/
export interface UnpinResponse {
/**
* Duration of the request in human-readable format
* @type {string}
* @memberof UnpinResponse
*/
duration: string;
}
/**
*
* @export
Expand Down
Loading

0 comments on commit 72829f1

Please sign in to comment.