require 'pg' require 'json' require 'influxparser' Process.setproctitle('telegraf-tsdb-ingest') CONNECT_URL = ARGV[1] || "user=postgres dbname=ingestor_test" SCHEMA = 'telegraf_ingest' $pg = PG.connect(CONNECT_URL) $known_tables = {} $high_cardinality_tags = { 'process_name' => true, 'pid' => true } $source_tags = { 'host' => true, 'location' => true } def ensure_schema_exists(schema) $pg.exec("CREATE SCHEMA IF NOT EXISTS #{schema}"); end ensure_schema_exists(SCHEMA) def grab_table_list() $pg.exec("SELECT * FROM pg_catalog.pg_tables WHERE schemaname IN ('#{SCHEMA}', 'public');") do |result| result.each do |tuple| table = tuple['schemaname'] + '.' + tuple['tablename'] $known_tables[table] = true end end end grab_table_list class DedupContainer attr_reader :cache def initialize(pg, dataname, datatype) @pg = pg @dataname = dataname @datatype = datatype @tablename = "#{SCHEMA}.#{@dataname}s" @id_column = "#{@dataname}_id" @cache = {} setup_table unless $known_tables[@tablename] end def setup_table() @pg.exec("CREATE TABLE #{@tablename} ( #{@id_column} SERIAL PRIMARY KEY, #{@dataname} #{@datatype} UNIQUE)") @pg.exec("CREATE INDEX ON #{@tablename} #{@datatype == 'JSONB' ? 'USING GIN' : ''} (#{@dataname})") end def load_table() @pg.exec("SELECT * FROM #{@tablename}") do |result| result.each do |tuple| @cache[tuple[@dataname]] = tuple[@id_column] end end end def add_key(key) key_str = key if @datatype == 'JSONB' key_str = key.to_json end upsert_statement = "INSERT INTO #{@tablename}(#{@dataname}) VALUES ($1::#{@datatype}) RETURNING #{@id_column}" id_res = @pg.exec_params("SELECT #{@id_column} FROM #{@tablename} WHERE #{@dataname} = $1::#{@datatype}", [key_str]) if(id_res.ntuples == 0) id_res = @pg.exec_params(upsert_statement, [key_str]); end key_id = id_res[0][@id_column].to_i @cache[key] = key_id key_id end def [](key) r = @cache[key] return r unless r.nil? add_key key end end class TimeseriesTable attr_reader :internal_tablename def initialize(pg, tablename) @pg = pg @tablename = tablename @internal_tablename = "#{SCHEMA}._timeseries_#{tablename}" @chunk_time_interval = '1d' @compression_interval = '2d' @retention_time = '6 months' setup_tables unless $known_tables[@internal_tablename] end def setup_tables @pg.exec <<-SQL CREATE TABLE #{@internal_tablename} ( time TIMESTAMPTZ NOT NULL, source_id INT NOT NULL, tag_id INT NOT NULL, metric_id INT NOT NULL, high_cardinality_tags JSONB, ts_value NUMERIC, FOREIGN KEY (source_id) REFERENCES #{SCHEMA}.sources (source_id), FOREIGN KEY (tag_id) REFERENCES #{SCHEMA}.tags (tag_id), FOREIGN KEY (metric_id) REFERENCES #{SCHEMA}.metrics (metric_id) ) SQL @pg.exec "CREATE INDEX ON #{@internal_tablename} (metric_id, source_id, tag_id)" @pg.exec "SELECT * FROM create_hypertable('#{@internal_tablename}', 'time', chunk_time_interval => INTERVAL '#{@chunk_time_interval}')" @pg.exec "ALTER TABLE #{@internal_tablename} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tag_id, metric_id, high_cardinality_tags')" @pg.exec "SELECT * FROM add_compression_policy('#{@internal_tablename}', INTERVAL '#{@compression_interval}')" @pg.exec "SELECT * FROM add_retention_policy('#{@internal_tablename}', INTERVAL '#{@retention_time}')" @pg.exec <<-SQL CREATE VIEW #{@tablename} AS ( SELECT time, source, tag, metric, high_cardinality_tags, ts_value FROM #{@internal_tablename} INNER JOIN #{SCHEMA}.sources USING (source_id) INNER JOIN #{SCHEMA}.tags USING (tag_id) INNER JOIN #{SCHEMA}.metrics USING (metric_id) ) SQL end end $known_sources = DedupContainer.new($pg, 'source', 'JSONB'); $known_metrics = DedupContainer.new($pg, 'metric', 'VARCHAR'); $known_tags = DedupContainer.new($pg, 'tag', 'JSONB'); $timeseries = {} puts $known_tables puts $known_sources puts $known_sources[{"host" => "xnm-core.lucidragons.de"}] def ingest_line(line) begin line = InfluxParser.parse_point(line) rescue => e STDERR.puts "Error in line protocol parsing: #{e}" return end series = $timeseries[line['series']] if(series.nil?) series = TimeseriesTable.new($pg, line['series']); $timeseries[line['series']] = series end line_source_tags = {} line_series_tags = {} line_high_cardinality_tags = {} tags = line['tags'] values = line['values'] if tags.include? 'metric' and values.include? 'value' metric = tags['metric'] values[tags['metric']] = values['value'] tags.delete 'metric' values.delete 'value' end tags.each do |tag, tag_value| if($source_tags[tag]) line_source_tags[tag] = tag_value elsif($high_cardinality_tags[tag]) line_high_cardinality_tags[tag] = tag_value else line_series_tags[tag] = tag_value end end line_high_cardinality_tags = nil if line_high_cardinality_tags.empty? timestamp = Time.at(line['timestamp'].to_f * 1e-9); line_source_id = $known_sources[line_source_tags] line_series_id = $known_tags[line_series_tags] metric_ids_array = [] values_array = [] values.each do |metric, value| next unless value.is_a? Numeric metric_ids_array << $known_metrics[metric] values_array << value end puts "Inserting into #{series.internal_tablename}" metric_ids_array = '{'+metric_ids_array.join(',')+'}' values_array = '{'+values_array.join(',')+'}' insert_statement = <<-SQL INSERT INTO #{series.internal_tablename} (time, source_id, tag_id, metric_id, ts_value, high_cardinality_tags) VALUES ($1::timestamptz, $2::int, $3::int, unnest($4::int[]), unnest($5::numeric[]), $6::jsonb) SQL $pg.exec_params(insert_statement, [timestamp, line_source_id, line_series_id, metric_ids_array, values_array, line_high_cardinality_tags.to_json]) end $stdin.sync = true lines_queue = Queue.new Thread.new do loop do lines_queue << STDIN.gets end end loop do sleep 10 next if lines_queue.empty? $pg.transaction do ingest_line(lines_queue.pop) until lines_queue.empty? end end