self.config_parser.add_section('json')
self.config_parser.set('json','write_path',"/dev/shm/wget/r3sensors.json")
self.config_parser.set('json','moveto_path',"/dev/shm/www/r3sensors.json")
- self.config_parser.set('json','backup_path',"var/www/r3sensors.json.bak")
+ self.config_parser.set('json','backup_path',"/home/guests/realraum.wirdorange.org/r3sensors.json.bak")
self.config_parser.set('json','backup_every',"50")
self.config_parser.set('json','limit_list_len',"10000")
+ self.config_parser.set('json','updateinterval',"30")
self.config_parser.add_section('zmq')
- self.config_parser.set('zmq','remote_uri',"tcp://wuzzler.realraum.at:4244")
- self.config_parser.set('zmq','sshtunnel',"realraum@wuzzler.realraum.at:22000")
+ self.config_parser.set('zmq','remote_uri',"tcp://torwaechter.realraum.at:4244")
+ self.config_parser.set('zmq','sshtunnel',"realraum@torwaechter.realraum.at:22000")
self.config_parser.set('zmq','sshkeyfile',"/home/guests/realraum.wirdorange.org/id_rsa")
- self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate PresenceUpdate")
+ self.config_parser.set('zmq','subscribe',"TempSensorUpdate IlluminationSensorUpdate DustSensorUpdate RelativeHumiditySensorUpdate MovementSensorUpdate")
self.config_parser.add_section('debug')
self.config_parser.set('debug','enabled',"False")
self.config_mtime=0
pass
sys.exit(0)
-#signal.signal(signal.SIGTERM, exitHandler)
-signal.signal(signal.SIGINT, exitHandler)
-signal.signal(signal.SIGQUIT, exitHandler)
+time_column_name_="Time"
+latest_values_ = {}
+sensor_store_ = {}
+sensor_cols_num_ = {} #stores number of columns for a sensor not counting Time (x-axis) column. AKA the number of data-rows. Equals highest SensorIndex +1
+reset_these_structnames_ = {}
-logging.info("%s started" % os.path.basename(sys.argv[0]))
+def addEventToTempLastValueStore(structname, msgdata):
+ global latest_values_
+ sensorindex = int(msgdata["Sensorindex"]) if "Sensorindex" in msgdata else 0
+ if not structname in latest_values_:
+ latest_values_[structname]=[]
+ if not structname in sensor_cols_num_ or sensor_cols_num_[structname] < sensorindex +1:
+ sensor_cols_num_[structname] = sensorindex +1
+ if len(latest_values_[structname]) < sensor_cols_num_[structname]:
+ latest_values_[structname] += [0] * (sensor_cols_num_[structname] - len(latest_values_[structname]))
+ expandSensorStoreLists(structname, sensor_cols_num_[structname])
+ # store Value in temp last value store:
+ try:
+ del msgdata["Sensorindex"]
+ except:
+ pass
+ try:
+ del msgdata["Ts"]
+ except:
+ pass
+ if len(msgdata) > 0:
+ #store first value that is not Sensorindex or Ts into store
+ latest_values_[structname][sensorindex] = msgdata.values()[0]
+ else:
+ #if that value does not exist, (i.e. movementevent), count event occurances
+ latest_values_[structname][sensorindex] += 1
+ reset_these_structnames_[structname] = True
+
+
+def cleanTempLastValueOfMovementValues():
+ global latest_values_
+ for k in reset_these_structnames_.keys():
+ latest_values_[k] = [0] * sensor_cols_num_[k]
+
+
+def expandSensorStoreLists(structname, newlength):
+ global sensor_store_
+ if not structname in sensor_store_:
+ sensor_store_[structname]=[]
+ #remove old headings so we can add them again below
+ try:
+ if sensor_store_[structname][0][0] == time_column_name_:
+ sensor_store_[structname].pop(0)
+ except:
+ pass
+ #expand all previous value lists
+ newlength_including_time = newlength +1
+ sensor_store_[structname] = map(lambda l: l[:newlength_including_time] + ([0] * (newlength_including_time - len(l))) , sensor_store_[structname])
+
+
+def addEventsToStore():
+ global sensor_store_
+ ts = int(time.time())
+ for structname in latest_values_.keys():
+ if not structname in sensor_store_:
+ sensor_store_[structname]=[]
+
+ #if missing, add Header List [Time, 0, 1, 2]
+ if len(sensor_store_[structname]) == 0 or len(sensor_store_[structname][0]) < 2 or sensor_store_[structname][0][0] != time_column_name_:
+ sensor_store_[structname].insert(0,[time_column_name_] + list(map(lambda n: "Sensor %d"%n,range(0,sensor_cols_num_[structname]))))
+
+ # add values
+ try:
+ # if latest values are identical, just update timestamp
+ if sensor_store_[structname][-1][1:] == latest_values_[structname] and sensor_store_[structname][-1][1:] == sensor_store_[structname][-2][1:]:
+ sensor_store_[structname].pop()
+ except:
+ pass
+ sensor_store_[structname].append([ts] + latest_values_[structname])
+
+ #cap list lenght
+ if uwscfg.json_limit_list_len:
+ if len(sensor_store_[structname]) > uwscfg.json_limit_list_len:
+ sensor_store_[structname] = sensor_store_[structname][- uwscfg.json_limit_list_len:]
+
+
+if __name__ == "__main__":
+ #signal.signal(signal.SIGTERM, exitHandler)
+ signal.signal(signal.SIGINT, exitHandler)
+ signal.signal(signal.SIGQUIT, exitHandler)
+
+ logging.info("%s started" % os.path.basename(sys.argv[0]))
-if len(sys.argv) > 1:
- uwscfg = UWSConfig(sys.argv[1])
-else:
- uwscfg = UWSConfig()
+ if len(sys.argv) > 1:
+ uwscfg = UWSConfig(sys.argv[1])
+ else:
+ uwscfg = UWSConfig()
-try:
- with open(uwscfg.json_moveto_path,"rb") as fh:
- sensor_store = json.loads(fh.read())
-except Exception, e:
- logging.debug(e)
try:
- with open(uwscfg.json_backup_path,"rb") as fh:
- sensor_store = json.loads(fh.read())
+ with open(uwscfg.json_moveto_path,"rb") as fh:
+ sensor_store_ = json.loads(fh.read())
except Exception, e:
logging.debug(e)
- sensor_store = {}
+ try:
+ with open(uwscfg.json_backup_path,"rb") as fh:
+ sensor_store_ = json.loads(fh.read())
+ except Exception, e:
+ logging.debug(e)
-while True:
- try:
- #Start zmq connection to publish / forward sensor data
- zmqctx = zmq.Context()
- zmqctx.linger = 0
- zmqsub = zmqctx.socket(zmq.SUB)
- for topic in uwscfg.zmq_subscribe.split(" "):
- zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
- #zmqsub.connect(uwscfg.zmq_remote_uri)
- zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
- backup_counter = 0
+
+ for k in set(sensor_store_.keys()).difference(set(uwscfg.zmq_subscribe.split(" "))):
+ del sensor_store_[k] # del old sensordata of sensor we do not subscribe to
+
+ for k in sensor_store_.keys():
+ try:
+ if len(sensor_store_[k][0]) > 1:
+ sensor_cols_num_[k] = len(sensor_store_[k][0]) -1
+ except:
+ pass
while True:
- #receive sensor updates
- data = zmqsub.recv_multipart()
- (structname, dictdata) = decodeR3Message(data)
- logging.debug("Got data: " + structname + ":"+ str(dictdata))
+ try:
+ #Start zmq connection to publish / forward sensor data
+ zmqctx = zmq.Context()
+ zmqctx.linger = 0
+ zmqsub = zmqctx.socket(zmq.SUB)
+ for topic in uwscfg.zmq_subscribe.split(" "):
+ zmqsub.setsockopt(zmq.SUBSCRIBE, topic)
+ if uwscfg.zmq_sshtunnel:
+ zmq.ssh.tunnel.tunnel_connection(zmqsub, uwscfg.zmq_remote_uri, uwscfg.zmq_sshtunnel, keyfile=uwscfg.zmq_sshkeyfile)
+ else:
+ zmqsub.connect(uwscfg.zmq_remote_uri)
+ backup_counter = 0
+ last_update = int(time.time())
- uwscfg.checkConfigUpdates()
+ while True:
+ #receive sensor updates
+ data = zmqsub.recv_multipart()
+ (structname, dictdata) = decodeR3Message(data)
+ logging.debug("Got data: " + structname + ":"+ str(dictdata))
- # store data in local dict of lists
- if "Sensorindex" in dictdata:
- structname += str(dictdata["Sensorindex"])
- del dictdata["Sensorindex"]
- ts = dictdata["Ts"]
- del dictdata["Ts"]
+ uwscfg.checkConfigUpdates()
- if not structname in sensor_store:
- sensor_store[structname] = []
+ addEventToTempLastValueStore(structname, dictdata)
- sensor_store[structname].append((ts,dictdata.values()[0]))
- backup_counter += 1
+ logging.debug("lastdata:"+str(latest_values_))
+ if int(time.time()) - last_update < int(uwscfg.json_updateinterval):
+ continue
- #cap list lenght
- if uwscfg.json_limit_list_len:
- if len(sensor_store[structname]) > uwscfg.json_limit_list_len:
- sensor_store[structname] = sensor_store[structname][- uwscfg.json_limit_list_len:]
-
- # save sensor_store to json for apache
- with open(uwscfg.json_write_path,"wb") as fh:
- fh.truncate()
- fh.write(json.dumps(sensor_store))
- if backup_counter > uwscfg.json_backup_every:
- backup_counter = 0
- shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
- shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
-
- except Exception, ex:
- logging.error("main: "+str(ex))
- traceback.print_exc(file=sys.stdout)
- try:
- zmqsub.close()
- zmqctx.destroy()
- except:
- pass
- time.sleep(5)
+ logging.debug("update interval elapsed")
+ last_update = int(time.time())
+
+ addEventsToStore()
+ cleanTempLastValueOfMovementValues()
+ logging.debug("post-cleanMovement lastdata:"+str(latest_values_))
+
+ backup_counter += 1
+ # save sensor_store_ to json for apache
+ with open(uwscfg.json_write_path,"wb") as fh:
+ fh.truncate()
+ fh.write(json.dumps(sensor_store_))
+ if backup_counter > uwscfg.json_backup_every:
+ backup_counter = 0
+ shutil.copy(uwscfg.json_write_path, uwscfg.json_backup_path)
+ shutil.move(uwscfg.json_write_path, uwscfg.json_moveto_path)
+
+ except Exception, ex:
+ logging.error("main: "+str(ex))
+ traceback.print_exc(file=sys.stdout)
+ try:
+ zmqsub.close()
+ zmqctx.destroy()
+ except:
+ pass
+ time.sleep(5)