sql_connection = $sql_connection; $this->hostname = $hostname; } private function _exec($qery, $argtypes = '', ...$args) { $stmt = $this->sql_connection->prepare($qery); if($argtypes != ""){ $stmt->bind_param($argtypes, ...$args); } $stmt->execute(); return $stmt->get_result(); } public function get_current_timestamp() { return (int)($this->_exec( "SELECT unix_timestamp(NOW()) AS ctime" )->fetch_assoc()['ctime']); } public function increment_counter($tags, $counter, $value = 1, $timestamp = null) { $timestamp ??= $this->get_current_timestamp(); $qry = "INSERT INTO analytics_summations ( time_bucket, metric, tags, metric_value ) VALUES ( from_unixtime(floor(? / 300) * 300), ?, ?, ? ) AS new ON DUPLICATE KEY UPDATE metric_value=analytics_summations.metric_value + new.metric_value; "; $this->_exec($qry, "dssd", $timestamp, $counter, json_encode($tags), $value); } public function insert_event($event_tags, $event_text) { $qry = "INSERT INTO analytics_events ( event_time, tags, event_text ) VALUES (NOW(), ?, ?)"; $this->_exec($qry, "ss", json_encode($event_tags), $event_text); } public function log_path_access( $path, $agent, $referrer, $time, $status = 200) { if(strlen($path) == 0) { $path = '/'; } $this->increment_counter([ 'host' => $this->hostname, 'path' => $path, 'agent' => $agent, 'referrer' => $referrer, 'status' => $status ], 'access_sum'); $this->increment_counter([ 'host' => $this->hostname, 'path' => $path ], 'runtime', $time); } public function log_path_errcode( $path, $code, $message) { $this->insert_event([ 'host' => $this->hostname, 'path' => $path, 'code' => $code ], $message); } public function generate_lp_line($table, $tags, $values, $timestamp) { $out_str = $table; $line_tags = []; foreach($tags AS $tag_key => $tag_value) { if(!preg_match('/^[\w_]+$/', $tag_key)) { throw new Exception('Invalid line tag key (' . $tag_key . ')!'); } $tag_value = preg_replace('/([,=\s])/', '\\\\$0', $tag_value); $line_tags []= $tag_key . '=' . $tag_value; } $line_values = []; foreach($values AS $tag_key => $tag_value) { if(!preg_match('/^[\w_]+$/', $tag_key)) { throw new Exception('Invalid line value key (' . $tag_key . ')!'); } if(gettype($tag_value) == 'string') { $tag_value = preg_replace('/(["\])/', '\\\\$0', $tag_value); $tag_value = preg_replace('/\n/', '\\\\n', $tag_value); $tag_value = '"' . $tag_value . '"'; } elseif (gettype($tag_value) == 'integer') { $tag_value = $tag_value . 'i'; } $line_values []= $tag_key . '=' . $tag_value; } return $table . ',' . implode(',', $line_tags) . ' ' . implode(',', $line_values) . ' ' . $timestamp; } public function pop_analytics($delete = true) { $this->sql_connection->begin_transaction(); try { $barrier_time = $this->_exec("SELECT NOW() - INTERVAL 6 MINUTE AS ctime")->fetch_assoc()['ctime']; $result = $this->_exec(" SELECT * FROM analytics_summations WHERE time_bucket < ? ORDER BY metric, time_bucket DESC", "s", $barrier_time); $data_category = "access_metrics"; $row = $result->fetch_assoc(); $out_str = ''; while(isset($row)) { $row_tags = json_decode($row['tags']); $row_value = $row['metric_value']; $row_metric = $row['metric']; $out_str .= $this->generate_lp_line($data_category, $row_tags, [ $row_metric => $row_value ], strtotime($row['time_bucket']) . "000000000") . "\n"; $row = $result->fetch_assoc(); } $result = $this->_exec(" SELECT * FROM analytics_events WHERE event_time < ? ORDER BY event_time DESC", "s", $barrier_time); while(isset($row)) { $row_tags = json_decode($row['tags']); $row_value = $row['event_text']; $row_metric = $row['metric']; $out_str .= $this->generate_lp_line($data_category, $row_tags, [ $row_metric => $row_value ], strtotime($row['time_bucket']) . "000000000") . "\n"; $row = $result->fetch_assoc(); } if($delete) { $this->_exec("DELETE FROM analytics_summations WHERE time_bucket <= ?", "s", $barrier_time); } $this->sql_connection->commit(); return $out_str; } catch (\Throwable $th) { $this->sql_connection->rollback(); throw $th; } } public function pop_analytics_json($delete = true) { $this->sql_connection->begin_transaction(); try { $barrier_time = $this->_exec("SELECT NOW() - INTERVAL 6 MINUTE AS ctime")->fetch_assoc()['ctime']; $out_data = []; $result = $this->_exec(" SELECT * FROM analytics_summations WHERE time_bucket < ? ORDER BY metric, time_bucket DESC", "s", $barrier_time); $row = $result->fetch_assoc(); $current_metric_collection = []; $current_time_bucket_collection = []; $current_metric = $row['metric'] ?? null; $current_time_bucket = $row['time_bucket'] ?? null; while(isset($row)) { $current_time_bucket_collection[]= [ 'tags' => json_decode($row['tags']), 'value' => floatval($row['metric_value']) ]; $row = $result->fetch_assoc(); if(!isset($row) OR ($row['time_bucket'] != $current_time_bucket) OR ($row['metric'] != $current_metric)) { $current_metric_collection []= [ 'time' => $current_time_bucket, 'data' => $current_time_bucket_collection ]; $current_time_bucket_collection = []; $current_time_bucket = $row['time_bucket'] ?? null; } if(!isset($row) OR ($row['metric'] != $current_metric)) { $out_data []= [ 'metric' => $current_metric, 'data' => $current_metric_collection ]; $current_metric_collection = []; $current_metric = $row['metric'] ?? null; } } if($delete) { $this->_exec("DELETE FROM analytics_summations WHERE time_bucket <= ?", "s", $barrier_time); } $this->sql_connection->commit(); return json_encode($out_data); } catch (\Throwable $th) { $this->sql_connection->rollback(); throw $th; } } public function pop_analytics_old($delete = true) { $this->sql_connection->begin_transaction(); $out_data = ""; try { $barrier_time = $this->_exec("SELECT NOW() - INTERVAL 6 MINUTE AS ctime")->fetch_assoc()['ctime']; $data = $this->_exec(" SELECT * FROM analytics_access_sums WHERE time_bucket < ? ", "s", $barrier_time)->fetch_all(MYSQLI_ASSOC); $data_prefix="analytics_access_sums"; foreach($data AS $post_data) { $path = $post_data['request_path']; if($path == '') { $path = '/'; } $out_data .= $data_prefix . ",host=" . $post_data['host'] . ",agent=".$post_data['agent']; $out_data .= ",path=".$path.",referrer=".$post_data['referrer']; $out_data .= " access_sum=" . $post_data['access_sum']; $out_data .= " " . strtotime($post_data['time_bucket']) . "000000000\n"; } $data = $this->_exec(" SELECT * FROM analytics_processing_time_sums WHERE time_bucket < ? ", "s", $barrier_time)->fetch_all(MYSQLI_ASSOC); $data_prefix="analytics_processing_time_sums"; foreach($data AS $post_data) { $path = $post_data['request_path']; if($path == '') { $path = '/'; } $out_data .= $data_prefix . ",host=" . $post_data['host']; $out_data .= ",path=".$path; $out_data .= " time_sum=" . $post_data['time_sum']; $out_data .= " " . strtotime($post_data['time_bucket']) . "000000000\n"; } if($delete) { $this->_exec("DELETE FROM analytics_access_sums WHERE time_bucket <= ?", "s", $barrier_time); $this->_exec("DELETE FROM analytics_processing_time_sums WHERE time_bucket <= ?", "s", $barrier_time); } $this->sql_connection->commit(); return $out_data; } catch (\Throwable $th) { $this->sql_connection->rollback(); throw $th; } } } ?>