TL;DR: I created two functions for working with Apache Age (accessed by SQLx) that returns JSON results directly from the database.
The codes ugly and I’m unsure what the performance cost is but this works well enough to be used in osintbuddy. We end up automatically casting the agtype
results to JSON
.
// External resources: https://age.apache.org/age-manual/master/intro/types.html#map
// Expected return must include: WITH <age_map> AS return_var
// E.g.
// "query... WITH <age_map> AS <return_var> RETURN <return_var>"
//
// "MATCH (v)-[e] WITH {id: id(e), label: label(e), start_id: start_id(e), end_id: end_id(e)} AS e, v
// WITH {id: id(v), label: label(v), properties: properties(v)} AS v, e
// RETURN e, v"
pub async fn with_cypher(
graph_name: &str,
tx: &mut PgConnection,
query: &str,
query_as: &str,
) -> Result<Vec<serde_json::Value>, AppError> {
let query_params: Vec<&str> = query_as.split(",").collect();
let query_cast =
query_params
.clone()
.into_iter()
.fold(String::new(), |mut acc, cypher_ret_value| {
let b = format!(
"CAST(CAST({} AS VarChar) AS JSON), ",
cypher_ret_value.replace("agtype", "").trim()
)
.to_string();
acc.reserve(&b.len() + 1);
acc.push_str(&b);
acc
});
// Remove trailing comma from SQL CASTs seen above
let query_cast = query_cast[0..query_cast.len() - 2].to_string();
let objs: Vec<PgRow> = sqlx::raw_sql(&format!(
"SELECT {query_cast} FROM cypher('{graph_name}', $$ {query} $$) as ({query_as})"
))
.fetch_all(tx)
.await
.map_err(|err| {
error!("{err}");
AppError {
message: "We ran into an OpenCypher select transaction error!",
kind: ErrorKind::Critical,
}
})?;
let columns = query_params
.iter()
.map(|a| a.replace("agtype", "").trim().to_owned());
let mut age_objects: Vec<serde_json::Value> = Vec::new();
columns.into_iter().for_each(|col| {
age_objects.extend(
objs.iter()
.map(|row| row.get::<serde_json::Value, _>(col.as_str()))
.collect::<Vec<serde_json::Value>>(),
);
});
Ok(age_objects)
}
And setting up the transaction from the Postgres<Pool>
(aka PgPool
) ref:
pub type PgTx = Transaction<'static, Postgres>;
pub type AgeTx = Result<PgTx, AppError>;
pub async fn age_tx(pool: &PgPool) -> AgeTx {
let tx = pool
.begin()
.await
.map(|mut tx| async {
let _ = sqlx::query("CREATE EXTENSION IF NOT EXISTS age")
.execute(&mut *tx)
.await;
let _ = sqlx::query("LOAD 'age'").execute(&mut *tx).await;
let _ = sqlx::query("SET search_path = ag_catalog, \"$user\", public")
.execute(&mut *tx)
.await;
tx
})
.map_err(|err| {
log::error!("{err}");
AppError {
message: "Error loading Apache Age extension.",
kind: ErrorKind::Critical,
}
})?
.await;
Ok(tx)
}
Usage looks like:
// pool: actix_web::web::Data<PgPool>
let mut tx = age_tx(pool.as_ref()).await?;
let vertices = with_cypher(
&graph_name,
tx.as_mut(),
"MATCH (v) WITH {id: id(v)} AS v RETURN v",
"v agtype",
)
.await?;
let edges = with_cypher(
&graph_name,
tx.as_mut(),
"MATCH (v)-[e]->() WITH {id: id(e)} AS e RETURN e",
"e agtype",
)
.await?;
let second_degrees = with_cypher(
&graph_name,
tx.as_mut(),
"MATCH ()-[]->(a)-[]->(v) WITH {id: id(v)} AS v RETURN v",
"v agtype",
)
.await?;
tx.commit().await.map_err(|err| {
error!("{err}");
AppError {
message: "We ran into an error commiting the age transaction!",
kind: ErrorKind::Critical,
}
})?;
I’ll slowly refine this over time but it works well enough for now. This is much better than writing SQL like:
CREATE EXTENSION IF NOT EXISTS age;
LOAD 'age';
SET search_path = ag_catalog, "$user", public;
SELECT CAST(CAST(e AS VARCHAR) AS JSON), CAST(CAST(v AS VARCHAR) AS JSON) FROM cypher('g_efa4cbe16d83498196b445ac4e5e27da', $$
MATCH (v)-[e]->()
WITH {id: id(e), start_id: start_id(e), end_id: end_id(e), properties: properties(e)} AS e, v
WITH {id: id(v), label: label(v), properties: properties(v)} AS v, e RETURN e, v $$) as (e agtype,v agtype);
-- columns:
-- e | v
-- ------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------
-- {"id": 1970324836974598, "end_id": 281474976710658, "start_id": 281474976710657, "properties": {}} | {"id": 281474976710657, "label": "", "properties": {"name": "s"}}
-- {"id": 2251799813685250, "end_id": 281474976710660, "start_id": 281474976710659, "properties": {}} | {"id": 281474976710659, "label": "", "properties": {"name": "Andres"}}
-- {"id": 2251799813685249, "end_id": 281474976710660, "start_id": 281474976710661, "properties": {}} | {"id": 281474976710661, "label": "", "properties": {"name": "Michael"}}
-- {"id": 2251799813685251, "end_id": 281474976710663, "start_id": 281474976710662, "properties": {}} | {"id": 281474976710662, "label": "", "properties": {}}
When looking at the extension code or the issues it seems you’re supposed to be able to cast agtype
directly to JSON
however I can’t get this to work. I run into errors such as:
ERROR: cannot cast type agtype to json
# or else I get errors like:
ERROR: type "agtype_as_json" does not exist
I’ll try troubleshooting this further later, being able to return JSON from Apache Age with less casting would be really nice.