timeseries-hoarder/telegraf_psql_ingestor.rb

247 lines
6.5 KiB
Ruby

require 'pg'
require 'json'
require 'influxparser'
Process.setproctitle('telegraf-tsdb-ingest')
CONNECT_URL = ARGV[1] || "user=postgres dbname=ingestor_test"
SCHEMA = 'telegraf_ingest'
$pg = PG.connect(CONNECT_URL)
$known_tables = {}
$high_cardinality_tags = {
'process_name' => true,
'pid' => true
}
$source_tags = {
'host' => true,
'location' => true
}
def ensure_schema_exists(schema)
$pg.exec("CREATE SCHEMA IF NOT EXISTS #{schema}");
end
ensure_schema_exists(SCHEMA)
def grab_table_list()
$pg.exec("SELECT * FROM pg_catalog.pg_tables WHERE schemaname IN ('#{SCHEMA}', 'public');") do |result|
result.each do |tuple|
table = tuple['schemaname'] + '.' + tuple['tablename']
$known_tables[table] = true
end
end
end
grab_table_list
class DedupContainer
attr_reader :cache
def initialize(pg, dataname, datatype)
@pg = pg
@dataname = dataname
@datatype = datatype
@tablename = "#{SCHEMA}.#{@dataname}s"
@id_column = "#{@dataname}_id"
@cache = {}
setup_table unless $known_tables[@tablename]
end
def setup_table()
@pg.exec("CREATE TABLE #{@tablename} ( #{@id_column} SERIAL PRIMARY KEY, #{@dataname} #{@datatype} UNIQUE)")
@pg.exec("CREATE INDEX ON #{@tablename} #{@datatype == 'JSONB' ? 'USING GIN' : ''} (#{@dataname})")
end
def load_table()
@pg.exec("SELECT * FROM #{@tablename}") do |result|
result.each do |tuple|
@cache[tuple[@dataname]] = tuple[@id_column]
end
end
end
def add_key(key)
key_str = key
if @datatype == 'JSONB'
key_str = key.to_json
end
upsert_statement = "INSERT INTO #{@tablename}(#{@dataname}) VALUES ($1::#{@datatype}) RETURNING #{@id_column}"
id_res = @pg.exec_params("SELECT #{@id_column} FROM #{@tablename} WHERE #{@dataname} = $1::#{@datatype}", [key_str])
if(id_res.ntuples == 0)
id_res = @pg.exec_params(upsert_statement, [key_str]);
end
key_id = id_res[0][@id_column].to_i
@cache[key] = key_id
key_id
end
def [](key)
r = @cache[key]
return r unless r.nil?
add_key key
end
end
class TimeseriesTable
attr_reader :internal_tablename
def initialize(pg, tablename)
@pg = pg
@tablename = tablename
@internal_tablename = "#{SCHEMA}._timeseries_#{tablename}"
@chunk_time_interval = '1d'
@compression_interval = '2d'
@retention_time = '6 months'
setup_tables unless $known_tables[@internal_tablename]
end
def setup_tables
@pg.exec <<-SQL
CREATE TABLE #{@internal_tablename} (
time TIMESTAMPTZ NOT NULL,
source_id INT NOT NULL,
tag_id INT NOT NULL,
metric_id INT NOT NULL,
high_cardinality_tags JSONB,
ts_value NUMERIC,
FOREIGN KEY (source_id) REFERENCES #{SCHEMA}.sources (source_id),
FOREIGN KEY (tag_id) REFERENCES #{SCHEMA}.tags (tag_id),
FOREIGN KEY (metric_id) REFERENCES #{SCHEMA}.metrics (metric_id)
)
SQL
@pg.exec "CREATE INDEX ON #{@internal_tablename} (metric_id, source_id, tag_id)"
@pg.exec "SELECT * FROM create_hypertable('#{@internal_tablename}', 'time', chunk_time_interval => INTERVAL '#{@chunk_time_interval}')"
@pg.exec "ALTER TABLE #{@internal_tablename} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tag_id, metric_id, high_cardinality_tags')"
@pg.exec "SELECT * FROM add_compression_policy('#{@internal_tablename}', INTERVAL '#{@compression_interval}')"
@pg.exec "SELECT * FROM add_retention_policy('#{@internal_tablename}', INTERVAL '#{@retention_time}')"
@pg.exec <<-SQL
CREATE VIEW #{@tablename} AS (
SELECT time, source, tag, metric, high_cardinality_tags, ts_value
FROM #{@internal_tablename}
INNER JOIN #{SCHEMA}.sources USING (source_id)
INNER JOIN #{SCHEMA}.tags USING (tag_id)
INNER JOIN #{SCHEMA}.metrics USING (metric_id)
)
SQL
end
end
$known_sources = DedupContainer.new($pg, 'source', 'JSONB');
$known_metrics = DedupContainer.new($pg, 'metric', 'VARCHAR');
$known_tags = DedupContainer.new($pg, 'tag', 'JSONB');
$timeseries = {}
puts $known_tables
puts $known_sources
puts $known_sources[{"host" => "xnm-core.lucidragons.de"}]
def ingest_line(line)
begin
line = InfluxParser.parse_point(line)
rescue => e
STDERR.puts "Error in line protocol parsing: #{e}"
return
end
series = $timeseries[line['series']]
if(series.nil?)
series = TimeseriesTable.new($pg, line['series']);
$timeseries[line['series']] = series
end
line_source_tags = {}
line_series_tags = {}
line_high_cardinality_tags = {}
tags = line['tags']
values = line['values']
if tags.include? 'metric' and values.include? 'value'
metric = tags['metric']
values[tags['metric']] = values['value']
tags.delete 'metric'
values.delete 'value'
end
tags.each do |tag, tag_value|
if($source_tags[tag])
line_source_tags[tag] = tag_value
elsif($high_cardinality_tags[tag])
line_high_cardinality_tags[tag] = tag_value
else
line_series_tags[tag] = tag_value
end
end
line_high_cardinality_tags = nil if line_high_cardinality_tags.empty?
timestamp = Time.at(line['timestamp'].to_f * 1e-9);
line_source_id = $known_sources[line_source_tags]
line_series_id = $known_tags[line_series_tags]
metric_ids_array = []
values_array = []
values.each do |metric, value|
next unless value.is_a? Numeric
metric_ids_array << $known_metrics[metric]
values_array << value
end
puts "Inserting into #{series.internal_tablename}"
metric_ids_array = '{'+metric_ids_array.join(',')+'}'
values_array = '{'+values_array.join(',')+'}'
insert_statement = <<-SQL
INSERT INTO #{series.internal_tablename}
(time, source_id, tag_id, metric_id, ts_value, high_cardinality_tags)
VALUES ($1::timestamptz, $2::int, $3::int, unnest($4::int[]), unnest($5::numeric[]), $6::jsonb)
SQL
$pg.exec_params(insert_statement, [timestamp, line_source_id, line_series_id, metric_ids_array, values_array, line_high_cardinality_tags.to_json])
end
$stdin.sync = true
lines_queue = Queue.new
Thread.new do
loop do
lines_queue << STDIN.gets
end
end
loop do
sleep 10
next if lines_queue.empty?
$pg.transaction do
ingest_line(lines_queue.pop) until lines_queue.empty?
end
end