does not yet work
authorBernhard Tittelbach <xro@realraum.at>
Wed, 2 Oct 2013 21:37:08 +0000 (21:37 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Wed, 2 Oct 2013 21:37:08 +0000 (21:37 +0000)
go/r3-eventbroker_zmq/main.go
go/r3-eventbroker_zmq/make_deploy.zsh [new file with mode: 0644]
go/r3-eventbroker_zmq/metamovement.go [new file with mode: 0644]
go/r3-eventbroker_zmq/presence.go [new file with mode: 0644]
go/r3-eventbroker_zmq/sockettoevent.go
go/r3-eventbroker_zmq/zeromq.go

index 3145c6c..dbd35eb 100644 (file)
@@ -6,7 +6,7 @@ import (
     "fmt"
     "os"
     "flag"
-    "time"
+    //~ "time"
     "log/syslog"
     "log"
     pubsub "github.com/tuxychandru/pubsub"
@@ -37,11 +37,11 @@ func usage() {
 }
 
 func init() {
-    flag.StringVar(&doorsub_addr_, "doorsubaddr", "tcp://wuzzler.realraum.at:4242", "zmq door event publish addr")
-    flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://:4243", "zmq public/listen socket addr for incoming sensor data")
-    flag.StringVar(&pub_port_, "pubport", "tcp://:4244", "zmq port publishing consodilated events")
+    flag.StringVar(&doorsub_addr_, "doorsubaddr", "tcp://torwaechter.realraum.at:4242", "zmq door event publish addr")
+    flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://*:4243", "zmq public/listen socket addr for incoming sensor data")
+    flag.StringVar(&pub_port_, "pubport", "tcp://*:4244", "zmq port publishing consodilated events")
     flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups")
-    flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local1 facility")
+    flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local2 facility")
     flag.Usage = usage
     flag.Parse()
 }
@@ -55,25 +55,30 @@ func main() {
     
     if use_syslog_ {
         var logerr error
-        Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | syslog.LOG_LOCAL2, 0)
+        Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0)
+        //~ Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | syslog.LOG_LOCAL2, 0)
         if logerr != nil { panic(logerr) }
         Syslog_.Print("started")
         defer Syslog_.Print("exiting")
     }
     
     ps := pubsub.New(3)
-    ticker := time.NewTicker(time.Duration(5) * time.Minute)
+    //~ ticker := time.NewTicker(time.Duration(5) * time.Minute)
     publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
     
+    go MetaEventRoutine_Movement(ps, 10, 20, 10)
+    go MetaEventRoutine_Presence(ps)
     
     for {
+        log.Print("for loop")
         select {
             case subin := <- sub_in_chans.In():
                 ParseSocketInputLine(subin, ps, keylookup_socket)
-            case <- ticker.C:
-                MakeTimeTick(ps)
+            //~ case <- ticker.C:
+                //~ MakeTimeTick(ps)
             case event_interface := <- publish_these_events_chan:
                 data, err := FormatEventForSocket(event_interface)
+                log.Print("publishing", data)
                 if err != nil {
                     Syslog_.Print(err)
                     continue
diff --git a/go/r3-eventbroker_zmq/make_deploy.zsh b/go/r3-eventbroker_zmq/make_deploy.zsh
new file mode 100644 (file)
index 0000000..746215b
--- /dev/null
@@ -0,0 +1,7 @@
+#!/bin/zsh
+export GO386=387
+export CGO_ENABLED=1
+go-linux-386 clean
+#go-linux-386 build
+#strip ${PWD:t}
+go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/
diff --git a/go/r3-eventbroker_zmq/metamovement.go b/go/r3-eventbroker_zmq/metamovement.go
new file mode 100644 (file)
index 0000000..98d8e8d
--- /dev/null
@@ -0,0 +1,46 @@
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+    "time"
+    //~ "./brain"
+    pubsub "github.com/tuxychandru/pubsub"
+    "container/ring"
+    )
+
+type SomethingReallyIsMoving struct {
+    Movement bool
+    Ts int64
+}
+
+
+func MetaEventRoutine_Movement(ps *pubsub.PubSub, granularity, gran_duration int , threshold uint32) {
+    var last_movement int64
+    movement_window := ring.New(granularity+1)
+    events_chan := ps.Sub("movement")
+    myticker := time.NewTicker(time.Duration(gran_duration) * time.Second)
+    
+    for { select {
+        case event := <- events_chan:
+            switch event.(type) {
+                case MovementSensorUpdate:
+                    movement_window.Value =  (uint32) (movement_window.Value.(uint32)  + 1)
+            }
+        case <- myticker.C:
+            movement_window.Prev().Value = (uint32) (0)
+            movement_window = movement_window.Next()
+            var movsum uint32 = 0
+            movement_window.Do(func(v interface{}){if v != nil {movsum += v.(uint32)}})
+            ts :=  time.Now().Unix()
+            if movsum > threshold {
+                ps.Pub( SomethingReallyIsMoving{true,ts}, "movement")
+                last_movement = ts
+            }
+            
+            if last_movement > 0 && ts - last_movement < 3600*6 && ts - last_movement > 3600*3 {
+                last_movement = 0
+                ps.Pub( SomethingReallyIsMoving{false, ts}, "movement")
+            }
+    } }
+}
\ No newline at end of file
diff --git a/go/r3-eventbroker_zmq/presence.go b/go/r3-eventbroker_zmq/presence.go
new file mode 100644 (file)
index 0000000..953af11
--- /dev/null
@@ -0,0 +1,72 @@
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+    "time"
+    //~ "./brain"
+    pubsub "github.com/tuxychandru/pubsub"
+    )
+
+type PresenceUpdate struct {
+    Present bool
+    Ts int64
+}
+
+type doorstate struct {
+    locked bool
+    shut bool
+}
+
+func MetaEventRoutine_Presence(ps *pubsub.PubSub) {
+    //~ var last_door_cmd *DoorCommandEvent
+    var last_presence bool
+    var last_movement, last_buttonpress int64
+    doorstatemap := make(map[byte]doorstate,1)
+
+    events_chan := ps.Sub("door", "doorcmd", "buttons", "movement")
+    
+    for event := range(events_chan) {
+        new_presence := last_presence
+        ts := time.Now().Unix()
+        switch evnt := event.(type) {
+            case SomethingReallyIsMoving:
+                if evnt.Movement {
+                    last_movement = evnt.Ts
+                } else {
+                    last_movement = 0
+                }
+            case ButtonPressUpdate:
+                last_buttonpress = evnt.Ts
+                new_presence = true
+            //~ case DoorCommandEvent:
+                //~ last_door_cmd = &evnt
+            case DoorLockUpdate:
+                doorstatemap[evnt.DoorID]=doorstate{locked:evnt.Locked, shut:doorstatemap[evnt.DoorID].shut}
+            case DoorAjarUpdate:
+                doorstatemap[evnt.DoorID]=doorstate{locked:doorstatemap[evnt.DoorID].locked, shut:evnt.Shut}
+        }
+        
+        any_door_unlocked := false
+        any_door_ajar := false
+        for _, ds := range(doorstatemap) {
+            if ds.locked == false {any_door_unlocked = true }
+            if ds.shut == false {any_door_ajar = true }
+        }
+        
+        if new_presence != last_presence {
+            //... skip state check .. we had a definite presence event
+        } else if any_door_unlocked || any_door_ajar {
+            new_presence = true
+        } else if last_movement != 0 || ts - last_buttonpress < 200 {
+            new_presence = true
+        } else {
+            new_presence = false
+        }
+        
+        if new_presence != last_presence {
+            last_presence = new_presence
+            ps.Pub(PresenceUpdate{new_presence, ts} , "presence")
+        }
+    }
+}
\ No newline at end of file
index f2e7a20..84dff4a 100644 (file)
@@ -10,6 +10,7 @@ import (
     "encoding/json"
     pubsub "github.com/tuxychandru/pubsub"
     zmq "github.com/vaughan0/go-zmq"    
+    "log"
     )
 
 var (
@@ -26,13 +27,6 @@ var (
 )
 
 
-type PresenceUpdate struct {
-    Present bool
-    Ts int64
-}
-func (s PresenceUpdate) Serialize() string
-
-
 type DoorLockUpdate struct {
     DoorID byte
     Locked bool
@@ -100,12 +94,13 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z
     var tidbit interface{}
     ts := time.Now().Unix()
     if len(lines) < 1 { return }
+    log.Print("ParseSocketInputLine",string(lines[0]))
     switch string(lines[0]) {
         case "State:":
-            if len(lines) < 2 { continue }
+            if len(lines) < 2 { return }
             parseSocketInputLine_State(lines[1:], ps, ts)
         case "Status:":
-            if len(lines) < 3 { continue }
+            if len(lines) < 3 { return }
             tidbit = DoorLockUpdate{0, string(lines[1]) == "closed", ts}
             //~ brn.Oboite("door", tidbit)
             ps.Pub(tidbit, "door")
@@ -113,23 +108,21 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z
             //~ brn.Oboite("door", tidbit)
             ps.Pub(tidbit, "door")            
         case "Info(card):":
-            if len(lines) < 3 { continue }
-            if string(lines[2]) != "found" {
-                continue
-            }
+            if len(lines) < 3 { return }
+            if string(lines[2]) != "found" { return }
             match_cardid := re_cardid_.FindSubmatch(lines[1])
             if len(match_cardid) > 1 {
                 // PreCondition: same thread/goroutinge as created keylookup_socket !!!!
-                nick, err := keylookup_socket.LookupCardIdNick(match_cardid[1])
+                nick, err := LookupCardIdNick(keylookup_socket, match_cardid[1])
                 if err != nil {
                     Syslog_.Print("CardID Lookup Error",err)
-                    nick := "Unresolvable KeyID"
+                    nick = "Unresolvable KeyID"
                 }
                 // new event: toggle by user nick using card
                 ps.Pub(DoorCommandEvent{"toggle", "Card", nick, ts},"doorcmd")
             }
         case "Info(ajar):":
-            if len(lines) < 5 { continue }
+            if len(lines) < 5 { return }
             tidbit = DoorAjarUpdate{0, string(lines[4]) == "shut", ts}
             //~ brn.Oboite("door", tidbit)
             ps.Pub(tidbit, "door")                    
index 55aa201..09720e1 100644 (file)
@@ -78,7 +78,7 @@ func zmqsHandleError(chans *zmq.Channels) {
     }
 }
 
-func (sock *zmq.Socket) ZmqsRequestAnswer(request [][]byte) (answer [][]byte) {
+func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) {
     if err := sock.Send(request); err != nil {
         panic(err)
     }
@@ -89,18 +89,18 @@ func (sock *zmq.Socket) ZmqsRequestAnswer(request [][]byte) (answer [][]byte) {
     return parts
 }
 
-func (s *zmq.Socket) LookupCardIdNick(hexbytes []byte) (nick string, error) {
-    answ := s.ZmqsRequestAnswer([][]byte{hexbytes})
+func LookupCardIdNick(s *zmq.Socket, hexbytes []byte) (string, error) {
+    answ := ZmqsRequestAnswer(s, [][]byte{hexbytes})
     if len(answ) == 0 {
         return "", errors.New("Empty reply received")
     }    
-    if answ[0] == []byte("ERROR") {
+    if bytes.Compare(answ[0], []byte("ERROR")) == 0 {
         return "", errors.New(string(bytes.Join(answ[1:],[]byte(" "))))
     }
-    if answ[0] !=  []byte("RESULT") || len(answ) != 3{
+    if bytes.Compare(answ[0], []byte("RESULT")) != 0 || len(answ) != 3{
         return "", errors.New("Unknown reply received")
     }
-    if answ[1] !=  hexbytes {
+    if bytes.Compare(answ[1], hexbytes) != 0 {
         return "", errors.New("Wrong reply received")
     }
     return string(answ[2]), nil