timeseries-hoarder/lib/timeseries/hoarder/CachingTable.rb

115 lines
3 KiB
Ruby
Raw Normal View History

require_relative 'Table.rb'
require 'time'
require 'json'
module Timeseries
module Hoarder
class CachingTable < Table
attr_accessor :retention_time
attr_reader :db
def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60)
raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database
@content_name = content_name
@id_column = content_name + '_id'
super(db, name, 'ts_hoarder')
@known_tags = {}
@tag_access_times = {}
@tag_access_updates = {}
@tag_access_update_delay = tag_access_update_delay
@retention_time = nil
end
def table_creation
@pg.exec("CREATE TABLE #{@table_id} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, #{@content_name}_extra JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )")
@pg.exec("CREATE INDEX ON #{@table_id} USING GIN ( #{@content_name} )")
end
def load_cache_content
@pg.exec("SELECT * FROM #{@table_id} WHERE (NOW() - last_used) < INTERVAL '3h'") do |results|
results.each do |tuple|
tags = JSON.parse(tuple[@content_name])
@known_tags[tags] = tuple[@id_column]
@tag_access_times[tags] = Time.parse(tuple['last_used'])
end
end
true
end
def create_entry(tags)
return @known_tags[tags] if @known_tags.include? tags
returned_id = nil
@pg.transaction do
@pg.exec("LOCK TABLE #{@table_id}")
res = @pg.exec_params("SELECT * FROM #{@table_id} WHERE #{@content_name} = $1::jsonb", [tags.to_json])
if(res.num_tuples >= 1)
returned_id = res[0][@id_column]
@known_tags[tags] = returned_id
@tag_access_times[tags] = Time.parse(res[0]['last_used'])
else
res = @pg.exec_params("INSERT INTO #{@table_id} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json])
returned_id = res[0][@id_column]
@known_tags[tags] = returned_id
@tag_access_times[tags] = Time.now()
end
end
returned_id
end
def update_use_times()
time_column = []
id_column = []
@tag_access_updates.each do |tag, value|
time_column << @tag_access_times[tag].to_s
id_column << @known_tags[tag]
end
@tag_access_updates = {}
time_column = '{'+time_column.join(',')+'}'
id_column = '{'+id_column.join(',')+'}'
update_query = <<~SQL
WITH update_data AS (
SELECT unnest($1::timestamptz[]) AS last_used, unnest($2::int[]) AS id
)
UPDATE #{@table_id} AS o
SET last_used = GREATEST(o.last_used, n.last_used)
FROM update_data AS n
WHERE #{@id_column} = n.id
SQL
@pg.exec_params(update_query, [time_column, id_column]);
end
def [](tags)
access_time = Time.now()
@tag_access_times[tags] = access_time
@tag_access_updates[tags] = true
known_id = @known_tags[tags]
return known_id unless known_id.nil?
return create_entry(tags)
end
end
end
end