diff --git a/Gemfile b/Gemfile index 8744b7c..fc09e1c 100644 --- a/Gemfile +++ b/Gemfile @@ -7,4 +7,9 @@ gemspec gem "rake", "~> 13.0" +gem "pry" + +gem "debug", "~> 1.0" +gem "solargraph", "~> 0.48" + gem "rubocop", "~> 1.21" diff --git a/Gemfile.lock b/Gemfile.lock index 8d89f4d..7da520e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -9,15 +9,43 @@ GEM remote: https://rubygems.org/ specs: ast (2.4.2) + backport (1.2.0) + benchmark (0.2.1) + coderay (1.1.3) + debug (1.8.0) + irb (>= 1.5.0) + reline (>= 0.3.1) + diff-lcs (1.5.0) + e2mmap (0.1.0) influxparser (0.0.5) + io-console (0.6.0) + irb (1.6.4) + reline (>= 0.3.0) + jaro_winkler (1.5.4) json (2.6.3) + kramdown (2.4.0) + rexml + kramdown-parser-gfm (1.1.0) + kramdown (~> 2.0) + method_source (1.0.0) + nokogiri (1.14.3-x86_64-linux) + racc (~> 1.4) parallel (1.22.1) parser (3.2.2.0) ast (~> 2.4.1) pg (1.5.3) + pry (0.14.2) + coderay (~> 1.1) + method_source (~> 1.0) + racc (1.6.2) rainbow (3.1.1) rake (13.0.6) + rbs (2.8.4) regexp_parser (2.7.0) + reline (0.3.3) + io-console (~> 0.5) + reverse_markdown (2.1.1) + nokogiri rexml (3.2.5) rubocop (1.50.1) json (~> 2.3) @@ -32,15 +60,37 @@ GEM rubocop-ast (1.28.0) parser (>= 3.2.1.0) ruby-progressbar (1.13.0) + solargraph (0.49.0) + backport (~> 1.2) + benchmark + bundler (~> 2.0) + diff-lcs (~> 1.4) + e2mmap + jaro_winkler (~> 1.5) + kramdown (~> 2.3) + kramdown-parser-gfm (~> 1.1) + parser (~> 3.0) + rbs (~> 2.0) + reverse_markdown (~> 2.0) + rubocop (~> 1.38) + thor (~> 1.0) + tilt (~> 2.0) + yard (~> 0.9, >= 0.9.24) + thor (1.2.1) + tilt (2.1.0) unicode-display_width (2.4.2) + yard (0.9.34) PLATFORMS x86_64-linux DEPENDENCIES + debug (~> 1.0) + pry rake (~> 13.0) rubocop (~> 1.21) + solargraph (~> 0.48) timeseries-hoarder! BUNDLED WITH - 2.3.26 + 2.3.22 diff --git a/bin/console b/bin/console index b426583..900ae5d 100755 --- a/bin/console +++ b/bin/console @@ -7,9 +7,11 @@ require "timeseries/hoarder" # You can add fixtures and/or initialization code here to make experimenting # with your gem easier. You can also use a different console, if you like. -# (If you use this, don't forget to add pry to your Gemfile!) -# require "pry" -# Pry.start +$pg = PG.connect('user=postgres dbname=tshoard_test') +$pg.exec("DROP TABLE IF EXISTS ts_hoarder.test CASCADE") -require "irb" -IRB.start(__FILE__) +$db = Timeseries::Hoarder::Database.new($pg) + +# (If you use this, don't forget to add pry to your Gemfile!) +require "pry" +Pry.start diff --git a/lib/timeseries/hoarder.rb b/lib/timeseries/hoarder.rb index 39236de..e6944ef 100644 --- a/lib/timeseries/hoarder.rb +++ b/lib/timeseries/hoarder.rb @@ -2,6 +2,8 @@ require_relative "hoarder/version" +require_relative 'hoarder/TimeseriesDatabase.rb' + module Timeseries module Hoarder class Error < StandardError; end diff --git a/lib/timeseries/hoarder/CachingTable.rb b/lib/timeseries/hoarder/CachingTable.rb index 273b96e..c8f967e 100644 --- a/lib/timeseries/hoarder/CachingTable.rb +++ b/lib/timeseries/hoarder/CachingTable.rb @@ -7,7 +7,12 @@ require 'json' module Timeseries module Hoarder class CachingTable < Table + attr_accessor :retention_time + attr_reader :db + def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60) + raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database + @content_name = content_name @id_column = content_name + '_id' @@ -18,16 +23,18 @@ module Timeseries @tag_access_updates = {} @tag_access_update_delay = tag_access_update_delay + + @retention_time = nil end def table_creation - @pg.exec("CREATE TABLE ts_hoarder.#{@table_name} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )") + @pg.exec("CREATE TABLE #{@table_id} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, #{@content_name}_extra JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )") - @pg.exec("CREATE INDEX ON ts_hoarder.#{@table_name} USING GIN ( #{@content_name} )") + @pg.exec("CREATE INDEX ON #{@table_id} USING GIN ( #{@content_name} )") end def load_cache_content - @pg.exec("SELECT * FROM ts_hoarder.#{@table_name}") do |results| + @pg.exec("SELECT * FROM #{@table_id} WHERE (NOW() - last_used) < INTERVAL '3h'") do |results| results.each do |tuple| tags = JSON.parse(tuple[@content_name]) @@ -46,16 +53,16 @@ module Timeseries returned_id = nil @pg.transaction do - @pg.exec("LOCK TABLE ts_hoarder.#{@table_name}") + @pg.exec("LOCK TABLE #{@table_id}") - res = @pg.exec_params("SELECT * FROM ts_hoarder.#{@table_name} WHERE #{@content_name} = $1::jsonb", [tags.to_json]) + res = @pg.exec_params("SELECT * FROM #{@table_id} WHERE #{@content_name} = $1::jsonb", [tags.to_json]) if(res.num_tuples >= 1) returned_id = res[0][@id_column] @known_tags[tags] = returned_id @tag_access_times[tags] = Time.parse(res[0]['last_used']) else - res = @pg.exec_params("INSERT INTO ts_hoarder.#{@table_name} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json]) + res = @pg.exec_params("INSERT INTO #{@table_id} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json]) returned_id = res[0][@id_column] @known_tags[tags] = returned_id @@ -66,12 +73,37 @@ module Timeseries returned_id end + def update_use_times() + time_column = [] + id_column = [] + + @tag_access_updates.each do |tag, value| + time_column << @tag_access_times[tag].to_s + id_column << @known_tags[tag] + end + @tag_access_updates = {} + + time_column = '{'+time_column.join(',')+'}' + id_column = '{'+id_column.join(',')+'}' + + update_query = <<~SQL + WITH update_data AS ( + SELECT unnest($1::timestamptz[]) AS last_used, unnest($2::int[]) AS id + ) + UPDATE #{@table_id} AS o + SET last_used = GREATEST(o.last_used, n.last_used) + FROM update_data AS n + WHERE #{@id_column} = n.id + SQL + + @pg.exec_params(update_query, [time_column, id_column]); + end + def [](tags) access_time = Time.now() - if(((@tag_access_times[tags] || Time.at(0)) - Time.now()) > @tag_access_update_delay) - @tag_access_times[tags] = access_time - @tag_access_updates[tags] = true - end + + @tag_access_times[tags] = access_time + @tag_access_updates[tags] = true known_id = @known_tags[tags] return known_id unless known_id.nil? diff --git a/lib/timeseries/hoarder/Table.rb b/lib/timeseries/hoarder/Table.rb index 094804e..3c6688a 100644 --- a/lib/timeseries/hoarder/Table.rb +++ b/lib/timeseries/hoarder/Table.rb @@ -3,10 +3,20 @@ module Timeseries module Hoarder class Table + attr_reader :table_name, :table_schema, :table_id + def initialize(db, table_name, table_schema = "public") + raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database + + if (not table_name.is_a? String) or (not table_schema.is_a? String) + raise ArgumentError, "Table name and schema must be strings!" + end + @table_name = table_name @table_schema = table_schema + @table_id = "\"#{@table_schema}\".\"#{@table_name}\"" + @db = db @pg = @db.pg @@ -21,7 +31,7 @@ module Timeseries @pg.transaction do @pg.exec("SELECT pg_advisory_lock(0)") - r = @pg.exec_params("SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = $2", [@table_name, @schema_name]) + r = @pg.exec_params("SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = $2", [@table_name, @table_schema]) if r.num_tuples >= 1 @created = true diff --git a/lib/timeseries/hoarder/TimeseriesDatabase.rb b/lib/timeseries/hoarder/TimeseriesDatabase.rb index d925ae3..64f3067 100644 --- a/lib/timeseries/hoarder/TimeseriesDatabase.rb +++ b/lib/timeseries/hoarder/TimeseriesDatabase.rb @@ -2,6 +2,7 @@ require 'pg' require_relative 'CachingTable.rb' +require_relative 'TimeseriesTable.rb' module Timeseries module Hoarder @@ -12,9 +13,20 @@ module Timeseries def initialize(pg) @pg = pg + @pg.exec("CREATE EXTENSION IF NOT EXISTS timescaledb") + @pg.exec("CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit") + @pg.exec("CREATE SCHEMA IF NOT EXISTS ts_hoarder") @data_sources = CachingTable.new(self, 'sources', 'source') + + @series = {} + end + + def add_timeseries(name, **opts) + return @series[name] if @series.include? name + + new_series = SeriesTable.new(self, name, **opts) end end end diff --git a/lib/timeseries/hoarder/TimeseriesTable.rb b/lib/timeseries/hoarder/TimeseriesTable.rb new file mode 100644 index 0000000..6e9eebd --- /dev/null +++ b/lib/timeseries/hoarder/TimeseriesTable.rb @@ -0,0 +1,36 @@ + +require_relative 'Table.rb' +require_relative 'CachingTable.rb' + +module Timeseries + module Hoarder + class SeriesTable < Table + def initialize(db, name, **opts) + @chunk_time = opts[:chunk_time] || '3h' + @compress_chunk_time = opts[:compress_chunk_time] || '24h' + + @name = name + @tags = CachingTable.new(db, name + '_tags') + + super(db, name, 'ts_hoarder') + end + + def table_creation + @pg.exec("CREATE TABLE #{@table_id} ( time TIMESTAMPTZ NOT NULL, source_id INTEGER NOT NULL, tags_id INTEGER NOT NULL, value DOUBLE PRECISION NOT NULL)") + + @pg.exec_params("SELECT create_hypertable('#{@table_schema}.#{@table_name}', 'time', chunk_time_interval => $1::interval)", [@chunk_time]) + @pg.exec_params("ALTER TABLE #{@table_id} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tags_id', timescaledb.compress_chunk_time_interval='#{@compress_chunk_time}')") + + view_sql = <