Move to sqlx

This commit is contained in:
2024-12-24 22:22:05 +01:00
parent afa304e3d4
commit ff7a2deaf4
29 changed files with 1812 additions and 11015 deletions

View File

@@ -1,8 +1,10 @@
use crate::{config::CONFIG, prisma::PrismaClient};
use crate::config::CONFIG;
pub async fn get_prisma_client() -> PrismaClient {
use sqlx::{postgres::PgPoolOptions, PgPool};
pub async fn get_prisma_client() -> PgPool {
let database_url: String = format!(
"postgresql://{}:{}@{}:{}/{}?connection_limit=10&pool_timeout=300",
"postgresql://{}:{}@{}:{}/{}",
CONFIG.postgres_user,
CONFIG.postgres_password,
CONFIG.postgres_host,
@@ -10,9 +12,10 @@ pub async fn get_prisma_client() -> PrismaClient {
CONFIG.postgres_db
);
PrismaClient::_builder()
.with_url(database_url)
.build()
PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(std::time::Duration::from_secs(300))
.connect(&database_url)
.await
.unwrap()
}

View File

@@ -1,6 +1,5 @@
pub mod config;
pub mod db;
pub mod prisma;
pub mod views;
use sentry::{integrations::debug_images::DebugImagesIntegration, types::Dsn, ClientOptions};

File diff suppressed because one or more lines are too long

View File

@@ -8,10 +8,13 @@ use axum::{
use chrono::Duration;
use serde::Deserialize;
use crate::prisma::chat_donate_notifications;
use super::Database;
#[derive(sqlx::FromRow)]
struct ChatDonateNotification {
pub sended: chrono::NaiveDateTime,
}
#[derive(Deserialize)]
struct IsNeedSendQuery {
is_private: bool,
@@ -25,12 +28,14 @@ async fn is_need_send(
const NOTIFICATION_DELTA_DAYS_PRIVATE: i64 = 60;
const NOTIFICATION_DELTA_DAYS: i64 = 7;
let notification = db
.chat_donate_notifications()
.find_unique(chat_donate_notifications::chat_id::equals(chat_id))
.exec()
.await
.unwrap();
let notification = sqlx::query_as!(
ChatDonateNotification,
r#"SELECT sended FROM chat_donate_notifications WHERE chat_id = $1"#,
chat_id
)
.fetch_optional(&db.0)
.await
.unwrap();
let delta_days = if query.is_private {
NOTIFICATION_DELTA_DAYS_PRIVATE
@@ -40,7 +45,7 @@ async fn is_need_send(
match notification {
Some(notification) => {
let result = notification.sended.naive_local() + Duration::days(delta_days)
let result = notification.sended + Duration::days(delta_days)
<= chrono::offset::Local::now().naive_local();
Json(result).into_response()
}
@@ -49,17 +54,17 @@ async fn is_need_send(
}
async fn mark_sent(Path(chat_id): Path<i64>, db: Database) -> impl IntoResponse {
let _ = db
.chat_donate_notifications()
.upsert(
chat_donate_notifications::chat_id::equals(chat_id),
chat_donate_notifications::create(chat_id, chrono::offset::Local::now().into(), vec![]),
vec![chat_donate_notifications::sended::set(
chrono::offset::Local::now().into(),
)],
)
.exec()
.await;
sqlx::query_as!(
ChatDonateNotification,
r#"INSERT INTO chat_donate_notifications (chat_id, sended) VALUES ($1, $2)
ON CONFLICT (chat_id) DO UPDATE SET sended = EXCLUDED.sended
RETURNING sended"#,
chat_id,
chrono::offset::Local::now().naive_local()
)
.fetch_one(&db.0)
.await
.unwrap();
StatusCode::OK
}

View File

@@ -2,49 +2,35 @@ use axum::{extract::Path, http::StatusCode, response::IntoResponse, routing::get
use serde::Serialize;
use super::Database;
use crate::prisma::language;
#[derive(Serialize)]
#[derive(sqlx::FromRow, Serialize)]
pub struct LanguageDetail {
pub id: i32,
pub label: String,
pub code: String,
}
impl From<language::Data> for LanguageDetail {
fn from(value: language::Data) -> Self {
let language::Data {
id, label, code, ..
} = value;
Self { id, label, code }
}
}
async fn get_languages(db: Database) -> impl IntoResponse {
let languages: Vec<LanguageDetail> = db
.language()
.find_many(vec![])
.exec()
let languages = sqlx::query_as!(LanguageDetail, "SELECT id, label, code FROM languages")
.fetch_all(&db.0)
.await
.unwrap()
.into_iter()
.map(|item| item.into())
.collect();
.unwrap();
Json(languages).into_response()
}
async fn get_language_by_code(Path(code): Path<String>, db: Database) -> impl IntoResponse {
let language = db
.language()
.find_unique(language::code::equals(code))
.exec()
.await
.unwrap();
let language = sqlx::query_as!(
LanguageDetail,
r#"SELECT id, label, code FROM languages WHERE code = $1"#,
code
)
.fetch_optional(&db.0)
.await
.unwrap();
match language {
Some(v) => Json::<LanguageDetail>(v.into()).into_response(),
Some(v) => Json(v).into_response(),
None => StatusCode::NOT_FOUND.into_response(),
}
}

View File

@@ -6,18 +6,19 @@ use axum::{
Extension, Router,
};
use axum_prometheus::PrometheusMetricLayer;
use sqlx::PgPool;
use std::sync::Arc;
use tower_http::trace::{self, TraceLayer};
use tracing::Level;
use crate::{config::CONFIG, db::get_prisma_client, prisma::PrismaClient};
use crate::{config::CONFIG, db::get_prisma_client};
pub mod donate_notifications;
pub mod languages;
pub mod pagination;
pub mod users;
pub type Database = Extension<Arc<PrismaClient>>;
pub type Database = Extension<PgPool>;
async fn auth(req: Request<axum::body::Body>, next: Next) -> Result<Response, StatusCode> {
let auth_header = req

View File

@@ -1,7 +1,6 @@
pub mod serializers;
pub mod utils;
use crate::prisma::{language_to_user, user_activity, user_settings};
use axum::{
extract::{Path, Query},
http::StatusCode,
@@ -9,9 +8,10 @@ use axum::{
routing::{get, post},
Json, Router,
};
use serializers::SimpleUser;
use self::{
serializers::{CreateOrUpdateUserData, UserDetail},
serializers::{CreateOrUpdateUserData, UserDetail, UserLanguage},
utils::update_languages,
};
@@ -23,112 +23,165 @@ use super::{
async fn get_users(pagination: Query<Pagination>, db: Database) -> impl IntoResponse {
let pagination: Pagination = pagination.0;
let users_count = db.user_settings().count(vec![]).exec().await.unwrap();
let users: Vec<UserDetail> = db
.user_settings()
.find_many(vec![])
.with(user_settings::languages::fetch(vec![]).with(language_to_user::language::fetch()))
.order_by(user_settings::id::order(prisma_client_rust::Direction::Asc))
.skip(pagination.skip())
.take(pagination.take())
.exec()
let users_count = sqlx::query_scalar(r#"SELECT COUNT(*) FROM user_settings"#)
.fetch_one(&db.0)
.await
.unwrap()
.into_iter()
.map(|item| item.into())
.collect();
.unwrap();
let users = sqlx::query_as!(
UserDetail,
r#"
SELECT
user_settings.id,
user_settings.user_id,
user_settings.last_name,
user_settings.first_name,
user_settings.username,
user_settings.source,
ARRAY_AGG((
languages.id,
languages.label,
languages.code
)) AS "allowed_langs: Vec<UserLanguage>"
FROM user_settings
LEFT JOIN users_languages ON user_settings.id = users_languages.user
LEFT JOIN languages ON users_languages.language = languages.id
GROUP BY user_settings.id
ORDER BY user_settings.id ASC
OFFSET $1
LIMIT $2
"#,
pagination.skip(),
pagination.take(),
)
.fetch_all(&db.0)
.await
.unwrap();
Json(Page::create(users, users_count, pagination)).into_response()
}
async fn get_user(Path(user_id): Path<i64>, db: Database) -> impl IntoResponse {
let user = db
.user_settings()
.find_unique(user_settings::user_id::equals(user_id))
.with(user_settings::languages::fetch(vec![]).with(language_to_user::language::fetch()))
.exec()
.await
.unwrap();
let user = sqlx::query_as!(
UserDetail,
r#"
SELECT
user_settings.id,
user_settings.user_id,
user_settings.last_name,
user_settings.first_name,
user_settings.username,
user_settings.source,
ARRAY_AGG((
languages.id,
languages.label,
languages.code
)) AS "allowed_langs: Vec<UserLanguage>"
FROM user_settings
LEFT JOIN users_languages ON user_settings.id = users_languages.user
LEFT JOIN languages ON users_languages.language = languages.id
WHERE user_settings.user_id = $1
GROUP BY user_settings.id
"#,
user_id,
)
.fetch_optional(&db.0)
.await
.unwrap();
if user.is_none() {
return StatusCode::NO_CONTENT.into_response();
}
Json::<UserDetail>(user.unwrap().into()).into_response()
Json::<UserDetail>(user.unwrap()).into_response()
}
async fn create_or_update_user(
db: Database,
Json(data): Json<CreateOrUpdateUserData>,
) -> impl IntoResponse {
let user = db
.user_settings()
.upsert(
user_settings::user_id::equals(data.user_id),
user_settings::create(
data.user_id,
data.last_name.clone(),
data.first_name.clone(),
data.username.clone(),
data.source.clone(),
vec![],
),
vec![
user_settings::last_name::set(data.last_name),
user_settings::first_name::set(data.first_name),
user_settings::username::set(data.username),
user_settings::source::set(data.source),
],
)
.with(user_settings::languages::fetch(vec![]).with(language_to_user::language::fetch()))
.exec()
.await
.unwrap();
let user = sqlx::query_as!(
SimpleUser,
r#"
INSERT INTO user_settings (user_id, last_name, first_name, username, source)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (user_id) DO UPDATE
SET last_name = $2, first_name = $3, username = $4, source = $5
RETURNING id, user_id, last_name, first_name, username, source
"#,
data.user_id,
data.last_name,
data.first_name,
data.username,
data.source,
)
.fetch_one(&db.0)
.await
.unwrap();
let user_id = user.id;
update_languages(user, data.allowed_langs, db.clone()).await;
update_languages(user.id, data.allowed_langs, db.clone()).await;
let user = db
.user_settings()
.find_unique(user_settings::id::equals(user_id))
.with(user_settings::languages::fetch(vec![]).with(language_to_user::language::fetch()))
.exec()
.await
.unwrap()
.unwrap();
let user = sqlx::query_as!(
UserDetail,
r#"
SELECT
user_settings.id,
user_settings.user_id,
user_settings.last_name,
user_settings.first_name,
user_settings.username,
user_settings.source,
ARRAY_AGG((
languages.id,
languages.label,
languages.code
)) AS "allowed_langs: Vec<UserLanguage>"
FROM user_settings
LEFT JOIN users_languages ON user_settings.id = users_languages.user
LEFT JOIN languages ON users_languages.language = languages.id
WHERE user_settings.id = $1
GROUP BY user_settings.id
"#,
user.id,
)
.fetch_one(&db.0)
.await
.unwrap();
Json::<UserDetail>(user.into()).into_response()
Json::<UserDetail>(user).into_response()
}
async fn update_activity(Path(user_id): Path<i64>, db: Database) -> impl IntoResponse {
let user = db
.user_settings()
.find_unique(user_settings::user_id::equals(user_id))
.exec()
.await
.unwrap();
let user = sqlx::query_as!(
SimpleUser,
r#"
SELECT id, user_id, last_name, first_name, username, source
FROM user_settings
WHERE user_id = $1
"#,
user_id,
)
.fetch_optional(&db.0)
.await
.unwrap();
let user = match user {
Some(v) => v,
None => return StatusCode::NOT_FOUND.into_response(),
};
let _ = db
.user_activity()
.upsert(
user_activity::user_id::equals(user.id),
user_activity::create(
chrono::offset::Local::now().into(),
user_settings::id::equals(user.id),
vec![],
),
vec![user_activity::updated::set(
chrono::offset::Local::now().into(),
)],
)
.exec()
.await;
sqlx::query!(
r#"
INSERT INTO user_activity ("user", updated)
VALUES ($1, NOW())
ON CONFLICT ("user") DO UPDATE
SET updated = NOW()
"#,
user.id,
)
.execute(&db.0)
.await
.unwrap();
StatusCode::OK.into_response()
}

View File

@@ -1,25 +1,23 @@
use serde::{Deserialize, Serialize};
use crate::prisma::{language, user_settings};
#[derive(Serialize)]
#[derive(sqlx::FromRow, sqlx::Type, Serialize)]
pub struct UserLanguage {
pub id: i32,
pub label: String,
pub code: String,
}
impl From<language::Data> for UserLanguage {
fn from(value: language::Data) -> Self {
Self {
id: value.id,
label: value.label,
code: value.code,
}
}
#[derive(sqlx::FromRow, Serialize)]
pub struct SimpleUser {
pub id: i32,
pub user_id: i64,
pub last_name: String,
pub first_name: String,
pub username: String,
pub source: String,
}
#[derive(Serialize)]
#[derive(sqlx::FromRow, Serialize)]
pub struct UserDetail {
pub id: i32,
pub user_id: i64,
@@ -27,29 +25,8 @@ pub struct UserDetail {
pub first_name: String,
pub username: String,
pub source: String,
pub allowed_langs: Vec<UserLanguage>,
}
impl From<user_settings::Data> for UserDetail {
fn from(value: user_settings::Data) -> Self {
let allowed_langs: Vec<UserLanguage> = value
.languages
.unwrap()
.into_iter()
.map(|item| *item.language.unwrap())
.map(|item| item.into())
.collect();
Self {
id: value.id,
user_id: value.user_id,
last_name: value.last_name,
first_name: value.first_name,
username: value.username,
source: value.source,
allowed_langs,
}
}
#[serde(default)]
pub allowed_langs: Option<Vec<UserLanguage>>,
}
#[derive(Deserialize)]

View File

@@ -1,68 +1,32 @@
use std::collections::HashMap;
use crate::views::Database;
use crate::{
prisma::{language, language_to_user, user_settings},
views::Database,
};
pub async fn update_languages(user: i32, new_langs: Vec<String>, db: Database) {
sqlx::query!(
r#"
DELETE FROM users_languages
WHERE "user" = $1 AND language NOT IN (
SELECT id FROM languages WHERE code = ANY($2)
)
"#,
user,
&new_langs
)
.execute(&db.0)
.await
.unwrap();
pub async fn update_languages(user: user_settings::Data, new_langs: Vec<String>, db: Database) {
// Delete
{
let need_delete: Vec<_> = user
.languages()
.unwrap()
.iter()
.map(|item| {
let language::Data { code, .. } = *item.clone().language.unwrap();
(item.id, code)
})
.filter(|(_, code)| !new_langs.contains(code))
.map(|(id, _)| id)
.collect();
let _ = db
.language_to_user()
.delete_many(vec![language_to_user::id::in_vec(need_delete)])
.exec()
.await;
}
// Create
{
let languages: HashMap<_, _> = db
.language()
.find_many(vec![])
.exec()
.await
.unwrap()
.into_iter()
.map(|l| (l.code, l.id))
.collect();
let current_langs: Vec<_> = user
.languages()
.unwrap()
.iter()
.map(|item| item.clone().language.unwrap().code)
.collect();
let need_create: Vec<i32> = new_langs
.into_iter()
.filter(|code| !current_langs.contains(code))
.map(|code| *languages.get(&code).unwrap())
.collect();
let _ = db
.language_to_user()
.create_many(
need_create
.iter()
.map(|language_id| {
language_to_user::create_unchecked(*language_id, user.id, vec![])
})
.collect(),
)
.exec()
.await;
}
sqlx::query!(
r#"
INSERT INTO users_languages ("user", language)
SELECT $1, id
FROM languages
WHERE code = ANY($2)
ON CONFLICT DO NOTHING
"#,
user,
&new_langs
)
.execute(&db.0)
.await
.unwrap();
}