mirror of
https://github.com/flibusta-apps/telegram_files_cache_server.git
synced 2025-12-06 06:35:38 +01:00
Move to sqlx
This commit is contained in:
BIN
src/.DS_Store
vendored
BIN
src/.DS_Store
vendored
Binary file not shown.
12
src/db.rs
12
src/db.rs
@@ -1,6 +1,8 @@
|
||||
use crate::{config::CONFIG, prisma::PrismaClient};
|
||||
use crate::config::CONFIG;
|
||||
|
||||
pub async fn get_prisma_client() -> PrismaClient {
|
||||
use sqlx::{postgres::PgPoolOptions, PgPool};
|
||||
|
||||
pub async fn get_pg_pool() -> PgPool {
|
||||
let database_url: String = format!(
|
||||
"postgresql://{}:{}@{}:{}/{}?connection_limit=10&pool_timeout=300",
|
||||
CONFIG.postgres_user,
|
||||
@@ -10,9 +12,9 @@ pub async fn get_prisma_client() -> PrismaClient {
|
||||
CONFIG.postgres_db
|
||||
);
|
||||
|
||||
PrismaClient::_builder()
|
||||
.with_url(database_url)
|
||||
.build()
|
||||
PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&database_url)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
pub mod config;
|
||||
pub mod db;
|
||||
pub mod prisma;
|
||||
pub mod repository;
|
||||
pub mod serializers;
|
||||
pub mod services;
|
||||
pub mod views;
|
||||
|
||||
|
||||
1351
src/prisma.rs
1351
src/prisma.rs
File diff suppressed because one or more lines are too long
@@ -1,6 +1,4 @@
|
||||
use prisma_client_rust::QueryError;
|
||||
|
||||
use crate::{prisma::cached_file, views::Database};
|
||||
use crate::{serializers::CachedFile, views::Database};
|
||||
|
||||
pub struct CachedFileRepository {
|
||||
db: Database,
|
||||
@@ -15,11 +13,18 @@ impl CachedFileRepository {
|
||||
&self,
|
||||
object_id: i32,
|
||||
object_type: String,
|
||||
) -> Result<cached_file::Data, QueryError> {
|
||||
self.db
|
||||
.cached_file()
|
||||
.delete(cached_file::object_id_object_type(object_id, object_type))
|
||||
.exec()
|
||||
.await
|
||||
) -> Result<CachedFile, sqlx::Error> {
|
||||
sqlx::query_as!(
|
||||
CachedFile,
|
||||
r#"
|
||||
DELETE FROM cached_files
|
||||
WHERE object_id = $1 AND object_type = $2
|
||||
RETURNING *
|
||||
"#,
|
||||
object_id,
|
||||
object_type
|
||||
)
|
||||
.fetch_one(&self.db)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
8
src/serializers.rs
Normal file
8
src/serializers.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
#[derive(sqlx::FromRow, serde::Serialize)]
|
||||
pub struct CachedFile {
|
||||
pub id: i32,
|
||||
pub object_id: i32,
|
||||
pub object_type: String,
|
||||
pub message_id: i64,
|
||||
pub chat_id: i64,
|
||||
}
|
||||
@@ -14,7 +14,7 @@ use teloxide::{
|
||||
};
|
||||
use tracing::log;
|
||||
|
||||
use crate::{config, prisma::cached_file, repository::CachedFileRepository, views::Database};
|
||||
use crate::{config, repository::CachedFileRepository, serializers::CachedFile, views::Database};
|
||||
|
||||
use self::{
|
||||
book_library::{get_book, get_books, types::BaseBook},
|
||||
@@ -55,16 +55,18 @@ pub async fn get_cached_file_or_cache(
|
||||
object_id: i32,
|
||||
object_type: String,
|
||||
db: Database,
|
||||
) -> Option<cached_file::Data> {
|
||||
let cached_file = db
|
||||
.cached_file()
|
||||
.find_unique(cached_file::object_id_object_type(
|
||||
object_id,
|
||||
object_type.clone(),
|
||||
))
|
||||
.exec()
|
||||
.await
|
||||
.unwrap();
|
||||
) -> Option<CachedFile> {
|
||||
let cached_file = sqlx::query_as!(
|
||||
CachedFile,
|
||||
r#"
|
||||
SELECT * FROM cached_files
|
||||
WHERE object_id = $1 AND object_type = $2"#,
|
||||
object_id,
|
||||
object_type
|
||||
)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match cached_file {
|
||||
Some(cached_file) => Some(cached_file),
|
||||
@@ -72,7 +74,7 @@ pub async fn get_cached_file_or_cache(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_cached_file_copy(original: cached_file::Data, db: Database) -> CacheData {
|
||||
pub async fn get_cached_file_copy(original: CachedFile, db: Database) -> CacheData {
|
||||
let bot = ROUND_ROBIN_BOT.get_bot();
|
||||
|
||||
let message_id = match bot
|
||||
@@ -85,11 +87,16 @@ pub async fn get_cached_file_copy(original: cached_file::Data, db: Database) ->
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(_) => {
|
||||
let _ = db
|
||||
.cached_file()
|
||||
.delete(cached_file::id::equals(original.id))
|
||||
.exec()
|
||||
.await;
|
||||
sqlx::query!(
|
||||
r#"
|
||||
DELETE FROM cached_files
|
||||
WHERE id = $1
|
||||
"#,
|
||||
original.id
|
||||
)
|
||||
.execute(&db)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let new_original =
|
||||
get_cached_file_or_cache(original.object_id, original.object_type.clone(), db)
|
||||
@@ -117,11 +124,7 @@ pub async fn get_cached_file_copy(original: cached_file::Data, db: Database) ->
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cache_file(
|
||||
object_id: i32,
|
||||
object_type: String,
|
||||
db: Database,
|
||||
) -> Option<cached_file::Data> {
|
||||
pub async fn cache_file(object_id: i32, object_type: String, db: Database) -> Option<CachedFile> {
|
||||
let book = match get_book(object_id).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
@@ -154,18 +157,23 @@ pub async fn cache_file(
|
||||
};
|
||||
|
||||
Some(
|
||||
db.cached_file()
|
||||
.create(object_id, object_type, message_id, chat_id, vec![])
|
||||
.exec()
|
||||
.await
|
||||
.unwrap(),
|
||||
sqlx::query_as!(
|
||||
CachedFile,
|
||||
r#"INSERT INTO cached_files (object_id, object_type, message_id, chat_id)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING *"#,
|
||||
object_id,
|
||||
object_type,
|
||||
message_id,
|
||||
chat_id
|
||||
)
|
||||
.fetch_one(&db)
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn download_from_cache(
|
||||
cached_data: cached_file::Data,
|
||||
db: Database,
|
||||
) -> Option<DownloadResult> {
|
||||
pub async fn download_from_cache(cached_data: CachedFile, db: Database) -> Option<DownloadResult> {
|
||||
let response_task = tokio::task::spawn(download_from_telegram_files(
|
||||
cached_data.message_id,
|
||||
cached_data.chat_id,
|
||||
@@ -300,14 +308,14 @@ pub async fn start_update_cache(db: Database) {
|
||||
|
||||
for book in books {
|
||||
'types: for available_type in book.available_types {
|
||||
let cached_file = match db
|
||||
.cached_file()
|
||||
.find_unique(cached_file::object_id_object_type(
|
||||
book.id,
|
||||
available_type.clone(),
|
||||
))
|
||||
.exec()
|
||||
.await
|
||||
let cached_file = match sqlx::query_as!(
|
||||
CachedFile,
|
||||
r#"SELECT * FROM cached_files WHERE object_id = $1 AND object_type = $2"#,
|
||||
book.id,
|
||||
available_type.clone()
|
||||
)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
{
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
|
||||
45
src/views.rs
45
src/views.rs
@@ -9,27 +9,26 @@ use axum::{
|
||||
};
|
||||
use axum_prometheus::PrometheusMetricLayer;
|
||||
use base64::{engine::general_purpose, Engine};
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
use sqlx::PgPool;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tower_http::trace::{self, TraceLayer};
|
||||
use tracing::Level;
|
||||
|
||||
use crate::{
|
||||
config::CONFIG,
|
||||
db::get_prisma_client,
|
||||
prisma::{cached_file, PrismaClient},
|
||||
db::get_pg_pool,
|
||||
serializers::CachedFile,
|
||||
services::{
|
||||
download_from_cache, download_utils::get_response_async_read, get_cached_file_copy,
|
||||
get_cached_file_or_cache, start_update_cache, CacheData,
|
||||
},
|
||||
};
|
||||
|
||||
pub type Database = Arc<PrismaClient>;
|
||||
pub type Database = PgPool;
|
||||
|
||||
//
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct GetCachedFileQuery {
|
||||
pub copy: bool,
|
||||
}
|
||||
@@ -111,26 +110,20 @@ async fn delete_cached_file(
|
||||
Path((object_id, object_type)): Path<(i32, String)>,
|
||||
Extension(Ext { db, .. }): Extension<Ext>,
|
||||
) -> impl IntoResponse {
|
||||
let cached_file = db
|
||||
.cached_file()
|
||||
.find_unique(cached_file::object_id_object_type(
|
||||
object_id,
|
||||
object_type.clone(),
|
||||
))
|
||||
.exec()
|
||||
.await
|
||||
.unwrap();
|
||||
let cached_file: Option<CachedFile> = sqlx::query_as!(
|
||||
CachedFile,
|
||||
r#"DELETE FROM cached_files
|
||||
WHERE object_id = $1 AND object_type = $2
|
||||
RETURNING *"#,
|
||||
object_id,
|
||||
object_type
|
||||
)
|
||||
.fetch_optional(&db)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match cached_file {
|
||||
Some(v) => {
|
||||
db.cached_file()
|
||||
.delete(cached_file::object_id_object_type(object_id, object_type))
|
||||
.exec()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Json(v).into_response()
|
||||
}
|
||||
Some(v) => Json::<CachedFile>(v).into_response(),
|
||||
None => StatusCode::NO_CONTENT.into_response(),
|
||||
}
|
||||
}
|
||||
@@ -164,11 +157,11 @@ async fn auth(req: Request<axum::body::Body>, next: Next) -> Result<Response, St
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Ext {
|
||||
pub db: Arc<PrismaClient>,
|
||||
pub db: PgPool,
|
||||
}
|
||||
|
||||
pub async fn get_router() -> Router {
|
||||
let db = Arc::new(get_prisma_client().await);
|
||||
let db = get_pg_pool().await;
|
||||
|
||||
let ext = Ext { db };
|
||||
|
||||
|
||||
Reference in New Issue
Block a user