247 lines
6.5 KiB
Ruby
247 lines
6.5 KiB
Ruby
|
|
require 'pg'
|
|
require 'json'
|
|
require 'influxparser'
|
|
|
|
Process.setproctitle('telegraf-tsdb-ingest')
|
|
|
|
CONNECT_URL = ARGV[1] || "user=postgres dbname=ingestor_test"
|
|
SCHEMA = 'telegraf_ingest'
|
|
|
|
$pg = PG.connect(CONNECT_URL)
|
|
|
|
$known_tables = {}
|
|
|
|
$high_cardinality_tags = {
|
|
'process_name' => true,
|
|
'pid' => true
|
|
}
|
|
$source_tags = {
|
|
'host' => true,
|
|
'location' => true
|
|
}
|
|
|
|
def ensure_schema_exists(schema)
|
|
$pg.exec("CREATE SCHEMA IF NOT EXISTS #{schema}");
|
|
end
|
|
ensure_schema_exists(SCHEMA)
|
|
|
|
def grab_table_list()
|
|
$pg.exec("SELECT * FROM pg_catalog.pg_tables WHERE schemaname IN ('#{SCHEMA}', 'public');") do |result|
|
|
result.each do |tuple|
|
|
table = tuple['schemaname'] + '.' + tuple['tablename']
|
|
$known_tables[table] = true
|
|
end
|
|
end
|
|
end
|
|
|
|
grab_table_list
|
|
|
|
class DedupContainer
|
|
attr_reader :cache
|
|
|
|
def initialize(pg, dataname, datatype)
|
|
@pg = pg
|
|
|
|
@dataname = dataname
|
|
@datatype = datatype
|
|
|
|
@tablename = "#{SCHEMA}.#{@dataname}s"
|
|
@id_column = "#{@dataname}_id"
|
|
|
|
@cache = {}
|
|
|
|
setup_table unless $known_tables[@tablename]
|
|
end
|
|
|
|
def setup_table()
|
|
@pg.exec("CREATE TABLE #{@tablename} ( #{@id_column} SERIAL PRIMARY KEY, #{@dataname} #{@datatype} UNIQUE)")
|
|
@pg.exec("CREATE INDEX ON #{@tablename} #{@datatype == 'JSONB' ? 'USING GIN' : ''} (#{@dataname})")
|
|
end
|
|
|
|
def load_table()
|
|
@pg.exec("SELECT * FROM #{@tablename}") do |result|
|
|
result.each do |tuple|
|
|
@cache[tuple[@dataname]] = tuple[@id_column]
|
|
end
|
|
end
|
|
end
|
|
|
|
def add_key(key)
|
|
key_str = key
|
|
if @datatype == 'JSONB'
|
|
key_str = key.to_json
|
|
end
|
|
|
|
upsert_statement = "INSERT INTO #{@tablename}(#{@dataname}) VALUES ($1::#{@datatype}) RETURNING #{@id_column}"
|
|
|
|
id_res = @pg.exec_params("SELECT #{@id_column} FROM #{@tablename} WHERE #{@dataname} = $1::#{@datatype}", [key_str])
|
|
if(id_res.ntuples == 0)
|
|
id_res = @pg.exec_params(upsert_statement, [key_str]);
|
|
end
|
|
|
|
key_id = id_res[0][@id_column].to_i
|
|
|
|
@cache[key] = key_id
|
|
|
|
key_id
|
|
end
|
|
|
|
def [](key)
|
|
r = @cache[key]
|
|
return r unless r.nil?
|
|
|
|
add_key key
|
|
end
|
|
end
|
|
|
|
class TimeseriesTable
|
|
attr_reader :internal_tablename
|
|
|
|
def initialize(pg, tablename)
|
|
@pg = pg
|
|
|
|
@tablename = tablename
|
|
@internal_tablename = "#{SCHEMA}._timeseries_#{tablename}"
|
|
|
|
@chunk_time_interval = '1d'
|
|
@compression_interval = '2d'
|
|
|
|
@retention_time = '6 months'
|
|
|
|
setup_tables unless $known_tables[@internal_tablename]
|
|
end
|
|
|
|
def setup_tables
|
|
@pg.exec <<-SQL
|
|
CREATE TABLE #{@internal_tablename} (
|
|
time TIMESTAMPTZ NOT NULL,
|
|
source_id INT NOT NULL,
|
|
tag_id INT NOT NULL,
|
|
metric_id INT NOT NULL,
|
|
high_cardinality_tags JSONB,
|
|
ts_value NUMERIC,
|
|
|
|
FOREIGN KEY (source_id) REFERENCES #{SCHEMA}.sources (source_id),
|
|
FOREIGN KEY (tag_id) REFERENCES #{SCHEMA}.tags (tag_id),
|
|
FOREIGN KEY (metric_id) REFERENCES #{SCHEMA}.metrics (metric_id)
|
|
)
|
|
SQL
|
|
@pg.exec "CREATE INDEX ON #{@internal_tablename} (metric_id, source_id, tag_id)"
|
|
@pg.exec "SELECT * FROM create_hypertable('#{@internal_tablename}', 'time', chunk_time_interval => INTERVAL '#{@chunk_time_interval}')"
|
|
|
|
@pg.exec "ALTER TABLE #{@internal_tablename} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tag_id, metric_id, high_cardinality_tags')"
|
|
|
|
@pg.exec "SELECT * FROM add_compression_policy('#{@internal_tablename}', INTERVAL '#{@compression_interval}')"
|
|
@pg.exec "SELECT * FROM add_retention_policy('#{@internal_tablename}', INTERVAL '#{@retention_time}')"
|
|
|
|
@pg.exec <<-SQL
|
|
CREATE VIEW #{@tablename} AS (
|
|
SELECT time, source, tag, metric, high_cardinality_tags, ts_value
|
|
FROM #{@internal_tablename}
|
|
INNER JOIN #{SCHEMA}.sources USING (source_id)
|
|
INNER JOIN #{SCHEMA}.tags USING (tag_id)
|
|
INNER JOIN #{SCHEMA}.metrics USING (metric_id)
|
|
)
|
|
SQL
|
|
end
|
|
end
|
|
|
|
$known_sources = DedupContainer.new($pg, 'source', 'JSONB');
|
|
$known_metrics = DedupContainer.new($pg, 'metric', 'VARCHAR');
|
|
$known_tags = DedupContainer.new($pg, 'tag', 'JSONB');
|
|
|
|
$timeseries = {}
|
|
|
|
puts $known_tables
|
|
puts $known_sources
|
|
|
|
puts $known_sources[{"host" => "xnm-core.lucidragons.de"}]
|
|
|
|
def ingest_line(line)
|
|
begin
|
|
line = InfluxParser.parse_point(line)
|
|
rescue => e
|
|
STDERR.puts "Error in line protocol parsing: #{e}"
|
|
return
|
|
end
|
|
|
|
series = $timeseries[line['series']]
|
|
if(series.nil?)
|
|
series = TimeseriesTable.new($pg, line['series']);
|
|
$timeseries[line['series']] = series
|
|
end
|
|
|
|
line_source_tags = {}
|
|
line_series_tags = {}
|
|
line_high_cardinality_tags = {}
|
|
|
|
tags = line['tags']
|
|
values = line['values']
|
|
|
|
if tags.include? 'metric' and values.include? 'value'
|
|
metric = tags['metric']
|
|
values[tags['metric']] = values['value']
|
|
|
|
tags.delete 'metric'
|
|
values.delete 'value'
|
|
end
|
|
|
|
tags.each do |tag, tag_value|
|
|
if($source_tags[tag])
|
|
line_source_tags[tag] = tag_value
|
|
elsif($high_cardinality_tags[tag])
|
|
line_high_cardinality_tags[tag] = tag_value
|
|
else
|
|
line_series_tags[tag] = tag_value
|
|
end
|
|
end
|
|
|
|
line_high_cardinality_tags = nil if line_high_cardinality_tags.empty?
|
|
|
|
timestamp = Time.at(line['timestamp'].to_f * 1e-9);
|
|
|
|
line_source_id = $known_sources[line_source_tags]
|
|
line_series_id = $known_tags[line_series_tags]
|
|
|
|
metric_ids_array = []
|
|
values_array = []
|
|
|
|
values.each do |metric, value|
|
|
next unless value.is_a? Numeric
|
|
|
|
metric_ids_array << $known_metrics[metric]
|
|
values_array << value
|
|
end
|
|
|
|
puts "Inserting into #{series.internal_tablename}"
|
|
|
|
metric_ids_array = '{'+metric_ids_array.join(',')+'}'
|
|
values_array = '{'+values_array.join(',')+'}'
|
|
|
|
insert_statement = <<-SQL
|
|
INSERT INTO #{series.internal_tablename}
|
|
(time, source_id, tag_id, metric_id, ts_value, high_cardinality_tags)
|
|
VALUES ($1::timestamptz, $2::int, $3::int, unnest($4::int[]), unnest($5::numeric[]), $6::jsonb)
|
|
SQL
|
|
$pg.exec_params(insert_statement, [timestamp, line_source_id, line_series_id, metric_ids_array, values_array, line_high_cardinality_tags.to_json])
|
|
end
|
|
|
|
$stdin.sync = true
|
|
|
|
lines_queue = Queue.new
|
|
|
|
Thread.new do
|
|
loop do
|
|
lines_queue << STDIN.gets
|
|
end
|
|
end
|
|
|
|
loop do
|
|
sleep 10
|
|
next if lines_queue.empty?
|
|
|
|
$pg.transaction do
|
|
ingest_line(lines_queue.pop) until lines_queue.empty?
|
|
end
|
|
end
|