#!php -f
* @package tools
* @license http://www.opensource.org/licenses/gpl-license.php GNU Public License
*/
/**
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see .
*/
/**
* Path to Volkszaehler
*
* Absolute Path or relative to current workdir
*/
define('VZCOMPRESS2_VZPATH', '../../');
//Dummy VZ_DIR so volkszaehler.conf.php doesn't throw warnings
if(!defined('VZ_DIR')) define('VZ_DIR', '');
class vzcompress2 {
private $config;
private $sql_config;
private $sql;
private $channels;
private $entities;
private $purgecounter;
private $timestr = '%x %X';
static private $sensortypemap = array(
'SensorInterpreter' => 'AVG',
'MeterInterpreter' => 'SUM',
'CounterInterpreter' => 'MAX',
'AggregatorInterpreter' => false
);
public function __construct($config = array()) {
if(!isset($config['compressscheme'])) {
$config['compressscheme']['default'] = array( //Definition for all other channels
(7*24*60*60) => (1*60), //Older than 7 Days Datapoint per 1 Minute
(30*24*60*60) => (5*60), //Older than 30 Days Datapoint per 5 Minutes
(6*30*24*60*60) => (15*60), //Older than 6 Month Datapoint per 15 Minutes
(365*24*60*60) => (30*60), //Older than 1 Year Datapoint per 30 Minutes
);
}
if(!isset($config['verbose'])) $config['verbose']=true;
if(!isset($config['caching'])) $config['caching']=false;
if(!isset($config['sleep'])) $config['sleep']=0;
$this->config = $config;
$this->sql_config_load();
$this->sql_connect();
$this->cache_init();
$this->sql_getChannels();
$this->json_getEntities();
$this->compress();
}
private function sql_config_load() {
require(VZCOMPRESS2_VZPATH.'etc/volkszaehler.conf.php');
$this->sql_config = $config['db'];
}
private function sql_connect() {
//Let's hack a hopefully valid DSN based on configuration
$dsn = str_replace('pdo_', '', $this->sql_config['driver']).':dbname='.$this->sql_config['dbname'].';host='.$this->sql_config['host'];
try {
$this->sql = new PDO($dsn, $this->sql_config['user'], $this->sql_config['password']);
} catch (PDOException $e) {
trigger_error('Connection to database failed: ' . $e->getMessage(), E_USER_ERROR);
}
}
private function sql_simplequery($qry) {
usleep($this->config['sleep']);
if(!$stmt = $this->sql->prepare ($qry)) return false;
if(!$stmt->execute()) {
var_dump($stmt->errorInfo());
return false;
}
$out = $stmt->fetchAll();
$stmt->closeCursor();
return $out;
}
private function sql_getChannels() {
$this->channels = $this->sql_simplequery("SELECT * FROM `entities` WHERE `class` = 'channel';");
}
private function json_getEntities() {
$json = file_get_contents(VZCOMPRESS2_VZPATH.'lib/Definition/EntityDefinition.json');
//The JSON-File contains comments which violates the spec and derps PHPs decoder
//Remove the headers...
$json = explode("\n[\n", $json);
if(count($json) > 1) {
$json = '['.$json[1];
}else{
$json = $json[0];
}
$this->entities = json_decode($json);
}
private function cache_init() {
if($this->config['caching']) {
if(substr($this->config['caching'], -1) != '/') $this->config['caching'].='/';
if(file_exists($this->config['caching'])) {
if(!is_dir($this->config['caching'])) {
trigger_error('Can not cache to '.$this->config['caching'].' - Not a directory', E_USER_WARNING);
$this->config['caching'] = false;
}
if(!is_writable($this->config['caching'])) {
trigger_error('Can not cache to'.$this->config['caching'].' - Not writable', E_USER_WARNING);
$this->config['caching'] = false;
}
}else{
if(!mkdir($this->config['caching'], 0755, true)) {
trigger_error('Can not cache to'.$this->config['caching'].' - Could not create directory', E_USER_WARNING);
$this->config['caching'] = false;
}
}
}
}
private function cache_write($chanid, $timebase, $last) {
if(!$this->config['caching']) return false;
if($timebase == 0 || $last == 0) return false;
file_put_contents($this->config['caching'].$chanid.'.'.$timebase, $last);
}
private function cache_read($chanid, $timebase) {
if(!$this->config['caching']) return false;
if(!file_exists($this->config['caching'].$chanid.'.'.$timebase)) return false;
return (float)file_get_contents($this->config['caching'].$chanid.'.'.$timebase);
}
private function compress() {
$start = time();
foreach($this->channels as $channel) {
if(isset($this->config['channels']) && !in_array($channel['id'], $this->config['channels'])) continue;
echo strftime($this->timestr).' - Processing Sensor ID '.$channel['id'].'...'."\n";
$this->process($channel);
}
echo strftime($this->timestr).' - Done. Purged '.$this->purgecounter.' Datapoints from '.count($this->channels).' Channels in '.(time()-$start).' Seconds'."\n";
}
private function process($channel) {
//What type of sensor?
foreach($this->entities as $entity) {
if($entity->name == $channel['type']) {
$type = str_replace('Volkszaehler\\Interpreter\\', '', $entity->interpreter);
break;
}
}
if(!isset($type)) {
trigger_error('Could not detect inperpreter for type '.$channel['type'], E_USER_WARNING);
return false;
}
if(!isset(self::$sensortypemap[$type])) {
trigger_error('Interpreter '.$type.' is currently not supported', E_USER_WARNING);
return false;
}
$sqlfunc = self::$sensortypemap[$type];
if($sqlfunc == false) return false;
//Detect compressscheme
if(isset($this->config['compressscheme'][$channel['id']])) {
$cs = $this->config['compressscheme'][$channel['id']];
}else{
$cs = $this->config['compressscheme']['default'];
}
//Prepare compressscheme
ksort($cs);
$times = array_keys($cs);
$times[] = 0;
$timestamp = time(); //Local timestamp should be consistent during our transactions
//Run compression passes
for($i=0; $isql_simplequery("SELECT MIN(`timestamp`) as `min`, MAX(`timestamp`) as `max` FROM `data` WHERE `channel_id` = '".$channel['id']."' AND `timestamp` <= '".(($timestamp-$times[$i])*1000)."' AND `timestamp` > '".(($times[$i+1] > 0) ? (($timestamp-$times[$i+1])*1000) : 0 )."'");
if((float)$datatimes[0]['max'] == 0) {
echo strftime($this->timestr).' - Skipping compression pass for datapoints between '.strftime($this->timestr, ($timestamp-$times[$i+1])).' and '.strftime($this->timestr, ($timestamp-$times[$i])).' using a '.$cs[$times[$i]].' second timeframe: No Datapoints found'."\n";
continue;
}
//Caching
$lastrun = (float)$this->cache_read($channel['id'], $times[$i]);
if($lastrun && (float)$lastrun >= (float)$datatimes[0]['min']) {
echo strftime($this->timestr).' - Skipping datapoints between '.strftime($this->timestr, ((float)$datatimes[0]['min']/1000)).' and '.strftime($this->timestr, ((float)$lastrun/1000)).' (Cached)'."\n";
(float)$datatimes[0]['min'] = $lastrun;
}
echo strftime($this->timestr).' - Compressing datapoints between '.strftime($this->timestr, ((float)$datatimes[0]['min']/1000)).' and '.strftime($this->timestr, ((float)$datatimes[0]['max']/1000)).' using a '.$cs[$times[$i]].' second timeframe'."\n";
//Step 2: Loop new possible timeframes
$curtime = (float)$datatimes[0]['min'];
$lastpurgecount = $this->purgecounter;
$steps = (((float)$datatimes[0]['max']/1000)-((float)$datatimes[0]['min']/1000))/$cs[$times[$i]];
if($steps == 0) continue;
$step = 0;
$passstart = time();
do {
//Step 2.1: Increase timestamps
$lastcurtime = $curtime;
$step++;
$curtime += $cs[$times[$i]]*1000;
//Print status
if($this->config['verbose']) echo "\r".strftime($this->timestr).' - Processing: '.strftime($this->timestr, $lastcurtime/1000).' - '.strftime($this->timestr, $curtime/1000).' ('.round(100/$steps*$step).'%)... ';
//Step 2.1: Get new Value for timeframe
$newset = $this->sql_simplequery("SELECT ".$sqlfunc."(`value`) as `newval`, COUNT(`value`) as `datapoints`, MIN(`id`) as updateid FROM `data` WHERE `channel_id` = '".$channel['id']."' AND `timestamp` > '".$lastcurtime."' AND `timestamp` <= '".$curtime."';");
//Step 2.2: Skip if current timeframe has no or already just one datapoint
if(count($newset) == 0 || $newset[0]['datapoints'] < 2) continue;
$this->sql->beginTransaction();
//Step 2.3: Delete old Datapoints
if($this->sql_simplequery("DELETE FROM `data` WHERE `channel_id` = '".$channel['id']."' AND `timestamp` > '".$lastcurtime."' AND `timestamp` <= '".$curtime."' AND `id` != '".$newset[0]['updateid']."';") === false) {
$this->sql->rollback();
trigger_error('SQL FAILURE', E_USER_ERROR);
}
$this->purgecounter+=($newset[0]['datapoints']-1);
//Step 2.4: Update oldest Datapoint
// Note: Use UPDATE instead of INSERT to avoid filling up our id-pool
if($this->sql_simplequery("UPDATE `data` SET `timestamp` = '".($curtime-1)."', `value` = '".$newset[0]['newval']."' WHERE `channel_id` = '".$channel['id']."' AND `id` = '".$newset[0]['updateid']."';") === false) {
$this->sql->rollback();
trigger_error('SQL FAILURE', E_USER_ERROR);
}
//Step 2.6 Commit to Database
$this->sql->commit();
}while($curtime <= (float)$datatimes[0]['max']);
echo "\r".strftime($this->timestr).' - Removed '.($this->purgecounter-$lastpurgecount).' Datapoints in '.(time()-$passstart).' Seconds. '."\n";
$this->cache_write($channel['id'], $times[$i], (float)$datatimes[0]['max']);
}
}
}
/**
* Sample Configuration
*/
setlocale(LC_ALL, 'de_DE.UTF-8', 'de_DE@euro', 'de_DE', 'de', 'ge');
$config = array(
'verbose' => true, //Show times/percentage - should be disables on slow TTYs
'caching' => '/tmp/vzcompress2/', //Path or false
'sleep' => 500, //Microseconds to sleep between requests
//'channels' => array( //If defined only this channels are compressed
// '1', '2', '3' //Note that IDs are strings
//),
'compressscheme' => array(
// '1' => array( //Definition for Channel ID 1
// //...see below...
// ),
'default' => array( //Definition for all other channels
(7*24*60*60) => (1*60), //Older than 7 Days Datapoint per 1 Minute
(30*24*60*60) => (5*60), //Older than 30 Days Datapoint per 5 Minutes
(6*30*24*60*60) => (15*60), //Older than 6 Month Datapoint per 15 Minutes
(365*24*60*60) => (30*60), //Older than 1 Year Datapoint per 30 Minutes
)
)
);
$test = new vzcompress2($config);
?>