Time-series data ingestion from Rust WebAssembly application, leveraging GreptimeDB and WasmEdge
What is WebAssembly
WebAssembly is a new instruction format that offers cross-platform compatibility and execution speeds close to native machine code. By compiling C/C++ or Rust code into WebAssembly, program performance can be enhanced within browsers.
Additionally, in environments outside of browsers, particularly at the edge of CDNs or IoT, WebAssembly can be used to implement advanced functionalities like sandboxing and dynamic loading of plugins.
What is WasmEdge
WasmEdge is a sandbox project of the Cloud Native Computing Foundation (CNCF), offering the sandboxing capabilities mentioned earlier. It allows developers to extend the resources and interfaces they can access on top of the standard WebAssembly. For instance, WasmEdge provides additional capabilities such as TLS, networking, and AI abilities for Wasm, significantly enriching its range of applications.
WasmEdge GitHub address: https://github.com/WasmEdge/WasmEdge.
Installing GreptimeDB and WasmEdge
If you have already installed GreptimeDB, you can skip this step.
Download and run GreptimeDB
curl -L https://github.com/GreptimeTeam/greptimedb/raw/develop/scripts/install.sh | sh
./greptime standalone start
Install WasmEdge
curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | bash -s
Writing a GreptimeDB WASM Application
In WasmEdge, we can use the MySQL protocol to connect Rust-written applications to GreptimeDB.
First, create a new Rust project using cargo new
. Our compilation target will be wasm32-wasi
. You can create a .cargo/config.toml
file in the project root directory to specify the default compilation target. This way, you won't need to specify --target
after every cargo build
command.
# .cargo/config.toml
[build]
target = "wasm32-wasi"
Edit the Cargo.toml
to add dependencies. The application of mysql_async
requires the tokio
runtime and WasmEdge maintains the modified versions of these two libraries, enabling them to be compiled into WebAssembly code and run in the WasmEdge environment.
[package]
name = "greptimedb"
version = "0.1.0"
edition = "2021"
[dependencies]
mysql_async_wasi = "0.31"
time = "0.3"
tokio_wasi = { version = "1", features = [ "io-util", "fs", "net", "time", "rt", "macros"] }
To further edit the src/
main.rs
file and incorporate database access logic, you should follow these steps. The following code will demonstrate:
Reading the database address from environment variables and creating a connection pool.
Executing SQL statements to create a data table.
Inserting data.
Querying data.
Define data structure
#[derive(Debug)]
struct CpuMetric {
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
}
impl CpuMetric {
fn new(
hostname: String,
environment: String,
usage_user: f64,
usage_system: f64,
usage_idle: f64,
ts: i64,
) -> Self {
Self {
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
}
}
}
Initializing a database connection pool
use mysql_async::{
prelude::*, Opts, OptsBuilder, Pool, PoolConstraints, PoolOpts, Result,
};
use time::PrimitiveDateTime;
fn get_url() -> String {
if let Ok(url) = std::env::var("DATABASE_URL") {
let opts = Opts::from_url(&url).expect("DATABASE_URL invalid");
if opts
.db_name()
.expect("a database name is required")
.is_empty()
{
panic!("database name is empty");
}
url
} else {
"mysql://root:pass@127.0.0.1:3306/mysql".into()
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
// Alternative: The "easy" way with a default connection pool
// let pool = Pool::new(Opts::from_url(&*get_url()).unwrap());
// let mut conn = pool.get_conn().await.unwrap();
// Below we create a customized connection pool
let opts = Opts::from_url(&*get_url()).unwrap();
let builder = OptsBuilder::from_opts(opts);
// The connection pool will have a min of 1 and max of 2 connections.
let constraints = PoolConstraints::new(1, 2).unwrap();
let pool_opts = PoolOpts::default().with_constraints(constraints);
let pool = Pool::new(builder.pool_opts(pool_opts));
let mut conn = pool.get_conn().await.unwrap();
Ok(())
}
Creating a data table
// Create table if not exists
r"CREATE TABLE IF NOT EXISTS wasmedge_example_cpu_metrics (
hostname STRING,
environment STRING,
usage_user DOUBLE,
usage_system DOUBLE,
usage_idle DOUBLE,
ts TIMESTAMP,
TIME INDEX(ts),
PRIMARY KEY(hostname, environment)
);"
.ignore(&mut conn)
.await?;
Inserting data
let metrics = vec![
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307200050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307200050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307260050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307260050,
),
CpuMetric::new(
"host0".into(),
"test".into(),
32f64,
3f64,
4f64,
1680307320050,
),
CpuMetric::new(
"host1".into(),
"test".into(),
29f64,
32f64,
50f64,
1680307320050,
),
];
r"INSERT INTO wasmedge_example_cpu_metrics (hostname, environment, usage_user, usage_system, usage_idle, ts)
VALUES (:hostname, :environment, :usage_user, :usage_system, :usage_idle, :ts)"
.with(metrics.iter().map(|metric| {
params! {
"hostname" => &metric.hostname,
"environment" => &metric.environment,
"usage_user" => metric.usage_user,
"usage_system" => metric.usage_system,
"usage_idle" => metric.usage_idle,
"ts" => metric.ts,
}
}))
.batch(&mut conn)
.await?;
Quering data
let loaded_metrics = "SELECT * FROM wasmedge_example_cpu_metrics"
.with(())
.map(
&mut conn,
|(hostname, environment, usage_user, usage_system, usage_idle, raw_ts): (
String,
String,
f64,
f64,
f64,
PrimitiveDateTime,
)| {
let ts = raw_ts.assume_utc().unix_timestamp() * 1000;
CpuMetric::new(
hostname,
environment,
usage_user,
usage_system,
usage_idle,
ts,
)
},
)
.await?;
println!("{:?}", loaded_metrics);
The tokio
and mysql_async
libraries provided by the WasmEdge team are fully compatible with the original version's programming interface, allowing for a seamless transition of Rust applications to the WebAssembly platform.
By compiling this project, we can get the greptimedb.wasm file.
cargo build
ls -lh target/wasm32-wasi/debug/greptimedb.wasm
Run our application through WasmEdge:
wasmedge --env "DATABASE_URL=mysql://localhost:4002/public" target/wasm32-wasi/debug/greptimedb.wasm
The above sample program has been added to the WasmEdge database usage demonstration. You can find the full source code in the GitHub repository at https://github.com/WasmEdge/wasmedge-db-examples/tree/main/greptimedb.
Conclusion
WasmEdge offers expanded capabilities for WebAssembly applications. If you deploy your application in a WebAssembly environment, you can also use the OpenTelemetry SDK in the future to collect metric data and store it directly in GreptimeDB. Download GreptimeDB now or activate a GreptimeCloud instance to run the above example and experience the convenience of time-series data ingestion for Rust WebAssembly applications.