From dca59445490ffc4306086fe8006c1bec004473b9 Mon Sep 17 00:00:00 2001 From: xaseiresh Date: Wed, 10 May 2023 09:11:23 +0200 Subject: [PATCH] feat: :sparkles: begin work on everything :P --- Gemfile.lock | 46 ++++ lib/timeseries/hoarder/CachingTable.rb | 83 +++++++ lib/timeseries/hoarder/Table.rb | 42 ++++ lib/timeseries/hoarder/TimeseriesDatabase.rb | 21 ++ telegraf_psql_ingestor.rb | 247 +++++++++++++++++++ timeseries-hoarder.gemspec | 12 +- 6 files changed, 445 insertions(+), 6 deletions(-) create mode 100644 Gemfile.lock create mode 100644 lib/timeseries/hoarder/CachingTable.rb create mode 100644 lib/timeseries/hoarder/Table.rb create mode 100644 lib/timeseries/hoarder/TimeseriesDatabase.rb create mode 100644 telegraf_psql_ingestor.rb diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..8d89f4d --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,46 @@ +PATH + remote: . + specs: + timeseries-hoarder (0.1.0) + influxparser (~> 0.0.5) + pg (~> 1.5) + +GEM + remote: https://rubygems.org/ + specs: + ast (2.4.2) + influxparser (0.0.5) + json (2.6.3) + parallel (1.22.1) + parser (3.2.2.0) + ast (~> 2.4.1) + pg (1.5.3) + rainbow (3.1.1) + rake (13.0.6) + regexp_parser (2.7.0) + rexml (3.2.5) + rubocop (1.50.1) + json (~> 2.3) + parallel (~> 1.10) + parser (>= 3.2.0.0) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 1.8, < 3.0) + rexml (>= 3.2.5, < 4.0) + rubocop-ast (>= 1.28.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 3.0) + rubocop-ast (1.28.0) + parser (>= 3.2.1.0) + ruby-progressbar (1.13.0) + unicode-display_width (2.4.2) + +PLATFORMS + x86_64-linux + +DEPENDENCIES + rake (~> 13.0) + rubocop (~> 1.21) + timeseries-hoarder! + +BUNDLED WITH + 2.3.26 diff --git a/lib/timeseries/hoarder/CachingTable.rb b/lib/timeseries/hoarder/CachingTable.rb new file mode 100644 index 0000000..273b96e --- /dev/null +++ b/lib/timeseries/hoarder/CachingTable.rb @@ -0,0 +1,83 @@ + +require_relative 'Table.rb' + +require 'time' +require 'json' + +module Timeseries + module Hoarder + class CachingTable < Table + def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60) + @content_name = content_name + @id_column = content_name + '_id' + + super(db, name, 'ts_hoarder') + + @known_tags = {} + @tag_access_times = {} + @tag_access_updates = {} + + @tag_access_update_delay = tag_access_update_delay + end + + def table_creation + @pg.exec("CREATE TABLE ts_hoarder.#{@table_name} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )") + + @pg.exec("CREATE INDEX ON ts_hoarder.#{@table_name} USING GIN ( #{@content_name} )") + end + + def load_cache_content + @pg.exec("SELECT * FROM ts_hoarder.#{@table_name}") do |results| + results.each do |tuple| + + tags = JSON.parse(tuple[@content_name]) + + @known_tags[tags] = tuple[@id_column] + @tag_access_times[tags] = Time.parse(tuple['last_used']) + end + end + + true + end + + def create_entry(tags) + return @known_tags[tags] if @known_tags.include? tags + + returned_id = nil + + @pg.transaction do + @pg.exec("LOCK TABLE ts_hoarder.#{@table_name}") + + res = @pg.exec_params("SELECT * FROM ts_hoarder.#{@table_name} WHERE #{@content_name} = $1::jsonb", [tags.to_json]) + + if(res.num_tuples >= 1) + returned_id = res[0][@id_column] + @known_tags[tags] = returned_id + @tag_access_times[tags] = Time.parse(res[0]['last_used']) + else + res = @pg.exec_params("INSERT INTO ts_hoarder.#{@table_name} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json]) + + returned_id = res[0][@id_column] + @known_tags[tags] = returned_id + @tag_access_times[tags] = Time.now() + end + end + + returned_id + end + + def [](tags) + access_time = Time.now() + if(((@tag_access_times[tags] || Time.at(0)) - Time.now()) > @tag_access_update_delay) + @tag_access_times[tags] = access_time + @tag_access_updates[tags] = true + end + + known_id = @known_tags[tags] + return known_id unless known_id.nil? + + return create_entry(tags) + end + end + end +end \ No newline at end of file diff --git a/lib/timeseries/hoarder/Table.rb b/lib/timeseries/hoarder/Table.rb new file mode 100644 index 0000000..094804e --- /dev/null +++ b/lib/timeseries/hoarder/Table.rb @@ -0,0 +1,42 @@ + + +module Timeseries + module Hoarder + class Table + def initialize(db, table_name, table_schema = "public") + @table_name = table_name + @table_schema = table_schema + + @db = db + @pg = @db.pg + + @created = false + + ensure_table_exists + end + + def ensure_table_exists + return if @created + + @pg.transaction do + @pg.exec("SELECT pg_advisory_lock(0)") + + r = @pg.exec_params("SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = $2", [@table_name, @schema_name]) + + if r.num_tuples >= 1 + @created = true + return + end + + table_creation + + @created = true + end + end + + def table_creation + raise "No table creation string method provided!" + end + end + end +end \ No newline at end of file diff --git a/lib/timeseries/hoarder/TimeseriesDatabase.rb b/lib/timeseries/hoarder/TimeseriesDatabase.rb new file mode 100644 index 0000000..d925ae3 --- /dev/null +++ b/lib/timeseries/hoarder/TimeseriesDatabase.rb @@ -0,0 +1,21 @@ + +require 'pg' + +require_relative 'CachingTable.rb' + +module Timeseries + module Hoarder + class Database + attr_reader :pg + attr_reader :data_sources + + def initialize(pg) + @pg = pg + + @pg.exec("CREATE SCHEMA IF NOT EXISTS ts_hoarder") + + @data_sources = CachingTable.new(self, 'sources', 'source') + end + end + end +end \ No newline at end of file diff --git a/telegraf_psql_ingestor.rb b/telegraf_psql_ingestor.rb new file mode 100644 index 0000000..76abac1 --- /dev/null +++ b/telegraf_psql_ingestor.rb @@ -0,0 +1,247 @@ + +require 'pg' +require 'json' +require 'influxparser' + +Process.setproctitle('telegraf-tsdb-ingest') + +CONNECT_URL = ARGV[1] || "user=postgres dbname=ingestor_test" +SCHEMA = 'telegraf_ingest' + +$pg = PG.connect(CONNECT_URL) + +$known_tables = {} + +$high_cardinality_tags = { + 'process_name' => true, + 'pid' => true +} +$source_tags = { + 'host' => true, + 'location' => true +} + +def ensure_schema_exists(schema) + $pg.exec("CREATE SCHEMA IF NOT EXISTS #{schema}"); +end +ensure_schema_exists(SCHEMA) + +def grab_table_list() + $pg.exec("SELECT * FROM pg_catalog.pg_tables WHERE schemaname IN ('#{SCHEMA}', 'public');") do |result| + result.each do |tuple| + table = tuple['schemaname'] + '.' + tuple['tablename'] + $known_tables[table] = true + end + end +end + +grab_table_list + +class DedupContainer + attr_reader :cache + + def initialize(pg, dataname, datatype) + @pg = pg + + @dataname = dataname + @datatype = datatype + + @tablename = "#{SCHEMA}.#{@dataname}s" + @id_column = "#{@dataname}_id" + + @cache = {} + + setup_table unless $known_tables[@tablename] + end + + def setup_table() + @pg.exec("CREATE TABLE #{@tablename} ( #{@id_column} SERIAL PRIMARY KEY, #{@dataname} #{@datatype} UNIQUE)") + @pg.exec("CREATE INDEX ON #{@tablename} #{@datatype == 'JSONB' ? 'USING GIN' : ''} (#{@dataname})") + end + + def load_table() + @pg.exec("SELECT * FROM #{@tablename}") do |result| + result.each do |tuple| + @cache[tuple[@dataname]] = tuple[@id_column] + end + end + end + + def add_key(key) + key_str = key + if @datatype == 'JSONB' + key_str = key.to_json + end + + upsert_statement = "INSERT INTO #{@tablename}(#{@dataname}) VALUES ($1::#{@datatype}) RETURNING #{@id_column}" + + id_res = @pg.exec_params("SELECT #{@id_column} FROM #{@tablename} WHERE #{@dataname} = $1::#{@datatype}", [key_str]) + if(id_res.ntuples == 0) + id_res = @pg.exec_params(upsert_statement, [key_str]); + end + + key_id = id_res[0][@id_column].to_i + + @cache[key] = key_id + + key_id + end + + def [](key) + r = @cache[key] + return r unless r.nil? + + add_key key + end +end + +class TimeseriesTable + attr_reader :internal_tablename + + def initialize(pg, tablename) + @pg = pg + + @tablename = tablename + @internal_tablename = "#{SCHEMA}._timeseries_#{tablename}" + + @chunk_time_interval = '1d' + @compression_interval = '2d' + + @retention_time = '6 months' + + setup_tables unless $known_tables[@internal_tablename] + end + + def setup_tables + @pg.exec <<-SQL +CREATE TABLE #{@internal_tablename} ( + time TIMESTAMPTZ NOT NULL, + source_id INT NOT NULL, + tag_id INT NOT NULL, + metric_id INT NOT NULL, + high_cardinality_tags JSONB, + ts_value NUMERIC, + + FOREIGN KEY (source_id) REFERENCES #{SCHEMA}.sources (source_id), + FOREIGN KEY (tag_id) REFERENCES #{SCHEMA}.tags (tag_id), + FOREIGN KEY (metric_id) REFERENCES #{SCHEMA}.metrics (metric_id) +) +SQL + @pg.exec "CREATE INDEX ON #{@internal_tablename} (metric_id, source_id, tag_id)" + @pg.exec "SELECT * FROM create_hypertable('#{@internal_tablename}', 'time', chunk_time_interval => INTERVAL '#{@chunk_time_interval}')" + + @pg.exec "ALTER TABLE #{@internal_tablename} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tag_id, metric_id, high_cardinality_tags')" + + @pg.exec "SELECT * FROM add_compression_policy('#{@internal_tablename}', INTERVAL '#{@compression_interval}')" + @pg.exec "SELECT * FROM add_retention_policy('#{@internal_tablename}', INTERVAL '#{@retention_time}')" + + @pg.exec <<-SQL +CREATE VIEW #{@tablename} AS ( + SELECT time, source, tag, metric, high_cardinality_tags, ts_value + FROM #{@internal_tablename} + INNER JOIN #{SCHEMA}.sources USING (source_id) + INNER JOIN #{SCHEMA}.tags USING (tag_id) + INNER JOIN #{SCHEMA}.metrics USING (metric_id) +) +SQL + end +end + +$known_sources = DedupContainer.new($pg, 'source', 'JSONB'); +$known_metrics = DedupContainer.new($pg, 'metric', 'VARCHAR'); +$known_tags = DedupContainer.new($pg, 'tag', 'JSONB'); + +$timeseries = {} + +puts $known_tables +puts $known_sources + +puts $known_sources[{"host" => "xnm-core.lucidragons.de"}] + +def ingest_line(line) + begin + line = InfluxParser.parse_point(line) + rescue => e + STDERR.puts "Error in line protocol parsing: #{e}" + return + end + + series = $timeseries[line['series']] + if(series.nil?) + series = TimeseriesTable.new($pg, line['series']); + $timeseries[line['series']] = series + end + + line_source_tags = {} + line_series_tags = {} + line_high_cardinality_tags = {} + + tags = line['tags'] + values = line['values'] + + if tags.include? 'metric' and values.include? 'value' + metric = tags['metric'] + values[tags['metric']] = values['value'] + + tags.delete 'metric' + values.delete 'value' + end + + tags.each do |tag, tag_value| + if($source_tags[tag]) + line_source_tags[tag] = tag_value + elsif($high_cardinality_tags[tag]) + line_high_cardinality_tags[tag] = tag_value + else + line_series_tags[tag] = tag_value + end + end + + line_high_cardinality_tags = nil if line_high_cardinality_tags.empty? + + timestamp = Time.at(line['timestamp'].to_f * 1e-9); + + line_source_id = $known_sources[line_source_tags] + line_series_id = $known_tags[line_series_tags] + + metric_ids_array = [] + values_array = [] + + values.each do |metric, value| + next unless value.is_a? Numeric + + metric_ids_array << $known_metrics[metric] + values_array << value + end + + puts "Inserting into #{series.internal_tablename}" + + metric_ids_array = '{'+metric_ids_array.join(',')+'}' + values_array = '{'+values_array.join(',')+'}' + + insert_statement = <<-SQL +INSERT INTO #{series.internal_tablename} + (time, source_id, tag_id, metric_id, ts_value, high_cardinality_tags) + VALUES ($1::timestamptz, $2::int, $3::int, unnest($4::int[]), unnest($5::numeric[]), $6::jsonb) +SQL + $pg.exec_params(insert_statement, [timestamp, line_source_id, line_series_id, metric_ids_array, values_array, line_high_cardinality_tags.to_json]) +end + +$stdin.sync = true + +lines_queue = Queue.new + +Thread.new do + loop do + lines_queue << STDIN.gets + end +end + +loop do + sleep 10 + next if lines_queue.empty? + + $pg.transaction do + ingest_line(lines_queue.pop) until lines_queue.empty? + end +end diff --git a/timeseries-hoarder.gemspec b/timeseries-hoarder.gemspec index 37cfd61..d832ada 100644 --- a/timeseries-hoarder.gemspec +++ b/timeseries-hoarder.gemspec @@ -8,16 +8,15 @@ Gem::Specification.new do |spec| spec.authors = ["xaseiresh"] spec.email = ["davidbailey.2889@gmail.com"] - spec.summary = "TODO: Write a short summary, because RubyGems requires one." - spec.description = "TODO: Write a longer description or delete this line." - spec.homepage = "TODO: Put your gem's website or public repo URL here." + spec.summary = "Quick&Dirty time series ingestor gem" + spec.description = "Quickly, comfortably, reliably and flexibly ingest your Influx-Style time series into TimescaleDB" + spec.homepage = "https://forgejo.lucidragons.de/lucidergs/timeseries-hoarder" spec.required_ruby_version = ">= 2.6.0" spec.metadata["allowed_push_host"] = "TODO: Set to your gem server 'https://example.com'" spec.metadata["homepage_uri"] = spec.homepage - spec.metadata["source_code_uri"] = "TODO: Put your gem's public repo URL here." - spec.metadata["changelog_uri"] = "TODO: Put your gem's CHANGELOG.md URL here." + spec.metadata["source_code_uri"] = "https://forgejo.lucidragons.de/lucidergs/timeseries-hoarder" # Specify which files should be added to the gem when it is released. # The `git ls-files -z` loads the files in the RubyGem that have been added into git. @@ -31,7 +30,8 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] # Uncomment to register a new dependency of your gem - # spec.add_dependency "example-gem", "~> 1.0" + spec.add_dependency "pg", "~> 1.5" + spec.add_dependency "influxparser", "~> 0.0.5" # For more information and examples about making a new gem, check out our # guide at: https://bundler.io/guides/creating_gem.html