Compare commits
6 commits
dca5944549
...
2e74de359c
Author | SHA1 | Date | |
---|---|---|---|
2e74de359c | |||
3d15464521 | |||
637bb865b6 | |||
cd9e9deab3 | |||
2a20ba8b76 | |||
30eab9065e |
8 changed files with 166 additions and 17 deletions
5
Gemfile
5
Gemfile
|
@ -7,4 +7,9 @@ gemspec
|
||||||
|
|
||||||
gem "rake", "~> 13.0"
|
gem "rake", "~> 13.0"
|
||||||
|
|
||||||
|
gem "pry"
|
||||||
|
|
||||||
|
gem "debug", "~> 1.0"
|
||||||
|
gem "solargraph", "~> 0.48"
|
||||||
|
|
||||||
gem "rubocop", "~> 1.21"
|
gem "rubocop", "~> 1.21"
|
||||||
|
|
52
Gemfile.lock
52
Gemfile.lock
|
@ -9,15 +9,43 @@ GEM
|
||||||
remote: https://rubygems.org/
|
remote: https://rubygems.org/
|
||||||
specs:
|
specs:
|
||||||
ast (2.4.2)
|
ast (2.4.2)
|
||||||
|
backport (1.2.0)
|
||||||
|
benchmark (0.2.1)
|
||||||
|
coderay (1.1.3)
|
||||||
|
debug (1.8.0)
|
||||||
|
irb (>= 1.5.0)
|
||||||
|
reline (>= 0.3.1)
|
||||||
|
diff-lcs (1.5.0)
|
||||||
|
e2mmap (0.1.0)
|
||||||
influxparser (0.0.5)
|
influxparser (0.0.5)
|
||||||
|
io-console (0.6.0)
|
||||||
|
irb (1.6.4)
|
||||||
|
reline (>= 0.3.0)
|
||||||
|
jaro_winkler (1.5.4)
|
||||||
json (2.6.3)
|
json (2.6.3)
|
||||||
|
kramdown (2.4.0)
|
||||||
|
rexml
|
||||||
|
kramdown-parser-gfm (1.1.0)
|
||||||
|
kramdown (~> 2.0)
|
||||||
|
method_source (1.0.0)
|
||||||
|
nokogiri (1.14.3-x86_64-linux)
|
||||||
|
racc (~> 1.4)
|
||||||
parallel (1.22.1)
|
parallel (1.22.1)
|
||||||
parser (3.2.2.0)
|
parser (3.2.2.0)
|
||||||
ast (~> 2.4.1)
|
ast (~> 2.4.1)
|
||||||
pg (1.5.3)
|
pg (1.5.3)
|
||||||
|
pry (0.14.2)
|
||||||
|
coderay (~> 1.1)
|
||||||
|
method_source (~> 1.0)
|
||||||
|
racc (1.6.2)
|
||||||
rainbow (3.1.1)
|
rainbow (3.1.1)
|
||||||
rake (13.0.6)
|
rake (13.0.6)
|
||||||
|
rbs (2.8.4)
|
||||||
regexp_parser (2.7.0)
|
regexp_parser (2.7.0)
|
||||||
|
reline (0.3.3)
|
||||||
|
io-console (~> 0.5)
|
||||||
|
reverse_markdown (2.1.1)
|
||||||
|
nokogiri
|
||||||
rexml (3.2.5)
|
rexml (3.2.5)
|
||||||
rubocop (1.50.1)
|
rubocop (1.50.1)
|
||||||
json (~> 2.3)
|
json (~> 2.3)
|
||||||
|
@ -32,15 +60,37 @@ GEM
|
||||||
rubocop-ast (1.28.0)
|
rubocop-ast (1.28.0)
|
||||||
parser (>= 3.2.1.0)
|
parser (>= 3.2.1.0)
|
||||||
ruby-progressbar (1.13.0)
|
ruby-progressbar (1.13.0)
|
||||||
|
solargraph (0.49.0)
|
||||||
|
backport (~> 1.2)
|
||||||
|
benchmark
|
||||||
|
bundler (~> 2.0)
|
||||||
|
diff-lcs (~> 1.4)
|
||||||
|
e2mmap
|
||||||
|
jaro_winkler (~> 1.5)
|
||||||
|
kramdown (~> 2.3)
|
||||||
|
kramdown-parser-gfm (~> 1.1)
|
||||||
|
parser (~> 3.0)
|
||||||
|
rbs (~> 2.0)
|
||||||
|
reverse_markdown (~> 2.0)
|
||||||
|
rubocop (~> 1.38)
|
||||||
|
thor (~> 1.0)
|
||||||
|
tilt (~> 2.0)
|
||||||
|
yard (~> 0.9, >= 0.9.24)
|
||||||
|
thor (1.2.1)
|
||||||
|
tilt (2.1.0)
|
||||||
unicode-display_width (2.4.2)
|
unicode-display_width (2.4.2)
|
||||||
|
yard (0.9.34)
|
||||||
|
|
||||||
PLATFORMS
|
PLATFORMS
|
||||||
x86_64-linux
|
x86_64-linux
|
||||||
|
|
||||||
DEPENDENCIES
|
DEPENDENCIES
|
||||||
|
debug (~> 1.0)
|
||||||
|
pry
|
||||||
rake (~> 13.0)
|
rake (~> 13.0)
|
||||||
rubocop (~> 1.21)
|
rubocop (~> 1.21)
|
||||||
|
solargraph (~> 0.48)
|
||||||
timeseries-hoarder!
|
timeseries-hoarder!
|
||||||
|
|
||||||
BUNDLED WITH
|
BUNDLED WITH
|
||||||
2.3.26
|
2.3.22
|
||||||
|
|
12
bin/console
12
bin/console
|
@ -7,9 +7,11 @@ require "timeseries/hoarder"
|
||||||
# You can add fixtures and/or initialization code here to make experimenting
|
# You can add fixtures and/or initialization code here to make experimenting
|
||||||
# with your gem easier. You can also use a different console, if you like.
|
# with your gem easier. You can also use a different console, if you like.
|
||||||
|
|
||||||
# (If you use this, don't forget to add pry to your Gemfile!)
|
$pg = PG.connect('user=postgres dbname=tshoard_test')
|
||||||
# require "pry"
|
$pg.exec("DROP TABLE IF EXISTS ts_hoarder.test CASCADE")
|
||||||
# Pry.start
|
|
||||||
|
|
||||||
require "irb"
|
$db = Timeseries::Hoarder::Database.new($pg)
|
||||||
IRB.start(__FILE__)
|
|
||||||
|
# (If you use this, don't forget to add pry to your Gemfile!)
|
||||||
|
require "pry"
|
||||||
|
Pry.start
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
require_relative "hoarder/version"
|
require_relative "hoarder/version"
|
||||||
|
|
||||||
|
require_relative 'hoarder/TimeseriesDatabase.rb'
|
||||||
|
|
||||||
module Timeseries
|
module Timeseries
|
||||||
module Hoarder
|
module Hoarder
|
||||||
class Error < StandardError; end
|
class Error < StandardError; end
|
||||||
|
|
|
@ -7,7 +7,12 @@ require 'json'
|
||||||
module Timeseries
|
module Timeseries
|
||||||
module Hoarder
|
module Hoarder
|
||||||
class CachingTable < Table
|
class CachingTable < Table
|
||||||
|
attr_accessor :retention_time
|
||||||
|
attr_reader :db
|
||||||
|
|
||||||
def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60)
|
def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60)
|
||||||
|
raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database
|
||||||
|
|
||||||
@content_name = content_name
|
@content_name = content_name
|
||||||
@id_column = content_name + '_id'
|
@id_column = content_name + '_id'
|
||||||
|
|
||||||
|
@ -18,16 +23,18 @@ module Timeseries
|
||||||
@tag_access_updates = {}
|
@tag_access_updates = {}
|
||||||
|
|
||||||
@tag_access_update_delay = tag_access_update_delay
|
@tag_access_update_delay = tag_access_update_delay
|
||||||
|
|
||||||
|
@retention_time = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def table_creation
|
def table_creation
|
||||||
@pg.exec("CREATE TABLE ts_hoarder.#{@table_name} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )")
|
@pg.exec("CREATE TABLE #{@table_id} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, #{@content_name}_extra JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )")
|
||||||
|
|
||||||
@pg.exec("CREATE INDEX ON ts_hoarder.#{@table_name} USING GIN ( #{@content_name} )")
|
@pg.exec("CREATE INDEX ON #{@table_id} USING GIN ( #{@content_name} )")
|
||||||
end
|
end
|
||||||
|
|
||||||
def load_cache_content
|
def load_cache_content
|
||||||
@pg.exec("SELECT * FROM ts_hoarder.#{@table_name}") do |results|
|
@pg.exec("SELECT * FROM #{@table_id} WHERE (NOW() - last_used) < INTERVAL '3h'") do |results|
|
||||||
results.each do |tuple|
|
results.each do |tuple|
|
||||||
|
|
||||||
tags = JSON.parse(tuple[@content_name])
|
tags = JSON.parse(tuple[@content_name])
|
||||||
|
@ -46,16 +53,16 @@ module Timeseries
|
||||||
returned_id = nil
|
returned_id = nil
|
||||||
|
|
||||||
@pg.transaction do
|
@pg.transaction do
|
||||||
@pg.exec("LOCK TABLE ts_hoarder.#{@table_name}")
|
@pg.exec("LOCK TABLE #{@table_id}")
|
||||||
|
|
||||||
res = @pg.exec_params("SELECT * FROM ts_hoarder.#{@table_name} WHERE #{@content_name} = $1::jsonb", [tags.to_json])
|
res = @pg.exec_params("SELECT * FROM #{@table_id} WHERE #{@content_name} = $1::jsonb", [tags.to_json])
|
||||||
|
|
||||||
if(res.num_tuples >= 1)
|
if(res.num_tuples >= 1)
|
||||||
returned_id = res[0][@id_column]
|
returned_id = res[0][@id_column]
|
||||||
@known_tags[tags] = returned_id
|
@known_tags[tags] = returned_id
|
||||||
@tag_access_times[tags] = Time.parse(res[0]['last_used'])
|
@tag_access_times[tags] = Time.parse(res[0]['last_used'])
|
||||||
else
|
else
|
||||||
res = @pg.exec_params("INSERT INTO ts_hoarder.#{@table_name} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json])
|
res = @pg.exec_params("INSERT INTO #{@table_id} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json])
|
||||||
|
|
||||||
returned_id = res[0][@id_column]
|
returned_id = res[0][@id_column]
|
||||||
@known_tags[tags] = returned_id
|
@known_tags[tags] = returned_id
|
||||||
|
@ -66,12 +73,37 @@ module Timeseries
|
||||||
returned_id
|
returned_id
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def update_use_times()
|
||||||
|
time_column = []
|
||||||
|
id_column = []
|
||||||
|
|
||||||
|
@tag_access_updates.each do |tag, value|
|
||||||
|
time_column << @tag_access_times[tag].to_s
|
||||||
|
id_column << @known_tags[tag]
|
||||||
|
end
|
||||||
|
@tag_access_updates = {}
|
||||||
|
|
||||||
|
time_column = '{'+time_column.join(',')+'}'
|
||||||
|
id_column = '{'+id_column.join(',')+'}'
|
||||||
|
|
||||||
|
update_query = <<~SQL
|
||||||
|
WITH update_data AS (
|
||||||
|
SELECT unnest($1::timestamptz[]) AS last_used, unnest($2::int[]) AS id
|
||||||
|
)
|
||||||
|
UPDATE #{@table_id} AS o
|
||||||
|
SET last_used = GREATEST(o.last_used, n.last_used)
|
||||||
|
FROM update_data AS n
|
||||||
|
WHERE #{@id_column} = n.id
|
||||||
|
SQL
|
||||||
|
|
||||||
|
@pg.exec_params(update_query, [time_column, id_column]);
|
||||||
|
end
|
||||||
|
|
||||||
def [](tags)
|
def [](tags)
|
||||||
access_time = Time.now()
|
access_time = Time.now()
|
||||||
if(((@tag_access_times[tags] || Time.at(0)) - Time.now()) > @tag_access_update_delay)
|
|
||||||
@tag_access_times[tags] = access_time
|
@tag_access_times[tags] = access_time
|
||||||
@tag_access_updates[tags] = true
|
@tag_access_updates[tags] = true
|
||||||
end
|
|
||||||
|
|
||||||
known_id = @known_tags[tags]
|
known_id = @known_tags[tags]
|
||||||
return known_id unless known_id.nil?
|
return known_id unless known_id.nil?
|
||||||
|
|
|
@ -3,10 +3,20 @@
|
||||||
module Timeseries
|
module Timeseries
|
||||||
module Hoarder
|
module Hoarder
|
||||||
class Table
|
class Table
|
||||||
|
attr_reader :table_name, :table_schema, :table_id
|
||||||
|
|
||||||
def initialize(db, table_name, table_schema = "public")
|
def initialize(db, table_name, table_schema = "public")
|
||||||
|
raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database
|
||||||
|
|
||||||
|
if (not table_name.is_a? String) or (not table_schema.is_a? String)
|
||||||
|
raise ArgumentError, "Table name and schema must be strings!"
|
||||||
|
end
|
||||||
|
|
||||||
@table_name = table_name
|
@table_name = table_name
|
||||||
@table_schema = table_schema
|
@table_schema = table_schema
|
||||||
|
|
||||||
|
@table_id = "\"#{@table_schema}\".\"#{@table_name}\""
|
||||||
|
|
||||||
@db = db
|
@db = db
|
||||||
@pg = @db.pg
|
@pg = @db.pg
|
||||||
|
|
||||||
|
@ -21,7 +31,7 @@ module Timeseries
|
||||||
@pg.transaction do
|
@pg.transaction do
|
||||||
@pg.exec("SELECT pg_advisory_lock(0)")
|
@pg.exec("SELECT pg_advisory_lock(0)")
|
||||||
|
|
||||||
r = @pg.exec_params("SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = $2", [@table_name, @schema_name])
|
r = @pg.exec_params("SELECT 1 FROM information_schema.tables WHERE table_name = $1 AND table_schema = $2", [@table_name, @table_schema])
|
||||||
|
|
||||||
if r.num_tuples >= 1
|
if r.num_tuples >= 1
|
||||||
@created = true
|
@created = true
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
require 'pg'
|
require 'pg'
|
||||||
|
|
||||||
require_relative 'CachingTable.rb'
|
require_relative 'CachingTable.rb'
|
||||||
|
require_relative 'TimeseriesTable.rb'
|
||||||
|
|
||||||
module Timeseries
|
module Timeseries
|
||||||
module Hoarder
|
module Hoarder
|
||||||
|
@ -12,9 +13,20 @@ module Timeseries
|
||||||
def initialize(pg)
|
def initialize(pg)
|
||||||
@pg = pg
|
@pg = pg
|
||||||
|
|
||||||
|
@pg.exec("CREATE EXTENSION IF NOT EXISTS timescaledb")
|
||||||
|
@pg.exec("CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit")
|
||||||
|
|
||||||
@pg.exec("CREATE SCHEMA IF NOT EXISTS ts_hoarder")
|
@pg.exec("CREATE SCHEMA IF NOT EXISTS ts_hoarder")
|
||||||
|
|
||||||
@data_sources = CachingTable.new(self, 'sources', 'source')
|
@data_sources = CachingTable.new(self, 'sources', 'source')
|
||||||
|
|
||||||
|
@series = {}
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_timeseries(name, **opts)
|
||||||
|
return @series[name] if @series.include? name
|
||||||
|
|
||||||
|
new_series = SeriesTable.new(self, name, **opts)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
36
lib/timeseries/hoarder/TimeseriesTable.rb
Normal file
36
lib/timeseries/hoarder/TimeseriesTable.rb
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
|
||||||
|
require_relative 'Table.rb'
|
||||||
|
require_relative 'CachingTable.rb'
|
||||||
|
|
||||||
|
module Timeseries
|
||||||
|
module Hoarder
|
||||||
|
class SeriesTable < Table
|
||||||
|
def initialize(db, name, **opts)
|
||||||
|
@chunk_time = opts[:chunk_time] || '3h'
|
||||||
|
@compress_chunk_time = opts[:compress_chunk_time] || '24h'
|
||||||
|
|
||||||
|
@name = name
|
||||||
|
@tags = CachingTable.new(db, name + '_tags')
|
||||||
|
|
||||||
|
super(db, name, 'ts_hoarder')
|
||||||
|
end
|
||||||
|
|
||||||
|
def table_creation
|
||||||
|
@pg.exec("CREATE TABLE #{@table_id} ( time TIMESTAMPTZ NOT NULL, source_id INTEGER NOT NULL, tags_id INTEGER NOT NULL, value DOUBLE PRECISION NOT NULL)")
|
||||||
|
|
||||||
|
@pg.exec_params("SELECT create_hypertable('#{@table_schema}.#{@table_name}', 'time', chunk_time_interval => $1::interval)", [@chunk_time])
|
||||||
|
@pg.exec_params("ALTER TABLE #{@table_id} SET (timescaledb.compress, timescaledb.compress_segmentby = 'source_id, tags_id', timescaledb.compress_chunk_time_interval='#{@compress_chunk_time}')")
|
||||||
|
|
||||||
|
view_sql = <<SQL
|
||||||
|
CREATE OR REPLACE VIEW #{@name} AS
|
||||||
|
SELECT time, source, tags, value
|
||||||
|
FROM #{@table_id}
|
||||||
|
INNER JOIN #{@tags.table_id} USING ( tags_id )
|
||||||
|
INNER JOIN #{@db.data_sources.table_id} USING ( source_id )
|
||||||
|
SQL
|
||||||
|
|
||||||
|
@pg.exec(view_sql)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue