X-Git-Url: https://git.realraum.at/?p=svn42.git;a=blobdiff_plain;f=sensorvalues-to-json-zmq.py;h=d6bc31e912c84fd0a889638e5f80321a1892bd36;hp=a8d26a1faeb9dce3cbdbf787e71d626cc3cc74e7;hb=e5f623a144f65281490dbcf9757557e0f14a9402;hpb=8e73e942c082365269f650cf5a489aaa07452f28 diff --git a/sensorvalues-to-json-zmq.py b/sensorvalues-to-json-zmq.py index a8d26a1..d6bc31e 100755 --- a/sensorvalues-to-json-zmq.py +++ b/sensorvalues-to-json-zmq.py @@ -40,14 +40,15 @@ class UWSConfig: self.config_parser.add_section('json') self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json") self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json") - self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak") + self.config_parser.set('json','backup_path',"/home/guests/realraum.wirdorange.org/r3sensors.json.bak") self.config_parser.set('json','backup_every',"50") self.config_parser.set('json','limit_list_len',"10000") + self.config_parser.set('json','updateinterval',"30") self.config_parser.add_section('zmq') self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244") self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000") self.config_parser.set('zmq','sshkeyfile',"/home/guests/realraum.wirdorange.org/id_rsa") - self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate PresenceUpdate") + self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate") self.config_parser.add_section('debug') self.config_parser.set('debug','enabled',"False") self.config_mtime=0 @@ -160,84 +161,175 @@ def exitHandler(signum, frame): pass sys.exit(0) -#signal.signal(signal.SIGTERM, exitHandler) -signal.signal(signal.SIGINT, exitHandler) -signal.signal(signal.SIGQUIT, exitHandler) +time_column_name_="Time" +latest_values_ = {} +sensor_store_ = {} +sensor_cols_num_ = {} #stores number of columns for a sensor not counting Time (x-axis) column. AKA the number of data-rows. Equals highest SensorIndex +1 +reset_these_structnames_ = {} -logging.info("%s started" % os.path.basename(sys.argv[0])) +def addEventToTempLastValueStore(structname, msgdata): + global latest_values_ + sensorindex = int(msgdata["Sensorindex"]) if "Sensorindex" in msgdata else 0 + if not structname in latest_values_: + latest_values_[structname]=[] + if not structname in sensor_cols_num_ or sensor_cols_num_[structname] < sensorindex +1: + sensor_cols_num_[structname] = sensorindex +1 + if len(latest_values_[structname]) < sensor_cols_num_[structname]: + latest_values_[structname] += [0] * (sensor_cols_num_[structname] - len(latest_values_[structname])) + expandSensorStoreLists(structname, sensor_cols_num_[structname]) + # store Value in temp last value store: + try: + del msgdata["Sensorindex"] + except: + pass + try: + del msgdata["Ts"] + except: + pass + if len(msgdata) > 0: + #store first value that is not Sensorindex or Ts into store + latest_values_[structname][sensorindex] = msgdata.values()[0] + else: + #if that value does not exist, (i.e. movementevent), count event occurances + latest_values_[structname][sensorindex] += 1 + reset_these_structnames_[structname] = True + + +def cleanTempLastValueOfMovementValues(): + global latest_values_ + for k in reset_these_structnames_.keys(): + latest_values_[k] = [0] * sensor_cols_num_[k] + + +def expandSensorStoreLists(structname, newlength): + global sensor_store_ + if not structname in sensor_store_: + sensor_store_[structname]=[] + #remove old headings so we can add them again below + try: + if sensor_store_[structname][0][0] == time_column_name_: + sensor_store_[structname].pop(0) + except: + pass + #expand all previous value lists + newlength_including_time = newlength +1 + sensor_store_[structname] = map(lambda l: l[:newlength_including_time] + ([0] * (newlength_including_time - len(l))) , sensor_store_[structname]) + + +def addEventsToStore(): + global sensor_store_ + ts = int(time.time()) + for structname in latest_values_.keys(): + if not structname in sensor_store_: + sensor_store_[structname]=[] + + #if missing, add Header List [Time, 0, 1, 2] + if len(sensor_store_[structname]) == 0 or len(sensor_store_[structname][0]) < 2 or sensor_store_[structname][0][0] != time_column_name_: + sensor_store_[structname].insert(0,[time_column_name_] + list(map(lambda n: "Sensor %d"%n,range(0,sensor_cols_num_[structname])))) + + # add values + try: + # if latest values are identical, just update timestamp + if sensor_store_[structname][-1][1:] == latest_values_[structname] and sensor_store_[structname][-1][1:] == sensor_store_[structname][-2][1:]: + sensor_store_[structname].pop() + except: + pass + sensor_store_[structname].append([ts] + latest_values_[structname]) + + #cap list lenght + if uwscfg.json_limit_list_len: + if len(sensor_store_[structname]) > uwscfg.json_limit_list_len: + sensor_store_[structname] = sensor_store_[structname][- uwscfg.json_limit_list_len:] + + +if __name__ == "__main__": + #signal.signal(signal.SIGTERM, exitHandler) + signal.signal(signal.SIGINT, exitHandler) + signal.signal(signal.SIGQUIT, exitHandler) + + logging.info("%s started" % os.path.basename(sys.argv[0])) -if len(sys.argv) > 1: - uwscfg = UWSConfig(sys.argv[1]) -else: - uwscfg = UWSConfig() + if len(sys.argv) > 1: + uwscfg = UWSConfig(sys.argv[1]) + else: + uwscfg = UWSConfig() -try: - with open(uwscfg.json_moveto_path,"rb") as fh: - sensor_store = json.loads(fh.read()) -except Exception, e: - logging.debug(e) try: - with open(uwscfg.json_backup_path,"rb") as fh: - sensor_store = json.loads(fh.read()) + with open(uwscfg.json_moveto_path,"rb") as fh: + sensor_store_ = json.loads(fh.read()) except Exception, e: logging.debug(e) - sensor_store = {} + try: + with open(uwscfg.json_backup_path,"rb") as fh: + sensor_store_ = json.loads(fh.read()) + except Exception, e: + logging.debug(e) -while True: - try: - #Start zmq connection to publish / forward sensor data - zmqctx = zmq.Context() - zmqctx.linger = 0 - zmqsub = zmqctx.socket(zmq.SUB) - for topic in uwscfg.zmq_subscribe.split(" "): - zmqsub.setsockopt(zmq.SUBSCRIBE, topic) - #zmqsub.connect(uwscfg.zmq_remote_uri) - zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile) - backup_counter = 0 + + for k in set(sensor_store_.keys()).difference(set(uwscfg.zmq_subscribe.split(" "))): + del sensor_store_[k] # del old sensordata of sensor we do not subscribe to + + for k in sensor_store_.keys(): + try: + if len(sensor_store_[k][0]) > 1: + sensor_cols_num_[k] = len(sensor_store_[k][0]) -1 + except: + pass while True: - #receive sensor updates - data = zmqsub.recv_multipart() - (structname, dictdata) = decodeR3Message(data) - logging.debug("Got data: " + structname + ":"+ str(dictdata)) + try: + #Start zmq connection to publish / forward sensor data + zmqctx = zmq.Context() + zmqctx.linger = 0 + zmqsub = zmqctx.socket(zmq.SUB) + for topic in uwscfg.zmq_subscribe.split(" "): + zmqsub.setsockopt(zmq.SUBSCRIBE, topic) + if uwscfg.zmq_sshtunnel: + zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile) + else: + zmqsub.connect(uwscfg.zmq_remote_uri) + backup_counter = 0 + last_update = int(time.time()) - uwscfg.checkConfigUpdates() + while True: + #receive sensor updates + data = zmqsub.recv_multipart() + (structname, dictdata) = decodeR3Message(data) + logging.debug("Got data: " + structname + ":"+ str(dictdata)) - # store data in local dict of lists - if "Sensorindex" in dictdata: - structname += str(dictdata["Sensorindex"]) - del dictdata["Sensorindex"] - ts = dictdata["Ts"] - del dictdata["Ts"] + uwscfg.checkConfigUpdates() - if not structname in sensor_store: - sensor_store[structname] = [] + addEventToTempLastValueStore(structname, dictdata) - sensor_store[structname].append((ts,dictdata.values()[0])) - backup_counter += 1 + logging.debug("lastdata:"+str(latest_values_)) + if int(time.time()) - last_update < int(uwscfg.json_updateinterval): + continue - #cap list lenght - if uwscfg.json_limit_list_len: - if len(sensor_store[structname]) > uwscfg.json_limit_list_len: - sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:] - - # save sensor_store to json for apache - with open(uwscfg.json_write_path,"wb") as fh: - fh.truncate() - fh.write(json.dumps(sensor_store)) - if backup_counter > uwscfg.json_backup_every: - backup_counter = 0 - shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path) - shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path) - - except Exception, ex: - logging.error("main: "+str(ex)) - traceback.print_exc(file=sys.stdout) - try: - zmqsub.close() - zmqctx.destroy() - except: - pass - time.sleep(5) + logging.debug("update interval elapsed") + last_update = int(time.time()) + + addEventsToStore() + cleanTempLastValueOfMovementValues() + logging.debug("post-cleanMovement lastdata:"+str(latest_values_)) + + backup_counter += 1 + # save sensor_store_ to json for apache + with open(uwscfg.json_write_path,"wb") as fh: + fh.truncate() + fh.write(json.dumps(sensor_store_)) + if backup_counter > uwscfg.json_backup_every: + backup_counter = 0 + shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path) + shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path) + + except Exception, ex: + logging.error("main: "+str(ex)) + traceback.print_exc(file=sys.stdout) + try: + zmqsub.close() + zmqctx.destroy() + except: + pass + time.sleep(5)