--- /dev/null
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+from __future__ import with_statement
+import os
+import os.path
+import sys
+import logging
+import logging.handlers
+import time
+import threading
+import signal
+import ConfigParser
+import traceback
+import shutil
+import zmq
+import zmq.utils.jsonapi as json
+import zmq.ssh.tunnel
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+lh_syslog = logging.handlers.SysLogHandler(address="/dev/log",facility=logging.handlers.SysLogHandler.LOG_LOCAL2)
+lh_syslog.setFormatter(logging.Formatter('sensorvalues-to-json-zmq.py: %(levelname)s %(message)s'))
+logger.addHandler(lh_syslog)
+lh_stderr = logging.StreamHandler()
+logger.addHandler(lh_stderr)
+
+######## Config File Data Class ############
+
+class UWSConfig:
+ def __init__(self,configfile=None):
+ #Synchronisation
+ self.lock=threading.Lock()
+ self.finished_reading=threading.Condition(self.lock)
+ self.finished_writing=threading.Condition(self.lock)
+ self.currently_reading=0
+ self.currently_writing=False
+ #Config Data
+ self.configfile=configfile
+ self.config_parser=ConfigParser.ConfigParser()
+ self.config_parser.add_section('json')
+ self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json")
+ self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json")
+ self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak")
+ self.config_parser.set('json','backup_every',"50")
+ self.config_parser.set('json','limit_list_len',"10000")
+ self.config_parser.add_section('zmq')
+ self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244")
+ self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000")
+ self.config_parser.set('zmq','sshkeyfile',"/home/guests/realraum.wirdorange.org/id_rsa")
+ self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate PresenceUpdate")
+ self.config_parser.add_section('debug')
+ self.config_parser.set('debug','enabled',"False")
+ self.config_mtime=0
+ if not self.configfile is None:
+ try:
+ cf_handle = open(self.configfile,"r")
+ cf_handle.close()
+ except IOError:
+ self.writeConfigFile()
+ else:
+ self.checkConfigUpdates()
+
+ def guardReading(self):
+ with self.lock:
+ while self.currently_writing:
+ self.finished_writing.wait()
+ self.currently_reading+=1
+
+ def unguardReading(self):
+ with self.lock:
+ self.currently_reading-=1
+ self.finished_reading.notifyAll()
+
+ def guardWriting(self):
+ with self.lock:
+ self.currently_writing=True
+ while self.currently_reading > 0:
+ self.finished_reading.wait()
+
+ def unguardWriting(self):
+ with self.lock:
+ self.currently_writing=False
+ self.finished_writing.notifyAll()
+
+ def checkConfigUpdates(self):
+ global logger
+ if self.configfile is None:
+ return
+ logging.debug("Checking Configfile mtime: "+self.configfile)
+ try:
+ mtime = os.path.getmtime(self.configfile)
+ except (IOError,OSError):
+ return
+ if self.config_mtime < mtime:
+ logging.debug("Reading Configfile")
+ self.guardWriting()
+ try:
+ self.config_parser.read(self.configfile)
+ self.config_mtime=os.path.getmtime(self.configfile)
+ except (ConfigParser.ParsingError, IOError), pe_ex:
+ logging.error("Error parsing Configfile: "+str(pe_ex))
+ self.unguardWriting()
+ self.guardReading()
+ if self.config_parser.get('debug','enabled') == "True":
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.INFO)
+ self.unguardReading()
+
+ def writeConfigFile(self):
+ if self.configfile is None:
+ return
+ logging.debug("Writing Configfile "+self.configfile)
+ self.guardReading()
+ try:
+ cf_handle = open(self.configfile,"w")
+ self.config_parser.write(cf_handle)
+ cf_handle.close()
+ self.config_mtime=os.path.getmtime(self.configfile)
+ except IOError, io_ex:
+ logging.error("Error writing Configfile: "+str(io_ex))
+ self.configfile=None
+ self.unguardReading()
+
+ def __getattr__(self, name):
+ underscore_pos=name.find('_')
+ if underscore_pos < 0:
+ raise AttributeError
+ rv=None
+ self.guardReading()
+ try:
+ rv = self.config_parser.get(name[0:underscore_pos], name[underscore_pos+1:])
+ except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
+ self.unguardReading()
+ finally:
+ self.unguardReading()
+ return rv
+
+
+######## r3 ZMQ ############
+
+def sendR3Message(socket, structname, datadict):
+ socket.send_multipart([structname, json.dumps(datadict)])
+
+def decodeR3Message(multipart_msg):
+ try:
+ return (multipart_msg[0], json.loads(multipart_msg[1]))
+ except Exception, e:
+ logging.debug("decodeR3Message:"+str(e))
+ return ("",{})
+
+######## Main ############
+
+def exitHandler(signum, frame):
+ logging.info("stopping")
+ try:
+ zmqsub.close()
+ zmqctx.destroy()
+ except:
+ pass
+ sys.exit(0)
+
+#signal.signal(signal.SIGTERM, exitHandler)
+signal.signal(signal.SIGINT, exitHandler)
+signal.signal(signal.SIGQUIT, exitHandler)
+
+logging.info("%s started" % os.path.basename(sys.argv[0]))
+
+if len(sys.argv) > 1:
+ uwscfg = UWSConfig(sys.argv[1])
+else:
+ uwscfg = UWSConfig()
+
+try:
+ with open(uwscfg.json_moveto_path,"rb") as fh:
+ sensor_store = json.loads(fh.read())
+except Exception, e:
+ logging.debug(e)
+ try:
+ with open(uwscfg.json_backup_path,"rb") as fh:
+ sensor_store = json.loads(fh.read())
+ except Exception, e:
+ logging.debug(e)
+ sensor_store = {}
+
+while True:
+ try:
+ #Start zmq connection to publish / forward sensor data
+ zmqctx = zmq.Context()
+ zmqctx.linger = 0
+ zmqsub = zmqctx.socket(zmq.SUB)
+ for topic in uwscfg.zmq_subscribe.split(" "):
+ zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
+ #zmqsub.connect(uwscfg.zmq_remote_uri)
+ zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
+ backup_counter = 0
+
+ while True:
+ #receive sensor updates
+ data = zmqsub.recv_multipart()
+ (structname, dictdata) = decodeR3Message(data)
+ logging.debug("Got data: " + structname + ":"+ str(dictdata))
+
+ uwscfg.checkConfigUpdates()
+
+ # store data in local dict of lists
+ if "Sensorindex" in dictdata:
+ structname += str(dictdata["Sensorindex"])
+ del dictdata["Sensorindex"]
+ ts = dictdata["Ts"]
+ del dictdata["Ts"]
+
+ if not structname in sensor_store:
+ sensor_store[structname] = []
+
+ sensor_store[structname].append((ts,dictdata.values()[0]))
+ backup_counter += 1
+
+ #cap list lenght
+ if uwscfg.json_limit_list_len:
+ if len(sensor_store[structname]) > uwscfg.json_limit_list_len:
+ sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:]
+
+ # save sensor_store to json for apache
+ with open(uwscfg.json_write_path,"wb") as fh:
+ fh.truncate()
+ fh.write(json.dumps(sensor_store))
+ if backup_counter > uwscfg.json_backup_every:
+ backup_counter = 0
+ shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
+ shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
+
+ except Exception, ex:
+ logging.error("main: "+str(ex))
+ traceback.print_exc(file=sys.stdout)
+ try:
+ zmqsub.close()
+ zmqctx.destroy()
+ except:
+ pass
+ time.sleep(5)
+
+