From 27659332271c75803abb004474a81144f4842cdc Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Tue, 8 Oct 2013 03:57:55 +0000 Subject: [PATCH] sensorvalues json format for google graph js lib --- sensorvalues-to-json-zmq.py | 206 +++++++++++++++++++++++++++++-------------- 1 file changed, 138 insertions(+), 68 deletions(-) diff --git a/sensorvalues-to-json-zmq.py b/sensorvalues-to-json-zmq.py index a8d26a1..6608086 100755 --- a/sensorvalues-to-json-zmq.py +++ b/sensorvalues-to-json-zmq.py @@ -40,9 +40,10 @@ class UWSConfig: self.config_parser.add_section('json') self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json") self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json") - self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak") + self.config_parser.set('json','backup_path',"/home/guests/realraum.wirdorange.org/public_html/r3sensors.json.bak") self.config_parser.set('json','backup_every',"50") self.config_parser.set('json','limit_list_len',"10000") + self.config_parser.set('json','updateinterval',"30") self.config_parser.add_section('zmq') self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244") self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000") @@ -160,84 +161,153 @@ def exitHandler(signum, frame): pass sys.exit(0) -#signal.signal(signal.SIGTERM, exitHandler) -signal.signal(signal.SIGINT, exitHandler) -signal.signal(signal.SIGQUIT, exitHandler) +time_column_name_="Time" +lastest_values_ = {} +sensor_store_ = {} +sensor_cols_num_ = {} #stores number of columns for a sensor not counting Time (x-axis) column. AKA the number of data-rows. Equals highest SensorIndex +1 +reset_these_structnames_ = {} + +def addEventToTempLastValueStore(structname, msgdata): + global lastest_values_ + sensorindex = msgdata["Sensorindex"] if "Sensorindex" in msgdata else 0 + if not structname in lastest_values_: + lastest_values_[structname]=[] + if not structname in sensor_cols_num_ or sensor_cols_num_[structname] < sensorindex +1: + sensor_cols_num_[structname] = sensorindex +1 + if len(lastest_values_[structname]) < sensor_cols_num_[structname]: + lastest_values_[structname] += [0] * (sensor_cols_num_[structname] - len(lastest_values_[structname])) + expandSensorStoreLists(structname, sensor_cols_num_[structname]) + # store Value in temp last value store: + try: + del dictdata["Sensorindex"] + del dictdata["Ts"] + except: + pass + if len(msgdata) > 0: + #store first value that is not Sensorindex or Ts into store + lastest_values_[structname][sensorindex] = msgdata.values()[0] + else: + #if that value does not exist, (i.e. movementevent), count event occurances + lastest_values_[structname][sensorindex] += 1 + reset_these_structnames_[structname] = True -logging.info("%s started" % os.path.basename(sys.argv[0])) -if len(sys.argv) > 1: - uwscfg = UWSConfig(sys.argv[1]) -else: - uwscfg = UWSConfig() +def cleanTempLastValueOfMovementValues(): + global lastest_values_ + for k in reset_these_structnames_.keys(): + lastest_values_[k] = [0] * sensor_cols_num_[structname] -try: - with open(uwscfg.json_moveto_path,"rb") as fh: - sensor_store = json.loads(fh.read()) -except Exception, e: - logging.debug(e) + +def expandSensorStoreLists(structname, newlength): + global sensor_store_ + if not structname in sensor_store_: + sensor_store_[structname]=[] + #remove old headings so we can add them again below try: - with open(uwscfg.json_backup_path,"rb") as fh: - sensor_store = json.loads(fh.read()) - except Exception, e: - logging.debug(e) - sensor_store = {} + if sensor_store_[structname][0][0] == time_column_name_: + sensor_store_[structname][0].pop(0) + except: + pass + #expand all previous value lists + sensor_store_[structname] = map(lambda l: l + ([0] * (newlength +1 - len(l))) , sensor_store_[structname]) -while True: - try: - #Start zmq connection to publish / forward sensor data - zmqctx = zmq.Context() - zmqctx.linger = 0 - zmqsub = zmqctx.socket(zmq.SUB) - for topic in uwscfg.zmq_subscribe.split(" "): - zmqsub.setsockopt(zmq.SUBSCRIBE, topic) - #zmqsub.connect(uwscfg.zmq_remote_uri) - zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile) - backup_counter = 0 - while True: - #receive sensor updates - data = zmqsub.recv_multipart() - (structname, dictdata) = decodeR3Message(data) - logging.debug("Got data: " + structname + ":"+ str(dictdata)) - - uwscfg.checkConfigUpdates() - - # store data in local dict of lists - if "Sensorindex" in dictdata: - structname += str(dictdata["Sensorindex"]) - del dictdata["Sensorindex"] - ts = dictdata["Ts"] - del dictdata["Ts"] +def addEventsToStore(): + global sensor_store_ + ts = int(time.time()) + for structname in lastest_values_.keys(): + if not structname in sensor_store_: + sensor_store_[structname]=[] - if not structname in sensor_store: - sensor_store[structname] = [] + #if missing, add Header List [Time, 0, 1, 2] + if len(sensor_store_[structname]) == 0 or len(sensor_store_[structname][0]) < 2 or sensor_store_[structname][0][0] != time_column_name_: + sensor_store_[structname].insert(0,[time_column_name_] + list(map(str,range(0,sensor_cols_num_[structname])))) - sensor_store[structname].append((ts,dictdata.values()[0])) - backup_counter += 1 + # add values + sensor_store_[structname].append([ts] + lastest_values_[structname]) #cap list lenght if uwscfg.json_limit_list_len: - if len(sensor_store[structname]) > uwscfg.json_limit_list_len: - sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:] - - # save sensor_store to json for apache - with open(uwscfg.json_write_path,"wb") as fh: - fh.truncate() - fh.write(json.dumps(sensor_store)) - if backup_counter > uwscfg.json_backup_every: - backup_counter = 0 - shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path) - shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path) - - except Exception, ex: - logging.error("main: "+str(ex)) - traceback.print_exc(file=sys.stdout) + if len(sensor_store_[structname]) > uwscfg.json_limit_list_len: + sensor_store_[structname] = sensor_store_[structname][- uwscfg.json_limit_list_len:] + + +if __name__ == "__main__": + #signal.signal(signal.SIGTERM, exitHandler) + signal.signal(signal.SIGINT, exitHandler) + signal.signal(signal.SIGQUIT, exitHandler) + + logging.info("%s started" % os.path.basename(sys.argv[0])) + + if len(sys.argv) > 1: + uwscfg = UWSConfig(sys.argv[1]) + else: + uwscfg = UWSConfig() + try: - zmqsub.close() - zmqctx.destroy() - except: - pass - time.sleep(5) + with open(uwscfg.json_moveto_path,"rb") as fh: + sensor_store = json.loads(fh.read()) + except Exception, e: + logging.debug(e) + try: + with open(uwscfg.json_backup_path,"rb") as fh: + sensor_store = json.loads(fh.read()) + except Exception, e: + logging.debug(e) + + while True: + try: + #Start zmq connection to publish / forward sensor data + zmqctx = zmq.Context() + zmqctx.linger = 0 + zmqsub = zmqctx.socket(zmq.SUB) + for topic in uwscfg.zmq_subscribe.split(" "): + zmqsub.setsockopt(zmq.SUBSCRIBE, topic) + if uwscfg.zmq_sshtunnel: + zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile) + else: + zmqsub.connect(uwscfg.zmq_remote_uri) + backup_counter = 0 + last_update = int(time.time()) + + while True: + #receive sensor updates + data = zmqsub.recv_multipart() + (structname, dictdata) = decodeR3Message(data) + logging.debug("Got data: " + structname + ":"+ str(dictdata)) + + uwscfg.checkConfigUpdates() + + addEventToTempLastValueStore(structname, dictdata) + + logging.debug("lastdata:"+str(lastest_values_)) + if int(time.time()) - last_update < int(uwscfg.json_updateinterval): + continue + + logging.debug("update interval elapsed") + last_update = int(time.time()) + + addEventsToStore() + cleanTempLastValueOfMovementValues() + + backup_counter += 1 + # save sensor_store to json for apache + with open(uwscfg.json_write_path,"wb") as fh: + fh.truncate() + fh.write(json.dumps(sensor_store_)) + if backup_counter > uwscfg.json_backup_every: + backup_counter = 0 + shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path) + shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path) + + except Exception, ex: + logging.error("main: "+str(ex)) + traceback.print_exc(file=sys.stdout) + try: + zmqsub.close() + zmqctx.destroy() + except: + pass + time.sleep(5) -- 1.7.10.4