From 7f155903ca7d9f1623b5ab355c9f251cf67212ed Mon Sep 17 00:00:00 2001 From: 4yn Date: Sat, 5 Feb 2022 00:40:50 +0800 Subject: [PATCH] better async --- src-tauri/Cargo.lock | 12 ++++++ src-tauri/Cargo.toml | 1 + src-tauri/src/bin/test_async.rs | 54 +++++++++++++++------------ src-tauri/src/bin/test_brokenithm.rs | 17 +++++++++ src-tauri/src/slider_io/brokenithm.rs | 12 +++++- src-tauri/src/slider_io/worker.rs | 23 +++++++----- 6 files changed, 85 insertions(+), 34 deletions(-) create mode 100644 src-tauri/src/bin/test_brokenithm.rs diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 0921480..281bbaf 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -105,6 +105,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "async-trait" +version = "0.1.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atk" version = "0.14.0" @@ -3041,6 +3052,7 @@ checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" name = "slidershim" version = "0.1.0" dependencies = [ + "async-trait", "directories", "env_logger", "futures", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index e70f76b..5100d3e 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -30,6 +30,7 @@ palette = "0.6.0" winapi = "0.3.9" futures = "0.3.19" +async-trait = "0.1.52" tokio = { version="1.16.1", features=["rt-multi-thread","macros"] } hyper = { version="0.14.16", features=["server","http1","http2","tcp"] } tungstenite = { version="0.16.0", default-features=false } diff --git a/src-tauri/src/bin/test_async.rs b/src-tauri/src/bin/test_async.rs index 0de4015..9361af2 100644 --- a/src-tauri/src/bin/test_async.rs +++ b/src-tauri/src/bin/test_async.rs @@ -1,39 +1,45 @@ extern crate slidershim; -use std::{io, time::Duration}; +use async_trait::async_trait; +use std::{future::Future, io, time::Duration}; -use tokio::time::sleep; +use tokio::{select, time::sleep}; -// use slidershim::slider_io::worker::{AsyncJob, AsyncJobFut, AsyncJobRecvStop, AsyncWorker}; +use slidershim::slider_io::worker::{AsyncJob, AsyncWorker}; -// struct CounterJob; +struct CounterJob; -// impl AsyncJob for CounterJob { -// fn job(self, mut recv_stop: AsyncJobRecvStop) -> AsyncJobFut { -// return Box::pin(async move { -// let mut x = 0; -// loop { -// x += 1; -// println!("{}", x); -// sleep(Duration::from_millis(500)).await; -// match recv_stop.try_recv() { -// Ok(_) => { -// println!("@@@"); -// break; -// } -// _ => {} -// } -// } -// }); -// } -// } +#[async_trait] +impl AsyncJob for CounterJob { + async fn do_work + Send>(self, stop_signal: F) { + let job_a = async { + println!("Start job A"); + let mut x = 0; + loop { + x += 1; + println!("{}", x); + sleep(Duration::from_millis(100)).await; + } + }; + let job_b = async move { + println!("Start job B"); + stop_signal.await; + println!("Stop signal hit at job B"); + }; + + select! { + _ = job_a => {}, + _ = job_b => {}, + } + } +} fn main() { env_logger::Builder::new() .filter_level(log::LevelFilter::Debug) .init(); - // let worker = AsyncWorker::new("counter", CounterJob); + let worker = AsyncWorker::new("counter", CounterJob); let mut input = String::new(); let string = io::stdin().read_line(&mut input).unwrap(); } diff --git a/src-tauri/src/bin/test_brokenithm.rs b/src-tauri/src/bin/test_brokenithm.rs new file mode 100644 index 0000000..212517a --- /dev/null +++ b/src-tauri/src/bin/test_brokenithm.rs @@ -0,0 +1,17 @@ +extern crate slidershim; + +use std::{io, time::Duration}; + +use tokio::time::sleep; + +// use slidershim::slider_io::{brokenithm::BrokenithmJob, worker::AsyncWorker}; + +fn main() { + env_logger::Builder::new() + .filter_level(log::LevelFilter::Debug) + .init(); + + // let worker = AsyncWorker::new("brokenithm", BrokenithmJob); + let mut input = String::new(); + let string = io::stdin().read_line(&mut input).unwrap(); +} diff --git a/src-tauri/src/slider_io/brokenithm.rs b/src-tauri/src/slider_io/brokenithm.rs index 7c6c792..c427c88 100644 --- a/src-tauri/src/slider_io/brokenithm.rs +++ b/src-tauri/src/slider_io/brokenithm.rs @@ -9,6 +9,8 @@ use hyper::{ Body, Request, Response, Server, }; +// use crate::slider_io::worker::{AsyncJob, AsyncJobFut, AsyncJobRecvStop}; + async fn handle_request( request: Request, remote_addr: SocketAddr, @@ -19,7 +21,7 @@ async fn handle_request( )))) } -pub async fn brokenithm_server() { +async fn brokenithm_server() { let addr = SocketAddr::from(([0, 0, 0, 0], 1666)); info!("Brokenithm opening on {:?}", addr); @@ -38,3 +40,11 @@ pub async fn brokenithm_server() { eprintln!("Server error: {}", e); } } + +// struct BrokenithmJob; + +// impl AsyncJob { +// fn job(self, mut recv_stop: AsyncJobRecvStop) -> AsyncJobFut { +// return Box::pin() +// } +// } diff --git a/src-tauri/src/slider_io/worker.rs b/src-tauri/src/slider_io/worker.rs index 41a58e1..a3131c8 100644 --- a/src-tauri/src/slider_io/worker.rs +++ b/src-tauri/src/slider_io/worker.rs @@ -1,3 +1,5 @@ +use async_trait::async_trait; +use log::info; use std::{ future::Future, pin::Pin, @@ -8,8 +10,6 @@ use std::{ thread, }; -use log::info; - use tokio::{ runtime::Runtime, sync::oneshot::{self, Receiver}, @@ -66,10 +66,9 @@ impl Drop for ThreadWorker { } } -pub type AsyncJobRecvStop = oneshot::Receiver<()>; -pub type AsyncJobFut = Pin + Send + 'static>>; -pub trait AsyncJob { - fn job(self, recv_stop: AsyncJobRecvStop) -> AsyncJobFut; +#[async_trait] +pub trait AsyncJob: Send + 'static { + async fn do_work + Send>(self, stop_signal: F); } pub struct AsyncWorker { @@ -80,7 +79,10 @@ pub struct AsyncWorker { } impl AsyncWorker { - pub fn new(name: &'static str, job: T) -> AsyncWorker { + pub fn new(name: &'static str, job: T) -> AsyncWorker + where + T: AsyncJob, + { info!("Async worker starting {}", name); let (send_stop, recv_stop) = oneshot::channel::<()>(); @@ -91,8 +93,11 @@ impl AsyncWorker { .unwrap(); let task = runtime.spawn(async move { - let fut = job.job(recv_stop); - fut.await; + job + .do_work(async move { + recv_stop.await; + }) + .await; }); AsyncWorker {