store sensorvalues to json on vex
authorBernhard Tittelbach <xro@realraum.at>
Sat, 5 Oct 2013 22:59:14 +0000 (22:59 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Sat, 5 Oct 2013 22:59:14 +0000 (22:59 +0000)
sensorvalues-to-json-zmq.py [new file with mode: 0755]

diff --git a/sensorvalues-to-json-zmq.py b/sensorvalues-to-json-zmq.py
new file mode 100755 (executable)
index 0000000..a8d26a1
--- /dev/null
@@ -0,0 +1,243 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+from __future__ import with_statement
+import os
+import os.path
+import sys
+import logging
+import logging.handlers
+import time
+import threading
+import signal
+import ConfigParser
+import traceback
+import shutil
+import zmq
+import zmq.utils.jsonapi as json
+import zmq.ssh.tunnel
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+lh_syslog = logging.handlers.SysLogHandler(address="/dev/log",facility=logging.handlers.SysLogHandler.LOG_LOCAL2)
+lh_syslog.setFormatter(logging.Formatter('sensorvalues-to-json-zmq.py: %(levelname)s %(message)s'))
+logger.addHandler(lh_syslog)
+lh_stderr = logging.StreamHandler()
+logger.addHandler(lh_stderr)
+
+######## Config File Data Class ############
+
+class UWSConfig:
+  def __init__(self,configfile=None):
+    #Synchronisation
+    self.lock=threading.Lock()
+    self.finished_reading=threading.Condition(self.lock)
+    self.finished_writing=threading.Condition(self.lock)
+    self.currently_reading=0
+    self.currently_writing=False
+    #Config Data
+    self.configfile=configfile
+    self.config_parser=ConfigParser.ConfigParser()
+    self.config_parser.add_section('json')
+    self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json")
+    self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json")
+    self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak")
+    self.config_parser.set('json','backup_every',"50")
+    self.config_parser.set('json','limit_list_len',"10000")
+    self.config_parser.add_section('zmq')
+    self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244")
+    self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000")
+    self.config_parser.set('zmq','sshkeyfile',"/home/guests/realraum.wirdorange.org/id_rsa")
+    self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate PresenceUpdate")
+    self.config_parser.add_section('debug')
+    self.config_parser.set('debug','enabled',"False")
+    self.config_mtime=0
+    if not self.configfile is None:
+      try:
+        cf_handle = open(self.configfile,"r")
+        cf_handle.close()
+      except IOError:
+        self.writeConfigFile()
+      else:
+        self.checkConfigUpdates()
+
+  def guardReading(self):
+    with self.lock:
+      while self.currently_writing:
+        self.finished_writing.wait()
+      self.currently_reading+=1
+
+  def unguardReading(self):
+    with self.lock:
+      self.currently_reading-=1
+      self.finished_reading.notifyAll()
+
+  def guardWriting(self):
+    with self.lock:
+      self.currently_writing=True
+      while self.currently_reading > 0:
+        self.finished_reading.wait()
+
+  def unguardWriting(self):
+    with self.lock:
+      self.currently_writing=False
+      self.finished_writing.notifyAll()
+
+  def checkConfigUpdates(self):
+    global logger
+    if self.configfile is None:
+      return
+    logging.debug("Checking Configfile mtime: "+self.configfile)
+    try:
+      mtime = os.path.getmtime(self.configfile)
+    except (IOError,OSError):
+      return
+    if self.config_mtime < mtime:
+      logging.debug("Reading Configfile")
+      self.guardWriting()
+      try:
+        self.config_parser.read(self.configfile)
+        self.config_mtime=os.path.getmtime(self.configfile)
+      except (ConfigParser.ParsingError, IOError), pe_ex:
+        logging.error("Error parsing Configfile: "+str(pe_ex))
+      self.unguardWriting()
+      self.guardReading()
+      if self.config_parser.get('debug','enabled') == "True":
+        logger.setLevel(logging.DEBUG)
+      else:
+        logger.setLevel(logging.INFO)
+      self.unguardReading()
+
+  def writeConfigFile(self):
+    if self.configfile is None:
+      return
+    logging.debug("Writing Configfile "+self.configfile)
+    self.guardReading()
+    try:
+      cf_handle = open(self.configfile,"w")
+      self.config_parser.write(cf_handle)
+      cf_handle.close()
+      self.config_mtime=os.path.getmtime(self.configfile)
+    except IOError, io_ex:
+      logging.error("Error writing Configfile: "+str(io_ex))
+      self.configfile=None
+    self.unguardReading()
+
+  def __getattr__(self, name):
+    underscore_pos=name.find('_')
+    if underscore_pos < 0:
+      raise AttributeError
+    rv=None
+    self.guardReading()
+    try:
+      rv = self.config_parser.get(name[0:underscore_pos], name[underscore_pos+1:])
+    except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
+      self.unguardReading()
+    finally:
+      self.unguardReading()
+    return rv
+
+
+######## r3 ZMQ ############
+
+def sendR3Message(socket, structname, datadict):
+    socket.send_multipart([structname, json.dumps(datadict)])
+
+def decodeR3Message(multipart_msg):
+    try:
+        return (multipart_msg[0], json.loads(multipart_msg[1]))
+    except Exception, e:
+        logging.debug("decodeR3Message:"+str(e))
+        return ("",{})
+
+######## Main ############
+
+def exitHandler(signum, frame):
+  logging.info("stopping")
+  try:
+    zmqsub.close()
+    zmqctx.destroy()
+  except:
+    pass
+  sys.exit(0)
+
+#signal.signal(signal.SIGTERM, exitHandler)
+signal.signal(signal.SIGINT, exitHandler)
+signal.signal(signal.SIGQUIT, exitHandler)
+
+logging.info("%s started" % os.path.basename(sys.argv[0]))
+
+if len(sys.argv) > 1:
+  uwscfg = UWSConfig(sys.argv[1])
+else:
+  uwscfg = UWSConfig()
+
+try:
+    with open(uwscfg.json_moveto_path,"rb") as fh:
+        sensor_store = json.loads(fh.read())
+except Exception, e:
+    logging.debug(e)
+    try:
+        with open(uwscfg.json_backup_path,"rb") as fh:
+            sensor_store = json.loads(fh.read())
+    except Exception, e:
+        logging.debug(e)
+        sensor_store = {}
+
+while True:
+  try:
+    #Start zmq connection to publish / forward sensor data
+    zmqctx = zmq.Context()
+    zmqctx.linger = 0
+    zmqsub = zmqctx.socket(zmq.SUB)
+    for topic in uwscfg.zmq_subscribe.split(" "):
+        zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
+    #zmqsub.connect(uwscfg.zmq_remote_uri)
+    zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
+    backup_counter = 0
+
+    while True:
+        #receive sensor updates
+        data = zmqsub.recv_multipart()
+        (structname, dictdata) = decodeR3Message(data)
+        logging.debug("Got data: " + structname + ":"+ str(dictdata))
+
+        uwscfg.checkConfigUpdates()
+
+        # store data in local dict of lists
+        if "Sensorindex" in dictdata:
+            structname += str(dictdata["Sensorindex"])
+            del dictdata["Sensorindex"]
+        ts = dictdata["Ts"]
+        del dictdata["Ts"]
+
+        if not structname in sensor_store:
+            sensor_store[structname] = []
+
+        sensor_store[structname].append((ts,dictdata.values()[0]))
+        backup_counter += 1
+
+        #cap list lenght
+        if uwscfg.json_limit_list_len:
+            if len(sensor_store[structname]) > uwscfg.json_limit_list_len:
+                sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:]
+
+        # save sensor_store to json for apache
+        with open(uwscfg.json_write_path,"wb") as fh:
+            fh.truncate()
+            fh.write(json.dumps(sensor_store))
+        if backup_counter > uwscfg.json_backup_every:
+            backup_counter = 0
+            shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
+        shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
+
+  except Exception, ex:
+    logging.error("main: "+str(ex))
+    traceback.print_exc(file=sys.stdout)
+    try:
+      zmqsub.close()
+      zmqctx.destroy()
+    except:
+      pass
+    time.sleep(5)
+
+