self.config_parser.add_section('json')
self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json")
self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json")
- self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak")
+ self.config_parser.set('json','backup_path',"/home/guests/realraum.wirdorange.org/public_html/r3sensors.json.bak")
self.config_parser.set('json','backup_every',"50")
self.config_parser.set('json','limit_list_len',"10000")
+ self.config_parser.set('json','updateinterval',"30")
self.config_parser.add_section('zmq')
self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244")
self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000")
pass
sys.exit(0)
-#signal.signal(signal.SIGTERM, exitHandler)
-signal.signal(signal.SIGINT, exitHandler)
-signal.signal(signal.SIGQUIT, exitHandler)
+time_column_name_="Time"
+lastest_values_ = {}
+sensor_store_ = {}
+sensor_cols_num_ = {} #stores number of columns for a sensor not counting Time (x-axis) column. AKA the number of data-rows. Equals highest SensorIndex +1
+reset_these_structnames_ = {}
+
+def addEventToTempLastValueStore(structname, msgdata):
+ global lastest_values_
+ sensorindex = msgdata["Sensorindex"] if "Sensorindex" in msgdata else 0
+ if not structname in lastest_values_:
+ lastest_values_[structname]=[]
+ if not structname in sensor_cols_num_ or sensor_cols_num_[structname] < sensorindex +1:
+ sensor_cols_num_[structname] = sensorindex +1
+ if len(lastest_values_[structname]) < sensor_cols_num_[structname]:
+ lastest_values_[structname] += [0] * (sensor_cols_num_[structname] - len(lastest_values_[structname]))
+ expandSensorStoreLists(structname, sensor_cols_num_[structname])
+ # store Value in temp last value store:
+ try:
+ del dictdata["Sensorindex"]
+ del dictdata["Ts"]
+ except:
+ pass
+ if len(msgdata) > 0:
+ #store first value that is not Sensorindex or Ts into store
+ lastest_values_[structname][sensorindex] = msgdata.values()[0]
+ else:
+ #if that value does not exist, (i.e. movementevent), count event occurances
+ lastest_values_[structname][sensorindex] += 1
+ reset_these_structnames_[structname] = True
-logging.info("%s started" % os.path.basename(sys.argv[0]))
-if len(sys.argv) > 1:
- uwscfg = UWSConfig(sys.argv[1])
-else:
- uwscfg = UWSConfig()
+def cleanTempLastValueOfMovementValues():
+ global lastest_values_
+ for k in reset_these_structnames_.keys():
+ lastest_values_[k] = [0] * sensor_cols_num_[structname]
-try:
- with open(uwscfg.json_moveto_path,"rb") as fh:
- sensor_store = json.loads(fh.read())
-except Exception, e:
- logging.debug(e)
+
+def expandSensorStoreLists(structname, newlength):
+ global sensor_store_
+ if not structname in sensor_store_:
+ sensor_store_[structname]=[]
+ #remove old headings so we can add them again below
try:
- with open(uwscfg.json_backup_path,"rb") as fh:
- sensor_store = json.loads(fh.read())
- except Exception, e:
- logging.debug(e)
- sensor_store = {}
+ if sensor_store_[structname][0][0] == time_column_name_:
+ sensor_store_[structname][0].pop(0)
+ except:
+ pass
+ #expand all previous value lists
+ sensor_store_[structname] = map(lambda l: l + ([0] * (newlength +1 - len(l))) , sensor_store_[structname])
-while True:
- try:
- #Start zmq connection to publish / forward sensor data
- zmqctx = zmq.Context()
- zmqctx.linger = 0
- zmqsub = zmqctx.socket(zmq.SUB)
- for topic in uwscfg.zmq_subscribe.split(" "):
- zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
- #zmqsub.connect(uwscfg.zmq_remote_uri)
- zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
- backup_counter = 0
- while True:
- #receive sensor updates
- data = zmqsub.recv_multipart()
- (structname, dictdata) = decodeR3Message(data)
- logging.debug("Got data: " + structname + ":"+ str(dictdata))
-
- uwscfg.checkConfigUpdates()
-
- # store data in local dict of lists
- if "Sensorindex" in dictdata:
- structname += str(dictdata["Sensorindex"])
- del dictdata["Sensorindex"]
- ts = dictdata["Ts"]
- del dictdata["Ts"]
+def addEventsToStore():
+ global sensor_store_
+ ts = int(time.time())
+ for structname in lastest_values_.keys():
+ if not structname in sensor_store_:
+ sensor_store_[structname]=[]
- if not structname in sensor_store:
- sensor_store[structname] = []
+ #if missing, add Header List [Time, 0, 1, 2]
+ if len(sensor_store_[structname]) == 0 or len(sensor_store_[structname][0]) < 2 or sensor_store_[structname][0][0] != time_column_name_:
+ sensor_store_[structname].insert(0,[time_column_name_] + list(map(str,range(0,sensor_cols_num_[structname]))))
- sensor_store[structname].append((ts,dictdata.values()[0]))
- backup_counter += 1
+ # add values
+ sensor_store_[structname].append([ts] + lastest_values_[structname])
#cap list lenght
if uwscfg.json_limit_list_len:
- if len(sensor_store[structname]) > uwscfg.json_limit_list_len:
- sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:]
-
- # save sensor_store to json for apache
- with open(uwscfg.json_write_path,"wb") as fh:
- fh.truncate()
- fh.write(json.dumps(sensor_store))
- if backup_counter > uwscfg.json_backup_every:
- backup_counter = 0
- shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
- shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
-
- except Exception, ex:
- logging.error("main: "+str(ex))
- traceback.print_exc(file=sys.stdout)
+ if len(sensor_store_[structname]) > uwscfg.json_limit_list_len:
+ sensor_store_[structname] = sensor_store_[structname][- uwscfg.json_limit_list_len:]
+
+
+if __name__ == "__main__":
+ #signal.signal(signal.SIGTERM, exitHandler)
+ signal.signal(signal.SIGINT, exitHandler)
+ signal.signal(signal.SIGQUIT, exitHandler)
+
+ logging.info("%s started" % os.path.basename(sys.argv[0]))
+
+ if len(sys.argv) > 1:
+ uwscfg = UWSConfig(sys.argv[1])
+ else:
+ uwscfg = UWSConfig()
+
try:
- zmqsub.close()
- zmqctx.destroy()
- except:
- pass
- time.sleep(5)
+ with open(uwscfg.json_moveto_path,"rb") as fh:
+ sensor_store = json.loads(fh.read())
+ except Exception, e:
+ logging.debug(e)
+ try:
+ with open(uwscfg.json_backup_path,"rb") as fh:
+ sensor_store = json.loads(fh.read())
+ except Exception, e:
+ logging.debug(e)
+
+ while True:
+ try:
+ #Start zmq connection to publish / forward sensor data
+ zmqctx = zmq.Context()
+ zmqctx.linger = 0
+ zmqsub = zmqctx.socket(zmq.SUB)
+ for topic in uwscfg.zmq_subscribe.split(" "):
+ zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
+ if uwscfg.zmq_sshtunnel:
+ zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
+ else:
+ zmqsub.connect(uwscfg.zmq_remote_uri)
+ backup_counter = 0
+ last_update = int(time.time())
+
+ while True:
+ #receive sensor updates
+ data = zmqsub.recv_multipart()
+ (structname, dictdata) = decodeR3Message(data)
+ logging.debug("Got data: " + structname + ":"+ str(dictdata))
+
+ uwscfg.checkConfigUpdates()
+
+ addEventToTempLastValueStore(structname, dictdata)
+
+ logging.debug("lastdata:"+str(lastest_values_))
+ if int(time.time()) - last_update < int(uwscfg.json_updateinterval):
+ continue
+
+ logging.debug("update interval elapsed")
+ last_update = int(time.time())
+
+ addEventsToStore()
+ cleanTempLastValueOfMovementValues()
+
+ backup_counter += 1
+ # save sensor_store to json for apache
+ with open(uwscfg.json_write_path,"wb") as fh:
+ fh.truncate()
+ fh.write(json.dumps(sensor_store_))
+ if backup_counter > uwscfg.json_backup_every:
+ backup_counter = 0
+ shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
+ shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
+
+ except Exception, ex:
+ logging.error("main: "+str(ex))
+ traceback.print_exc(file=sys.stdout)
+ try:
+ zmqsub.close()
+ zmqctx.destroy()
+ except:
+ pass
+ time.sleep(5)