feat: ✨ add proper last_used_time updating to caching table
This commit is contained in:
parent
2a20ba8b76
commit
cd9e9deab3
1 changed files with 42 additions and 10 deletions
|
@ -7,7 +7,12 @@ require 'json'
|
||||||
module Timeseries
|
module Timeseries
|
||||||
module Hoarder
|
module Hoarder
|
||||||
class CachingTable < Table
|
class CachingTable < Table
|
||||||
|
attr_accessor :retention_time
|
||||||
|
attr_reader :db
|
||||||
|
|
||||||
def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60)
|
def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60)
|
||||||
|
raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database
|
||||||
|
|
||||||
@content_name = content_name
|
@content_name = content_name
|
||||||
@id_column = content_name + '_id'
|
@id_column = content_name + '_id'
|
||||||
|
|
||||||
|
@ -18,16 +23,18 @@ module Timeseries
|
||||||
@tag_access_updates = {}
|
@tag_access_updates = {}
|
||||||
|
|
||||||
@tag_access_update_delay = tag_access_update_delay
|
@tag_access_update_delay = tag_access_update_delay
|
||||||
|
|
||||||
|
@retention_time = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def table_creation
|
def table_creation
|
||||||
@pg.exec("CREATE TABLE ts_hoarder.#{@table_name} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )")
|
@pg.exec("CREATE TABLE #{@table_id} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, #{@content_name}_extra JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )")
|
||||||
|
|
||||||
@pg.exec("CREATE INDEX ON ts_hoarder.#{@table_name} USING GIN ( #{@content_name} )")
|
@pg.exec("CREATE INDEX ON #{@table_id} USING GIN ( #{@content_name} )")
|
||||||
end
|
end
|
||||||
|
|
||||||
def load_cache_content
|
def load_cache_content
|
||||||
@pg.exec("SELECT * FROM ts_hoarder.#{@table_name}") do |results|
|
@pg.exec("SELECT * FROM #{@table_id} WHERE (NOW() - last_used) < INTERVAL '3h'") do |results|
|
||||||
results.each do |tuple|
|
results.each do |tuple|
|
||||||
|
|
||||||
tags = JSON.parse(tuple[@content_name])
|
tags = JSON.parse(tuple[@content_name])
|
||||||
|
@ -46,16 +53,16 @@ module Timeseries
|
||||||
returned_id = nil
|
returned_id = nil
|
||||||
|
|
||||||
@pg.transaction do
|
@pg.transaction do
|
||||||
@pg.exec("LOCK TABLE ts_hoarder.#{@table_name}")
|
@pg.exec("LOCK TABLE #{@table_id}")
|
||||||
|
|
||||||
res = @pg.exec_params("SELECT * FROM ts_hoarder.#{@table_name} WHERE #{@content_name} = $1::jsonb", [tags.to_json])
|
res = @pg.exec_params("SELECT * FROM #{@table_id} WHERE #{@content_name} = $1::jsonb", [tags.to_json])
|
||||||
|
|
||||||
if(res.num_tuples >= 1)
|
if(res.num_tuples >= 1)
|
||||||
returned_id = res[0][@id_column]
|
returned_id = res[0][@id_column]
|
||||||
@known_tags[tags] = returned_id
|
@known_tags[tags] = returned_id
|
||||||
@tag_access_times[tags] = Time.parse(res[0]['last_used'])
|
@tag_access_times[tags] = Time.parse(res[0]['last_used'])
|
||||||
else
|
else
|
||||||
res = @pg.exec_params("INSERT INTO ts_hoarder.#{@table_name} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json])
|
res = @pg.exec_params("INSERT INTO #{@table_id} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json])
|
||||||
|
|
||||||
returned_id = res[0][@id_column]
|
returned_id = res[0][@id_column]
|
||||||
@known_tags[tags] = returned_id
|
@known_tags[tags] = returned_id
|
||||||
|
@ -66,12 +73,37 @@ module Timeseries
|
||||||
returned_id
|
returned_id
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def update_use_times()
|
||||||
|
time_column = []
|
||||||
|
id_column = []
|
||||||
|
|
||||||
|
@tag_access_updates.each do |tag, value|
|
||||||
|
time_column << @tag_access_times[tag].to_s
|
||||||
|
id_column << @known_tags[tag]
|
||||||
|
end
|
||||||
|
@tag_access_updates = {}
|
||||||
|
|
||||||
|
time_column = '{'+time_column.join(',')+'}'
|
||||||
|
id_column = '{'+id_column.join(',')+'}'
|
||||||
|
|
||||||
|
update_query = <<~SQL
|
||||||
|
WITH update_data AS (
|
||||||
|
SELECT unnest($1::timestamptz[]) AS last_used, unnest($2::int[]) AS id
|
||||||
|
)
|
||||||
|
UPDATE #{@table_id} AS o
|
||||||
|
SET last_used = GREATEST(o.last_used, n.last_used)
|
||||||
|
FROM update_data AS n
|
||||||
|
WHERE #{@id_column} = n.id
|
||||||
|
SQL
|
||||||
|
|
||||||
|
@pg.exec_params(update_query, [time_column, id_column]);
|
||||||
|
end
|
||||||
|
|
||||||
def [](tags)
|
def [](tags)
|
||||||
access_time = Time.now()
|
access_time = Time.now()
|
||||||
if(((@tag_access_times[tags] || Time.at(0)) - Time.now()) > @tag_access_update_delay)
|
|
||||||
@tag_access_times[tags] = access_time
|
@tag_access_times[tags] = access_time
|
||||||
@tag_access_updates[tags] = true
|
@tag_access_updates[tags] = true
|
||||||
end
|
|
||||||
|
|
||||||
known_id = @known_tags[tags]
|
known_id = @known_tags[tags]
|
||||||
return known_id unless known_id.nil?
|
return known_id unless known_id.nil?
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue