Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): add reusable rxSwr operator #7562

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions packages/sanity/src/core/util/__tests__/rxSwr.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {describe, expect, it} from '@jest/globals'
import {lastValueFrom, timer} from 'rxjs'
import {map, toArray} from 'rxjs/operators'

import {createSWR} from '../rxSwr'

describe('rxSwr', () => {
it('should cache the last known value and emit sync', async () => {
const swr = createSWR({maxSize: 1})

const observable = timer(100).pipe(
map(() => 'value!'),
swr('someKey'),
toArray(),
)

expect(await lastValueFrom(observable)).toEqual([{fromCache: false, value: 'value!'}])

// Second subscription, now with warm cache
expect(await lastValueFrom(observable)).toEqual([
{fromCache: true, value: 'value!'},
{fromCache: false, value: 'value!'},
])
})

it('should discard old cache keys when exceeding maxSize', async () => {
const swr = createSWR({maxSize: 1})

const observable1 = timer(100).pipe(
map(() => 'observable1!'),
swr('key1'),
toArray(),
)

expect(await lastValueFrom(observable1)).toEqual([{fromCache: false, value: 'observable1!'}])

// Second subscription, now with warm cache
expect(await lastValueFrom(observable1)).toEqual([
{fromCache: true, value: 'observable1!'},
{fromCache: false, value: 'observable1!'},
])

const observable2 = timer(100).pipe(
map(() => 'observable2!'),
swr('key2'),
toArray(),
)

// Subscribing to observable2 should purge the key of observable1
expect(await lastValueFrom(observable2)).toEqual([{fromCache: false, value: 'observable2!'}])

// re-subscribing to the first should now not have a cache
expect(await lastValueFrom(observable1)).toEqual([{fromCache: false, value: 'observable1!'}])
})
})
1 change: 1 addition & 0 deletions packages/sanity/src/core/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from './isString'
export * from './isTruthy'
export * from './PartialExcept'
export * from './resizeObserver'
export * from './rxSwr'
export * from './schemaUtils'
export * from './searchUtils'
export * from './supportsTouch'
Expand Down
70 changes: 70 additions & 0 deletions packages/sanity/src/core/util/rxSwr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import QuickLRU from 'quick-lru'
import {concat, defer, EMPTY, map, type Observable, of, type OperatorFunction} from 'rxjs'
import {tap} from 'rxjs/operators'

/**
* The interface that any caching layer must implement
* @internal
*/
interface SWRCache<T> {
/**
* Note: This will throw if key does not exist. Always check for existence with `has` before calling
*/
get(key: string): T
has(key: string): boolean
set(key: string, value: T): void
delete(key: string): void
}

const createSWRCache = createLRUCache

/**
*
* Create an SWR (Stale While Revalidate) rxjs operator that will store the latest value in a cache and emit the last know value upon observable subscription
* @param options - Options
* @internal
*/
export function createSWR<T>(options: {maxSize: number}) {
const cache = createSWRCache<T>(options)
return function rxSwr(key: string): OperatorFunction<T, {fromCache: boolean; value: T}> {
return (input$: Observable<T>) => {
return concat(
defer(() => (cache.has(key) ? of({fromCache: true, value: cache.get(key)}) : EMPTY)),
input$.pipe(
tap((result) => cache.set(key, result)),
map((value) => ({
fromCache: false,
value: value,
})),
),
)
}
}
}

/**
* For now, the only cache layer implemented is an in-memory LRU.
* @param options - LRU options
* @internal
*/
function createLRUCache<T>(options: {maxSize: number}): SWRCache<T> {
const lru = new QuickLRU<string, {value: T}>(options)
return {
get(key: string) {
const entry = lru.get(key)
if (!entry) {
throw new Error(`Key not found in LRU cache: ${key}`)
}
return entry.value
},
set(key: string, value: T) {
lru.set(key, {value})
},
delete(key: string) {
lru.delete(key)
},
has(key: string) {
return lru.has(key)
},
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import {type SanityClient} from '@sanity/client'
import QuickLRU from 'quick-lru'
import {
asyncScheduler,
defer,
EMPTY,
map,
merge,
mergeMap,
type Observable,
of,
type OperatorFunction,
partition,
pipe,
share,
take,
throttleTime,
throwError,
timer,
} from 'rxjs'
import {tap} from 'rxjs/operators'
import {exhaustMapWithTrailing} from 'rxjs-exhaustmap-with-trailing'
import {createSearch, getSearchableTypes, type SanityDocumentLike, type Schema} from 'sanity'
import {
createSearch,
createSWR,
getSearchableTypes,
type SanityDocumentLike,
type Schema,
} from 'sanity'

import {getExtendedProjection} from '../../structureBuilder/util/getExtendedProjection'
import {ENABLE_LRU_MEMO} from './constants'
import {type SortOrder} from './types'

interface ListenQueryOptions {
Expand All @@ -44,6 +44,8 @@ export interface SearchQueryResult {
documents: SanityDocumentLike[]
}

const swr = createSWR<SanityDocumentLike[]>({maxSize: 100})

export function listenSearchQuery(options: ListenQueryOptions): Observable<SearchQueryResult> {
const {
client,
Expand Down Expand Up @@ -91,7 +93,7 @@ export function listenSearchQuery(options: ListenQueryOptions): Observable<Searc

const [welcome$, mutationAndReconnect$] = partition(events$, (ev) => ev.type === 'welcome')

const memoKey = JSON.stringify({filter, limit, params, searchQuery, sort, staticTypeNames})
const swrKey = JSON.stringify({filter, limit, params, searchQuery, sort, staticTypeNames})

return merge(
welcome$.pipe(take(1)),
Expand Down Expand Up @@ -157,37 +159,7 @@ export function listenSearchQuery(options: ListenQueryOptions): Observable<Searc
}),
)
}),
ENABLE_LRU_MEMO
? pipe(
memoLRU(memoKey, lru),
map((memo) => ({
fromCache: memo.type === 'memo',
documents: memo.value,
})),
)
: map((documents) => ({
fromCache: false,
documents,
})),
swr(swrKey),
map(({fromCache, value}) => ({fromCache, documents: value})),
)
}

const lru = new QuickLRU<string, SanityDocumentLike[]>({maxSize: 100})
function memoLRU<T>(
memoKey: string,
cache: QuickLRU<string, T>,
): OperatorFunction<T, {type: 'memo'; value: T} | {type: 'value'; value: T}> {
return (input$: Observable<T>) =>
merge(
defer(() =>
cache.has(memoKey) ? of({type: 'memo' as const, value: cache.get(memoKey)!}) : EMPTY,
),
input$.pipe(
tap((result) => cache.set(memoKey, result)),
map((value) => ({
type: 'value' as const,
value: value,
})),
),
)
}
Loading