Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pg_lsp): use tokio, partially remove crossbeam channels #143

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
88 changes: 85 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/pg_lsp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pg_base_db.workspace = true
pg_schema_cache.workspace = true
pg_workspace.workspace = true
pg_diagnostics.workspace = true
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "sync"] }
tokio-util = "0.7.12"

[dev-dependencies]

Expand Down
8 changes: 8 additions & 0 deletions crates/pg_lsp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ impl LspClient {
Ok(())
}

/// This will ignore any errors that occur while sending the notification.
pub fn send_info_notification(&self, message: &str) {
let _ = self.send_notification::<ShowMessage>(ShowMessageParams {
message: message.into(),
typ: MessageType::INFO,
});
}

pub fn send_request<R>(&self, params: R::Params) -> Result<R::Result>
where
R: lsp_types::request::Request,
Expand Down
34 changes: 30 additions & 4 deletions crates/pg_lsp/src/client/client_flags.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,36 @@
use lsp_types::InitializeParams;

/// Contains information about the client's capabilities.
/// This is used to determine which features the server can use.
#[derive(Debug, Clone)]
pub struct ClientFlags {
/// If `true`, the server can pull the configuration from the client.
pub configuration_pull: bool,
/// If `true`, the server can pull configuration from the client.
pub has_configuration: bool,

/// If `true`, the client notifies the server when its configuration changes.
pub will_push_configuration: bool,
}

impl ClientFlags {
pub(crate) fn from_initialize_request_params(params: &InitializeParams) -> Self {
let has_configuration = params
.capabilities
.workspace
.as_ref()
.and_then(|w| w.configuration)
.unwrap_or(false);

let will_push_configuration = params
.capabilities
.workspace
.as_ref()
.and_then(|w| w.did_change_configuration)
.and_then(|c| c.dynamic_registration)
.unwrap_or(false);

/// If `true`, the client notifies the server when the configuration changes.
pub configuration_push: bool,
Self {
has_configuration,
will_push_configuration,
}
}
}
66 changes: 66 additions & 0 deletions crates/pg_lsp/src/db_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use pg_schema_cache::SchemaCache;
use sqlx::{postgres::PgListener, PgPool};
use tokio::task::JoinHandle;

#[derive(Debug)]
pub(crate) struct DbConnection {
pub pool: PgPool,
connection_string: String,
schema_update_handle: Option<JoinHandle<()>>,
}

impl DbConnection {
pub(crate) async fn new(connection_string: String) -> Result<Self, sqlx::Error> {
let pool = PgPool::connect(&connection_string).await?;
Ok(Self {
pool,
connection_string: connection_string,
schema_update_handle: None,
})
}

pub(crate) fn connected_to(&self, connection_string: &str) -> bool {
connection_string == self.connection_string
}

pub(crate) async fn close(self) {
if self.schema_update_handle.is_some() {
self.schema_update_handle.unwrap().abort();
}
self.pool.close().await;
}

pub(crate) async fn listen_for_schema_updates<F>(
&mut self,
on_schema_update: F,
) -> anyhow::Result<()>
where
F: Fn(SchemaCache) -> () + Send + 'static,
{
let mut listener = PgListener::connect_with(&self.pool).await?;
listener.listen_all(["postgres_lsp", "pgrst"]).await?;

let pool = self.pool.clone();

let handle: JoinHandle<()> = tokio::spawn(async move {
loop {
match listener.recv().await {
Ok(not) => {
if not.payload().to_string() == "reload schema" {
let schema_cache = SchemaCache::load(&pool).await;
on_schema_update(schema_cache);
};
}
Err(why) => {
eprintln!("Error receiving notification: {:?}", why);
break;
}
}
}
});

self.schema_update_handle = Some(handle);

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/pg_lsp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod client;
mod db_connection;
pub mod server;
mod utils;
7 changes: 5 additions & 2 deletions crates/pg_lsp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use lsp_server::Connection;
use pg_lsp::server::Server;

fn main() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (connection, threads) = Connection::stdio();
Server::init(connection)?;

let server = Server::init(connection)?;
server.run().await?;
threads.join()?;

Ok(())
Expand Down
Loading
Loading