Skip to content

Commit

Permalink
iterator: rename RowIteratorWorker to PagerWorker
Browse files Browse the repository at this point in the history
Not to use legacy naming, (SingleConnection)RowIteratorWorker is renamed
to (SingleConnection)PagerWorker.
  • Loading branch information
wprzytula committed Nov 12, 2024
1 parent 11c8406 commit 4971667
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ use checked_channel_sender::{ProvingSender, SendAttemptedProof};

type PageSendAttemptedProof = SendAttemptedProof<Result<ReceivedPage, QueryError>>;

// RowIteratorWorker works in the background to fetch pages
// RowIterator receives them through a channel
struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> {
// PagerWorker works in the background to fetch pages
// QueryPager receives them through a channel
struct PagerWorker<'a, QueryFunc, SpanCreatorFunc> {
sender: ProvingSender<Result<ReceivedPage, QueryError>>,

// Closure used to perform a single page query
Expand All @@ -153,7 +153,7 @@ struct RowIteratorWorker<'a, QueryFunc, SpanCreatorFunc> {
span_creator: SpanCreatorFunc,
}

impl<QueryFunc, QueryFut, SpanCreator> RowIteratorWorker<'_, QueryFunc, SpanCreator>
impl<QueryFunc, QueryFut, SpanCreator> PagerWorker<'_, QueryFunc, SpanCreator>
where
QueryFunc: Fn(Arc<Connection>, Consistency, PagingState) -> QueryFut,
QueryFut: Future<Output = Result<QueryResponse, UserRequestError>>,
Expand Down Expand Up @@ -260,7 +260,7 @@ where
}
}

// Send last_error to RowIterator - query failed fully
// Send last_error to QueryPager - query failed fully
self.log_query_error(&last_error);
let (proof, _) = self.sender.send(Err(last_error)).await;
proof
Expand Down Expand Up @@ -333,10 +333,10 @@ where

let received_page = ReceivedPage { rows, tracing_id };

// Send next page to RowIterator
// Send next page to QueryPager
let (proof, res) = self.sender.send(Ok(received_page)).await;
if res.is_err() {
// channel was closed, RowIterator was dropped - should shutdown
// channel was closed, QueryPager was dropped - should shutdown
return Ok(ControlFlow::Break(proof));
}

Expand Down Expand Up @@ -469,15 +469,15 @@ where
}
}

/// A massively simplified version of the RowIteratorWorker. It does not have
/// A massively simplified version of the PagerWorker. It does not have
/// any complicated logic related to retries, it just fetches pages from
/// a single connection.
struct SingleConnectionRowIteratorWorker<Fetcher> {
struct SingleConnectionPagerWorker<Fetcher> {
sender: ProvingSender<Result<ReceivedPage, QueryError>>,
fetcher: Fetcher,
}

impl<Fetcher, FetchFut> SingleConnectionRowIteratorWorker<Fetcher>
impl<Fetcher, FetchFut> SingleConnectionPagerWorker<Fetcher>
where
Fetcher: Fn(PagingState) -> FetchFut + Send + Sync,
FetchFut: Future<Output = Result<QueryResponse, UserRequestError>> + Send,
Expand Down Expand Up @@ -508,7 +508,7 @@ where
.await;

if send_result.is_err() {
// channel was closed, RowIterator was dropped - should shutdown
// channel was closed, QueryPager was dropped - should shutdown
return Ok(proof);
}

Expand Down Expand Up @@ -742,7 +742,7 @@ impl QueryPager {
span
};

let worker = RowIteratorWorker {
let worker = PagerWorker {
sender: sender.into(),
page_query,
statement_info: routing_info,
Expand Down Expand Up @@ -860,7 +860,7 @@ impl QueryPager {
span
};

let worker = RowIteratorWorker {
let worker = PagerWorker {
sender: sender.into(),
page_query,
statement_info,
Expand Down Expand Up @@ -894,7 +894,7 @@ impl QueryPager {
let page_size = query.get_validated_page_size();

let worker_task = async move {
let worker = SingleConnectionRowIteratorWorker {
let worker = SingleConnectionPagerWorker {
sender: sender.into(),
fetcher: |paging_state| {
connection.query_raw_with_consistency(
Expand Down Expand Up @@ -924,7 +924,7 @@ impl QueryPager {
let page_size = prepared.get_validated_page_size();

let worker_task = async move {
let worker = SingleConnectionRowIteratorWorker {
let worker = SingleConnectionPagerWorker {
sender: sender.into(),
fetcher: |paging_state| {
connection.execute_raw_with_consistency(
Expand Down

0 comments on commit 4971667

Please sign in to comment.