Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add full-text search on windmill service logs #4576

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 42 additions & 6 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ Windmill Community Edition {GIT_VERSION}

#[cfg(feature = "tantivy")]
let (index_reader, index_writer) = if should_index_jobs {
let (r, w) = windmill_indexer::indexer_ee::init_index(&db).await?;
let (r, w) = windmill_indexer::completed_runs_ee::init_index(&db).await?;
(Some(r), Some(w))
} else {
(None, None)
Expand All @@ -488,26 +488,61 @@ Windmill Community Edition {GIT_VERSION}
let index_writer2 = index_writer.clone();
async {
if let Some(index_writer) = index_writer2 {
windmill_indexer::indexer_ee::run_indexer(db.clone(), index_writer, indexer_rx)
.await;
windmill_indexer::completed_runs_ee::run_indexer(
db.clone(),
index_writer,
indexer_rx,
)
.await;
}
Ok(())
}
};

#[cfg(all(feature = "tantivy", feature = "parquet"))]
let (log_index_reader, log_index_writer) = if should_index_jobs {
let (r, w) = windmill_indexer::service_logs_ee::init_index(&db).await?;
(Some(r), Some(w))
} else {
(None, None)
};

#[cfg(all(feature = "tantivy", feature = "parquet"))]
let log_indexer_f = {
let log_indexer_rx = killpill_rx.resubscribe();
let log_index_writer2 = log_index_writer.clone();
async {
if let Some(log_index_writer) = log_index_writer2 {
windmill_indexer::service_logs_ee::run_indexer(
db.clone(),
log_index_writer,
log_indexer_rx,
)
.await;
}
Ok(())
}
};

#[cfg(not(feature = "tantivy"))]
let (index_reader, index_writer) = (None, None);
let index_reader = None;

#[cfg(not(feature = "tantivy"))]
let indexer_f = async { Ok(()) as anyhow::Result<()> };

#[cfg(not(all(feature = "tantivy", feature = "parquet")))]
let log_index_reader = None;

#[cfg(not(all(feature = "tantivy", feature = "parquet")))]
let log_indexer_f = async { Ok(()) as anyhow::Result<()> };

let server_f = async {
if !is_agent {
windmill_api::run_server(
db.clone(),
rsmq2,
index_reader,
index_writer,
log_index_reader,
addr,
server_killpill_rx,
base_internal_tx,
Expand Down Expand Up @@ -779,7 +814,8 @@ Windmill Community Edition {GIT_VERSION}
monitor_f,
server_f,
metrics_f,
indexer_f
indexer_f,
log_indexer_f
)?;
} else {
tracing::info!("Nothing to do, exiting.");
Expand Down
109 changes: 109 additions & 0 deletions backend/windmill-api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9518,6 +9518,109 @@ paths:
items:
$ref: "#/components/schemas/JobSearchHit"

/srch/index/search/service_logs:
get:
summary: Search through service logs with a string query
operationId: searchLogsIndex
tags:
- indexSearch
parameters:
- name: search_query
in: query
required: true
schema:
type: string
- name: mode
in: query
required: true
schema:
type: string
- name: worker_group
in: query
required: false
schema:
type: string
- name: hostname
in: query
required: true
schema:
type: string
- name: min_ts
in: query
required: false
schema:
type: string
format: date-time
- name: max_ts
in: query
required: false
schema:
type: string
format: date-time
responses:
"200":
description: search results
content:
application/json:
schema:
type: object
properties:
query_parse_errors:
description: a list of the terms that couldn't be parsed (and thus ignored)
type: array
items:
type: string
hits:
description: log files that matched the query
type: array
items:
$ref: "#/components/schemas/LogSearchHit"

/srch/index/search/count_service_logs:
get:
summary: Search and count the log line hits on every provided host
operationId: countSearchLogsIndex
tags:
- indexSearch
parameters:
- name: search_query
in: query
required: true
schema:
type: string
- name: hosts
in: query
required: true
schema:
type: string
- name: min_ts
in: query
required: false
schema:
type: string
format: date-time
- name: max_ts
in: query
required: false
schema:
type: string
format: date-time
responses:
"200":
description: search results
content:
application/json:
schema:
type: object
properties:
query_parse_errors:
description: a list of the terms that couldn't be parsed (and thus ignored)
type: array
items:
type: string
count_per_host:
description: count of log lines that matched the query per hostname
type: object

components:
securitySchemes:
Expand Down Expand Up @@ -12469,3 +12572,9 @@ components:
properties:
dancer:
type: string

LogSearchHit:
type: object
properties:
dancer:
type: string
4 changes: 4 additions & 0 deletions backend/windmill-api/src/indexer_ee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ use axum::Router;
pub fn workspaced_service() -> Router {
Router::new()
}

pub fn global_service() -> Router {
Router::new()
}
19 changes: 12 additions & 7 deletions backend/windmill-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,18 @@ pub async fn add_webhook_allowed_origin(
type IndexReader = ();

#[cfg(not(feature = "tantivy"))]
type IndexWriter = ();
type ServiceLogIndexReader = ();

#[cfg(feature = "tantivy")]
type IndexReader = windmill_indexer::indexer_ee::IndexReader;
type IndexReader = windmill_indexer::completed_runs_ee::IndexReader;
#[cfg(feature = "tantivy")]
type IndexWriter = windmill_indexer::indexer_ee::IndexWriter;
type ServiceLogIndexReader = windmill_indexer::service_logs_ee::ServiceLogIndexReader;

pub async fn run_server(
db: DB,
rsmq: Option<rsmq_async::MultiplexedRsmq>,
index_reader: Option<IndexReader>,
index_writer: Option<IndexWriter>,
job_index_reader: Option<IndexReader>,
log_index_reader: Option<ServiceLogIndexReader>,
addr: SocketAddr,
mut rx: tokio::sync::broadcast::Receiver<()>,
port_tx: tokio::sync::oneshot::Sender<String>,
Expand Down Expand Up @@ -203,8 +203,9 @@ pub async fn run_server(
.layer(Extension(rsmq.clone()))
.layer(Extension(user_db.clone()))
.layer(Extension(auth_cache.clone()))
.layer(Extension(index_reader))
.layer(Extension(index_writer))
.layer(Extension(job_index_reader))
.layer(Extension(log_index_reader))
// .layer(Extension(index_writer))
.layer(CookieManagerLayer::new())
.layer(Extension(WebhookShared::new(rx.resubscribe(), db.clone())))
.layer(DefaultBodyLimit::max(
Expand Down Expand Up @@ -320,6 +321,10 @@ pub async fn run_server(
"/srch/w/:workspace_id/index",
indexer_ee::workspaced_service(),
)
.nest(
"/srch/index",
indexer_ee::global_service(),
)
.nest("/oidc", oidc_ee::global_service())
.nest(
"/saml",
Expand Down
Loading
Loading