diff --git a/Cargo.lock b/Cargo.lock index 5598365..7befb27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,53 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.5.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.66" @@ -542,6 +589,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.8.0" @@ -679,6 +732,7 @@ version = "0.1.0" dependencies = [ "async-compression", "async-trait", + "axum", "chrono", "deadpool-postgres", "env_logger", @@ -721,6 +775,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "md-5" version = "0.10.4" @@ -953,6 +1013,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -1407,6 +1487,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "tempfile" version = "3.3.0" @@ -1583,6 +1669,47 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.2" @@ -1596,6 +1723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index f9ef878..681a827 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ env_logger = "0.9.0" serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.85" tokio-cron-scheduler = "0.8.1" +axum = "0.5.16" diff --git a/src/config.rs b/src/config.rs index 32095f6..d5003d0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,6 +17,8 @@ pub struct Webhook { } pub struct Config { + pub api_key: String, + pub sentry_dsn: String, pub postgres_db_name: String, @@ -37,6 +39,8 @@ fn get_env(env: &'static str) -> String { impl Config { pub fn load() -> Config { Config { + api_key: get_env("API_KEY"), + sentry_dsn: get_env("SENTRY_DSN"), postgres_db_name: get_env("POSTGRES_DB_NAME"), diff --git a/src/main.rs b/src/main.rs index 5a252bd..83b3a92 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,522 +4,55 @@ extern crate lazy_static; pub mod config; pub mod types; pub mod utils; +pub mod updater; -use std::{ - fmt::Debug, - sync::{Arc, Mutex}, str::FromStr -}; +use std::net::SocketAddr; +use axum::{Router, routing::post, http::HeaderMap}; -use config::Webhook; -use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, Pool, RecyclingMethod, Runtime}; -use futures::{io::copy, TryStreamExt}; -use reqwest::header::{HeaderMap, HeaderValue, HeaderName}; -use tokio::fs::{File, remove_file}; -use tokio_cron_scheduler::{JobScheduler, Job, JobSchedulerError}; -use tokio_postgres::NoTls; +use crate::updater::cron_jobs; -use async_compression::futures::bufread::GzipDecoder; +async fn update(headers: HeaderMap) -> &'static str { + let config_api_key = config::CONFIG.api_key.clone(); -use sql_parse::{ - parse_statement, InsertReplace, InsertReplaceType, ParseOptions, SQLArguments, SQLDialect, - Statement, -}; -use tokio_util::compat::TokioAsyncReadCompatExt; -use types::{ - Author, AuthorAnnotation, AuthorAnnotationPic, BookAnnotation, BookAnnotationPic, BookAuthor, - BookGenre, FromVecExpression, Genre, Sequence, SequenceInfo, Translator, Update, -}; -use utils::read_lines; - -use crate::types::Book; - -async fn download_file(filename_str: &str) -> Result<(), Box> { - log::info!("Download {filename_str}..."); - - let link = format!("{}/sql/{filename_str}.gz", &config::CONFIG.fl_base_url); - - let response = match reqwest::get(link).await { - Ok(v) => v, - Err(err) => return Err(Box::new(err)), + let api_key = match headers.get("Authorization") { + Some(v) => v, + None => return "No api-key!", }; - let response = match response.error_for_status() { - Ok(v) => v, - Err(err) => return Err(Box::new(err)), - }; - - match remove_file(filename_str).await { - Ok(_) => (), - Err(err) => log::debug!("Can't remove file: {:?}", err), - }; - - let mut file = match File::create(filename_str).await { - Ok(v) => v.compat(), - Err(err) => { - log::error!("Can't create {filename_str}: {:?}", err); - return Err(Box::new(err)) - }, - }; - - let data = response - .bytes_stream() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) - .into_async_read(); - - let decoder = GzipDecoder::new(data); - - match copy(decoder, &mut file).await { - Ok(_) => (), - Err(err) => { - log::error!("Can't write data {filename_str}: {}", err); - return Err(Box::new(err)) - }, - }; - - log::info!("{filename_str} downloaded!"); - - Ok(()) -} - -async fn process( - pool: Pool, - source_id: i16, - file_name: &str, - deps: Vec>>>, -) -> Result<(), Box> -where - T: Debug + FromVecExpression + Update, -{ - if deps.len() != 0 { - loop { - let mut some_failed = false; - let mut some_none = false; - - for dep in deps.iter() { - let status = dep.lock().unwrap(); - match &*status { - Some(status) => match status { - UpdateStatus::Success => (), - UpdateStatus::Fail => some_failed = true, - }, - None => some_none = true, - } - } - - if !some_failed && !some_none { - break; - } - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } + if config_api_key != api_key.to_str().unwrap() { + return "Wrong api-key!" } - match download_file(file_name).await { - Ok(_) => (), - Err(err) => return Err(err), - }; - - let parse_options = ParseOptions::new() - .dialect(SQLDialect::MariaDB) - .arguments(SQLArguments::QuestionMark) - .warn_unquoted_identifiers(true); - - let lines = read_lines(file_name); - - let lines = match lines { - Ok(v) => v, - Err(err) => return Err(Box::new(err)), - }; - - match T::before_update(&pool.get().await.unwrap()).await { - Ok(_) => (), - Err(err) => return Err(err), - }; - - log::info!("Start update {file_name}..."); - - for line in lines.into_iter() { - let line = match line { - Ok(line) => line, - Err(err) => return Err(Box::new(err)), - }; - - let mut issues = Vec::new(); - let ast = parse_statement(&line, &mut issues, &parse_options); - - match ast { - Some(Statement::InsertReplace( - i @ InsertReplace { - type_: InsertReplaceType::Insert(_), - .. - }, - )) => { - for value in i.values.into_iter() { - for t_value in value.1.into_iter() { - let value = T::from_vec_expression(&t_value); - let client = pool.get().await.unwrap(); - - match value.update(&client, source_id).await { - Ok(_) => { - // log::info!("{:?}", value); - () - } - Err(err) => { - log::error!("Update error: {:?} : {:?}", value, err); - return Err(err) - }, - } - } - } - } - _ => (), + tokio::spawn(async { + match updater::update().await { + Ok(_) => log::info!("Updated!"), + Err(err) => log::info!("Updater err: {:?}", err), } - } - - log::info!("Updated {file_name}..."); - - Ok(()) -} - -async fn get_postgres_pool() -> Result { - let mut config = Config::new(); - - config.host = Some(config::CONFIG.postgres_host.clone()); - config.port = Some(config::CONFIG.postgres_port); - config.dbname = Some(config::CONFIG.postgres_db_name.clone()); - config.user = Some(config::CONFIG.postgres_user.clone()); - config.password = Some(config::CONFIG.postgres_password.clone()); - config.connect_timeout = Some(std::time::Duration::from_secs(5)); - config.manager = Some(ManagerConfig { - recycling_method: RecyclingMethod::Verified, }); - match config.create_pool(Some(Runtime::Tokio1), NoTls) { - Ok(pool) => Ok(pool), - Err(err) => Err(err), - } + "Update started" } -async fn get_source(pool: Pool) -> Result> { - let client = pool.get().await.unwrap(); +async fn start_app() { + let app = Router::new() + .route("/update", post(update)); - let row = match client - .query_one("SELECT id FROM sources WHERE name = 'flibusta';", &[]) + let addr = SocketAddr::from(([0, 0, 0, 0], 8080)); + + axum::Server::bind(&addr) + .serve(app.into_make_service()) .await - { - Ok(v) => v, - Err(err) => return Err(Box::new(err)), - }; - - let id = row.get(0); - - Ok(id) -} - -enum UpdateStatus { - Success, - Fail, -} - -async fn send_webhooks() -> Result<(), Box> { - for webhook in config::CONFIG.webhooks.clone().into_iter() { - let Webhook { method, url, headers } = webhook; - - let client = reqwest::Client::new(); - - let builder = match method { - config::Method::Get => { - client.get(url) - }, - config::Method::Post => { - client.post(url) - }, - }; - - let t_headers: Vec<(HeaderName, HeaderValue)> = headers.into_iter().map(|(key, val)| { - let value = match val { - serde_json::Value::String(v) => v, - _ => panic!("Header value not string!") - }; - - ( - HeaderName::from_str(key.as_ref()).unwrap(), - HeaderValue::from_str(&value).unwrap() - ) - }).collect(); - - let headers = HeaderMap::from_iter(t_headers.into_iter()); - - let response = builder.headers(headers).send().await; - - let response = match response { - Ok(v) => v, - Err(err) => return Err(Box::new(err)), - }; - - match response.error_for_status() { - Ok(_) => (), - Err(err) => return Err(Box::new(err)), - }; - }; - - Ok(()) -} - -async fn update() -> Result<(), Box> { - log::info!("Start update..."); - - let pool = match get_postgres_pool().await { - Ok(pool) => pool, - Err(err) => panic!("{:?}", err), - }; - - let source_id = match get_source(pool.clone()).await { - Ok(v) => Arc::new(v), - Err(err) => panic!("{:?}", err), - }; - - let author_status: Arc>> = Arc::new(Mutex::new(None)); - let book_status: Arc>> = Arc::new(Mutex::new(None)); - let sequence_status: Arc>> = Arc::new(Mutex::new(None)); - let book_annotation_status: Arc>> = Arc::new(Mutex::new(None)); - let author_annotation_status: Arc>> = Arc::new(Mutex::new(None)); - let genre_status: Arc>> = Arc::new(Mutex::new(None)); - - let pool_clone = pool.clone(); - let author_status_clone = author_status.clone(); - let source_id_clone = source_id.clone(); - let author_process = tokio::spawn(async move { - match process::(pool_clone, *source_id_clone, "lib.libavtorname.sql", vec![]).await - { - Ok(_) => { - let mut status = author_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Ok(()) - } - Err(err) => { - let mut status = author_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Err(err) - } - } - }); - - let pool_clone = pool.clone(); - let book_status_clone = book_status.clone(); - let source_id_clone = source_id.clone(); - let book_process = tokio::spawn(async move { - match process::(pool_clone, *source_id_clone, "lib.libbook.sql", vec![]).await { - Ok(_) => { - let mut status = book_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Ok(()) - } - Err(err) => { - let mut status = book_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Fail); - Err(err) - } - } - }); - - let pool_clone = pool.clone(); - let deps = vec![author_status.clone(), book_status.clone()]; - let source_id_clone = source_id.clone(); - let book_author_process = tokio::spawn(async move { - process::(pool_clone, *source_id_clone, "lib.libavtor.sql", deps).await - }); - - let pool_clone = pool.clone(); - let deps = vec![author_status.clone(), book_status.clone()]; - let source_id_clone = source_id.clone(); - let translator_process = tokio::spawn(async move { - process::(pool_clone, *source_id_clone, "lib.libtranslator.sql", deps).await - }); - - let pool_clone = pool.clone(); - let sequence_status_clone = sequence_status.clone(); - let source_id_clone = source_id.clone(); - let sequence_process = tokio::spawn(async move { - match process::(pool_clone, *source_id_clone, "lib.libseqname.sql", vec![]).await - { - Ok(_) => { - let mut status = sequence_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Ok(()) - } - Err(err) => { - let mut status = sequence_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Fail); - Err(err) - } - } - }); - - let pool_clone = pool.clone(); - let deps = vec![book_status.clone(), sequence_status.clone()]; - let source_id_clone = source_id.clone(); - let sequence_info_process = tokio::spawn(async move { - process::(pool_clone, *source_id_clone, "lib.libseq.sql", deps).await - }); - - let pool_clone = pool.clone(); - let deps = vec![book_status.clone()]; - let book_annotation_status_clone = book_annotation_status.clone(); - let source_id_clone = source_id.clone(); - let book_annotation_process = tokio::spawn(async move { - match process::(pool_clone, *source_id_clone, "lib.b.annotations.sql", deps) - .await - { - Ok(_) => { - let mut status = book_annotation_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Ok(()) - } - Err(err) => { - let mut status = book_annotation_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Fail); - Err(err) - } - } - }); - - let pool_clone = pool.clone(); - let deps = vec![book_annotation_status.clone()]; - let source_id_clone = source_id.clone(); - let book_annotation_pics_process = tokio::spawn(async move { - process::( - pool_clone, - *source_id_clone, - "lib.b.annotations_pics.sql", - deps, - ) - .await - }); - - let pool_clone = pool.clone(); - let deps = vec![author_status.clone()]; - let author_annotation_status_clone = author_annotation_status.clone(); - let source_id_clone = source_id.clone(); - let author_annotation_process = tokio::spawn(async move { - match process::( - pool_clone, - *source_id_clone, - "lib.a.annotations.sql", - deps, - ) - .await - { - Ok(_) => { - let mut status = author_annotation_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Ok(()) - } - Err(err) => { - let mut status = author_annotation_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Fail); - Err(err) - } - } - }); - - let pool_clone = pool.clone(); - let deps = vec![author_annotation_status.clone()]; - let source_id_clone = source_id.clone(); - let author_annotation_pics_process = tokio::spawn(async move { - process::( - pool_clone, - *source_id_clone, - "lib.a.annotations_pics.sql", - deps, - ) - .await - }); - - let pool_clone = pool.clone(); - let genre_status_clone = genre_status.clone(); - let source_id_clone = source_id.clone(); - let genre_annotation_process = tokio::spawn(async move { - match process::(pool_clone, *source_id_clone, "lib.libgenrelist.sql", vec![]).await { - Ok(_) => { - let mut status = genre_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Success); - Ok(()) - } - Err(err) => { - let mut status = genre_status_clone.lock().unwrap(); - *status = Some(UpdateStatus::Fail); - Err(err) - } - } - }); - - let pool_clone = pool.clone(); - let deps = vec![genre_status.clone(), book_status.clone()]; - let source_id_clone = source_id.clone(); - let book_genre_process = tokio::spawn(async move { - process::(pool_clone, *source_id_clone, "lib.libgenre.sql", deps).await - }); - - for process in [ - author_process, - book_process, - book_author_process, - translator_process, - sequence_process, - sequence_info_process, - book_annotation_process, - book_annotation_pics_process, - author_annotation_process, - author_annotation_pics_process, - genre_annotation_process, - book_genre_process - ] { - let process_result = match process.await { - Ok(v) => v, - Err(err) => return Err(Box::new(err)), - }; - - match process_result { - Ok(_) => (), - Err(err) => panic!("{:?}", err), - } - } - - match send_webhooks().await { - Ok(_) => { - log::info!("Webhooks sended!"); - }, - Err(err) => { - log::info!("Webhooks send failed : {err}"); - return Err(Box::new(err)) - }, - }; - - Ok(()) + .unwrap(); } #[tokio::main] -async fn main() -> Result<(), JobSchedulerError> { +async fn main() { let _guard = sentry::init(config::CONFIG.sentry_dsn.clone()); env_logger::init(); - let job_scheduler = JobScheduler::new().await.unwrap(); + tokio::spawn(async { + cron_jobs().await + }); - let update_job = match Job::new_async("* 0 5 * * *", |_uuid, _l| Box::pin(async { - match update().await { - Ok(_) => log::info!("Updated"), - Err(err) => log::info!("Update err: {:?}", err), - }; - })) { - Ok(v) => v, - Err(err) => panic!("{:?}", err), - }; - - job_scheduler.add(update_job).await.unwrap(); - - match job_scheduler.start().await { - Ok(_) => Ok(()), - Err(err) => Err(err), - } + start_app().await; } diff --git a/src/updater.rs b/src/updater.rs new file mode 100644 index 0000000..3d99149 --- /dev/null +++ b/src/updater.rs @@ -0,0 +1,514 @@ +use std::{ + fmt::Debug, + sync::{Arc, Mutex}, str::FromStr +}; + +use crate::config::{Webhook, self}; +use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, Pool, RecyclingMethod, Runtime}; +use futures::{io::copy, TryStreamExt}; +use reqwest::header::{HeaderMap, HeaderValue, HeaderName}; +use tokio::fs::{File, remove_file}; +use tokio_cron_scheduler::{JobScheduler, Job, JobSchedulerError}; +use tokio_postgres::NoTls; + +use async_compression::futures::bufread::GzipDecoder; + +use sql_parse::{ + parse_statement, InsertReplace, InsertReplaceType, ParseOptions, SQLArguments, SQLDialect, + Statement, +}; +use tokio_util::compat::TokioAsyncReadCompatExt; +use crate::types::{ + Author, AuthorAnnotation, AuthorAnnotationPic, BookAnnotation, BookAnnotationPic, BookAuthor, + BookGenre, FromVecExpression, Genre, Sequence, SequenceInfo, Translator, Update, +}; +use crate::utils::read_lines; + +use crate::types::Book; + +async fn download_file(filename_str: &str) -> Result<(), Box> { + log::info!("Download {filename_str}..."); + + let link = format!("{}/sql/{filename_str}.gz", &config::CONFIG.fl_base_url); + + let response = match reqwest::get(link).await { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + let response = match response.error_for_status() { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + match remove_file(filename_str).await { + Ok(_) => (), + Err(err) => log::debug!("Can't remove file: {:?}", err), + }; + + let mut file = match File::create(filename_str).await { + Ok(v) => v.compat(), + Err(err) => { + log::error!("Can't create {filename_str}: {:?}", err); + return Err(Box::new(err)) + }, + }; + + let data = response + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .into_async_read(); + + let decoder = GzipDecoder::new(data); + + match copy(decoder, &mut file).await { + Ok(_) => (), + Err(err) => { + log::error!("Can't write data {filename_str}: {}", err); + return Err(Box::new(err)) + }, + }; + + log::info!("{filename_str} downloaded!"); + + Ok(()) +} + +async fn process( + pool: Pool, + source_id: i16, + file_name: &str, + deps: Vec>>>, +) -> Result<(), Box> +where + T: Debug + FromVecExpression + Update, +{ + if deps.len() != 0 { + loop { + let mut some_failed = false; + let mut some_none = false; + + for dep in deps.iter() { + let status = dep.lock().unwrap(); + match &*status { + Some(status) => match status { + UpdateStatus::Success => (), + UpdateStatus::Fail => some_failed = true, + }, + None => some_none = true, + } + } + + if !some_failed && !some_none { + break; + } + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + match download_file(file_name).await { + Ok(_) => (), + Err(err) => return Err(err), + }; + + let parse_options = ParseOptions::new() + .dialect(SQLDialect::MariaDB) + .arguments(SQLArguments::QuestionMark) + .warn_unquoted_identifiers(true); + + let lines = read_lines(file_name); + + let lines = match lines { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + match T::before_update(&pool.get().await.unwrap()).await { + Ok(_) => (), + Err(err) => return Err(err), + }; + + log::info!("Start update {file_name}..."); + + for line in lines.into_iter() { + let line = match line { + Ok(line) => line, + Err(err) => return Err(Box::new(err)), + }; + + let mut issues = Vec::new(); + let ast = parse_statement(&line, &mut issues, &parse_options); + + match ast { + Some(Statement::InsertReplace( + i @ InsertReplace { + type_: InsertReplaceType::Insert(_), + .. + }, + )) => { + for value in i.values.into_iter() { + for t_value in value.1.into_iter() { + let value = T::from_vec_expression(&t_value); + let client = pool.get().await.unwrap(); + + match value.update(&client, source_id).await { + Ok(_) => { + // log::info!("{:?}", value); + () + } + Err(err) => { + log::error!("Update error: {:?} : {:?}", value, err); + return Err(err) + }, + } + } + } + } + _ => (), + } + } + + log::info!("Updated {file_name}..."); + + Ok(()) +} + +async fn get_postgres_pool() -> Result { + let mut config = Config::new(); + + config.host = Some(config::CONFIG.postgres_host.clone()); + config.port = Some(config::CONFIG.postgres_port); + config.dbname = Some(config::CONFIG.postgres_db_name.clone()); + config.user = Some(config::CONFIG.postgres_user.clone()); + config.password = Some(config::CONFIG.postgres_password.clone()); + config.connect_timeout = Some(std::time::Duration::from_secs(5)); + config.manager = Some(ManagerConfig { + recycling_method: RecyclingMethod::Verified, + }); + + match config.create_pool(Some(Runtime::Tokio1), NoTls) { + Ok(pool) => Ok(pool), + Err(err) => Err(err), + } +} + +async fn get_source(pool: Pool) -> Result> { + let client = pool.get().await.unwrap(); + + let row = match client + .query_one("SELECT id FROM sources WHERE name = 'flibusta';", &[]) + .await + { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + let id = row.get(0); + + Ok(id) +} + +enum UpdateStatus { + Success, + Fail, +} + +async fn send_webhooks() -> Result<(), Box> { + for webhook in config::CONFIG.webhooks.clone().into_iter() { + let Webhook { method, url, headers } = webhook; + + let client = reqwest::Client::new(); + + let builder = match method { + config::Method::Get => { + client.get(url) + }, + config::Method::Post => { + client.post(url) + }, + }; + + let t_headers: Vec<(HeaderName, HeaderValue)> = headers.into_iter().map(|(key, val)| { + let value = match val { + serde_json::Value::String(v) => v, + _ => panic!("Header value not string!") + }; + + ( + HeaderName::from_str(key.as_ref()).unwrap(), + HeaderValue::from_str(&value).unwrap() + ) + }).collect(); + + let headers = HeaderMap::from_iter(t_headers.into_iter()); + + let response = builder.headers(headers).send().await; + + let response = match response { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + match response.error_for_status() { + Ok(_) => (), + Err(err) => return Err(Box::new(err)), + }; + }; + + Ok(()) +} + +pub async fn update() -> Result<(), Box> { + log::info!("Start update..."); + + let pool = match get_postgres_pool().await { + Ok(pool) => pool, + Err(err) => panic!("{:?}", err), + }; + + let source_id = match get_source(pool.clone()).await { + Ok(v) => Arc::new(v), + Err(err) => panic!("{:?}", err), + }; + + let author_status: Arc>> = Arc::new(Mutex::new(None)); + let book_status: Arc>> = Arc::new(Mutex::new(None)); + let sequence_status: Arc>> = Arc::new(Mutex::new(None)); + let book_annotation_status: Arc>> = Arc::new(Mutex::new(None)); + let author_annotation_status: Arc>> = Arc::new(Mutex::new(None)); + let genre_status: Arc>> = Arc::new(Mutex::new(None)); + + let pool_clone = pool.clone(); + let author_status_clone = author_status.clone(); + let source_id_clone = source_id.clone(); + let author_process = tokio::spawn(async move { + match process::(pool_clone, *source_id_clone, "lib.libavtorname.sql", vec![]).await + { + Ok(_) => { + let mut status = author_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Ok(()) + } + Err(err) => { + let mut status = author_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Err(err) + } + } + }); + + let pool_clone = pool.clone(); + let book_status_clone = book_status.clone(); + let source_id_clone = source_id.clone(); + let book_process = tokio::spawn(async move { + match process::(pool_clone, *source_id_clone, "lib.libbook.sql", vec![]).await { + Ok(_) => { + let mut status = book_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Ok(()) + } + Err(err) => { + let mut status = book_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Fail); + Err(err) + } + } + }); + + let pool_clone = pool.clone(); + let deps = vec![author_status.clone(), book_status.clone()]; + let source_id_clone = source_id.clone(); + let book_author_process = tokio::spawn(async move { + process::(pool_clone, *source_id_clone, "lib.libavtor.sql", deps).await + }); + + let pool_clone = pool.clone(); + let deps = vec![author_status.clone(), book_status.clone()]; + let source_id_clone = source_id.clone(); + let translator_process = tokio::spawn(async move { + process::(pool_clone, *source_id_clone, "lib.libtranslator.sql", deps).await + }); + + let pool_clone = pool.clone(); + let sequence_status_clone = sequence_status.clone(); + let source_id_clone = source_id.clone(); + let sequence_process = tokio::spawn(async move { + match process::(pool_clone, *source_id_clone, "lib.libseqname.sql", vec![]).await + { + Ok(_) => { + let mut status = sequence_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Ok(()) + } + Err(err) => { + let mut status = sequence_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Fail); + Err(err) + } + } + }); + + let pool_clone = pool.clone(); + let deps = vec![book_status.clone(), sequence_status.clone()]; + let source_id_clone = source_id.clone(); + let sequence_info_process = tokio::spawn(async move { + process::(pool_clone, *source_id_clone, "lib.libseq.sql", deps).await + }); + + let pool_clone = pool.clone(); + let deps = vec![book_status.clone()]; + let book_annotation_status_clone = book_annotation_status.clone(); + let source_id_clone = source_id.clone(); + let book_annotation_process = tokio::spawn(async move { + match process::(pool_clone, *source_id_clone, "lib.b.annotations.sql", deps) + .await + { + Ok(_) => { + let mut status = book_annotation_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Ok(()) + } + Err(err) => { + let mut status = book_annotation_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Fail); + Err(err) + } + } + }); + + let pool_clone = pool.clone(); + let deps = vec![book_annotation_status.clone()]; + let source_id_clone = source_id.clone(); + let book_annotation_pics_process = tokio::spawn(async move { + process::( + pool_clone, + *source_id_clone, + "lib.b.annotations_pics.sql", + deps, + ) + .await + }); + + let pool_clone = pool.clone(); + let deps = vec![author_status.clone()]; + let author_annotation_status_clone = author_annotation_status.clone(); + let source_id_clone = source_id.clone(); + let author_annotation_process = tokio::spawn(async move { + match process::( + pool_clone, + *source_id_clone, + "lib.a.annotations.sql", + deps, + ) + .await + { + Ok(_) => { + let mut status = author_annotation_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Ok(()) + } + Err(err) => { + let mut status = author_annotation_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Fail); + Err(err) + } + } + }); + + let pool_clone = pool.clone(); + let deps = vec![author_annotation_status.clone()]; + let source_id_clone = source_id.clone(); + let author_annotation_pics_process = tokio::spawn(async move { + process::( + pool_clone, + *source_id_clone, + "lib.a.annotations_pics.sql", + deps, + ) + .await + }); + + let pool_clone = pool.clone(); + let genre_status_clone = genre_status.clone(); + let source_id_clone = source_id.clone(); + let genre_annotation_process = tokio::spawn(async move { + match process::(pool_clone, *source_id_clone, "lib.libgenrelist.sql", vec![]).await { + Ok(_) => { + let mut status = genre_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Success); + Ok(()) + } + Err(err) => { + let mut status = genre_status_clone.lock().unwrap(); + *status = Some(UpdateStatus::Fail); + Err(err) + } + } + }); + + let pool_clone = pool.clone(); + let deps = vec![genre_status.clone(), book_status.clone()]; + let source_id_clone = source_id.clone(); + let book_genre_process = tokio::spawn(async move { + process::(pool_clone, *source_id_clone, "lib.libgenre.sql", deps).await + }); + + for process in [ + author_process, + book_process, + book_author_process, + translator_process, + sequence_process, + sequence_info_process, + book_annotation_process, + book_annotation_pics_process, + author_annotation_process, + author_annotation_pics_process, + genre_annotation_process, + book_genre_process + ] { + let process_result = match process.await { + Ok(v) => v, + Err(err) => return Err(Box::new(err)), + }; + + match process_result { + Ok(_) => (), + Err(err) => panic!("{:?}", err), + } + } + + match send_webhooks().await { + Ok(_) => { + log::info!("Webhooks sended!"); + }, + Err(err) => { + log::info!("Webhooks send failed : {err}"); + return Err(Box::new(err)) + }, + }; + + Ok(()) +} + +pub async fn cron_jobs() -> Result<(), JobSchedulerError> { + let job_scheduler = JobScheduler::new().await.unwrap(); + + let update_job = match Job::new_async("* 0 5 * * *", |_uuid, _l| Box::pin(async { + match update().await { + Ok(_) => log::info!("Updated"), + Err(err) => log::info!("Update err: {:?}", err), + }; + })) { + Ok(v) => v, + Err(err) => panic!("{:?}", err), + }; + + job_scheduler.add(update_job).await.unwrap(); + + match job_scheduler.start().await { + Ok(_) => Ok(()), + Err(err) => Err(err), + } +} \ No newline at end of file