From cd9e9deab379cb5419e9ef972f1055c156f399f4 Mon Sep 17 00:00:00 2001 From: David Bailey Date: Thu, 18 May 2023 14:08:28 +0200 Subject: [PATCH] feat: :sparkles: add proper last_used_time updating to caching table --- lib/timeseries/hoarder/CachingTable.rb | 52 +++++++++++++++++++++----- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/lib/timeseries/hoarder/CachingTable.rb b/lib/timeseries/hoarder/CachingTable.rb index 273b96e..c8f967e 100644 --- a/lib/timeseries/hoarder/CachingTable.rb +++ b/lib/timeseries/hoarder/CachingTable.rb @@ -7,7 +7,12 @@ require 'json' module Timeseries module Hoarder class CachingTable < Table + attr_accessor :retention_time + attr_reader :db + def initialize(db, name, content_name = 'tags', tag_access_update_delay: 60) + raise ArgumentError, "DB needs to be a Timeseries::Hoarder::Database!" unless db.is_a? Database + @content_name = content_name @id_column = content_name + '_id' @@ -18,16 +23,18 @@ module Timeseries @tag_access_updates = {} @tag_access_update_delay = tag_access_update_delay + + @retention_time = nil end def table_creation - @pg.exec("CREATE TABLE ts_hoarder.#{@table_name} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )") + @pg.exec("CREATE TABLE #{@table_id} ( #{@id_column} SERIAL PRIMARY KEY, #{@content_name} JSONB, #{@content_name}_extra JSONB, created_at TIMESTAMPTZ, last_used TIMESTAMPTZ )") - @pg.exec("CREATE INDEX ON ts_hoarder.#{@table_name} USING GIN ( #{@content_name} )") + @pg.exec("CREATE INDEX ON #{@table_id} USING GIN ( #{@content_name} )") end def load_cache_content - @pg.exec("SELECT * FROM ts_hoarder.#{@table_name}") do |results| + @pg.exec("SELECT * FROM #{@table_id} WHERE (NOW() - last_used) < INTERVAL '3h'") do |results| results.each do |tuple| tags = JSON.parse(tuple[@content_name]) @@ -46,16 +53,16 @@ module Timeseries returned_id = nil @pg.transaction do - @pg.exec("LOCK TABLE ts_hoarder.#{@table_name}") + @pg.exec("LOCK TABLE #{@table_id}") - res = @pg.exec_params("SELECT * FROM ts_hoarder.#{@table_name} WHERE #{@content_name} = $1::jsonb", [tags.to_json]) + res = @pg.exec_params("SELECT * FROM #{@table_id} WHERE #{@content_name} = $1::jsonb", [tags.to_json]) if(res.num_tuples >= 1) returned_id = res[0][@id_column] @known_tags[tags] = returned_id @tag_access_times[tags] = Time.parse(res[0]['last_used']) else - res = @pg.exec_params("INSERT INTO ts_hoarder.#{@table_name} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json]) + res = @pg.exec_params("INSERT INTO #{@table_id} (#{@content_name}, created_at, last_used) VALUES ($1::jsonb, NOW(), NOW()) RETURNING #{@id_column}", [tags.to_json]) returned_id = res[0][@id_column] @known_tags[tags] = returned_id @@ -66,12 +73,37 @@ module Timeseries returned_id end + def update_use_times() + time_column = [] + id_column = [] + + @tag_access_updates.each do |tag, value| + time_column << @tag_access_times[tag].to_s + id_column << @known_tags[tag] + end + @tag_access_updates = {} + + time_column = '{'+time_column.join(',')+'}' + id_column = '{'+id_column.join(',')+'}' + + update_query = <<~SQL + WITH update_data AS ( + SELECT unnest($1::timestamptz[]) AS last_used, unnest($2::int[]) AS id + ) + UPDATE #{@table_id} AS o + SET last_used = GREATEST(o.last_used, n.last_used) + FROM update_data AS n + WHERE #{@id_column} = n.id + SQL + + @pg.exec_params(update_query, [time_column, id_column]); + end + def [](tags) access_time = Time.now() - if(((@tag_access_times[tags] || Time.at(0)) - Time.now()) > @tag_access_update_delay) - @tag_access_times[tags] = access_time - @tag_access_updates[tags] = true - end + + @tag_access_times[tags] = access_time + @tag_access_updates[tags] = true known_id = @known_tags[tags] return known_id unless known_id.nil?