From 8e73e942c082365269f650cf5a489aaa07452f28 Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Sat, 5 Oct 2013 22:59:14 +0000 Subject: [PATCH] store sensorvalues to json on vex --- sensorvalues-to-json-zmq.py | 243 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100755 sensorvalues-to-json-zmq.py diff --git a/sensorvalues-to-json-zmq.py b/sensorvalues-to-json-zmq.py new file mode 100755 index 0000000..a8d26a1 --- /dev/null +++ b/sensorvalues-to-json-zmq.py @@ -0,0 +1,243 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +from __future__ import with_statement +import os +import os.path +import sys +import logging +import logging.handlers +import time +import threading +import signal +import ConfigParser +import traceback +import shutil +import zmq +import zmq.utils.jsonapi as json +import zmq.ssh.tunnel + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +lh_syslog = logging.handlers.SysLogHandler(address="/dev/log",facility=logging.handlers.SysLogHandler.LOG_LOCAL2) +lh_syslog.setFormatter(logging.Formatter('sensorvalues-to-json-zmq.py: %(levelname)s %(message)s')) +logger.addHandler(lh_syslog) +lh_stderr = logging.StreamHandler() +logger.addHandler(lh_stderr) + +######## Config File Data Class ############ + +class UWSConfig: + def __init__(self,configfile=None): + #Synchronisation + self.lock=threading.Lock() + self.finished_reading=threading.Condition(self.lock) + self.finished_writing=threading.Condition(self.lock) + self.currently_reading=0 + self.currently_writing=False + #Config Data + self.configfile=configfile + self.config_parser=ConfigParser.ConfigParser() + self.config_parser.add_section('json') + self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json") + self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json") + self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak") + self.config_parser.set('json','backup_every',"50") + self.config_parser.set('json','limit_list_len',"10000") + self.config_parser.add_section('zmq') + self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244") + self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000") + self.config_parser.set('zmq','sshkeyfile',"/home/guests/realraum.wirdorange.org/id_rsa") + self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate PresenceUpdate") + self.config_parser.add_section('debug') + self.config_parser.set('debug','enabled',"False") + self.config_mtime=0 + if not self.configfile is None: + try: + cf_handle = open(self.configfile,"r") + cf_handle.close() + except IOError: + self.writeConfigFile() + else: + self.checkConfigUpdates() + + def guardReading(self): + with self.lock: + while self.currently_writing: + self.finished_writing.wait() + self.currently_reading+=1 + + def unguardReading(self): + with self.lock: + self.currently_reading-=1 + self.finished_reading.notifyAll() + + def guardWriting(self): + with self.lock: + self.currently_writing=True + while self.currently_reading > 0: + self.finished_reading.wait() + + def unguardWriting(self): + with self.lock: + self.currently_writing=False + self.finished_writing.notifyAll() + + def checkConfigUpdates(self): + global logger + if self.configfile is None: + return + logging.debug("Checking Configfile mtime: "+self.configfile) + try: + mtime = os.path.getmtime(self.configfile) + except (IOError,OSError): + return + if self.config_mtime < mtime: + logging.debug("Reading Configfile") + self.guardWriting() + try: + self.config_parser.read(self.configfile) + self.config_mtime=os.path.getmtime(self.configfile) + except (ConfigParser.ParsingError, IOError), pe_ex: + logging.error("Error parsing Configfile: "+str(pe_ex)) + self.unguardWriting() + self.guardReading() + if self.config_parser.get('debug','enabled') == "True": + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + self.unguardReading() + + def writeConfigFile(self): + if self.configfile is None: + return + logging.debug("Writing Configfile "+self.configfile) + self.guardReading() + try: + cf_handle = open(self.configfile,"w") + self.config_parser.write(cf_handle) + cf_handle.close() + self.config_mtime=os.path.getmtime(self.configfile) + except IOError, io_ex: + logging.error("Error writing Configfile: "+str(io_ex)) + self.configfile=None + self.unguardReading() + + def __getattr__(self, name): + underscore_pos=name.find('_') + if underscore_pos < 0: + raise AttributeError + rv=None + self.guardReading() + try: + rv = self.config_parser.get(name[0:underscore_pos], name[underscore_pos+1:]) + except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): + self.unguardReading() + finally: + self.unguardReading() + return rv + + +######## r3 ZMQ ############ + +def sendR3Message(socket, structname, datadict): + socket.send_multipart([structname, json.dumps(datadict)]) + +def decodeR3Message(multipart_msg): + try: + return (multipart_msg[0], json.loads(multipart_msg[1])) + except Exception, e: + logging.debug("decodeR3Message:"+str(e)) + return ("",{}) + +######## Main ############ + +def exitHandler(signum, frame): + logging.info("stopping") + try: + zmqsub.close() + zmqctx.destroy() + except: + pass + sys.exit(0) + +#signal.signal(signal.SIGTERM, exitHandler) +signal.signal(signal.SIGINT, exitHandler) +signal.signal(signal.SIGQUIT, exitHandler) + +logging.info("%s started" % os.path.basename(sys.argv[0])) + +if len(sys.argv) > 1: + uwscfg = UWSConfig(sys.argv[1]) +else: + uwscfg = UWSConfig() + +try: + with open(uwscfg.json_moveto_path,"rb") as fh: + sensor_store = json.loads(fh.read()) +except Exception, e: + logging.debug(e) + try: + with open(uwscfg.json_backup_path,"rb") as fh: + sensor_store = json.loads(fh.read()) + except Exception, e: + logging.debug(e) + sensor_store = {} + +while True: + try: + #Start zmq connection to publish / forward sensor data + zmqctx = zmq.Context() + zmqctx.linger = 0 + zmqsub = zmqctx.socket(zmq.SUB) + for topic in uwscfg.zmq_subscribe.split(" "): + zmqsub.setsockopt(zmq.SUBSCRIBE, topic) + #zmqsub.connect(uwscfg.zmq_remote_uri) + zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile) + backup_counter = 0 + + while True: + #receive sensor updates + data = zmqsub.recv_multipart() + (structname, dictdata) = decodeR3Message(data) + logging.debug("Got data: " + structname + ":"+ str(dictdata)) + + uwscfg.checkConfigUpdates() + + # store data in local dict of lists + if "Sensorindex" in dictdata: + structname += str(dictdata["Sensorindex"]) + del dictdata["Sensorindex"] + ts = dictdata["Ts"] + del dictdata["Ts"] + + if not structname in sensor_store: + sensor_store[structname] = [] + + sensor_store[structname].append((ts,dictdata.values()[0])) + backup_counter += 1 + + #cap list lenght + if uwscfg.json_limit_list_len: + if len(sensor_store[structname]) > uwscfg.json_limit_list_len: + sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:] + + # save sensor_store to json for apache + with open(uwscfg.json_write_path,"wb") as fh: + fh.truncate() + fh.write(json.dumps(sensor_store)) + if backup_counter > uwscfg.json_backup_every: + backup_counter = 0 + shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path) + shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path) + + except Exception, ex: + logging.error("main: "+str(ex)) + traceback.print_exc(file=sys.stdout) + try: + zmqsub.close() + zmqctx.destroy() + except: + pass + time.sleep(5) + + -- 1.7.10.4