Skip to content

Commit

Permalink
refactor(torii-grpc): event subscription with multiple clauses (#2555)
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo authored Oct 22, 2024
1 parent 467888b commit 33fd66c
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 166 deletions.
4 changes: 2 additions & 2 deletions crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse};
use torii_grpc::types::schema::Entity;
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, KeysClause, Query};
use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query};
use torii_relay::client::EventLoop;
use torii_relay::types::Message;

Expand Down Expand Up @@ -159,7 +159,7 @@ impl Client {
/// A direct stream to grpc subscribe starknet events
pub async fn on_starknet_event(
&self,
keys: Option<KeysClause>,
keys: Vec<EntityKeysClause>,
) -> Result<EventUpdateStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream = grpc_client.subscribe_events(keys).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ message RetrieveEventsResponse {
}

message SubscribeEventsRequest {
types.KeysClause keys = 1;
repeated types.EntityKeysClause keys = 1;
}

message SubscribeEventsResponse {
Expand Down
8 changes: 3 additions & 5 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use crate::proto::world::{
UpdateEntitiesSubscriptionRequest, WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, KeysClause, ModelKeysClause, Query,
};
use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -211,9 +209,9 @@ impl WorldClient {
/// Subscribe to the events of a World.
pub async fn subscribe_events(
&mut self,
keys: Option<KeysClause>,
keys: Vec<EntityKeysClause>,
) -> Result<EventUpdateStreaming, Error> {
let keys = keys.map(|c| c.into());
let keys = keys.into_iter().map(|c| c.into()).collect();

let stream = self
.inner
Expand Down
9 changes: 5 additions & 4 deletions crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,9 +857,11 @@ impl DojoWorld {

async fn subscribe_events(
&self,
clause: proto::types::KeysClause,
clause: Vec<proto::types::EntityKeysClause>,
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
self.event_manager.add_subscriber(clause.into()).await
self.event_manager
.add_subscriber(clause.into_iter().map(|keys| keys.into()).collect())
.await
}
}

Expand Down Expand Up @@ -1260,8 +1262,7 @@ impl proto::world::world_server::World for DojoWorld {
&self,
request: Request<proto::world::SubscribeEventsRequest>,
) -> ServiceResult<Self::SubscribeEventsStream> {
let keys = request.into_inner().keys.unwrap_or_default();

let keys = request.into_inner().keys;
let rx = self.subscribe_events(keys).await.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeEventsStream))
Expand Down
65 changes: 3 additions & 62 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use torii_core::sql::FELT_DELIMITER;
use torii_core::types::OptimisticEntity;
use tracing::{error, trace};

use super::match_entity_keys;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::types::{EntityKeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::entity";

Expand Down Expand Up @@ -128,67 +129,7 @@ impl Service {

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !sub.clauses.is_empty()
&& !sub.clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&hashed)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &entity.updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
continue;
}

Expand Down
35 changes: 5 additions & 30 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use torii_core::sql::FELT_DELIMITER;
use torii_core::types::Event;
use tracing::{error, trace};

use super::match_keys;
use crate::proto;
use crate::proto::world::SubscribeEventsResponse;
use crate::types::{KeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event";

#[derive(Debug)]
pub struct EventSubscriber {
/// Event keys that the subscriber is interested in
keys: KeysClause,
keys: Vec<EntityKeysClause>,
/// The channel to send the response back to the subscriber.
sender: Sender<Result<proto::world::SubscribeEventsResponse, tonic::Status>>,
}
Expand All @@ -41,7 +42,7 @@ pub struct EventManager {
impl EventManager {
pub async fn add_subscriber(
&self,
keys: KeysClause,
keys: Vec<EntityKeysClause>,
) -> Result<Receiver<Result<proto::world::SubscribeEventsResponse, tonic::Status>>, Error> {
let id = rand::thread_rng().gen::<usize>();
let (sender, receiver) = channel(1);
Expand Down Expand Up @@ -108,33 +109,7 @@ impl Service {
.map_err(ParseError::from)?;

for (idx, sub) in subs.subscribers.read().await.iter() {
// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if sub.keys.pattern_matching == PatternMatching::FixedLen
&& keys.len() != sub.keys.keys.len()
{
continue;
}

if !keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber key pattern
// in this case we might want to list all events with the same
// key selector so we can match them all
let sub_key = sub.keys.keys.get(idx);

// if we have a key in the subscriber, it must match the key in the event
// unless its empty, which is a wildcard
match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
}) {
if !match_keys(&keys, &sub.keys) {
continue;
}

Expand Down
65 changes: 3 additions & 62 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use torii_core::types::OptimisticEventMessage;
use tracing::{error, trace};

use super::entity::EntitiesSubscriber;
use super::match_entity_keys;
use crate::proto;
use crate::proto::world::SubscribeEntityResponse;
use crate::types::{EntityKeysClause, PatternMatching};
use crate::types::EntityKeysClause;

pub(crate) const LOG_TARGET: &str = "torii::grpc::server::subscriptions::event_message";

Expand Down Expand Up @@ -120,67 +121,7 @@ impl Service {

// If we have a clause of keys, then check that the key pattern of the entity
// matches the key pattern of the subscriber.
if !sub.clauses.is_empty()
&& !sub.clauses.iter().any(|clause| match clause {
EntityKeysClause::HashedKeys(hashed_keys) => {
hashed_keys.is_empty() || hashed_keys.contains(&hashed)
}
EntityKeysClause::Keys(clause) => {
// if we have a model clause, then we need to check that the entity
// has an updated model and that the model name matches the clause
if let Some(updated_model) = &entity.updated_model {
let name = updated_model.name();
let (namespace, name) = name.split_once('-').unwrap();

if !clause.models.is_empty()
&& !clause.models.iter().any(|clause_model| {
let (clause_namespace, clause_model) =
clause_model.split_once('-').unwrap();
// if both namespace and model are empty, we should match all.
// if namespace is specified and model is empty or * we should
// match all models in the
// namespace if namespace
// and model are specified, we should match the
// specific model
(clause_namespace.is_empty()
|| clause_namespace == namespace
|| clause_namespace == "*")
&& (clause_model.is_empty()
|| clause_model == name
|| clause_model == "*")
})
{
return false;
}
}

// if the key pattern doesnt match our subscribers key pattern, skip
// ["", "0x0"] would match with keys ["0x...", "0x0", ...]
if clause.pattern_matching == PatternMatching::FixedLen
&& keys.len() != clause.keys.len()
{
return false;
}

return keys.iter().enumerate().all(|(idx, key)| {
// this is going to be None if our key pattern overflows the subscriber
// key pattern in this case we should skip
let sub_key = clause.keys.get(idx);

match sub_key {
// the key in the subscriber must match the key of the entity
// athis index
Some(Some(sub_key)) => key == sub_key,
// otherwise, if we have no key we should automatically match.
// or.. we overflowed the subscriber key pattern
// but we're in VariableLen pattern matching
// so we should match all next keys
_ => true,
}
});
}
})
{
if !match_entity_keys(hashed, &keys, &entity.updated_model, &sub.clauses) {
continue;
}

Expand Down
Loading

0 comments on commit 33fd66c

Please sign in to comment.