Skip to content

Commit

Permalink
add spec for Cleaner. Add ability to set "subscription_expiration_sec…
Browse files Browse the repository at this point in the history
…onds" as argument
  • Loading branch information
prog-supdex committed Sep 20, 2023
1 parent 1a1981e commit aab684c
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 19 deletions.
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ group :development, :test do
gem "rubocop"
gem "rubocop-rspec"
end

group :test do
gem "timecop"
end
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,20 @@ To avoid filling Redis storage with stale subscription data:
Heroku users should set up `use_redis_object_on_cleanup` setting to `false` due to [limitations in Heroku Redis](https://devcenter.heroku.com/articles/heroku-redis#connection-permissions).
You can also call specific commands `clean_channels` or `clean_subscriptions` with passed `subscription_expiration_seconds` as an argument.
For instance
```ruby
GraphQL::AnyCable::Cleaner.clean_channels(100)
# or
GraphQL::AnyCable::Cleaner.clean_subscriptions(100)
```
It will be helpful in cases when you have another value, `subscription_expiration_seconds` (or you don't have one) in `configuration`,
but it needs to remove `subscriptions` and `channels` earlier without changing `subscription_expiration_seconds` in `configuration`

You can't put a zero value for `clean_channels` or `clean_subscriptions` methods
### Recommendations
You should run `GraphQL::AnyCable::Cleaner` or `rake graphql:anycable:clean` periodically because it helps to avoid swelling of RAM consumption,
Expand Down
20 changes: 12 additions & 8 deletions lib/graphql/anycable/cleaner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@ def clean
clean_topic_fingerprints
end

def clean_channels
return unless config.subscription_expiration_seconds
def clean_channels(expiration_seconds = nil)
expiration_seconds ||= config.subscription_expiration_seconds

return if expiration_seconds.nil? || expiration_seconds.to_i.zero?
return unless config.use_redis_object_on_cleanup

store_name = redis_key(adapter::CHANNELS_STORAGE_TIME)

remove_old_objects(store_name)
remove_old_objects(store_name, expiration_seconds.to_i)
end

def clean_subscriptions
return unless config.subscription_expiration_seconds
def clean_subscriptions(expiration_seconds = nil)
expiration_seconds ||= config.subscription_expiration_seconds

return if expiration_seconds.nil? || expiration_seconds.to_i.zero?
return unless config.use_redis_object_on_cleanup

store_name = redis_key(adapter::SUBSCRIPTIONS_STORAGE_TIME)

remove_old_objects(store_name)
remove_old_objects(store_name, expiration_seconds.to_i)
end

# For cases, when we need to clear only `subscription time storage`
Expand Down Expand Up @@ -81,9 +85,9 @@ def redis_key(prefix)
"#{config.redis_prefix}-#{prefix}"
end

def remove_old_objects(store_name)
def remove_old_objects(store_name, expiration_seconds)
# Determine the time point before which the keys should be deleted
time_point = (Time.now - config.subscription_expiration_seconds).to_i
time_point = (Time.now - expiration_seconds).to_i

# iterating per 1000 records
loop do
Expand Down
8 changes: 4 additions & 4 deletions lib/graphql/anycable/tasks/clean_expired_subscriptions.rake
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ namespace :graphql do

namespace :clean do
# Clean up old channels
task :channels do
GraphQL::AnyCable::Cleaner.clean_channels
task :channels, [:expire_seconds] do |_, args|
GraphQL::AnyCable::Cleaner.clean_channels(args[:expire_seconds]&.to_i)
end

# Clean up old subscriptions (they should have expired by themselves)
task :subscriptions do
GraphQL::AnyCable::Cleaner.clean_subscriptions
task :subscriptions, [:expire_seconds] do |_, args|
GraphQL::AnyCable::Cleaner.clean_subscriptions(args[:expire_seconds]&.to_i)
end

# Clean up subscription_ids from event fingerprints for expired subscriptions
Expand Down
11 changes: 4 additions & 7 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,12 @@ def write_subscription(query, events)
pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
end

# add the records to the storages if subscription_expiration_seconds is nil
unless config.subscription_expiration_seconds
current_timestamp = Time.now.to_i
current_timestamp = Time.now.to_i

pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id)
pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id)
pipeline.zadd(redis_key(SUBSCRIPTIONS_STORAGE_TIME), current_timestamp, full_subscription_id)
pipeline.zadd(redis_key(CHANNELS_STORAGE_TIME), current_timestamp, full_channel_id)

next
end
next unless config.subscription_expiration_seconds

pipeline.expire(full_channel_id, config.subscription_expiration_seconds)
pipeline.expire(full_subscription_id, config.subscription_expiration_seconds)
Expand Down
180 changes: 180 additions & 0 deletions spec/graphql/cleaner_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# frozen_string_literal: true

require "timecop"

RSpec.describe GraphQL::AnyCable::Cleaner do
let(:query) do
<<~GRAPHQL
subscription SomeSubscription { productUpdated { id } }
GRAPHQL
end

let(:channel) do
socket = double("Socket", istate: AnyCable::Socket::State.new({}))
connection = double("Connection", anycable_socket: socket)
double("Channel", id: "legacy_id", params: { "channelId" => "legacy_id" }, stream_from: nil, connection: connection)
end

let(:subscription_id) do
"some-truly-random-number"
end

let(:redis) { GraphQL::AnyCable.redis }

before do
AnycableSchema.execute(
query: query,
context: { channel: channel, subscription_id: subscription_id },
variables: {},
operation_name: "SomeSubscription",
)
end

describe ".clean_subscriptions" do
context "when expired_seconds passed via argument" do
context "when subscriptions are expired" do
let(:lifetime_in_seconds) { 10 }

it "cleans subscriptions" do
expect(redis.keys("graphql-subscription:*").count).to be > 0

Timecop.freeze(Time.now + 10) do
described_class.clean_subscriptions(lifetime_in_seconds)
end

expect(redis.keys("graphql-subscription:*").count).to be 0
end
end

context "when subscriptions are not expired" do
let(:lifetime_in_seconds) { 100 }

it "not cleans subscriptions" do
described_class.clean_subscriptions(lifetime_in_seconds)

expect(redis.keys("graphql-subscription:*").count).to be > 0
end
end
end

context "when expired_seconds passed via config" do
context "when subscriptions are expired" do
around do |ex|
old_value = GraphQL::AnyCable.config.subscription_expiration_seconds
GraphQL::AnyCable.config.subscription_expiration_seconds = 10

ex.run

GraphQL::AnyCable.config.subscription_expiration_seconds = old_value
end

it "cleans subscriptions" do
expect(redis.keys("graphql-subscription:*").count).to be > 0

Timecop.freeze(Time.now + 10) do
described_class.clean_subscriptions
end

expect(redis.keys("graphql-subscription:*").count).to be 0
end
end

context "when config.subscription_expiration_seconds is nil" do
it "remains subscriptions" do
Timecop.freeze(Time.now + 10) do
described_class.clean_subscriptions
end

expect(redis.keys("graphql-subscription:*").count).to be > 0
end
end
end

context "when an expiration_seconds is not positive integer" do
it "does not clean subscriptions" do
expect(described_class).to_not receive(:remove_old_objects)

described_class.clean_subscriptions("")

expect(redis.keys("graphql-subscription:*").count).to be > 0
end
end
end

describe ".clean_channels" do
context "when expired_seconds passed via argument" do
context "when channels are expired" do
let(:lifetime_in_seconds) { 10 }

it "cleans subscriptions" do
expect(redis.keys("graphql-channel:*").count).to be > 0

Timecop.freeze(Time.now + 10) do
described_class.clean_channels(lifetime_in_seconds)
end

expect(redis.keys("graphql-channel:*").count).to be 0
end
end

context "when channels are not expired" do
let(:lifetime_in_seconds) { 100 }

it "does not clean channels" do
described_class.clean_channels(lifetime_in_seconds)

expect(redis.keys("graphql-channel:*").count).to be > 0
end
end
end

context "when an expiration_seconds is not positive integer" do
it "does not clean channels" do
expect(described_class).to_not receive(:remove_old_objects)

described_class.clean_channels("")

expect(redis.keys("graphql-channel:*").count).to be > 0
end
end
end

describe ".clean_fingerprint_subscriptions" do
context "when subscription is blank" do
subject do
AnycableSchema.subscriptions.delete_subscription(subscription_id)

described_class.clean_fingerprint_subscriptions
end

it "cleans graphql-subscriptions" do
subscriptions_key = redis.keys("graphql-subscriptions:*")[0]

expect(redis.smembers(subscriptions_key).empty?).to be false

subject

expect(redis.smembers(subscriptions_key).empty?).to be true
end
end
end

describe ".clean_topic_fingerprints" do
subject do
# Emulate situation, when subscriptions in fingerprints are orphan
redis.scan_each(match: "graphql-subscriptions:*").each do |k|
redis.del(k)
end

described_class.clean_topic_fingerprints
end

it "cleans fingerprints" do
expect(redis.zrange("graphql-fingerprints::productUpdated:", 0, -1).empty?).to be false

subject

expect(redis.zrange("graphql-fingerprints::productUpdated:", 0, -1).empty?).to be true
end
end
end

0 comments on commit aab684c

Please sign in to comment.