Skip to content

Commit

Permalink
worker: Implement SyncAdmins background job
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 committed Jan 3, 2024
1 parent 7247bae commit 85a9829
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/tests/worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
mod git;
mod sync_admins;
83 changes: 83 additions & 0 deletions src/tests/worker/sync_admins.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::util::TestApp;
use crates_io::schema::users;
use crates_io::team_repo::{Member, MockTeamRepo, Team};
use crates_io::worker::jobs::SyncAdmins;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel::{PgConnection, QueryResult, RunQueryDsl};

#[test]
fn test_sync_admins_job() {
let mock_response = mock_team(
"crates-io",
vec![
mock_member("existing-admin", 1),
mock_member("new-admin", 3),
],
);

let mut team_repo = MockTeamRepo::new();
team_repo
.expect_get_team()
.with(mockall::predicate::eq("crates-io-admins"))
.returning(move |_| Ok(mock_response.clone()));

let (app, _) = TestApp::full().with_team_repo(team_repo).empty();

app.db(|conn| create_user("existing-admin", 1, true, conn).unwrap());
app.db(|conn| create_user("obsolete-admin", 2, true, conn).unwrap());
app.db(|conn| create_user("new-admin", 3, false, conn).unwrap());
app.db(|conn| create_user("unrelated-user", 42, false, conn).unwrap());

let admins = app.db(|conn| get_admins(conn).unwrap());
let expected_admins = vec![("existing-admin".into(), 1), ("obsolete-admin".into(), 2)];
assert_eq!(admins, expected_admins);

app.db(|conn| SyncAdmins.enqueue(conn).unwrap());
app.run_pending_background_jobs();

let admins = app.db(|conn| get_admins(conn).unwrap());
let expected_admins = vec![("existing-admin".into(), 1), ("new-admin".into(), 3)];
assert_eq!(admins, expected_admins);
}

fn mock_team(name: impl Into<String>, members: Vec<Member>) -> Team {
Team {
name: name.into(),
kind: "marker-team".to_string(),
members,
}
}

fn mock_member(name: impl Into<String>, github_id: i32) -> Member {
let name = name.into();
let github = name.clone();
Member {
name,
github,
github_id,
is_lead: false,
}
}

fn create_user(name: &str, gh_id: i32, is_admin: bool, conn: &mut PgConnection) -> QueryResult<()> {
diesel::insert_into(users::table)
.values((
users::name.eq(name),
users::gh_login.eq(name),
users::gh_id.eq(gh_id),
users::gh_access_token.eq("some random token"),
users::is_admin.eq(is_admin),
))
.execute(conn)?;

Ok(())
}

fn get_admins(conn: &mut PgConnection) -> QueryResult<Vec<(String, i32)>> {
users::table
.select((users::gh_login, users::gh_id))
.filter(users::is_admin.eq(true))
.order(users::gh_id.asc())
.get_results(conn)
}
2 changes: 2 additions & 0 deletions src/worker/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ mod daily_db_maintenance;
pub mod dump_db;
mod git;
mod readmes;
mod sync_admins;
mod typosquat;
mod update_downloads;

pub use self::daily_db_maintenance::DailyDbMaintenance;
pub use self::dump_db::DumpDb;
pub use self::git::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex};
pub use self::readmes::RenderAndUploadReadme;
pub use self::sync_admins::SyncAdmins;
pub use self::typosquat::CheckTyposquat;
pub use self::update_downloads::UpdateDownloads;

Expand Down
93 changes: 93 additions & 0 deletions src/worker/jobs/sync_admins.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use crate::schema::users;
use crate::tasks::spawn_blocking;
use crate::worker::Environment;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel::RunQueryDsl;
use std::collections::HashSet;
use std::sync::Arc;

/// See <https://github.com/rust-lang/team/blob/master/teams/crates-io-admins.toml>.
const TEAM_NAME: &str = "crates-io-admins";

#[derive(Serialize, Deserialize)]
pub struct SyncAdmins;

impl BackgroundJob for SyncAdmins {
const JOB_NAME: &'static str = "sync_admins";

type Context = Arc<Environment>;

async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
info!("Syncing admins from rust-lang/team repo…");

let repo_admins = ctx.team_repo.get_team(TEAM_NAME).await?.members;
let repo_admin_ids = repo_admins
.iter()
.map(|m| m.github_id)
.collect::<HashSet<_>>();

spawn_blocking::<_, _, anyhow::Error>(move || {
let mut conn = ctx.connection_pool.get()?;

let database_admins = users::table
.select((users::gh_id, users::gh_login))
.filter(users::is_admin.eq(true))
.get_results::<(i32, String)>(&mut conn)?;

let database_admin_ids = database_admins
.iter()
.map(|(gh_id, _)| *gh_id)
.collect::<HashSet<_>>();

let new_admin_ids = repo_admin_ids
.difference(&database_admin_ids)
.collect::<HashSet<_>>();

if new_admin_ids.is_empty() {
debug!("No new admins to add");
} else {
let new_admins = repo_admins
.iter()
.filter(|m| new_admin_ids.contains(&&m.github_id))
.map(|m| format!("{} (github_id: {})", m.github, m.github_id))
.collect::<Vec<_>>()
.join(", ");

info!("Adding new admins: {}", new_admins);

diesel::update(users::table)
.filter(users::gh_id.eq_any(new_admin_ids))
.set(users::is_admin.eq(true))
.execute(&mut conn)?;
}

let obsolete_admin_ids = database_admin_ids
.difference(&repo_admin_ids)
.collect::<HashSet<_>>();

if obsolete_admin_ids.is_empty() {
debug!("No obsolete admins to remove");
} else {
let obsolete_admins = database_admins
.iter()
.filter(|(gh_id, _)| obsolete_admin_ids.contains(&gh_id))
.map(|(gh_id, login)| format!("{} (github_id: {})", login, gh_id))
.collect::<Vec<_>>()
.join(", ");

info!("Removing obsolete admins: {}", obsolete_admins);

diesel::update(users::table)
.filter(users::gh_id.eq_any(obsolete_admin_ids))
.set(users::is_admin.eq(false))
.execute(&mut conn)?;
}

Ok(())
})
.await?;

Ok(())
}
}
1 change: 1 addition & 0 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl RunnerExt for Runner<Arc<Environment>> {
.register_job_type::<jobs::NormalizeIndex>()
.register_job_type::<jobs::RenderAndUploadReadme>()
.register_job_type::<jobs::SquashIndex>()
.register_job_type::<jobs::SyncAdmins>()
.register_job_type::<jobs::SyncToGitIndex>()
.register_job_type::<jobs::SyncToSparseIndex>()
.register_job_type::<jobs::UpdateDownloads>()
Expand Down

0 comments on commit 85a9829

Please sign in to comment.