old-sensors new zmq events bridge
authorBernhard Tittelbach <xro@realraum.at>
Sat, 5 Oct 2013 00:35:14 +0000 (00:35 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Sat, 5 Oct 2013 00:35:14 +0000 (00:35 +0000)
bridge-old-sensors-zmq.py [new file with mode: 0755]
go/r3-eventbroker_zmq/sockettoevent.go

diff --git a/bridge-old-sensors-zmq.py b/bridge-old-sensors-zmq.py
new file mode 100755 (executable)
index 0000000..66de9af
--- /dev/null
@@ -0,0 +1,244 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+from __future__ import with_statement
+import os
+import os.path
+import sys
+import threading
+import logging
+import logging.handlers
+import time
+import signal
+import re
+import subprocess
+import ConfigParser
+import traceback
+import json
+import zmq
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+lh_syslog = logging.handlers.SysLogHandler(address="/dev/log",facility=logging.handlers.SysLogHandler.LOG_LOCAL2)
+lh_syslog.setFormatter(logging.Formatter('bridge-old-sensors.py: %(levelname)s %(message)s'))
+logger.addHandler(lh_syslog)
+lh_stderr = logging.StreamHandler()
+logger.addHandler(lh_stderr)
+
+######## Config File Data Class ############
+
+class UWSConfig:
+  def __init__(self,configfile=None):
+    #Synchronisation
+    self.lock=threading.Lock()
+    self.finished_reading=threading.Condition(self.lock)
+    self.finished_writing=threading.Condition(self.lock)
+    self.currently_reading=0
+    self.currently_writing=False
+    #Config Data
+    self.configfile=configfile
+    self.config_parser=ConfigParser.ConfigParser()
+    self.config_parser.add_section('sensors')
+    self.config_parser.set('sensors','remote_cmd',"ssh -i /flash/tuer/id_rsa -o PasswordAuthentication=no -o StrictHostKeyChecking=no %RHOST% %RSHELL% %RSOCKET%")
+    self.config_parser.set('sensors','remote_host',"root@slug.realraum.at")
+    self.config_parser.set('sensors','remote_socket',"/var/run/powersensordaemon/cmd.sock")
+    self.config_parser.set('sensors','remote_shell',"usocket")
+    self.config_parser.add_section('debug')
+    self.config_parser.set('debug','enabled',"False")
+    self.config_mtime=0
+    if not self.configfile is None:
+      try:
+        cf_handle = open(self.configfile,"r")
+        cf_handle.close()
+      except IOError:
+        self.writeConfigFile()
+      else:
+        self.checkConfigUpdates()
+    
+  def guardReading(self):
+    with self.lock:
+      while self.currently_writing:
+        self.finished_writing.wait()
+      self.currently_reading+=1
+
+  def unguardReading(self):
+    with self.lock:
+      self.currently_reading-=1
+      self.finished_reading.notifyAll()
+      
+  def guardWriting(self):
+    with self.lock:
+      self.currently_writing=True
+      while self.currently_reading > 0:
+        self.finished_reading.wait()
+    
+  def unguardWriting(self):
+    with self.lock:
+      self.currently_writing=False
+      self.finished_writing.notifyAll()
+    
+  def checkConfigUpdates(self):
+    global logger
+    if self.configfile is None:
+      return
+    logging.debug("Checking Configfile mtime: "+self.configfile)
+    try:
+      mtime = os.path.getmtime(self.configfile)
+    except (IOError,OSError):
+      return
+    if self.config_mtime < mtime:
+      logging.debug("Reading Configfile")
+      self.guardWriting()
+      try:
+        self.config_parser.read(self.configfile)
+        self.config_mtime=os.path.getmtime(self.configfile)
+      except (ConfigParser.ParsingError, IOError), pe_ex:
+        logging.error("Error parsing Configfile: "+str(pe_ex))
+      self.unguardWriting()
+      self.guardReading()
+      if self.config_parser.get('debug','enabled') == "True":
+        logger.setLevel(logging.DEBUG)
+      else:
+        logger.setLevel(logging.INFO)
+      self.unguardReading()
+
+  def writeConfigFile(self):
+    if self.configfile is None:
+      return
+    logging.debug("Writing Configfile "+self.configfile)
+    self.guardReading()
+    try:
+      cf_handle = open(self.configfile,"w")
+      self.config_parser.write(cf_handle)
+      cf_handle.close()
+      self.config_mtime=os.path.getmtime(self.configfile)
+    except IOError, io_ex:
+      logging.error("Error writing Configfile: "+str(io_ex))
+      self.configfile=None
+    self.unguardReading()
+
+  def __getattr__(self, name):
+    underscore_pos=name.find('_')
+    if underscore_pos < 0:
+      raise AttributeError
+    rv=None
+    self.guardReading()
+    try:
+      rv = self.config_parser.get(name[0:underscore_pos], name[underscore_pos+1:])
+    except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
+      self.unguardReading()
+      raise AttributeError
+    self.unguardReading()
+    return rv
+
+
+######## r3 ZMQ ############
+
+def sendR3Message(socket, structname, datadict):
+    socket.send_multipart([structname, json.dumps(datadict)])
+
+######## Sensor Bridge ############
+tracksensor_running=True
+def trackSensorStatus(uwscfg, zmqsocket):
+  global sshp, tracksensor_running
+  RE_TEMP = re.compile(r'temp(\d): (\d+\.\d+)')
+  RE_PHOTO = re.compile(r'photo(\d): [^0-9]*?(\d+)',re.I)
+  RE_MOVEMENT = re.compile(r'movement|button\d?|PanicButton',re.I)
+  RE_ERROR = re.compile(r'Error: (.+)',re.I)
+  while tracksensor_running:
+    uwscfg.checkConfigUpdates()
+    sshp = None
+    try:
+      cmd = uwscfg.sensors_remote_cmd.replace("%RHOST%",uwscfg.sensors_remote_host).replace("%RSHELL%",uwscfg.sensors_remote_shell).replace("%RSOCKET%",uwscfg.sensors_remote_socket).split(" ")
+      logging.debug("trackSensorStatus: Executing: "+" ".join(cmd))
+      sshp = subprocess.Popen(cmd, bufsize=1024, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False)
+      logging.debug("trackSensorStatus: pid %d: running=%d" % (sshp.pid,sshp.poll() is None))
+      if not sshp.poll() is None:
+        raise Exception("trackSensorStatus: subprocess %d not started ?, returncode: %d" % (sshp.pid,sshp.returncode))
+      #sshp.stdin.write("listen movement\nlisten button\nlisten sensor\n")
+      time.sleep(5) #if we send listen bevor usocket is running, we will never get output
+      #sshp.stdin.write("listen all\n")
+      logging.debug("trackSensorStatus: send: listen movement, etc")
+      sshp.stdin.write("listen movement\n")
+      sshp.stdin.write("listen button\n")
+      sshp.stdin.write("listen sensor\n")
+      #sshp.stdin.write("sample temp0\n")
+      sshp.stdin.flush()
+      while tracksensor_running:
+        if not sshp.poll() is None:
+          raise Exception("trackSensorStatus: subprocess %d finished, returncode: %d" % (sshp.pid,sshp.returncode))
+        line = sshp.stdout.readline()
+        if len(line) < 1:
+          raise Exception("EOF on Subprocess, daemon seems to have quit, returncode: %d",sshp.returncode)
+        logging.debug("trackSensorStatus: Got Line: " + line)
+        m = RE_MOVEMENT.match(line)
+        if not m is None:
+          sendR3Message(zmqsocket, "MovementSensorUpdate", {"Sensorindex":0, "Ts":int(time.time())})
+          continue
+        m = RE_TEMP.match(line)
+        if not m is None:
+          sendR3Message(zmqsocket, "TempSensorUpdate", {"Sensorindex":int(m.group(1)), "Value":float(m.group(2)), "Ts":int(time.time())})
+          continue
+        m = RE_PHOTO.match(line)
+        if not m is None:
+          sendR3Message(zmqsocket, "IlluminationSensorUpdate", {"Sensorindex":int(m.group(1)), "Value":int(m.group(2)), "Ts":int(time.time())})
+          continue
+        m = RE_ERROR.match(line)
+        if not m is None:
+          logging.error("trackSensorStatus: got: "+line) 
+    except Exception, ex:
+      logging.error("trackSensorStatus: "+str(ex)) 
+      traceback.print_exc(file=sys.stdout)
+      if not sshp is None and sshp.poll() is None:
+        if sys.hexversion >= 0x020600F0:
+          sshp.terminate()
+        else:
+          subprocess.call(["kill",str(sshp.pid)])
+        time.sleep(1.5)
+        if sshp.poll() is None:
+          logging.error("trackSensorStatus: subprocess still alive, sending SIGKILL to pid %d" % (sshp.pid))
+          if sys.hexversion >= 0x020600F0:
+            sshp.kill()
+          else:
+            subprocess.call(["kill","-9",str(sshp.pid)])
+      time.sleep(5)  
+
+ ############ Main Routine ############
+
+def exitHandler(signum, frame):
+  global tracksensor_running, sshp
+  logging.info("Bridge stopping")
+  tracksensor_running=False
+  try:
+    if sys.hexversion >= 0x020600F0:
+      sshp.terminate()
+    else:
+      subprocess.call(["kill",str(sshp.pid)])
+  except:
+    pass
+  time.sleep(0.1)
+  sys.exit(0)
+  
+#signals proapbly don't work because of readline
+#signal.signal(signal.SIGTERM, exitHandler)
+signal.signal(signal.SIGINT, exitHandler)
+signal.signal(signal.SIGQUIT, exitHandler)
+
+logging.info("Sensor Bridge started")
+
+#option and only argument: path to config file
+if len(sys.argv) > 1:
+  uwscfg = UWSConfig(sys.argv[1])
+else:
+  uwscfg = UWSConfig()
+
+#Start zmq connection to publish / forward sensor data
+zmqctx = zmq.Context()
+zmqctx.linger = 0
+zmqpub = zmqctx.socket(zmq.PUB)
+zmqpub.connect("tcp://wuzzler.realraum.at:4243")
+
+#listen for sensor data and forward them
+trackSensorStatus(uwscfg, zmqpub)
+
+
+
index fbd74d5..2ebcc76 100644 (file)
@@ -4,7 +4,7 @@ package main
 
 import (
     "regexp"
-    "strconv"
+    //~ "strconv"
     "time"
     //~ "./brain"
     pubsub "github.com/tuxychandru/pubsub"
@@ -13,16 +13,16 @@ import (
     )
 
 var (
-       re_presence_    *regexp.Regexp     = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?")
-       re_state_      *regexp.Regexp     = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*")
-       re_status_      *regexp.Regexp     = regexp.MustCompile("Status: (closed|opened), (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing), (ajar|shut).*")
-       re_infocard_      *regexp.Regexp     = regexp.MustCompile("Info\\(card\\): card\\(([a-fA-F0-9]+)\\) (found|not found).*")
+       //~ re_presence_    *regexp.Regexp     = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?")
+       //~ re_state_      *regexp.Regexp     = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*")
+       //~ re_status_      *regexp.Regexp     = regexp.MustCompile("Status: (closed|opened), (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing), (ajar|shut).*")
+       //~ re_infocard_      *regexp.Regexp     = regexp.MustCompile("Info\\(card\\): card\\(([a-fA-F0-9]+)\\) (found|not found).*")
        re_cardid_      *regexp.Regexp     = regexp.MustCompile("card\\(([a-fA-F0-9]+)\\)")
-       re_infoajar_      *regexp.Regexp     = regexp.MustCompile("Info\\(ajar\\): door is now (ajar|shut)")
-       re_command_     *regexp.Regexp     = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?")
-       re_button_      *regexp.Regexp     = regexp.MustCompile("PanicButton|button\\d?")
-       re_temp_        *regexp.Regexp     = regexp.MustCompile("temp0: (\\d+\\.\\d+)")
-       re_photo_       *regexp.Regexp     = regexp.MustCompile("photo0: (\\d+)")
+       //~ re_infoajar_      *regexp.Regexp     = regexp.MustCompile("Info\\(ajar\\): door is now (ajar|shut)")
+       //~ re_command_     *regexp.Regexp     = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?")
+       //~ re_button_      *regexp.Regexp     = regexp.MustCompile("PanicButton|button\\d?")
+       //~ re_temp_        *regexp.Regexp     = regexp.MustCompile("temp0: (\\d+\\.\\d+)")
+       //~ re_photo_       *regexp.Regexp     = regexp.MustCompile("photo0: (\\d+)")
 )
 
 
@@ -76,12 +76,11 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z
             ps.Pub(r3events.DoorAjarUpdate{string(lines[4]) == "shut", ts}, "door")
         case "open", "close", "toggle", "reset":
             ps.Pub(r3events.DoorCommandEvent{string(lines[0]), string(lines[1]), string(lines[2]), ts},"doorcmd")
-        case "photo0":
-            newphoto, err := strconv.ParseInt(string(lines[1]), 10, 32)
-            if err == nil {
-                // brn.Oboite("photo0", newphoto)
-                ps.Pub(r3events.IlluminationSensorUpdate{0, newphoto, ts}, "sensors")
-            }
+        //~ case "photo0:":
+            //~ newphoto, err := strconv.ParseInt(string(lines[1]), 10, 32)
+            //~ if err == nil {
+                //~ ps.Pub(r3events.IlluminationSensorUpdate{0, newphoto, ts}, "sensors")
+            //~ }
         case "IlluminationSensorUpdate","TempSensorUpdate":
             //try decode r3event
             evnt, err := r3events.UnmarshalByteByte2Event(lines)