diff --git a/lib/timeseries/hoarder/sql/aggregate_selectors.sql b/lib/timeseries/hoarder/sql/aggregate_selectors.sql new file mode 100644 index 0000000..412f98e --- /dev/null +++ b/lib/timeseries/hoarder/sql/aggregate_selectors.sql @@ -0,0 +1,127 @@ +CREATE OR REPLACE FUNCTION get_hypertable_caggs(target_table_name TEXT, target_table_schema TEXT DEFAULT 'public') +RETURNS TABLE (table_name TEXT, table_schema TEXT) +LANGUAGE SQL +STABLE +AS $BODY$ +WITH RECURSIVE cagg_tables as ( + select vcu1.view_schema, vcu1.view_name + from information_schema.view_column_usage vcu1 + where vcu1.table_schema = target_table_schema + and vcu1.table_name = target_table_name + + union all + + select vcu2.view_schema, vcu2.view_name + from information_schema.view_column_usage vcu2, cagg_tables t + where t.view_schema = vcu2.table_schema + and t.view_name = vcu2.table_name +) +SELECT DISTINCT view_name AS table_name, + view_schema AS table_schema + FROM cagg_tables + WHERE view_schema != '_timescaledb_internal' +$BODY$; + +CREATE OR REPLACE FUNCTION get_cagg_interval(table_name TEXT, table_schema TEXT DEFAULT 'public') +RETURNS INTERVAL +LANGUAGE SQL +STABLE +AS $BODY$ +SELECT bucket_width * INTERVAL '1us' +FROM _timescaledb_catalog.continuous_agg +WHERE table_schema = user_view_schema AND table_name = user_view_name +LIMIT 1 +$BODY$; + +CREATE OR REPLACE FUNCTION get_jsonb_table_columns(in_table_name TEXT, in_table_schema TEXT DEFAULT 'public') +RETURNS JSONB +LANGUAGE SQL +STABLE +AS $BODY$ +SELECT jsonb_agg(column_name) +FROM information_schema.columns +WHERE table_name = in_table_name AND table_schema = in_table_schema +$BODY$; + +CREATE OR REPLACE FUNCTION aggregate_choice(hypertable TEXT, selection_interval INTERVAL, aggregate_types JSONB, hypertable_schema TEXT DEFAULT 'public') +RETURNS JSONB +LANGUAGE SQL +STABLE +AS $BODY$ +WITH available_tables AS ( + SELECT table_name, table_schema, get_cagg_interval(table_name, table_schema) AS table_interval, get_jsonb_table_columns(table_name, table_schema) AS table_columns + FROM get_hypertable_caggs(hypertable, hypertable_schema) + + UNION ALL + + SELECT hypertable AS table_name, hypertable_schema AS table_schema, INTERVAL '0s' AS table_interval, get_jsonb_table_columns(hypertable, hypertable_schema) AS table_columns +), available_aggregates AS ( + SELECT * + FROM jsonb_array_elements(aggregate_types) AS j (aggregate_option) +) +SELECT jsonb_build_object('table_name', table_name, +'table_schema', table_schema, +'table_interval', table_interval, +'interval_matched', table_interval = selection_interval) || aggregate_option +FROM available_tables, available_aggregates, (VALUES(1),(-1)) AS swp(interval_swap) +WHERE table_columns@>(aggregate_option->'with_columns') AND (table_interval*interval_swap) <= selection_interval +ORDER BY (table_interval*interval_swap) DESC +LIMIT 1 +$BODY$; + +CREATE OR REPLACE FUNCTION ts_autoagg(hypertable TEXT, data_interval INTERVAL, + aggregate_choices JSONB, + groupby_clause TEXT, filter_query TEXT, + hypertable_schema TEXT DEFAULT 'public', + time_column TEXT DEFAULT 'time') +RETURNS SETOF RECORD +LANGUAGE plpgsql +STABLE +AS $BODY$ +DECLARE + selected_parameters jsonb; + + aggregator_column TEXT; +BEGIN + SELECT aggregate_choice(hypertable, data_interval, aggregate_choices, hypertable_schema) INTO selected_parameters; + + IF NOT FOUND THEN + RAISE EXCEPTION 'No fitting hypertable or aggregate found for given columns and table %!', hypertable; + RETURN; + END IF; + + RAISE NOTICE 'Using parameter set %', selected_parameters; + + aggregator_column := selected_parameters->>'aggregate'; + + IF aggregator_column IS NULL THEN + RAISE EXCEPTION 'No aggregator given!' USING HINT = 'Supply a "aggregate" field in the JSON aggregate object' + RETURN; + END IF; + + RETURN QUERY EXECUTE format($qry$ + SELECT time_bucket($1, %I) AS time, %s, %s + FROM %I.%I + %s + GROUP BY 1, %s + ORDER BY 1 + $qry$, time_column, groupby_clause, aggregator_column, + selected_parameters->>'table_schema', selected_parameters->>'table_name', + filter_query, + groupby_clause) USING data_interval; +END; +$BODY$; + + +SELECT * +FROM ts_autoagg('cpu', INTERVAL '9m', $aggs$ + [ + {"with_columns":["values"], "aggregate":"avg(value) AS value"}, + {"with_columns":["value_avg"], "aggregate":"avg(value_avg) AS value"} + ] +$aggs$, +'tags', +$$ + WHERE time BETWEEN NOW()-INTERVAL'30d' AND NOW()-INTERVAL'10d' + AND tags@>'{"metric":"usage_idle","cpu":"cpu-total"}' +$$) AS (time TIMESTAMP, tags JSONB, value DOUBLE PRECISION);