sensorvalues json format for google graph js lib
authorBernhard Tittelbach <xro@realraum.at>
Tue, 8 Oct 2013 03:57:55 +0000 (03:57 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Tue, 8 Oct 2013 03:57:55 +0000 (03:57 +0000)
sensorvalues-to-json-zmq.py

index a8d26a1..6608086 100755 (executable)
@@ -40,9 +40,10 @@ class UWSConfig:
     self.config_parser.add_section('json')
     self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json")
     self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json")
-    self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak")
+    self.config_parser.set('json','backup_path',"/home/guests/realraum.wirdorange.org/public_html/r3sensors.json.bak")
     self.config_parser.set('json','backup_every',"50")
     self.config_parser.set('json','limit_list_len',"10000")
+    self.config_parser.set('json','updateinterval',"30")
     self.config_parser.add_section('zmq')
     self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244")
     self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000")
@@ -160,84 +161,153 @@ def exitHandler(signum, frame):
     pass
   sys.exit(0)
 
-#signal.signal(signal.SIGTERM, exitHandler)
-signal.signal(signal.SIGINT, exitHandler)
-signal.signal(signal.SIGQUIT, exitHandler)
+time_column_name_="Time"
+lastest_values_ = {}
+sensor_store_ = {}
+sensor_cols_num_ = {} #stores number of columns for a sensor not counting Time (x-axis) column. AKA the number of data-rows. Equals highest SensorIndex +1
+reset_these_structnames_ = {}
+
+def addEventToTempLastValueStore(structname, msgdata):
+    global lastest_values_
+    sensorindex = msgdata["Sensorindex"] if "Sensorindex" in msgdata else 0
+    if not structname in lastest_values_:
+        lastest_values_[structname]=[]
+    if not structname in sensor_cols_num_ or sensor_cols_num_[structname] < sensorindex +1:
+        sensor_cols_num_[structname] = sensorindex +1
+    if len(lastest_values_[structname]) < sensor_cols_num_[structname]:
+        lastest_values_[structname] += [0] * (sensor_cols_num_[structname] - len(lastest_values_[structname]))
+        expandSensorStoreLists(structname, sensor_cols_num_[structname])
+    # store Value in temp last value store:
+    try:
+        del dictdata["Sensorindex"]
+        del dictdata["Ts"]
+    except:
+        pass
+    if len(msgdata) > 0:
+        #store first value that is not Sensorindex or Ts into store
+        lastest_values_[structname][sensorindex] = msgdata.values()[0]
+    else:
+        #if that value does not exist, (i.e. movementevent), count event occurances
+        lastest_values_[structname][sensorindex] += 1
+        reset_these_structnames_[structname] = True
 
-logging.info("%s started" % os.path.basename(sys.argv[0]))
 
-if len(sys.argv) > 1:
-  uwscfg = UWSConfig(sys.argv[1])
-else:
-  uwscfg = UWSConfig()
+def cleanTempLastValueOfMovementValues():
+    global lastest_values_
+    for k in reset_these_structnames_.keys():
+        lastest_values_[k] = [0] * sensor_cols_num_[structname]
 
-try:
-    with open(uwscfg.json_moveto_path,"rb") as fh:
-        sensor_store = json.loads(fh.read())
-except Exception, e:
-    logging.debug(e)
+
+def expandSensorStoreLists(structname, newlength):
+    global sensor_store_
+    if not structname in sensor_store_:
+        sensor_store_[structname]=[]
+    #remove old headings so we can add them again below
     try:
-        with open(uwscfg.json_backup_path,"rb") as fh:
-            sensor_store = json.loads(fh.read())
-    except Exception, e:
-        logging.debug(e)
-        sensor_store = {}
+        if sensor_store_[structname][0][0] == time_column_name_:
+            sensor_store_[structname][0].pop(0)
+    except:
+        pass
+    #expand all previous value lists
+    sensor_store_[structname] = map(lambda l: l + ([0] * (newlength +1 - len(l)))  , sensor_store_[structname])
 
-while True:
-  try:
-    #Start zmq connection to publish / forward sensor data
-    zmqctx = zmq.Context()
-    zmqctx.linger = 0
-    zmqsub = zmqctx.socket(zmq.SUB)
-    for topic in uwscfg.zmq_subscribe.split(" "):
-        zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
-    #zmqsub.connect(uwscfg.zmq_remote_uri)
-    zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
-    backup_counter = 0
 
-    while True:
-        #receive sensor updates
-        data = zmqsub.recv_multipart()
-        (structname, dictdata) = decodeR3Message(data)
-        logging.debug("Got data: " + structname + ":"+ str(dictdata))
-
-        uwscfg.checkConfigUpdates()
-
-        # store data in local dict of lists
-        if "Sensorindex" in dictdata:
-            structname += str(dictdata["Sensorindex"])
-            del dictdata["Sensorindex"]
-        ts = dictdata["Ts"]
-        del dictdata["Ts"]
+def addEventsToStore():
+    global sensor_store_
+    ts = int(time.time())
+    for structname in lastest_values_.keys():
+        if not structname in sensor_store_:
+            sensor_store_[structname]=[]
 
-        if not structname in sensor_store:
-            sensor_store[structname] = []
+        #if missing, add Header List [Time, 0, 1, 2]
+        if len(sensor_store_[structname]) == 0 or len(sensor_store_[structname][0]) < 2 or sensor_store_[structname][0][0] != time_column_name_:
+            sensor_store_[structname].insert(0,[time_column_name_] + list(map(str,range(0,sensor_cols_num_[structname]))))
 
-        sensor_store[structname].append((ts,dictdata.values()[0]))
-        backup_counter += 1
+        # add values
+        sensor_store_[structname].append([ts] + lastest_values_[structname])
 
         #cap list lenght
         if uwscfg.json_limit_list_len:
-            if len(sensor_store[structname]) > uwscfg.json_limit_list_len:
-                sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:]
-
-        # save sensor_store to json for apache
-        with open(uwscfg.json_write_path,"wb") as fh:
-            fh.truncate()
-            fh.write(json.dumps(sensor_store))
-        if backup_counter > uwscfg.json_backup_every:
-            backup_counter = 0
-            shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
-        shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
-
-  except Exception, ex:
-    logging.error("main: "+str(ex))
-    traceback.print_exc(file=sys.stdout)
+            if len(sensor_store_[structname]) > uwscfg.json_limit_list_len:
+                sensor_store_[structname] = sensor_store_[structname][- uwscfg.json_limit_list_len:]
+
+
+if __name__ == "__main__":
+    #signal.signal(signal.SIGTERM, exitHandler)
+    signal.signal(signal.SIGINT, exitHandler)
+    signal.signal(signal.SIGQUIT, exitHandler)
+
+    logging.info("%s started" % os.path.basename(sys.argv[0]))
+
+    if len(sys.argv) > 1:
+      uwscfg = UWSConfig(sys.argv[1])
+    else:
+      uwscfg = UWSConfig()
+
     try:
-      zmqsub.close()
-      zmqctx.destroy()
-    except:
-      pass
-    time.sleep(5)
+        with open(uwscfg.json_moveto_path,"rb") as fh:
+            sensor_store = json.loads(fh.read())
+    except Exception, e:
+        logging.debug(e)
+        try:
+            with open(uwscfg.json_backup_path,"rb") as fh:
+                sensor_store = json.loads(fh.read())
+        except Exception, e:
+            logging.debug(e)
+
+    while True:
+      try:
+        #Start zmq connection to publish / forward sensor data
+        zmqctx = zmq.Context()
+        zmqctx.linger = 0
+        zmqsub = zmqctx.socket(zmq.SUB)
+        for topic in uwscfg.zmq_subscribe.split(" "):
+            zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
+        if uwscfg.zmq_sshtunnel:
+            zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
+        else:
+            zmqsub.connect(uwscfg.zmq_remote_uri)
+        backup_counter = 0
+        last_update = int(time.time())
+
+        while True:
+            #receive sensor updates
+            data = zmqsub.recv_multipart()
+            (structname, dictdata) = decodeR3Message(data)
+            logging.debug("Got data: " + structname + ":"+ str(dictdata))
+
+            uwscfg.checkConfigUpdates()
+
+            addEventToTempLastValueStore(structname, dictdata)
+
+            logging.debug("lastdata:"+str(lastest_values_))
+            if int(time.time()) - last_update < int(uwscfg.json_updateinterval):
+                continue
+
+            logging.debug("update interval elapsed")
+            last_update = int(time.time())
+
+            addEventsToStore()
+            cleanTempLastValueOfMovementValues()
+
+            backup_counter += 1
+            # save sensor_store to json for apache
+            with open(uwscfg.json_write_path,"wb") as fh:
+                fh.truncate()
+                fh.write(json.dumps(sensor_store_))
+            if backup_counter > uwscfg.json_backup_every:
+                backup_counter = 0
+                shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
+            shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
+
+      except Exception, ex:
+        logging.error("main: "+str(ex))
+        traceback.print_exc(file=sys.stdout)
+        try:
+          zmqsub.close()
+          zmqctx.destroy()
+        except:
+          pass
+        time.sleep(5)