feat: ✨ begin work on everything :P
This commit is contained in:
parent
03b77c7395
commit
dca5944549
6 changed files with 445 additions and 6 deletions
247
telegraf_psql_ingestor.rb
Normal file
247
telegraf_psql_ingestor.rb
Normal file
|
@ -0,0 +1,247 @@
|
|||
|
||||
require 'pg'
|
||||
require 'json'
|
||||
require 'influxparser'
|
||||
|
||||
Process.setproctitle('telegraf-tsdb-ingest')
|
||||
|
||||
CONNECT_URL = ARGV[1] || "user=postgres dbname=ingestor_test"
|
||||
SCHEMA = 'telegraf_ingest'
|
||||
|
||||
$pg = PG.connect(CONNECT_URL)
|
||||
|
||||
$known_tables = {}
|
||||
|
||||
$high_cardinality_tags = {
|
||||
'process_name' => true,
|
||||
'pid' => true
|
||||
}
|
||||
$source_tags = {
|
||||
'host' => true,
|
||||
'location' => true
|
||||
}
|
||||
|
||||
def ensure_schema_exists(schema)
|
||||
$pg.exec("CREATE SCHEMA IF NOT EXISTS #{schema}");
|
||||
end
|
||||
ensure_schema_exists(SCHEMA)
|
||||
|
||||
def grab_table_list()
|
||||
$pg.exec("SELECT * FROM pg_catalog.pg_tables WHERE schemaname IN ('#{SCHEMA}', 'public');") do |result|
|
||||
result.each do |tuple|
|
||||
table = tuple['schemaname'] + '.' + tuple['tablename']
|
||||
$known_tables[table] = true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
grab_table_list
|
||||
|
||||
class DedupContainer
|
||||
attr_reader :cache
|
||||
|
||||
def initialize(pg, dataname, datatype)
|
||||
@pg = pg
|
||||
|
||||
@dataname = dataname
|
||||
@datatype = datatype
|
||||
|
||||
@tablename = "#{SCHEMA}.#{@dataname}s"
|
||||
@id_column = "#{@dataname}_id"
|
||||
|
||||
@cache = {}
|
||||
|
||||
setup_table unless $known_tables[@tablename]
|
||||
end
|
||||
|
||||
def setup_table()
|
||||
@pg.exec("CREATE TABLE #{@tablename} ( #{@id_column} SERIAL PRIMARY KEY, #{@dataname} #{@datatype} UNIQUE)")
|
||||
@pg.exec("CREATE INDEX ON #{@tablename} #{@datatype == 'JSONB' ? 'USING GIN' : ''} (#{@dataname})")
|
||||
end
|
||||
|
||||
def load_table()
|
||||
@pg.exec("SELECT * FROM #{@tablename}") do |result|
|
||||
result.each do |tuple|
|
||||
@cache[tuple[@dataname]] = tuple[@id_column]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_key(key)
|
||||
key_str = key
|
||||
if @datatype == 'JSONB'
|
||||
key_str = key.to_json
|
||||
end
|
||||
|
||||
upsert_statement = "INSERT INTO #{@tablename}(#{@dataname}) VALUES ($1::#{@datatype}) RETURNING #{@id_column}"
|
||||
|
||||
id_res = @pg.exec_params("SELECT #{@id_column} FROM #{@tablename} WHERE #{@dataname} = $1::#{@datatype}", [key_str])
|
||||
if(id_res.ntuples == 0)
|
||||
id_res = @pg.exec_params(upsert_statement, [key_str]);
|
||||
end
|
||||
|
||||
key_id = id_res[0][@id_column].to_i
|
||||
|
||||
@cache[key] = key_id
|
||||
|
||||
key_id
|
||||
end
|
||||
|
||||
def [](key)
|
||||
r = @cache[key]
|
||||
return r unless r.nil?
|
||||
|
||||
add_key key
|
||||
end
|
||||
end
|
||||
|
||||
class TimeseriesTable
|
||||
attr_reader :internal_tablename
|
||||
|
||||
def initialize(pg, tablename)
|
||||
@pg = pg
|
||||
|
||||
@tablename = tablename
|
||||
@internal_tablename = "#{SCHEMA}._timeseries_#{tablename}"
|
||||
|
||||
@chunk_time_interval = '1d'
|
||||
@compression_interval = '2d'
|
||||
|
||||
@retention_time = '6 months'
|
||||
|
||||
setup_tables unless $known_tables[@internal_tablename]
|
||||
end
|
||||
|
||||
def setup_tables
|
||||
@pg.exec <<-SQL
|
||||
CREATE TABLE #{@internal_tablename} (
|
||||
time TIMESTAMPTZ NOT NULL,
|
||||
source_id INT NOT NULL,
|
||||
tag_id INT NOT NULL,
|
||||
metric_id INT NOT NULL,
|
||||
high_cardinality_tags JSONB,
|
||||
ts_value NUMERIC,
|
||||
|
||||
FOREIGN KEY (source_id) REFERENCES #{SCHEMA}.sources (source_id),
|
||||
FOREIGN KEY (tag_id) REFERENCES #{SCHEMA}.tags (tag_id),
|
||||
FOREIGN KEY (metric_id) REFERENCES #{SCHEMA}.metrics (metric_id)
|
||||
)
|
||||
SQL
|
||||
@pg.exec "CREATE INDEX ON #{@internal_tablename} (metric_id, source_id, tag_id)"
|
||||
@pg.exec "SELECT * FROM create_hypertable('#{@internal_tablename}', 'time', chunk_time_interval => INTERVAL '#{@chunk_time_interval}')"
|
||||
|
||||
@pg.exec "ALTER TABLE #{@internal_tablename} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tag_id, metric_id, high_cardinality_tags')"
|
||||
|
||||
@pg.exec "SELECT * FROM add_compression_policy('#{@internal_tablename}', INTERVAL '#{@compression_interval}')"
|
||||
@pg.exec "SELECT * FROM add_retention_policy('#{@internal_tablename}', INTERVAL '#{@retention_time}')"
|
||||
|
||||
@pg.exec <<-SQL
|
||||
CREATE VIEW #{@tablename} AS (
|
||||
SELECT time, source, tag, metric, high_cardinality_tags, ts_value
|
||||
FROM #{@internal_tablename}
|
||||
INNER JOIN #{SCHEMA}.sources USING (source_id)
|
||||
INNER JOIN #{SCHEMA}.tags USING (tag_id)
|
||||
INNER JOIN #{SCHEMA}.metrics USING (metric_id)
|
||||
)
|
||||
SQL
|
||||
end
|
||||
end
|
||||
|
||||
$known_sources = DedupContainer.new($pg, 'source', 'JSONB');
|
||||
$known_metrics = DedupContainer.new($pg, 'metric', 'VARCHAR');
|
||||
$known_tags = DedupContainer.new($pg, 'tag', 'JSONB');
|
||||
|
||||
$timeseries = {}
|
||||
|
||||
puts $known_tables
|
||||
puts $known_sources
|
||||
|
||||
puts $known_sources[{"host" => "xnm-core.lucidragons.de"}]
|
||||
|
||||
def ingest_line(line)
|
||||
begin
|
||||
line = InfluxParser.parse_point(line)
|
||||
rescue => e
|
||||
STDERR.puts "Error in line protocol parsing: #{e}"
|
||||
return
|
||||
end
|
||||
|
||||
series = $timeseries[line['series']]
|
||||
if(series.nil?)
|
||||
series = TimeseriesTable.new($pg, line['series']);
|
||||
$timeseries[line['series']] = series
|
||||
end
|
||||
|
||||
line_source_tags = {}
|
||||
line_series_tags = {}
|
||||
line_high_cardinality_tags = {}
|
||||
|
||||
tags = line['tags']
|
||||
values = line['values']
|
||||
|
||||
if tags.include? 'metric' and values.include? 'value'
|
||||
metric = tags['metric']
|
||||
values[tags['metric']] = values['value']
|
||||
|
||||
tags.delete 'metric'
|
||||
values.delete 'value'
|
||||
end
|
||||
|
||||
tags.each do |tag, tag_value|
|
||||
if($source_tags[tag])
|
||||
line_source_tags[tag] = tag_value
|
||||
elsif($high_cardinality_tags[tag])
|
||||
line_high_cardinality_tags[tag] = tag_value
|
||||
else
|
||||
line_series_tags[tag] = tag_value
|
||||
end
|
||||
end
|
||||
|
||||
line_high_cardinality_tags = nil if line_high_cardinality_tags.empty?
|
||||
|
||||
timestamp = Time.at(line['timestamp'].to_f * 1e-9);
|
||||
|
||||
line_source_id = $known_sources[line_source_tags]
|
||||
line_series_id = $known_tags[line_series_tags]
|
||||
|
||||
metric_ids_array = []
|
||||
values_array = []
|
||||
|
||||
values.each do |metric, value|
|
||||
next unless value.is_a? Numeric
|
||||
|
||||
metric_ids_array << $known_metrics[metric]
|
||||
values_array << value
|
||||
end
|
||||
|
||||
puts "Inserting into #{series.internal_tablename}"
|
||||
|
||||
metric_ids_array = '{'+metric_ids_array.join(',')+'}'
|
||||
values_array = '{'+values_array.join(',')+'}'
|
||||
|
||||
insert_statement = <<-SQL
|
||||
INSERT INTO #{series.internal_tablename}
|
||||
(time, source_id, tag_id, metric_id, ts_value, high_cardinality_tags)
|
||||
VALUES ($1::timestamptz, $2::int, $3::int, unnest($4::int[]), unnest($5::numeric[]), $6::jsonb)
|
||||
SQL
|
||||
$pg.exec_params(insert_statement, [timestamp, line_source_id, line_series_id, metric_ids_array, values_array, line_high_cardinality_tags.to_json])
|
||||
end
|
||||
|
||||
$stdin.sync = true
|
||||
|
||||
lines_queue = Queue.new
|
||||
|
||||
Thread.new do
|
||||
loop do
|
||||
lines_queue << STDIN.gets
|
||||
end
|
||||
end
|
||||
|
||||
loop do
|
||||
sleep 10
|
||||
next if lines_queue.empty?
|
||||
|
||||
$pg.transaction do
|
||||
ingest_line(lines_queue.pop) until lines_queue.empty?
|
||||
end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue