refactor: use oncecell for better performance

main
大可 2022-01-03 22:58:41 +08:00
parent 66241f6316
commit f1ba659bfe
2 changed files with 35 additions and 66 deletions

View File

@ -1,11 +1,10 @@
use axum::{error_handling::HandleErrorLayer, routing::get, Router};
use clap::{App, Arg};
use std::{net::SocketAddr, sync::RwLock, time::Duration};
use std::{net::SocketAddr, time::Duration};
use tower::ServiceBuilder;
use tower_http::add_extension::AddExtensionLayer;
use httpmq_rs::service::{handle_error, process, SharedState, State, DEFAULT_MAX_QUEUE_CELL};
use httpmq_rs::service::{handle_error, init, process};
#[tokio::main]
async fn main() {
@ -15,8 +14,6 @@ async fn main() {
}
tracing_subscriber::fmt::init();
let state = SharedState::new(RwLock::new(State::new()));
let matches = App::new("httpmq-rs")
.bin_name("httpmq-rs")
.arg(
@ -26,16 +23,7 @@ async fn main() {
)
.get_matches();
DEFAULT_MAX_QUEUE_CELL
.set(
matches
.value_of("maxqueue")
.unwrap()
.parse::<i32>()
.unwrap(),
)
.unwrap();
init(matches);
// Build our application by composing routes
let app = Router::new()
.route("/", get(process))
@ -48,7 +36,7 @@ async fn main() {
.concurrency_limit(1024)
.timeout(Duration::from_secs(10))
// .layer(TraceLayer::new_for_http())
.layer(AddExtensionLayer::new(state))
// .layer(AddExtensionLayer::new(state))
.into_inner(),
);

View File

@ -1,16 +1,14 @@
use axum::{extract::Extension, extract::Query, http::StatusCode, response::IntoResponse};
use axum::{extract::Query, http::StatusCode, response::IntoResponse};
use clap::ArgMatches;
use once_cell::sync::OnceCell;
use rocksdb::{WriteBatch, DB};
use serde::Deserialize;
use std::{
borrow::Cow,
str,
sync::{Arc, RwLock},
};
use std::{borrow::Cow, str};
use tower::BoxError;
use tracing::debug;
pub static DEFAULT_MAX_QUEUE_CELL: OnceCell<i32> = OnceCell::new();
pub static DATABASE: OnceCell<DB> = OnceCell::new();
// httpmq read metadata api
// retrieve from leveldb
@ -91,24 +89,22 @@ fn httpmq_now_putpos(db: &rocksdb::DB, name: &String) -> Option<i32> {
Some(newpos)
}
pub type SharedState = Arc<RwLock<State>>;
pub fn init(matches: ArgMatches) {
DEFAULT_MAX_QUEUE_CELL
.set(
matches
.value_of("maxqueue")
.unwrap()
.parse::<i32>()
.unwrap(),
)
.unwrap();
pub struct State {
database: rocksdb::DB,
DATABASE.set(DB::open_default("path").unwrap()).unwrap();
}
impl State {
pub fn new() -> State {
let db = DB::open_default("path").unwrap();
State { database: db }
}
}
async fn kv_get(
Query(args): Query<KVSet>,
Extension(state): Extension<SharedState>,
) -> Result<String, StatusCode> {
let db = &state.read().unwrap().database;
async fn kv_get(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
let db = DATABASE.get().unwrap();
let getpos = httpmq_now_getpos(&db, &args.name).unwrap_or_default();
debug!("{} {:?}", getpos, args);
@ -136,13 +132,10 @@ pub struct KVSet {
num: Option<i32>,
}
async fn kv_maxqueue(
Query(args): Query<KVSet>,
Extension(state): Extension<SharedState>,
) -> Result<String, StatusCode> {
async fn kv_maxqueue(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
let num = args.num.unwrap_or(0);
if num > 0 && num <= *DEFAULT_MAX_QUEUE_CELL.get().unwrap() {
let db = &state.read().unwrap().database;
let db = DATABASE.get().unwrap();
db.put(args.name.to_string() + ".maxqueue", num.to_string())
.unwrap();
Ok(String::from("HTTPMQ_MAXQUEUE_OK"))
@ -151,11 +144,8 @@ async fn kv_maxqueue(
}
}
async fn kv_set(
Query(args): Query<KVSet>,
Extension(state): Extension<SharedState>,
) -> Result<String, StatusCode> {
let db = &state.read().unwrap().database;
async fn kv_set(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
let db = DATABASE.get().unwrap();
let putpos = httpmq_now_putpos(&db, &args.name).unwrap_or_default();
@ -178,11 +168,8 @@ async fn kv_set(
}
}
async fn kv_status(
Query(args): Query<KVSet>,
Extension(state): Extension<SharedState>,
) -> Result<String, StatusCode> {
let db = &state.read().unwrap().database;
async fn kv_status(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
let db = DATABASE.get().unwrap();
let metadata = httpmq_read_metadata(db, &args.name).unwrap_or(vec![0, 0, 0]);
let maxqueue = metadata[0];
let putpos = metadata[1];
@ -222,11 +209,8 @@ Number of unread queue: {}
Ok(buf)
}
async fn kv_reset(
Query(args): Query<KVSet>,
Extension(state): Extension<SharedState>,
) -> Result<String, StatusCode> {
let db = &state.read().unwrap().database;
async fn kv_reset(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
let db = DATABASE.get().unwrap();
db.put(
args.name.to_string() + ".maxqueue",
DEFAULT_MAX_QUEUE_CELL.get().unwrap().to_string(),
@ -238,16 +222,13 @@ async fn kv_reset(
Ok(String::from("HTTPMQ_RESET_OK"))
}
pub async fn process(
Query(args): Query<KVSet>,
Extension(state): Extension<SharedState>,
) -> Result<String, StatusCode> {
pub async fn process(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
let res = match &args.opt[..] {
"get" => kv_get(Query(args), Extension(state)).await,
"put" => kv_set(Query(args), Extension(state)).await,
"status" => kv_status(Query(args), Extension(state)).await,
"reset" => kv_reset(Query(args), Extension(state)).await,
"maxqueue" => kv_maxqueue(Query(args), Extension(state)).await,
"get" => kv_get(Query(args)).await,
"put" => kv_set(Query(args)).await,
"status" => kv_status(Query(args)).await,
"reset" => kv_reset(Query(args)).await,
"maxqueue" => kv_maxqueue(Query(args)).await,
_ => Ok(String::from("invalid opt")),
};