X-Git-Url: https://git.realraum.at/?p=svn42.git;a=blobdiff_plain;f=sensorvalues-to-json-zmq.py;fp=sensorvalues-to-json-zmq.py;h=0000000000000000000000000000000000000000;hp=2d42b5446d74f54ed4fe28a75facd237d125028f;hb=c775b5528f8f96e75c28264b9cea8525c6da9298;hpb=a87c91b7d4544703879b7ef6e472b863765e3949 diff --git a/sensorvalues-to-json-zmq.py b/sensorvalues-to-json-zmq.py deleted file mode 100755 index 2d42b54..0000000 --- a/sensorvalues-to-json-zmq.py +++ /dev/null @@ -1,335 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -from __future__ import with_statement -import os -import os.path -import sys -import logging -import logging.handlers -import time -import threading -import signal -import ConfigParser -import traceback -import shutil -import zmq -import zmq.utils.jsonapi as json -import zmq.ssh.tunnel - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -lh_syslog = logging.handlers.SysLogHandler(address="/dev/log",facility=logging.handlers.SysLogHandler.LOG_LOCAL2) -lh_syslog.setFormatter(logging.Formatter('sensorvalues-to-json-zmq.py: %(levelname)s %(message)s')) -logger.addHandler(lh_syslog) -lh_stderr = logging.StreamHandler() -logger.addHandler(lh_stderr) - -######## Config File Data Class ############ - -class UWSConfig: - def __init__(self,configfile=None): - #Synchronisation - self.lock=threading.Lock() - self.finished_reading=threading.Condition(self.lock) - self.finished_writing=threading.Condition(self.lock) - self.currently_reading=0 - self.currently_writing=False - #Config Data - self.configfile=configfile - self.config_parser=ConfigParser.ConfigParser() - self.config_parser.add_section('json') - self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json") - self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json") - self.config_parser.set('json','backup_path',"/home/guests/realraum.wirdorange.org/r3sensors.json.bak") - self.config_parser.set('json','backup_every',"50") - self.config_parser.set('json','limit_list_len',"10000") - self.config_parser.set('json','updateinterval',"30") - self.config_parser.add_section('zmq') - self.config_parser.set('zmq','remote_uri',"tcp://torwaechter.realraum.at:4244") - self.config_parser.set('zmq','sshtunnel',"realraum@torwaechter.realraum.at:22000") - self.config_parser.set('zmq','sshkeyfile',"/home/guests/realraum.wirdorange.org/id_rsa") - self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate") - self.config_parser.add_section('debug') - self.config_parser.set('debug','enabled',"False") - self.config_mtime=0 - if not self.configfile is None: - try: - cf_handle = open(self.configfile,"r") - cf_handle.close() - except IOError: - self.writeConfigFile() - else: - self.checkConfigUpdates() - - def guardReading(self): - with self.lock: - while self.currently_writing: - self.finished_writing.wait() - self.currently_reading+=1 - - def unguardReading(self): - with self.lock: - self.currently_reading-=1 - self.finished_reading.notifyAll() - - def guardWriting(self): - with self.lock: - self.currently_writing=True - while self.currently_reading > 0: - self.finished_reading.wait() - - def unguardWriting(self): - with self.lock: - self.currently_writing=False - self.finished_writing.notifyAll() - - def checkConfigUpdates(self): - global logger - if self.configfile is None: - return - logging.debug("Checking Configfile mtime: "+self.configfile) - try: - mtime = os.path.getmtime(self.configfile) - except (IOError,OSError): - return - if self.config_mtime < mtime: - logging.debug("Reading Configfile") - self.guardWriting() - try: - self.config_parser.read(self.configfile) - self.config_mtime=os.path.getmtime(self.configfile) - except (ConfigParser.ParsingError, IOError), pe_ex: - logging.error("Error parsing Configfile: "+str(pe_ex)) - self.unguardWriting() - self.guardReading() - if self.config_parser.get('debug','enabled') == "True": - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - self.unguardReading() - - def writeConfigFile(self): - if self.configfile is None: - return - logging.debug("Writing Configfile "+self.configfile) - self.guardReading() - try: - cf_handle = open(self.configfile,"w") - self.config_parser.write(cf_handle) - cf_handle.close() - self.config_mtime=os.path.getmtime(self.configfile) - except IOError, io_ex: - logging.error("Error writing Configfile: "+str(io_ex)) - self.configfile=None - self.unguardReading() - - def __getattr__(self, name): - underscore_pos=name.find('_') - if underscore_pos < 0: - raise AttributeError - rv=None - self.guardReading() - try: - rv = self.config_parser.get(name[0:underscore_pos], name[underscore_pos+1:]) - except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): - self.unguardReading() - finally: - self.unguardReading() - return rv - - -######## r3 ZMQ ############ - -def sendR3Message(socket, structname, datadict): - socket.send_multipart([structname, json.dumps(datadict)]) - -def decodeR3Message(multipart_msg): - try: - return (multipart_msg[0], json.loads(multipart_msg[1])) - except Exception, e: - logging.debug("decodeR3Message:"+str(e)) - return ("",{}) - -######## Main ############ - -def exitHandler(signum, frame): - logging.info("stopping") - try: - zmqsub.close() - zmqctx.destroy() - except: - pass - sys.exit(0) - -time_column_name_="Time" -latest_values_ = {} -sensor_store_ = {} -sensor_cols_num_ = {} #stores number of columns for a sensor not counting Time (x-axis) column. AKA the number of data-rows. Equals highest SensorIndex +1 -reset_these_structnames_ = {} - -def addEventToTempLastValueStore(structname, msgdata): - global latest_values_ - sensorindex = int(msgdata["Sensorindex"]) if "Sensorindex" in msgdata else 0 - if not structname in latest_values_: - latest_values_[structname]=[] - if not structname in sensor_cols_num_ or sensor_cols_num_[structname] < sensorindex +1: - sensor_cols_num_[structname] = sensorindex +1 - if len(latest_values_[structname]) < sensor_cols_num_[structname]: - latest_values_[structname] += [0] * (sensor_cols_num_[structname] - len(latest_values_[structname])) - expandSensorStoreLists(structname, sensor_cols_num_[structname]) - # store Value in temp last value store: - try: - del msgdata["Sensorindex"] - except: - pass - try: - del msgdata["Ts"] - except: - pass - if len(msgdata) > 0: - #store first value that is not Sensorindex or Ts into store - latest_values_[structname][sensorindex] = msgdata.values()[0] - else: - #if that value does not exist, (i.e. movementevent), count event occurances - latest_values_[structname][sensorindex] += 1 - reset_these_structnames_[structname] = True - - -def cleanTempLastValueOfMovementValues(): - global latest_values_ - for k in reset_these_structnames_.keys(): - latest_values_[k] = [0] * sensor_cols_num_[k] - - -def expandSensorStoreLists(structname, newlength): - global sensor_store_ - if not structname in sensor_store_: - sensor_store_[structname]=[] - #remove old headings so we can add them again below - try: - if sensor_store_[structname][0][0] == time_column_name_: - sensor_store_[structname].pop(0) - except: - pass - #expand all previous value lists - newlength_including_time = newlength +1 - sensor_store_[structname] = map(lambda l: l[:newlength_including_time] + ([0] * (newlength_including_time - len(l))) , sensor_store_[structname]) - - -def addEventsToStore(): - global sensor_store_ - ts = int(time.time()) - for structname in latest_values_.keys(): - if not structname in sensor_store_: - sensor_store_[structname]=[] - - #if missing, add Header List [Time, 0, 1, 2] - if len(sensor_store_[structname]) == 0 or len(sensor_store_[structname][0]) < 2 or sensor_store_[structname][0][0] != time_column_name_: - sensor_store_[structname].insert(0,[time_column_name_] + list(map(lambda n: "Sensor %d"%n,range(0,sensor_cols_num_[structname])))) - - # add values - try: - # if latest values are identical, just update timestamp - if sensor_store_[structname][-1][1:] == latest_values_[structname] and sensor_store_[structname][-1][1:] == sensor_store_[structname][-2][1:]: - sensor_store_[structname].pop() - except: - pass - sensor_store_[structname].append([ts] + latest_values_[structname]) - - #cap list lenght - if uwscfg.json_limit_list_len: - if len(sensor_store_[structname]) > uwscfg.json_limit_list_len: - sensor_store_[structname] = sensor_store_[structname][- uwscfg.json_limit_list_len:] - - -if __name__ == "__main__": - #signal.signal(signal.SIGTERM, exitHandler) - signal.signal(signal.SIGINT, exitHandler) - signal.signal(signal.SIGQUIT, exitHandler) - - logging.info("%s started" % os.path.basename(sys.argv[0])) - - if len(sys.argv) > 1: - uwscfg = UWSConfig(sys.argv[1]) - else: - uwscfg = UWSConfig() - - try: - with open(uwscfg.json_moveto_path,"rb") as fh: - sensor_store_ = json.loads(fh.read()) - except Exception, e: - logging.debug(e) - try: - with open(uwscfg.json_backup_path,"rb") as fh: - sensor_store_ = json.loads(fh.read()) - except Exception, e: - logging.debug(e) - - - for k in set(sensor_store_.keys()).difference(set(uwscfg.zmq_subscribe.split(" "))): - del sensor_store_[k] # del old sensordata of sensor we do not subscribe to - - for k in sensor_store_.keys(): - try: - if len(sensor_store_[k][0]) > 1: - sensor_cols_num_[k] = len(sensor_store_[k][0]) -1 - except: - pass - - while True: - try: - #Start zmq connection to publish / forward sensor data - zmqctx = zmq.Context() - zmqctx.linger = 0 - zmqsub = zmqctx.socket(zmq.SUB) - for topic in uwscfg.zmq_subscribe.split(" "): - zmqsub.setsockopt(zmq.SUBSCRIBE, topic) - if uwscfg.zmq_sshtunnel: - zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile) - else: - zmqsub.connect(uwscfg.zmq_remote_uri) - backup_counter = 0 - last_update = int(time.time()) - - while True: - #receive sensor updates - data = zmqsub.recv_multipart() - (structname, dictdata) = decodeR3Message(data) - logging.debug("Got data: " + structname + ":"+ str(dictdata)) - - uwscfg.checkConfigUpdates() - - addEventToTempLastValueStore(structname, dictdata) - - logging.debug("lastdata:"+str(latest_values_)) - if int(time.time()) - last_update < int(uwscfg.json_updateinterval): - continue - - logging.debug("update interval elapsed") - last_update = int(time.time()) - - addEventsToStore() - cleanTempLastValueOfMovementValues() - logging.debug("post-cleanMovement lastdata:"+str(latest_values_)) - - backup_counter += 1 - # save sensor_store_ to json for apache - with open(uwscfg.json_write_path,"wb") as fh: - fh.truncate() - fh.write(json.dumps(sensor_store_)) - if backup_counter > uwscfg.json_backup_every: - backup_counter = 0 - shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path) - shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path) - - except Exception, ex: - logging.error("main: "+str(ex)) - traceback.print_exc(file=sys.stdout) - try: - zmqsub.close() - zmqctx.destroy() - except: - pass - time.sleep(5) - -