From f016f14efe731bd7239dd1313c7ccacf662c9f7e Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 20 Apr 2020 19:56:50 -0500 Subject: [PATCH] Use newest background-jobs --- Cargo.lock | 16 ++++++++-------- Cargo.toml | 4 ++-- src/jobs/apub/announce.rs | 15 +++------------ src/jobs/apub/follow.rs | 15 +++------------ src/jobs/apub/forward.rs | 15 +++------------ src/jobs/apub/mod.rs | 8 +------- src/jobs/apub/reject.rs | 15 +++------------ src/jobs/apub/undo.rs | 15 +++------------ src/jobs/deliver.rs | 17 ++++------------- src/jobs/deliver_many.rs | 15 +++------------ src/jobs/instance.rs | 15 +++------------ src/jobs/mod.rs | 31 ++++++++++++------------------- src/jobs/nodeinfo.rs | 15 +++------------ src/jobs/process_listeners.rs | 15 +++------------ 14 files changed, 54 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fb318b..c8d53d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,8 +470,8 @@ dependencies = [ [[package]] name = "background-jobs" -version = "0.8.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" +version = "0.8.0-alpha.1" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45" dependencies = [ "background-jobs-actix", "background-jobs-core", @@ -479,8 +479,8 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.7.0-alpha.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" +version = "0.8.0-alpha.0" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45" dependencies = [ "actix", "actix-rt", @@ -500,8 +500,8 @@ dependencies = [ [[package]] name = "background-jobs-core" -version = "0.7.0" -source = "git+https://git.asonix.dog/Aardwolf/background-jobs#ca1c07366692c44fce6ae351a4a6762089660a20" +version = "0.8.0-alpha.0" +source = "git+https://git.asonix.dog/Aardwolf/background-jobs#f8fa1bb5ef71724053c8fe68b75d900a51cb4f45" dependencies = [ "actix", "anyhow", @@ -2312,9 +2312,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "standback" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d747fd6d33f130039c9518cffa45a83d53986642ca4f872497df668e3f2b6b4d" +checksum = "6389164ce46e8a68e1b373787efcca3b6b6620bb50b12d4e8d14380838db316f" [[package]] name = "static_assertions" diff --git a/Cargo.toml b/Cargo.toml index fdd066f..980600f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,8 @@ actix-webfinger = "0.3.0-alpha.3" activitystreams = "0.5.0" ammonia = "3.1.0" async-trait = "0.1.24" -background-jobs = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } -background-jobs-core = { version = "0.7.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" } +background-jobs = { version = "0.8.0-alpha.1", git = "https://git.asonix.dog/Aardwolf/background-jobs", default-features = false, features = ["background-jobs-actix"] } +background-jobs-core = { version = "0.8.0-alpha.0", git = "https://git.asonix.dog/Aardwolf/background-jobs" } bytes = "0.5.4" base64 = "0.12" bb8-postgres = { version = "0.4.0", features = ["with-serde_json-1", "with-uuid-0_8", "with-chrono-0_4"] } diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs index e85b567..059139d 100644 --- a/src/jobs/apub/announce.rs +++ b/src/jobs/apub/announce.rs @@ -8,7 +8,7 @@ use crate::{ }, }; use activitystreams::primitives::XsdAnyUri; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -17,9 +17,6 @@ pub struct Announce { actor: Actor, } -#[derive(Clone, Debug)] -pub struct AnnounceProcessor; - impl Announce { pub fn new(object_id: XsdAnyUri, actor: Actor) -> Self { Announce { object_id, actor } @@ -60,18 +57,12 @@ fn generate_announce( } impl ActixJob for Announce { - type Processor = AnnounceProcessor; type State = JobState; type Future = Pin>>>; + const NAME: &'static str = "AnnounceProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } - -impl Processor for AnnounceProcessor { - type Job = Announce; - - const NAME: &'static str = "AnnounceProcessor"; - const QUEUE: &'static str = "default"; -} diff --git a/src/jobs/apub/follow.rs b/src/jobs/apub/follow.rs index 86d7364..ea8156b 100644 --- a/src/jobs/apub/follow.rs +++ b/src/jobs/apub/follow.rs @@ -6,7 +6,7 @@ use crate::{ jobs::{apub::prepare_activity, Deliver, JobState}, }; use activitystreams::primitives::XsdAnyUri; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -16,9 +16,6 @@ pub struct Follow { actor: Actor, } -#[derive(Clone, Debug)] -pub struct FollowProcessor; - impl Follow { pub fn new(is_listener: bool, input: AcceptedObjects, actor: Actor) -> Self { Follow { @@ -105,18 +102,12 @@ fn generate_accept_follow( } impl ActixJob for Follow { - type Processor = FollowProcessor; type State = JobState; type Future = Pin>>>; + const NAME: &'static str = "FollowProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } - -impl Processor for FollowProcessor { - type Job = Follow; - - const NAME: &'static str = "FollowProcessor"; - const QUEUE: &'static str = "default"; -} diff --git a/src/jobs/apub/forward.rs b/src/jobs/apub/forward.rs index 63e9a52..a358610 100644 --- a/src/jobs/apub/forward.rs +++ b/src/jobs/apub/forward.rs @@ -3,7 +3,7 @@ use crate::{ data::Actor, jobs::{apub::get_inboxes, DeliverMany, JobState}, }; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -12,9 +12,6 @@ pub struct Forward { actor: Actor, } -#[derive(Clone, Debug)] -pub struct ForwardProcessor; - impl Forward { pub fn new(input: AcceptedObjects, actor: Actor) -> Self { Forward { input, actor } @@ -34,18 +31,12 @@ impl Forward { } impl ActixJob for Forward { - type Processor = ForwardProcessor; type State = JobState; type Future = Pin>>>; + const NAME: &'static str = "ForwardProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } - -impl Processor for ForwardProcessor { - type Job = Forward; - - const NAME: &'static str = "ForwardProcessor"; - const QUEUE: &'static str = "default"; -} diff --git a/src/jobs/apub/mod.rs b/src/jobs/apub/mod.rs index 6bab73c..7d1bd85 100644 --- a/src/jobs/apub/mod.rs +++ b/src/jobs/apub/mod.rs @@ -14,13 +14,7 @@ mod forward; mod reject; mod undo; -pub use self::{ - announce::{Announce, AnnounceProcessor}, - follow::{Follow, FollowProcessor}, - forward::{Forward, ForwardProcessor}, - reject::{Reject, RejectProcessor}, - undo::{Undo, UndoProcessor}, -}; +pub use self::{announce::Announce, follow::Follow, forward::Forward, reject::Reject, undo::Undo}; async fn get_inboxes( state: &State, diff --git a/src/jobs/apub/reject.rs b/src/jobs/apub/reject.rs index 68b25cc..401fbc3 100644 --- a/src/jobs/apub/reject.rs +++ b/src/jobs/apub/reject.rs @@ -4,15 +4,12 @@ use crate::{ jobs::{apub::generate_undo_follow, Deliver, JobState}, }; use activitystreams::primitives::XsdAnyUri; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Reject(pub Actor); -#[derive(Clone, Debug)] -pub struct RejectProcessor; - impl Reject { async fn perform(self, state: JobState) -> Result<(), anyhow::Error> { if let Some(_) = state.actors.unfollower(&self.0).await? { @@ -29,18 +26,12 @@ impl Reject { } impl ActixJob for Reject { - type Processor = RejectProcessor; type State = JobState; type Future = Pin>>>; + const NAME: &'static str = "RejectProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } - -impl Processor for RejectProcessor { - type Job = Reject; - - const NAME: &'static str = "RejectProcessor"; - const QUEUE: &'static str = "default"; -} diff --git a/src/jobs/apub/undo.rs b/src/jobs/apub/undo.rs index 713f898..5c6e3bc 100644 --- a/src/jobs/apub/undo.rs +++ b/src/jobs/apub/undo.rs @@ -5,7 +5,7 @@ use crate::{ jobs::{apub::generate_undo_follow, Deliver, JobState}, }; use activitystreams::primitives::XsdAnyUri; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -14,9 +14,6 @@ pub struct Undo { actor: Actor, } -#[derive(Clone, Debug)] -pub struct UndoProcessor; - impl Undo { pub fn new(input: AcceptedObjects, actor: Actor) -> Self { Undo { input, actor } @@ -42,18 +39,12 @@ impl Undo { } impl ActixJob for Undo { - type Processor = UndoProcessor; type State = JobState; type Future = Pin>>>; + const NAME: &'static str = "UndoProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } - -impl Processor for UndoProcessor { - type Job = Undo; - - const NAME: &'static str = "UndoProcessor"; - const QUEUE: &'static str = "default"; -} diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 2483f90..8ad8bc4 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -1,7 +1,7 @@ use crate::{error::MyError, jobs::JobState}; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{ActixJob, Backoff, Processor}; +use background_jobs::{ActixJob, Backoff}; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -22,14 +22,13 @@ impl Deliver { } } -#[derive(Clone, Debug)] -pub struct DeliverProcessor; - impl ActixJob for Deliver { type State = JobState; - type Processor = DeliverProcessor; type Future = Pin>>>; + const NAME: &'static str = "DeliverProcessor"; + const BACKOFF: Backoff = Backoff::Exponential(8); + fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { state.requests.deliver(self.to, &self.data).await?; @@ -38,11 +37,3 @@ impl ActixJob for Deliver { }) } } - -impl Processor for DeliverProcessor { - type Job = Deliver; - - const NAME: &'static str = "DeliverProcessor"; - const QUEUE: &'static str = "default"; - const BACKOFF_STRATEGY: Backoff = Backoff::Exponential(8); -} diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index af21e61..0957f5c 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -4,7 +4,7 @@ use crate::{ }; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use futures::future::{ready, Ready}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -35,22 +35,13 @@ impl DeliverMany { } } -#[derive(Clone, Debug)] -pub struct DeliverManyProcessor; - impl ActixJob for DeliverMany { type State = JobState; - type Processor = DeliverManyProcessor; type Future = Ready>; + const NAME: &'static str = "DeliverManyProcessor"; + fn run(self, state: Self::State) -> Self::Future { ready(self.perform(state)) } } - -impl Processor for DeliverManyProcessor { - type Job = DeliverMany; - - const NAME: &'static str = "DeliverManyProcessor"; - const QUEUE: &'static str = "default"; -} diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 00bf1ee..362114a 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -1,7 +1,7 @@ use crate::{config::UrlKind, jobs::JobState}; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use futures::join; use std::{future::Future, pin::Pin}; @@ -81,26 +81,17 @@ impl QueryInstance { } } -#[derive(Clone, Debug)] -pub struct InstanceProcessor; - impl ActixJob for QueryInstance { type State = JobState; - type Processor = InstanceProcessor; type Future = Pin>>>; + const NAME: &'static str = "InstanceProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } -impl Processor for InstanceProcessor { - type Job = QueryInstance; - - const NAME: &'static str = "InstanceProcessor"; - const QUEUE: &'static str = "default"; -} - #[derive(serde::Deserialize)] struct Instance { title: String, diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 846acfd..57660e6 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -15,14 +15,7 @@ use crate::{ data::{ActorCache, Media, NodeCache, State}, db::Db, error::MyError, - jobs::{ - deliver::DeliverProcessor, - deliver_many::DeliverManyProcessor, - instance::InstanceProcessor, - nodeinfo::NodeinfoProcessor, - process_listeners::{Listeners, ListenersProcessor}, - storage::Storage, - }, + jobs::{process_listeners::Listeners, storage::Storage}, requests::Requests, }; use background_jobs::{Job, QueueHandle, WorkerConfig}; @@ -56,17 +49,17 @@ pub fn create_workers( config.clone(), ) }) - .register(DeliverProcessor) - .register(DeliverManyProcessor) - .register(NodeinfoProcessor) - .register(InstanceProcessor) - .register(ListenersProcessor) - .register(apub::AnnounceProcessor) - .register(apub::FollowProcessor) - .register(apub::ForwardProcessor) - .register(apub::RejectProcessor) - .register(apub::UndoProcessor) - .set_processor_count("default", 4) + .register::() + .register::() + .register::() + .register::() + .register::() + .register::() + .register::() + .register::() + .register::() + .register::() + .set_worker_count("default", 4) .start(remote_handle); } diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index f862a6d..70878d2 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -1,7 +1,7 @@ use crate::jobs::JobState; use activitystreams::primitives::XsdAnyUri; use anyhow::Error; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -52,26 +52,17 @@ impl QueryNodeinfo { } } -#[derive(Clone, Debug)] -pub struct NodeinfoProcessor; - impl ActixJob for QueryNodeinfo { type State = JobState; - type Processor = NodeinfoProcessor; type Future = Pin>>>; + const NAME: &'static str = "NodeinfoProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } -impl Processor for NodeinfoProcessor { - type Job = QueryNodeinfo; - - const NAME: &'static str = "NodeinfoProcessor"; - const QUEUE: &'static str = "default"; -} - #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] struct Nodeinfo { diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs index 0a6c951..9ed8c79 100644 --- a/src/jobs/process_listeners.rs +++ b/src/jobs/process_listeners.rs @@ -1,14 +1,11 @@ use crate::jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}; use anyhow::Error; -use background_jobs::{ActixJob, Processor}; +use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Listeners; -#[derive(Clone, Debug)] -pub struct ListenersProcessor; - impl Listeners { async fn perform(self, state: JobState) -> Result<(), Error> { for listener in state.state.listeners().await { @@ -24,17 +21,11 @@ impl Listeners { impl ActixJob for Listeners { type State = JobState; - type Processor = ListenersProcessor; type Future = Pin>>>; + const NAME: &'static str = "ProcessListenersProcessor"; + fn run(self, state: Self::State) -> Self::Future { Box::pin(self.perform(state)) } } - -impl Processor for ListenersProcessor { - type Job = Listeners; - - const NAME: &'static str = "ProcessListenersProcessor"; - const QUEUE: &'static str = "default"; -}