feat(database): ✨ add completely new analytics backend
This commit is contained in:
parent
31080cae2b
commit
5e2f0a7185
7 changed files with 446 additions and 130 deletions
334
www/src/db_handler/mysql_analytics_handler.php
Normal file
334
www/src/db_handler/mysql_analytics_handler.php
Normal file
|
@ -0,0 +1,334 @@
|
|||
<?php
|
||||
|
||||
require_once 'analytics_interface.php';
|
||||
|
||||
class MySQLAnalyticsHandler
|
||||
implements AnalyticsInterface {
|
||||
|
||||
private $sql_connection;
|
||||
private $hostname;
|
||||
|
||||
function __construct($sql_connection, $hostname) {
|
||||
$this->sql_connection = $sql_connection;
|
||||
$this->hostname = $hostname;
|
||||
}
|
||||
|
||||
private function _exec($qery, $argtypes = '', ...$args) {
|
||||
$stmt = $this->sql_connection->prepare($qery);
|
||||
|
||||
if($argtypes != ""){
|
||||
$stmt->bind_param($argtypes, ...$args);
|
||||
}
|
||||
$stmt->execute();
|
||||
|
||||
return $stmt->get_result();
|
||||
}
|
||||
|
||||
public function get_current_timestamp() {
|
||||
return (int)($this->_exec(
|
||||
"SELECT unix_timestamp(NOW()) AS ctime"
|
||||
)->fetch_assoc()['ctime']);
|
||||
}
|
||||
|
||||
public function increment_counter($tags, $counter, $value = 1, $timestamp = null) {
|
||||
$timestamp ??= $this->get_current_timestamp();
|
||||
|
||||
$qry =
|
||||
"INSERT INTO analytics_summations
|
||||
(
|
||||
time_bucket,
|
||||
metric,
|
||||
tags,
|
||||
metric_value
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
from_unixtime(floor(? / 300) * 300),
|
||||
?,
|
||||
?,
|
||||
?
|
||||
) AS new
|
||||
ON DUPLICATE KEY
|
||||
UPDATE metric_value=analytics_summations.metric_value + new.metric_value;
|
||||
";
|
||||
|
||||
$this->_exec($qry,
|
||||
"dssd",
|
||||
$timestamp, $counter, json_encode($tags), $value);
|
||||
}
|
||||
|
||||
public function insert_event($event_tags, $event_text) {
|
||||
$qry =
|
||||
"INSERT INTO analytics_events (
|
||||
event_time, tags, event_text
|
||||
)
|
||||
VALUES (NOW(), ?, ?)";
|
||||
|
||||
$this->_exec($qry, "ss",
|
||||
json_encode($event_tags), $event_text);
|
||||
}
|
||||
|
||||
public function log_path_access(
|
||||
$path,
|
||||
$agent,
|
||||
$referrer,
|
||||
$time) {
|
||||
|
||||
if(strlen($path) == 0) {
|
||||
$path = '/';
|
||||
}
|
||||
|
||||
$this->increment_counter([
|
||||
'host' => $this->hostname,
|
||||
'path' => $path,
|
||||
'agent' => $agent,
|
||||
'referrer' => $referrer,
|
||||
], 'access_sum');
|
||||
|
||||
$this->increment_counter([
|
||||
'host' => $this->hostname,
|
||||
'path' => $path
|
||||
], 'runtime', $time);
|
||||
}
|
||||
|
||||
public function log_path_errcode(
|
||||
$path, $code, $message) {
|
||||
|
||||
$this->insert_event([
|
||||
'host' => $this->hostname,
|
||||
'path' => $path,
|
||||
'code' => $code
|
||||
], $message);
|
||||
}
|
||||
|
||||
public function generate_lp_line($table, $tags, $values, $timestamp) {
|
||||
$out_str = $table;
|
||||
|
||||
$line_tags = [];
|
||||
foreach($tags AS $tag_key => $tag_value) {
|
||||
if(!preg_match('/^[\w_]+$/', $tag_key)) {
|
||||
throw new Exception('Invalid line tag key (' . $tag_key . ')!');
|
||||
}
|
||||
|
||||
$tag_value = preg_replace('/([,=\s])/', '\\\\$0', $tag_value);
|
||||
|
||||
$line_tags []= $tag_key . '=' . $tag_value;
|
||||
}
|
||||
|
||||
$line_values = [];
|
||||
foreach($values AS $tag_key => $tag_value) {
|
||||
if(!preg_match('/^[\w_]+$/', $tag_key)) {
|
||||
throw new Exception('Invalid line value key (' . $tag_key . ')!');
|
||||
}
|
||||
|
||||
if(gettype($tag_value) == 'string') {
|
||||
$tag_value = preg_replace('/(["\])/', '\\\\$0', $tag_value);
|
||||
$tag_value = preg_replace('/\n/', '\\\\n', $tag_value);
|
||||
$tag_value = '"' . $tag_value . '"';
|
||||
}
|
||||
elseif (gettype($tag_value) == 'integer') {
|
||||
$tag_value = $tag_value . 'i';
|
||||
}
|
||||
|
||||
$line_values []= $tag_key . '=' . $tag_value;
|
||||
}
|
||||
|
||||
return $table
|
||||
. ',' . implode(',', $line_tags)
|
||||
. ' ' . implode(',', $line_values)
|
||||
. ' ' . $timestamp;
|
||||
}
|
||||
|
||||
public function pop_analytics($delete = true) {
|
||||
$this->sql_connection->begin_transaction();
|
||||
|
||||
try {
|
||||
$barrier_time = $this->_exec("SELECT NOW() - INTERVAL 6 MINUTE AS ctime")->fetch_assoc()['ctime'];
|
||||
|
||||
$result = $this->_exec("
|
||||
SELECT *
|
||||
FROM analytics_summations
|
||||
WHERE time_bucket < ?
|
||||
ORDER BY metric, time_bucket DESC", "s", $barrier_time);
|
||||
|
||||
$data_category = "access_metrics";
|
||||
|
||||
$row = $result->fetch_assoc();
|
||||
$out_str = '';
|
||||
|
||||
while(isset($row)) {
|
||||
$row_tags = json_decode($row['tags']);
|
||||
$row_value = $row['metric_value'];
|
||||
|
||||
$row_metric = $row['metric'];
|
||||
|
||||
$out_str .= $this->generate_lp_line($data_category, $row_tags, [
|
||||
$row_metric => $row_value
|
||||
], strtotime($row['time_bucket']) . "000000000") . "\n";
|
||||
|
||||
$row = $result->fetch_assoc();
|
||||
}
|
||||
|
||||
$result = $this->_exec("
|
||||
SELECT *
|
||||
FROM analytics_events
|
||||
WHERE event_time < ?
|
||||
ORDER BY event_time DESC", "s", $barrier_time);
|
||||
|
||||
while(isset($row)) {
|
||||
$row_tags = json_decode($row['tags']);
|
||||
$row_value = $row['event_text'];
|
||||
|
||||
$row_metric = $row['metric'];
|
||||
|
||||
$out_str .= $this->generate_lp_line($data_category, $row_tags, [
|
||||
$row_metric => $row_value
|
||||
], strtotime($row['time_bucket']) . "000000000") . "\n";
|
||||
|
||||
$row = $result->fetch_assoc();
|
||||
}
|
||||
|
||||
if($delete) {
|
||||
$this->_exec("DELETE FROM analytics_summations WHERE time_bucket <= ?", "s", $barrier_time);
|
||||
}
|
||||
|
||||
$this->sql_connection->commit();
|
||||
|
||||
return $out_str;
|
||||
|
||||
} catch (\Throwable $th) {
|
||||
$this->sql_connection->rollback();
|
||||
throw $th;
|
||||
}
|
||||
}
|
||||
|
||||
public function pop_analytics_json($delete = true) {
|
||||
$this->sql_connection->begin_transaction();
|
||||
|
||||
try {
|
||||
$barrier_time = $this->_exec("SELECT NOW() - INTERVAL 6 MINUTE AS ctime")->fetch_assoc()['ctime'];
|
||||
|
||||
$out_data = [];
|
||||
|
||||
$result = $this->_exec("
|
||||
SELECT *
|
||||
FROM analytics_summations
|
||||
WHERE time_bucket < ?
|
||||
ORDER BY metric, time_bucket DESC", "s", $barrier_time);
|
||||
|
||||
$row = $result->fetch_assoc();
|
||||
|
||||
$current_metric_collection = [];
|
||||
$current_time_bucket_collection = [];
|
||||
|
||||
$current_metric = $row['metric'] ?? null;
|
||||
$current_time_bucket = $row['time_bucket'] ?? null;
|
||||
|
||||
while(isset($row)) {
|
||||
$current_time_bucket_collection[]= [
|
||||
'tags' => json_decode($row['tags']),
|
||||
'value' => floatval($row['metric_value'])
|
||||
];
|
||||
|
||||
$row = $result->fetch_assoc();
|
||||
|
||||
if(!isset($row)
|
||||
OR ($row['time_bucket'] != $current_time_bucket)
|
||||
OR ($row['metric'] != $current_metric)) {
|
||||
|
||||
$current_metric_collection []= [
|
||||
'time' => $current_time_bucket,
|
||||
'data' => $current_time_bucket_collection
|
||||
];
|
||||
|
||||
$current_time_bucket_collection = [];
|
||||
$current_time_bucket = $row['time_bucket'] ?? null;
|
||||
}
|
||||
if(!isset($row) OR ($row['metric'] != $current_metric)) {
|
||||
$out_data []= [
|
||||
'metric' => $current_metric,
|
||||
'data' => $current_metric_collection
|
||||
];
|
||||
$current_metric_collection = [];
|
||||
$current_metric = $row['metric'] ?? null;
|
||||
}
|
||||
}
|
||||
|
||||
if($delete) {
|
||||
$this->_exec("DELETE FROM analytics_summations WHERE time_bucket <= ?", "s", $barrier_time);
|
||||
}
|
||||
|
||||
$this->sql_connection->commit();
|
||||
|
||||
return json_encode($out_data);
|
||||
|
||||
} catch (\Throwable $th) {
|
||||
$this->sql_connection->rollback();
|
||||
throw $th;
|
||||
}
|
||||
}
|
||||
|
||||
public function pop_analytics_old($delete = true) {
|
||||
$this->sql_connection->begin_transaction();
|
||||
$out_data = "";
|
||||
|
||||
try {
|
||||
$barrier_time = $this->_exec("SELECT NOW() - INTERVAL 6 MINUTE AS ctime")->fetch_assoc()['ctime'];
|
||||
|
||||
$data = $this->_exec("
|
||||
SELECT *
|
||||
FROM analytics_access_sums
|
||||
WHERE time_bucket < ?
|
||||
", "s", $barrier_time)->fetch_all(MYSQLI_ASSOC);
|
||||
|
||||
$data_prefix="analytics_access_sums";
|
||||
|
||||
foreach($data AS $post_data) {
|
||||
$path = $post_data['request_path'];
|
||||
if($path == '') {
|
||||
$path = '/';
|
||||
}
|
||||
$out_data .= $data_prefix . ",host=" . $post_data['host'] . ",agent=".$post_data['agent'];
|
||||
$out_data .= ",path=".$path.",referrer=".$post_data['referrer'];
|
||||
|
||||
$out_data .= " access_sum=" . $post_data['access_sum'];
|
||||
$out_data .= " " . strtotime($post_data['time_bucket']) . "000000000\n";
|
||||
}
|
||||
|
||||
$data = $this->_exec("
|
||||
SELECT *
|
||||
FROM analytics_processing_time_sums
|
||||
WHERE time_bucket < ?
|
||||
", "s", $barrier_time)->fetch_all(MYSQLI_ASSOC);
|
||||
|
||||
$data_prefix="analytics_processing_time_sums";
|
||||
|
||||
foreach($data AS $post_data) {
|
||||
$path = $post_data['request_path'];
|
||||
if($path == '') {
|
||||
$path = '/';
|
||||
}
|
||||
$out_data .= $data_prefix . ",host=" . $post_data['host'];
|
||||
$out_data .= ",path=".$path;
|
||||
|
||||
$out_data .= " time_sum=" . $post_data['time_sum'];
|
||||
$out_data .= " " . strtotime($post_data['time_bucket']) . "000000000\n";
|
||||
}
|
||||
|
||||
if($delete) {
|
||||
$this->_exec("DELETE FROM analytics_access_sums WHERE time_bucket <= ?", "s", $barrier_time);
|
||||
$this->_exec("DELETE FROM analytics_processing_time_sums WHERE time_bucket <= ?", "s", $barrier_time);
|
||||
}
|
||||
|
||||
$this->sql_connection->commit();
|
||||
return $out_data;
|
||||
|
||||
} catch (\Throwable $th) {
|
||||
$this->sql_connection->rollback();
|
||||
|
||||
throw $th;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
?>
|
Loading…
Add table
Add a link
Reference in a new issue