From: Bernhard Tittelbach Date: Sat, 5 Oct 2013 00:35:14 +0000 (+0000) Subject: old-sensors new zmq events bridge X-Git-Url: https://git.realraum.at/?p=svn42.git;a=commitdiff_plain;h=123fe81b6402de719f54e1dff5c84485236e2a2d old-sensors new zmq events bridge --- diff --git a/bridge-old-sensors-zmq.py b/bridge-old-sensors-zmq.py new file mode 100755 index 0000000..66de9af --- /dev/null +++ b/bridge-old-sensors-zmq.py @@ -0,0 +1,244 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +from __future__ import with_statement +import os +import os.path +import sys +import threading +import logging +import logging.handlers +import time +import signal +import re +import subprocess +import ConfigParser +import traceback +import json +import zmq + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +lh_syslog = logging.handlers.SysLogHandler(address="/dev/log",facility=logging.handlers.SysLogHandler.LOG_LOCAL2) +lh_syslog.setFormatter(logging.Formatter('bridge-old-sensors.py: %(levelname)s %(message)s')) +logger.addHandler(lh_syslog) +lh_stderr = logging.StreamHandler() +logger.addHandler(lh_stderr) + +######## Config File Data Class ############ + +class UWSConfig: + def __init__(self,configfile=None): + #Synchronisation + self.lock=threading.Lock() + self.finished_reading=threading.Condition(self.lock) + self.finished_writing=threading.Condition(self.lock) + self.currently_reading=0 + self.currently_writing=False + #Config Data + self.configfile=configfile + self.config_parser=ConfigParser.ConfigParser() + self.config_parser.add_section('sensors') + self.config_parser.set('sensors','remote_cmd',"ssh -i /flash/tuer/id_rsa -o PasswordAuthentication=no -o StrictHostKeyChecking=no %RHOST% %RSHELL% %RSOCKET%") + self.config_parser.set('sensors','remote_host',"root@slug.realraum.at") + self.config_parser.set('sensors','remote_socket',"/var/run/powersensordaemon/cmd.sock") + self.config_parser.set('sensors','remote_shell',"usocket") + self.config_parser.add_section('debug') + self.config_parser.set('debug','enabled',"False") + self.config_mtime=0 + if not self.configfile is None: + try: + cf_handle = open(self.configfile,"r") + cf_handle.close() + except IOError: + self.writeConfigFile() + else: + self.checkConfigUpdates() + + def guardReading(self): + with self.lock: + while self.currently_writing: + self.finished_writing.wait() + self.currently_reading+=1 + + def unguardReading(self): + with self.lock: + self.currently_reading-=1 + self.finished_reading.notifyAll() + + def guardWriting(self): + with self.lock: + self.currently_writing=True + while self.currently_reading > 0: + self.finished_reading.wait() + + def unguardWriting(self): + with self.lock: + self.currently_writing=False + self.finished_writing.notifyAll() + + def checkConfigUpdates(self): + global logger + if self.configfile is None: + return + logging.debug("Checking Configfile mtime: "+self.configfile) + try: + mtime = os.path.getmtime(self.configfile) + except (IOError,OSError): + return + if self.config_mtime < mtime: + logging.debug("Reading Configfile") + self.guardWriting() + try: + self.config_parser.read(self.configfile) + self.config_mtime=os.path.getmtime(self.configfile) + except (ConfigParser.ParsingError, IOError), pe_ex: + logging.error("Error parsing Configfile: "+str(pe_ex)) + self.unguardWriting() + self.guardReading() + if self.config_parser.get('debug','enabled') == "True": + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + self.unguardReading() + + def writeConfigFile(self): + if self.configfile is None: + return + logging.debug("Writing Configfile "+self.configfile) + self.guardReading() + try: + cf_handle = open(self.configfile,"w") + self.config_parser.write(cf_handle) + cf_handle.close() + self.config_mtime=os.path.getmtime(self.configfile) + except IOError, io_ex: + logging.error("Error writing Configfile: "+str(io_ex)) + self.configfile=None + self.unguardReading() + + def __getattr__(self, name): + underscore_pos=name.find('_') + if underscore_pos < 0: + raise AttributeError + rv=None + self.guardReading() + try: + rv = self.config_parser.get(name[0:underscore_pos], name[underscore_pos+1:]) + except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): + self.unguardReading() + raise AttributeError + self.unguardReading() + return rv + + +######## r3 ZMQ ############ + +def sendR3Message(socket, structname, datadict): + socket.send_multipart([structname, json.dumps(datadict)]) + +######## Sensor Bridge ############ +tracksensor_running=True +def trackSensorStatus(uwscfg, zmqsocket): + global sshp, tracksensor_running + RE_TEMP = re.compile(r'temp(\d): (\d+\.\d+)') + RE_PHOTO = re.compile(r'photo(\d): [^0-9]*?(\d+)',re.I) + RE_MOVEMENT = re.compile(r'movement|button\d?|PanicButton',re.I) + RE_ERROR = re.compile(r'Error: (.+)',re.I) + while tracksensor_running: + uwscfg.checkConfigUpdates() + sshp = None + try: + cmd = uwscfg.sensors_remote_cmd.replace("%RHOST%",uwscfg.sensors_remote_host).replace("%RSHELL%",uwscfg.sensors_remote_shell).replace("%RSOCKET%",uwscfg.sensors_remote_socket).split(" ") + logging.debug("trackSensorStatus: Executing: "+" ".join(cmd)) + sshp = subprocess.Popen(cmd, bufsize=1024, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False) + logging.debug("trackSensorStatus: pid %d: running=%d" % (sshp.pid,sshp.poll() is None)) + if not sshp.poll() is None: + raise Exception("trackSensorStatus: subprocess %d not started ?, returncode: %d" % (sshp.pid,sshp.returncode)) + #sshp.stdin.write("listen movement\nlisten button\nlisten sensor\n") + time.sleep(5) #if we send listen bevor usocket is running, we will never get output + #sshp.stdin.write("listen all\n") + logging.debug("trackSensorStatus: send: listen movement, etc") + sshp.stdin.write("listen movement\n") + sshp.stdin.write("listen button\n") + sshp.stdin.write("listen sensor\n") + #sshp.stdin.write("sample temp0\n") + sshp.stdin.flush() + while tracksensor_running: + if not sshp.poll() is None: + raise Exception("trackSensorStatus: subprocess %d finished, returncode: %d" % (sshp.pid,sshp.returncode)) + line = sshp.stdout.readline() + if len(line) < 1: + raise Exception("EOF on Subprocess, daemon seems to have quit, returncode: %d",sshp.returncode) + logging.debug("trackSensorStatus: Got Line: " + line) + m = RE_MOVEMENT.match(line) + if not m is None: + sendR3Message(zmqsocket, "MovementSensorUpdate", {"Sensorindex":0, "Ts":int(time.time())}) + continue + m = RE_TEMP.match(line) + if not m is None: + sendR3Message(zmqsocket, "TempSensorUpdate", {"Sensorindex":int(m.group(1)), "Value":float(m.group(2)), "Ts":int(time.time())}) + continue + m = RE_PHOTO.match(line) + if not m is None: + sendR3Message(zmqsocket, "IlluminationSensorUpdate", {"Sensorindex":int(m.group(1)), "Value":int(m.group(2)), "Ts":int(time.time())}) + continue + m = RE_ERROR.match(line) + if not m is None: + logging.error("trackSensorStatus: got: "+line) + except Exception, ex: + logging.error("trackSensorStatus: "+str(ex)) + traceback.print_exc(file=sys.stdout) + if not sshp is None and sshp.poll() is None: + if sys.hexversion >= 0x020600F0: + sshp.terminate() + else: + subprocess.call(["kill",str(sshp.pid)]) + time.sleep(1.5) + if sshp.poll() is None: + logging.error("trackSensorStatus: subprocess still alive, sending SIGKILL to pid %d" % (sshp.pid)) + if sys.hexversion >= 0x020600F0: + sshp.kill() + else: + subprocess.call(["kill","-9",str(sshp.pid)]) + time.sleep(5) + + ############ Main Routine ############ + +def exitHandler(signum, frame): + global tracksensor_running, sshp + logging.info("Bridge stopping") + tracksensor_running=False + try: + if sys.hexversion >= 0x020600F0: + sshp.terminate() + else: + subprocess.call(["kill",str(sshp.pid)]) + except: + pass + time.sleep(0.1) + sys.exit(0) + +#signals proapbly don't work because of readline +#signal.signal(signal.SIGTERM, exitHandler) +signal.signal(signal.SIGINT, exitHandler) +signal.signal(signal.SIGQUIT, exitHandler) + +logging.info("Sensor Bridge started") + +#option and only argument: path to config file +if len(sys.argv) > 1: + uwscfg = UWSConfig(sys.argv[1]) +else: + uwscfg = UWSConfig() + +#Start zmq connection to publish / forward sensor data +zmqctx = zmq.Context() +zmqctx.linger = 0 +zmqpub = zmqctx.socket(zmq.PUB) +zmqpub.connect("tcp://wuzzler.realraum.at:4243") + +#listen for sensor data and forward them +trackSensorStatus(uwscfg, zmqpub) + + + diff --git a/go/r3-eventbroker_zmq/sockettoevent.go b/go/r3-eventbroker_zmq/sockettoevent.go index fbd74d5..2ebcc76 100644 --- a/go/r3-eventbroker_zmq/sockettoevent.go +++ b/go/r3-eventbroker_zmq/sockettoevent.go @@ -4,7 +4,7 @@ package main import ( "regexp" - "strconv" + //~ "strconv" "time" //~ "./brain" pubsub "github.com/tuxychandru/pubsub" @@ -13,16 +13,16 @@ import ( ) var ( - re_presence_ *regexp.Regexp = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?") - re_state_ *regexp.Regexp = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*") - re_status_ *regexp.Regexp = regexp.MustCompile("Status: (closed|opened), (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing), (ajar|shut).*") - re_infocard_ *regexp.Regexp = regexp.MustCompile("Info\\(card\\): card\\(([a-fA-F0-9]+)\\) (found|not found).*") + //~ re_presence_ *regexp.Regexp = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?") + //~ re_state_ *regexp.Regexp = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*") + //~ re_status_ *regexp.Regexp = regexp.MustCompile("Status: (closed|opened), (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing), (ajar|shut).*") + //~ re_infocard_ *regexp.Regexp = regexp.MustCompile("Info\\(card\\): card\\(([a-fA-F0-9]+)\\) (found|not found).*") re_cardid_ *regexp.Regexp = regexp.MustCompile("card\\(([a-fA-F0-9]+)\\)") - re_infoajar_ *regexp.Regexp = regexp.MustCompile("Info\\(ajar\\): door is now (ajar|shut)") - re_command_ *regexp.Regexp = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?") - re_button_ *regexp.Regexp = regexp.MustCompile("PanicButton|button\\d?") - re_temp_ *regexp.Regexp = regexp.MustCompile("temp0: (\\d+\\.\\d+)") - re_photo_ *regexp.Regexp = regexp.MustCompile("photo0: (\\d+)") + //~ re_infoajar_ *regexp.Regexp = regexp.MustCompile("Info\\(ajar\\): door is now (ajar|shut)") + //~ re_command_ *regexp.Regexp = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?") + //~ re_button_ *regexp.Regexp = regexp.MustCompile("PanicButton|button\\d?") + //~ re_temp_ *regexp.Regexp = regexp.MustCompile("temp0: (\\d+\\.\\d+)") + //~ re_photo_ *regexp.Regexp = regexp.MustCompile("photo0: (\\d+)") ) @@ -76,12 +76,11 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z ps.Pub(r3events.DoorAjarUpdate{string(lines[4]) == "shut", ts}, "door") case "open", "close", "toggle", "reset": ps.Pub(r3events.DoorCommandEvent{string(lines[0]), string(lines[1]), string(lines[2]), ts},"doorcmd") - case "photo0": - newphoto, err := strconv.ParseInt(string(lines[1]), 10, 32) - if err == nil { - // brn.Oboite("photo0", newphoto) - ps.Pub(r3events.IlluminationSensorUpdate{0, newphoto, ts}, "sensors") - } + //~ case "photo0:": + //~ newphoto, err := strconv.ParseInt(string(lines[1]), 10, 32) + //~ if err == nil { + //~ ps.Pub(r3events.IlluminationSensorUpdate{0, newphoto, ts}, "sensors") + //~ } case "IlluminationSensorUpdate","TempSensorUpdate": //try decode r3event evnt, err := r3events.UnmarshalByteByte2Event(lines)