252 lines
7.2 KiB
Rust
252 lines
7.2 KiB
Rust
|
|
use std::{str};
|
|
use dashmap::DashMap;
|
|
use tracing::debug;
|
|
|
|
pub static MAX_QUEUE : i32 = 8192;
|
|
//pub static DEFAULT_MAX_QUEUE_CELL : Vec<i32> = vec!(MAX_QUEUE, 0,0);
|
|
|
|
|
|
// httpmq read metadata api
|
|
// retrieve from leveldb
|
|
// name.maxqueue - maxqueue
|
|
// name.putpos - putpos
|
|
// name.getpos - getpos
|
|
pub fn httpmq_read_metadata(db : &DashMap<&String, &str>, name: &String) -> Option<Vec<i32>> {
|
|
|
|
let max_queue_key = format!("{}.{}", &name, "maxqueue");
|
|
let max_queue_key_check = max_queue_key.to_string();
|
|
let put_pos_key = format!("{}.{}", &name, "putpos");
|
|
let get_pos_key = format!("{}.{}", &name, "getpos");
|
|
|
|
if !unsafe { db.contains_key(&max_queue_key_check.to_string()) } {
|
|
|
|
|
|
let _ = unsafe { db.insert(&max_queue_key.to_string(), "8192") };
|
|
let _ = unsafe { db.insert(&put_pos_key.to_string(), "0") };
|
|
let _ = unsafe { db.insert(&get_pos_key.to_string(), "0") };
|
|
|
|
return Some(vec!(MAX_QUEUE,0,0))
|
|
}
|
|
else {
|
|
let _thing = unsafe { db.get(&max_queue_key).unwrap() };
|
|
let _thing1 = unsafe { db.get(&put_pos_key).unwrap() };
|
|
let _thing2 = unsafe { db.get(&get_pos_key).unwrap() };
|
|
|
|
let mut result: Vec<i32> = vec!(
|
|
0,0,0
|
|
);
|
|
|
|
debug!("result {:?}", result);
|
|
if result[0] == 0 {
|
|
result[0] = MAX_QUEUE;
|
|
}
|
|
Some(result)
|
|
}
|
|
}
|
|
|
|
/*
|
|
fn httpmq_now_getpos(name: &String) -> Option<i32> {
|
|
let metadata = httpmq_read_metadata(name);
|
|
let maxqueue = metadata.as_ref()?[0];
|
|
let putpos = metadata.as_ref()?[1];
|
|
let mut getpos = metadata.as_ref()?[2];
|
|
|
|
if getpos == 0 && putpos > 0 {
|
|
getpos = 1 // first get operation, set getpos 1
|
|
} else if getpos < putpos {
|
|
getpos += 1 // 1nd lap, increase getpos
|
|
} else if getpos > putpos && getpos < maxqueue {
|
|
getpos += 1 // 2nd lap
|
|
} else if getpos > putpos && getpos == maxqueue {
|
|
getpos = 1 // 2nd first operation, set getpos 1
|
|
} else {
|
|
return Some(0); // all data in queue has been get
|
|
}
|
|
|
|
debug!("getpos {} {:?}", getpos, metadata);
|
|
|
|
db.put(name.to_string() + ".getpos", getpos.to_string())
|
|
.ok()?;
|
|
Some(getpos)
|
|
}
|
|
|
|
fn httpmq_now_putpos(name: &String) -> Option<i32> {
|
|
let metadata = httpmq_read_metadata(name);
|
|
let maxqueue = metadata.as_ref()?[0];
|
|
let mut putpos = metadata.as_ref()?[1];
|
|
let getpos = metadata.as_ref()?[2];
|
|
|
|
let newpos;
|
|
|
|
putpos += 1; // increase put queue pos
|
|
if putpos == getpos {
|
|
// queue is full
|
|
return Some(0); // return 0 to reject put operation
|
|
} else if getpos <= 1 && putpos > maxqueue {
|
|
// get operation less than 1
|
|
return Some(0); // and queue is full, just reject it
|
|
} else if putpos > maxqueue {
|
|
// 2nd lap
|
|
newpos = 1 // reset putpos as 1 and write to leveldb
|
|
} else {
|
|
// 1nd lap, convert int to string and write to leveldb
|
|
newpos = putpos;
|
|
}
|
|
|
|
debug!("newpos {} {:?}", newpos, metadata);
|
|
|
|
Some(newpos)
|
|
}
|
|
|
|
pub fn init(matches: ArgMatches) {
|
|
|
|
}
|
|
|
|
async fn kv_get(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
|
|
let getpos = httpmq_now_getpos(&args.name).unwrap_or_default();
|
|
|
|
debug!("{} {:?}", getpos, args);
|
|
|
|
if getpos == 0 {
|
|
Ok(String::from("HTTPMQ_GET_END"))
|
|
} else {
|
|
let queue_name = args.name.to_string() + &getpos.to_string();
|
|
let val = match db.get(&queue_name) {
|
|
Some(obj) => obj,
|
|
None => "HTTPMQ_GET_NONE",
|
|
};
|
|
|
|
Ok(val.to_string())
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize, Debug)]
|
|
pub struct KVSet {
|
|
opt: String,
|
|
name: String,
|
|
data: Option<String>,
|
|
// pos: Option<i32>,
|
|
num: Option<i32>,
|
|
}
|
|
|
|
async fn kv_maxqueue(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
|
|
let num = args.num.unwrap_or(0);
|
|
if num > 0 && num <= MAX_QUEUE {
|
|
let insert = db.insert(&format!("{}.{}", args.name, "maxqueue"), &format!("{}",num));
|
|
debug!("{:?}", insert);
|
|
Ok(String::from("HTTPMQ_MAXQUEUE_OK"))
|
|
} else {
|
|
Ok(String::from("HTTPMQ_MAXQUEUE_CANCEL"))
|
|
}
|
|
}
|
|
|
|
async fn kv_set(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
|
|
let db = DATABASE.get().unwrap();
|
|
|
|
let putpos = httpmq_now_putpos(&db, &args.name).unwrap_or_default();
|
|
|
|
debug!("{} {:?}", putpos, args);
|
|
|
|
if putpos > 0 {
|
|
let queue_name = args.name.to_string() + &putpos.to_string();
|
|
|
|
let data = args.data.unwrap_or("".to_string());
|
|
if data.len() > 0 {
|
|
let mut batch = WriteBatch::default();
|
|
batch.put(args.name.to_string() + ".putpos", putpos.to_string());
|
|
batch.put(queue_name, data);
|
|
db.write(batch).unwrap();
|
|
return Ok(String::from("HTTPMQ_PUT_OK"));
|
|
}
|
|
Ok(String::from("HTTPMQ_PUT_NO_DATA"))
|
|
} else {
|
|
Ok(String::from("HTTPMQ_PUT_END"))
|
|
}
|
|
}
|
|
|
|
async fn kv_status(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
|
|
let db = DATABASE.get().unwrap();
|
|
let metadata = httpmq_read_metadata(db, &args.name).unwrap_or(vec![0, 0, 0]);
|
|
let maxqueue = metadata[0];
|
|
let putpos = metadata[1];
|
|
let getpos = metadata[2];
|
|
|
|
let mut ungetnum = 0;
|
|
let mut put_times = "";
|
|
let mut get_times = "";
|
|
if putpos >= getpos {
|
|
ungetnum = (putpos - getpos).abs();
|
|
put_times = "1st lap";
|
|
get_times = "1st lap";
|
|
} else if putpos < getpos {
|
|
ungetnum = (maxqueue + putpos - getpos).abs();
|
|
put_times = "2st lap";
|
|
get_times = "1st lap";
|
|
}
|
|
|
|
let buf = format!(
|
|
"HTTP Simple Queue Service
|
|
------------------------------
|
|
Queue Name: {}
|
|
Maximum number of queues: {}
|
|
Put position of queue ({}): {}
|
|
Get position of queue ({}): {}
|
|
Number of unread queue: {}
|
|
",
|
|
args.name.to_string(),
|
|
maxqueue,
|
|
put_times,
|
|
putpos,
|
|
get_times,
|
|
getpos,
|
|
ungetnum
|
|
);
|
|
|
|
Ok(buf)
|
|
}
|
|
|
|
async fn kv_reset(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
|
|
let db = DATABASE.get().unwrap();
|
|
db.put(
|
|
args.name.to_string() + ".maxqueue",
|
|
DEFAULT_MAX_QUEUE_CELL.get().unwrap().to_string(),
|
|
)
|
|
.unwrap();
|
|
db.put(args.name.to_string() + ".putpos", "0").unwrap();
|
|
db.put(args.name.to_string() + ".getpos", "0").unwrap();
|
|
|
|
Ok(String::from("HTTPMQ_RESET_OK"))
|
|
}
|
|
|
|
pub async fn process(Query(args): Query<KVSet>) -> Result<String, StatusCode> {
|
|
let res = match &args.opt[..] {
|
|
"get" => kv_get(Query(args)).await,
|
|
"put" => kv_set(Query(args)).await,
|
|
"status" => kv_status(Query(args)).await,
|
|
"reset" => kv_reset(Query(args)).await,
|
|
"maxqueue" => kv_maxqueue(Query(args)).await,
|
|
_ => Ok(String::from("invalid opt")),
|
|
};
|
|
|
|
return res;
|
|
}
|
|
|
|
pub async fn handle_error(error: BoxError) -> impl IntoResponse {
|
|
if error.is::<tower::timeout::error::Elapsed>() {
|
|
return (StatusCode::REQUEST_TIMEOUT, Cow::from("request timed out"));
|
|
}
|
|
|
|
if error.is::<tower::load_shed::error::Overloaded>() {
|
|
return (
|
|
StatusCode::SERVICE_UNAVAILABLE,
|
|
Cow::from("service is overloaded, try again later"),
|
|
);
|
|
}
|
|
|
|
(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Cow::from(format!("Unhandled internal error: {}", error)),
|
|
)
|
|
}
|
|
*/ |