#!php -f * @package tools * @license http://www.opensource.org/licenses/gpl-license.php GNU Public License */ /** * This file is part of volkzaehler.org * * volkzaehler.org is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * volkzaehler.org is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with volkszaehler.org. If not, see . */ /** * Path to Volkszaehler * * Absolute Path or relative to current workdir */ define('VZCOMPRESS2_VZPATH', '../../'); //Dummy VZ_DIR so volkszaehler.conf.php doesn't throw warnings if(!defined('VZ_DIR')) define('VZ_DIR', ''); class vzcompress2 { private $config; private $sql_config; private $sql; private $channels; private $entities; private $purgecounter; private $timestr = '%x %X'; static private $sensortypemap = array( 'SensorInterpreter' => 'AVG', 'MeterInterpreter' => 'SUM', 'CounterInterpreter' => 'MAX', 'AggregatorInterpreter' => false ); public function __construct($config = array()) { if(!isset($config['compressscheme'])) { $config['compressscheme']['default'] = array( //Definition for all other channels (7*24*60*60) => (1*60), //Older than 7 Days Datapoint per 1 Minute (30*24*60*60) => (5*60), //Older than 30 Days Datapoint per 5 Minutes (6*30*24*60*60) => (15*60), //Older than 6 Month Datapoint per 15 Minutes (365*24*60*60) => (30*60), //Older than 1 Year Datapoint per 30 Minutes ); } if(!isset($config['verbose'])) $config['verbose']=true; if(!isset($config['caching'])) $config['caching']=false; if(!isset($config['sleep'])) $config['sleep']=0; $this->config = $config; $this->sql_config_load(); $this->sql_connect(); $this->cache_init(); $this->sql_getChannels(); $this->json_getEntities(); $this->compress(); } private function sql_config_load() { require(VZCOMPRESS2_VZPATH.'etc/volkszaehler.conf.php'); $this->sql_config = $config['db']; } private function sql_connect() { //Let's hack a hopefully valid DSN based on configuration $dsn = str_replace('pdo_', '', $this->sql_config['driver']).':dbname='.$this->sql_config['dbname'].';host='.$this->sql_config['host']; try { $this->sql = new PDO($dsn, $this->sql_config['user'], $this->sql_config['password']); } catch (PDOException $e) { trigger_error('Connection to database failed: ' . $e->getMessage(), E_USER_ERROR); } } private function sql_simplequery($qry) { usleep($this->config['sleep']); if(!$stmt = $this->sql->prepare ($qry)) return false; if(!$stmt->execute()) { var_dump($stmt->errorInfo()); return false; } $out = $stmt->fetchAll(); $stmt->closeCursor(); return $out; } private function sql_getChannels() { $this->channels = $this->sql_simplequery("SELECT * FROM `entities` WHERE `class` = 'channel';"); } private function json_getEntities() { $json = file_get_contents(VZCOMPRESS2_VZPATH.'lib/Definition/EntityDefinition.json'); //The JSON-File contains comments which violates the spec and derps PHPs decoder //Remove the headers... $json = explode("\n[\n", $json); if(count($json) > 1) { $json = '['.$json[1]; }else{ $json = $json[0]; } $this->entities = json_decode($json); } private function cache_init() { if($this->config['caching']) { if(substr($this->config['caching'], -1) != '/') $this->config['caching'].='/'; if(file_exists($this->config['caching'])) { if(!is_dir($this->config['caching'])) { trigger_error('Can not cache to '.$this->config['caching'].' - Not a directory', E_USER_WARNING); $this->config['caching'] = false; } if(!is_writable($this->config['caching'])) { trigger_error('Can not cache to'.$this->config['caching'].' - Not writable', E_USER_WARNING); $this->config['caching'] = false; } }else{ if(!mkdir($this->config['caching'], 0755, true)) { trigger_error('Can not cache to'.$this->config['caching'].' - Could not create directory', E_USER_WARNING); $this->config['caching'] = false; } } } } private function cache_write($chanid, $timebase, $last) { if(!$this->config['caching']) return false; if($timebase == 0 || $last == 0) return false; file_put_contents($this->config['caching'].$chanid.'.'.$timebase, $last); } private function cache_read($chanid, $timebase) { if(!$this->config['caching']) return false; if(!file_exists($this->config['caching'].$chanid.'.'.$timebase)) return false; return (float)file_get_contents($this->config['caching'].$chanid.'.'.$timebase); } private function compress() { $start = time(); foreach($this->channels as $channel) { if(isset($this->config['channels']) && !in_array($channel['id'], $this->config['channels'])) continue; echo strftime($this->timestr).' - Processing Sensor ID '.$channel['id'].'...'."\n"; $this->process($channel); } echo strftime($this->timestr).' - Done. Purged '.$this->purgecounter.' Datapoints from '.count($this->channels).' Channels in '.(time()-$start).' Seconds'."\n"; } private function process($channel) { //What type of sensor? foreach($this->entities as $entity) { if($entity->name == $channel['type']) { $type = str_replace('Volkszaehler\\Interpreter\\', '', $entity->interpreter); break; } } if(!isset($type)) { trigger_error('Could not detect inperpreter for type '.$channel['type'], E_USER_WARNING); return false; } if(!isset(self::$sensortypemap[$type])) { trigger_error('Interpreter '.$type.' is currently not supported', E_USER_WARNING); return false; } $sqlfunc = self::$sensortypemap[$type]; if($sqlfunc == false) return false; //Detect compressscheme if(isset($this->config['compressscheme'][$channel['id']])) { $cs = $this->config['compressscheme'][$channel['id']]; }else{ $cs = $this->config['compressscheme']['default']; } //Prepare compressscheme ksort($cs); $times = array_keys($cs); $times[] = 0; $timestamp = time(); //Local timestamp should be consistent during our transactions //Run compression passes for($i=0; $isql_simplequery("SELECT MIN(`timestamp`) as `min`, MAX(`timestamp`) as `max` FROM `data` WHERE `channel_id` = '".$channel['id']."' AND `timestamp` <= '".(($timestamp-$times[$i])*1000)."' AND `timestamp` > '".(($times[$i+1] > 0) ? (($timestamp-$times[$i+1])*1000) : 0 )."'"); if((float)$datatimes[0]['max'] == 0) { echo strftime($this->timestr).' - Skipping compression pass for datapoints between '.strftime($this->timestr, ($timestamp-$times[$i+1])).' and '.strftime($this->timestr, ($timestamp-$times[$i])).' using a '.$cs[$times[$i]].' second timeframe: No Datapoints found'."\n"; continue; } //Caching $lastrun = (float)$this->cache_read($channel['id'], $times[$i]); if($lastrun && (float)$lastrun >= (float)$datatimes[0]['min']) { echo strftime($this->timestr).' - Skipping datapoints between '.strftime($this->timestr, ((float)$datatimes[0]['min']/1000)).' and '.strftime($this->timestr, ((float)$lastrun/1000)).' (Cached)'."\n"; (float)$datatimes[0]['min'] = $lastrun; } echo strftime($this->timestr).' - Compressing datapoints between '.strftime($this->timestr, ((float)$datatimes[0]['min']/1000)).' and '.strftime($this->timestr, ((float)$datatimes[0]['max']/1000)).' using a '.$cs[$times[$i]].' second timeframe'."\n"; //Step 2: Loop new possible timeframes $curtime = (float)$datatimes[0]['min']; $lastpurgecount = $this->purgecounter; $steps = (((float)$datatimes[0]['max']/1000)-((float)$datatimes[0]['min']/1000))/$cs[$times[$i]]; if($steps == 0) continue; $step = 0; $passstart = time(); do { //Step 2.1: Increase timestamps $lastcurtime = $curtime; $step++; $curtime += $cs[$times[$i]]*1000; //Print status if($this->config['verbose']) echo "\r".strftime($this->timestr).' - Processing: '.strftime($this->timestr, $lastcurtime/1000).' - '.strftime($this->timestr, $curtime/1000).' ('.round(100/$steps*$step).'%)... '; //Step 2.1: Get new Value for timeframe $newset = $this->sql_simplequery("SELECT ".$sqlfunc."(`value`) as `newval`, COUNT(`value`) as `datapoints`, MIN(`id`) as updateid FROM `data` WHERE `channel_id` = '".$channel['id']."' AND `timestamp` > '".$lastcurtime."' AND `timestamp` <= '".$curtime."';"); //Step 2.2: Skip if current timeframe has no or already just one datapoint if(count($newset) == 0 || $newset[0]['datapoints'] < 2) continue; $this->sql->beginTransaction(); //Step 2.3: Delete old Datapoints if($this->sql_simplequery("DELETE FROM `data` WHERE `channel_id` = '".$channel['id']."' AND `timestamp` > '".$lastcurtime."' AND `timestamp` <= '".$curtime."' AND `id` != '".$newset[0]['updateid']."';") === false) { $this->sql->rollback(); trigger_error('SQL FAILURE', E_USER_ERROR); } $this->purgecounter+=($newset[0]['datapoints']-1); //Step 2.4: Update oldest Datapoint // Note: Use UPDATE instead of INSERT to avoid filling up our id-pool if($this->sql_simplequery("UPDATE `data` SET `timestamp` = '".($curtime-1)."', `value` = '".$newset[0]['newval']."' WHERE `channel_id` = '".$channel['id']."' AND `id` = '".$newset[0]['updateid']."';") === false) { $this->sql->rollback(); trigger_error('SQL FAILURE', E_USER_ERROR); } //Step 2.6 Commit to Database $this->sql->commit(); }while($curtime <= (float)$datatimes[0]['max']); echo "\r".strftime($this->timestr).' - Removed '.($this->purgecounter-$lastpurgecount).' Datapoints in '.(time()-$passstart).' Seconds. '."\n"; $this->cache_write($channel['id'], $times[$i], (float)$datatimes[0]['max']); } } } /** * Sample Configuration */ setlocale(LC_ALL, 'de_DE.UTF-8', 'de_DE@euro', 'de_DE', 'de', 'ge'); $config = array( 'verbose' => true, //Show times/percentage - should be disables on slow TTYs 'caching' => '/tmp/vzcompress2/', //Path or false 'sleep' => 500, //Microseconds to sleep between requests //'channels' => array( //If defined only this channels are compressed // '1', '2', '3' //Note that IDs are strings //), 'compressscheme' => array( // '1' => array( //Definition for Channel ID 1 // //...see below... // ), 'default' => array( //Definition for all other channels (7*24*60*60) => (1*60), //Older than 7 Days Datapoint per 1 Minute (30*24*60*60) => (5*60), //Older than 30 Days Datapoint per 5 Minutes (6*30*24*60*60) => (15*60), //Older than 6 Month Datapoint per 15 Minutes (365*24*60*60) => (30*60), //Older than 1 Year Datapoint per 30 Minutes ) ) ); $test = new vzcompress2($config); ?>