feat: add smart auto aggregate selector
This commit is contained in:
parent
c962aa2cdf
commit
1013060529
1 changed files with 127 additions and 0 deletions
127
lib/timeseries/hoarder/sql/aggregate_selectors.sql
Normal file
127
lib/timeseries/hoarder/sql/aggregate_selectors.sql
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
CREATE OR REPLACE FUNCTION get_hypertable_caggs(target_table_name TEXT, target_table_schema TEXT DEFAULT 'public')
|
||||||
|
RETURNS TABLE (table_name TEXT, table_schema TEXT)
|
||||||
|
LANGUAGE SQL
|
||||||
|
STABLE
|
||||||
|
AS $BODY$
|
||||||
|
WITH RECURSIVE cagg_tables as (
|
||||||
|
select vcu1.view_schema, vcu1.view_name
|
||||||
|
from information_schema.view_column_usage vcu1
|
||||||
|
where vcu1.table_schema = target_table_schema
|
||||||
|
and vcu1.table_name = target_table_name
|
||||||
|
|
||||||
|
union all
|
||||||
|
|
||||||
|
select vcu2.view_schema, vcu2.view_name
|
||||||
|
from information_schema.view_column_usage vcu2, cagg_tables t
|
||||||
|
where t.view_schema = vcu2.table_schema
|
||||||
|
and t.view_name = vcu2.table_name
|
||||||
|
)
|
||||||
|
SELECT DISTINCT view_name AS table_name,
|
||||||
|
view_schema AS table_schema
|
||||||
|
FROM cagg_tables
|
||||||
|
WHERE view_schema != '_timescaledb_internal'
|
||||||
|
$BODY$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION get_cagg_interval(table_name TEXT, table_schema TEXT DEFAULT 'public')
|
||||||
|
RETURNS INTERVAL
|
||||||
|
LANGUAGE SQL
|
||||||
|
STABLE
|
||||||
|
AS $BODY$
|
||||||
|
SELECT bucket_width * INTERVAL '1us'
|
||||||
|
FROM _timescaledb_catalog.continuous_agg
|
||||||
|
WHERE table_schema = user_view_schema AND table_name = user_view_name
|
||||||
|
LIMIT 1
|
||||||
|
$BODY$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION get_jsonb_table_columns(in_table_name TEXT, in_table_schema TEXT DEFAULT 'public')
|
||||||
|
RETURNS JSONB
|
||||||
|
LANGUAGE SQL
|
||||||
|
STABLE
|
||||||
|
AS $BODY$
|
||||||
|
SELECT jsonb_agg(column_name)
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_name = in_table_name AND table_schema = in_table_schema
|
||||||
|
$BODY$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION aggregate_choice(hypertable TEXT, selection_interval INTERVAL, aggregate_types JSONB, hypertable_schema TEXT DEFAULT 'public')
|
||||||
|
RETURNS JSONB
|
||||||
|
LANGUAGE SQL
|
||||||
|
STABLE
|
||||||
|
AS $BODY$
|
||||||
|
WITH available_tables AS (
|
||||||
|
SELECT table_name, table_schema, get_cagg_interval(table_name, table_schema) AS table_interval, get_jsonb_table_columns(table_name, table_schema) AS table_columns
|
||||||
|
FROM get_hypertable_caggs(hypertable, hypertable_schema)
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
SELECT hypertable AS table_name, hypertable_schema AS table_schema, INTERVAL '0s' AS table_interval, get_jsonb_table_columns(hypertable, hypertable_schema) AS table_columns
|
||||||
|
), available_aggregates AS (
|
||||||
|
SELECT *
|
||||||
|
FROM jsonb_array_elements(aggregate_types) AS j (aggregate_option)
|
||||||
|
)
|
||||||
|
SELECT jsonb_build_object('table_name', table_name,
|
||||||
|
'table_schema', table_schema,
|
||||||
|
'table_interval', table_interval,
|
||||||
|
'interval_matched', table_interval = selection_interval) || aggregate_option
|
||||||
|
FROM available_tables, available_aggregates, (VALUES(1),(-1)) AS swp(interval_swap)
|
||||||
|
WHERE table_columns@>(aggregate_option->'with_columns') AND (table_interval*interval_swap) <= selection_interval
|
||||||
|
ORDER BY (table_interval*interval_swap) DESC
|
||||||
|
LIMIT 1
|
||||||
|
$BODY$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION ts_autoagg(hypertable TEXT, data_interval INTERVAL,
|
||||||
|
aggregate_choices JSONB,
|
||||||
|
groupby_clause TEXT, filter_query TEXT,
|
||||||
|
hypertable_schema TEXT DEFAULT 'public',
|
||||||
|
time_column TEXT DEFAULT 'time')
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
STABLE
|
||||||
|
AS $BODY$
|
||||||
|
DECLARE
|
||||||
|
selected_parameters jsonb;
|
||||||
|
|
||||||
|
aggregator_column TEXT;
|
||||||
|
BEGIN
|
||||||
|
SELECT aggregate_choice(hypertable, data_interval, aggregate_choices, hypertable_schema) INTO selected_parameters;
|
||||||
|
|
||||||
|
IF NOT FOUND THEN
|
||||||
|
RAISE EXCEPTION 'No fitting hypertable or aggregate found for given columns and table %!', hypertable;
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RAISE NOTICE 'Using parameter set %', selected_parameters;
|
||||||
|
|
||||||
|
aggregator_column := selected_parameters->>'aggregate';
|
||||||
|
|
||||||
|
IF aggregator_column IS NULL THEN
|
||||||
|
RAISE EXCEPTION 'No aggregator given!' USING HINT = 'Supply a "aggregate" field in the JSON aggregate object'
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN QUERY EXECUTE format($qry$
|
||||||
|
SELECT time_bucket($1, %I) AS time, %s, %s
|
||||||
|
FROM %I.%I
|
||||||
|
%s
|
||||||
|
GROUP BY 1, %s
|
||||||
|
ORDER BY 1
|
||||||
|
$qry$, time_column, groupby_clause, aggregator_column,
|
||||||
|
selected_parameters->>'table_schema', selected_parameters->>'table_name',
|
||||||
|
filter_query,
|
||||||
|
groupby_clause) USING data_interval;
|
||||||
|
END;
|
||||||
|
$BODY$;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT *
|
||||||
|
FROM ts_autoagg('cpu', INTERVAL '9m', $aggs$
|
||||||
|
[
|
||||||
|
{"with_columns":["values"], "aggregate":"avg(value) AS value"},
|
||||||
|
{"with_columns":["value_avg"], "aggregate":"avg(value_avg) AS value"}
|
||||||
|
]
|
||||||
|
$aggs$,
|
||||||
|
'tags',
|
||||||
|
$$
|
||||||
|
WHERE time BETWEEN NOW()-INTERVAL'30d' AND NOW()-INTERVAL'10d'
|
||||||
|
AND tags@>'{"metric":"usage_idle","cpu":"cpu-total"}'
|
||||||
|
$$) AS (time TIMESTAMP, tags JSONB, value DOUBLE PRECISION);
|
Loading…
Add table
Add a link
Reference in a new issue