type export advancement
authorBernhard Tittelbach <xro@realraum.at>
Thu, 3 Oct 2013 00:29:13 +0000 (00:29 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Thu, 3 Oct 2013 00:29:13 +0000 (00:29 +0000)
go/r3-eventbroker_zmq/main.go
go/r3-eventbroker_zmq/make_deploy.zsh
go/r3-eventbroker_zmq/metamovement.go
go/r3-eventbroker_zmq/presence.go
go/r3-eventbroker_zmq/sockettoevent.go
go/r3-eventbroker_zmq/zeromq.go

index 6d1a5cd..af67170 100644 (file)
@@ -48,15 +48,13 @@ func init() {
 
 func main() {
     zmqctx, sub_in_chans, pub_out_socket, keylookup_socket := ZmqsInit(doorsub_addr_, sensorssub_port_, pub_port_, keylookup_addr_)
+    if sub_in_chans != nil {defer sub_in_chans.Close()}
     defer zmqctx.Close()
-    defer sub_in_chans.Close()
-    defer pub_out_socket.Close()
-    defer keylookup_socket.Close()
-    
+    if pub_out_socket != nil {defer pub_out_socket.Close()}
+    if keylookup_socket != nil {defer keylookup_socket.Close()}
     if sub_in_chans == nil || pub_out_socket == nil || keylookup_socket == nil {
         panic("zmq sockets must not be nil !!")
     }
-    
     if use_syslog_ {
         var logerr error
         Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0)
@@ -65,22 +63,15 @@ func main() {
         Syslog_.Print("started")
         defer Syslog_.Print("exiting")
     }
-    
+
     ps := pubsub.New(3)
     //~ ticker := time.NewTicker(time.Duration(5) * time.Minute)
     publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
-    
+
     go MetaEventRoutine_Movement(ps, 10, 20, 10)
     go MetaEventRoutine_Presence(ps)
-    
-    for xx := range(sub_in_chans.In()) {
-        log.Print(xx)
-        Syslog_.Print(xx)
-    }
-    
-    
+
     for {
-        log.Print("for loop")
         select {
             case subin := <- sub_in_chans.In():
                 ParseSocketInputLine(subin, ps, keylookup_socket)
@@ -88,9 +79,9 @@ func main() {
                 //~ MakeTimeTick(ps)
             case event_interface := <- publish_these_events_chan:
                 data, err := FormatEventForSocket(event_interface)
-                log.Print("publishing", data)
+                log.Printf("publishing %s",data)
                 if err != nil {
-                    Syslog_.Print(err)
+                    if Syslog_ != nil {Syslog_.Print(err)}
                     continue
                 }
                 if err := pub_out_socket.Send(data); err != nil {
@@ -99,6 +90,4 @@ func main() {
         }
     }
 
-    //~ nick, err := keylookup_socket.LookupCardIdNick(keyhexid)
-    
 }
index a890077..746215b 100644 (file)
@@ -1,7 +1,7 @@
 #!/bin/zsh
 export GO386=387
 export CGO_ENABLED=1
-go clean
+go-linux-386 clean
 #go-linux-386 build
 #strip ${PWD:t}
-go build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/
+go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/
index 98d8e8d..71aa209 100644 (file)
@@ -20,7 +20,7 @@ func MetaEventRoutine_Movement(ps *pubsub.PubSub, granularity, gran_duration int
     movement_window := ring.New(granularity+1)
     events_chan := ps.Sub("movement")
     myticker := time.NewTicker(time.Duration(gran_duration) * time.Second)
-    
+
     for { select {
         case event := <- events_chan:
             switch event.(type) {
@@ -37,7 +37,7 @@ func MetaEventRoutine_Movement(ps *pubsub.PubSub, granularity, gran_duration int
                 ps.Pub( SomethingReallyIsMoving{true,ts}, "movement")
                 last_movement = ts
             }
-            
+
             if last_movement > 0 && ts - last_movement < 3600*6 && ts - last_movement > 3600*3 {
                 last_movement = 0
                 ps.Pub( SomethingReallyIsMoving{false, ts}, "movement")
index 953af11..154c6d2 100644 (file)
@@ -22,10 +22,10 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) {
     //~ var last_door_cmd *DoorCommandEvent
     var last_presence bool
     var last_movement, last_buttonpress int64
-    doorstatemap := make(map[byte]doorstate,1)
+    doorstatemap := make(map[int]doorstate,1)
 
     events_chan := ps.Sub("door", "doorcmd", "buttons", "movement")
-    
+
     for event := range(events_chan) {
         new_presence := last_presence
         ts := time.Now().Unix()
@@ -46,14 +46,14 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) {
             case DoorAjarUpdate:
                 doorstatemap[evnt.DoorID]=doorstate{locked:doorstatemap[evnt.DoorID].locked, shut:evnt.Shut}
         }
-        
+
         any_door_unlocked := false
         any_door_ajar := false
         for _, ds := range(doorstatemap) {
             if ds.locked == false {any_door_unlocked = true }
             if ds.shut == false {any_door_ajar = true }
         }
-        
+
         if new_presence != last_presence {
             //... skip state check .. we had a definite presence event
         } else if any_door_unlocked || any_door_ajar {
@@ -63,7 +63,7 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) {
         } else {
             new_presence = false
         }
-        
+
         if new_presence != last_presence {
             last_presence = new_presence
             ps.Pub(PresenceUpdate{new_presence, ts} , "presence")
index 84dff4a..6664a97 100644 (file)
@@ -9,8 +9,9 @@ import (
     //~ "./brain"
     "encoding/json"
     pubsub "github.com/tuxychandru/pubsub"
-    zmq "github.com/vaughan0/go-zmq"    
+    zmq "github.com/vaughan0/go-zmq"
     "log"
+    "fmt"
     )
 
 var (
@@ -28,13 +29,13 @@ var (
 
 
 type DoorLockUpdate struct {
-    DoorID byte
+    DoorID int
     Locked bool
     Ts int64
 }
 
 type DoorAjarUpdate struct {
-    DoorID byte
+    DoorID int
     Shut bool
     Ts int64
 }
@@ -85,7 +86,7 @@ func parseSocketInputLine_State(lines [][]byte, ps *pubsub.PubSub, ts int64) {
         case "timeout":   //after open | after close
         case "opening":
         case "closing":
-        default:    
+        default:
     }
 }
 
@@ -94,7 +95,7 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z
     var tidbit interface{}
     ts := time.Now().Unix()
     if len(lines) < 1 { return }
-    log.Print("ParseSocketInputLine",string(lines[0]))
+    log.Printf("ParseSocketInputLine: %s %s",string(lines[0]), lines[1:])
     switch string(lines[0]) {
         case "State:":
             if len(lines) < 2 { return }
@@ -106,7 +107,7 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z
             ps.Pub(tidbit, "door")
             tidbit = DoorAjarUpdate{0, string(lines[len(lines)-2]) == "shut", ts}
             //~ brn.Oboite("door", tidbit)
-            ps.Pub(tidbit, "door")            
+            ps.Pub(tidbit, "door")
         case "Info(card):":
             if len(lines) < 3 { return }
             if string(lines[2]) != "found" { return }
@@ -125,7 +126,7 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z
             if len(lines) < 5 { return }
             tidbit = DoorAjarUpdate{0, string(lines[4]) == "shut", ts}
             //~ brn.Oboite("door", tidbit)
-            ps.Pub(tidbit, "door")                    
+            ps.Pub(tidbit, "door")
         case "open", "close", "toggle", "reset":
             ps.Pub(DoorCommandEvent{string(lines[0]), string(lines[1]), string(lines[2]), ts},"doorcmd")
         case "photo0":
@@ -142,11 +143,15 @@ func MakeTimeTick(ps *pubsub.PubSub) {
 }
 
 func FormatEventForSocket(event_interface interface{}) (data [][]byte, err error) {
-       msg, err := json.Marshal(data)
+    var msg []byte
+    fmt.Printf("%T%+v\n", event_interface, event_interface)
+    etype := fmt.Sprintf("%T", event_interface)[5:]
+       msg, err = json.Marshal(map[string]interface{}{etype: event_interface})
        if err != nil {
                return
        }
-    return [][]byte{msg}, nil
+    data = [][]byte{msg}
+    return
 }
 
     //~ match_presence := re_presence_.FindStringSubmatch(line)
index 09720e1..e13e6f8 100644 (file)
@@ -26,6 +26,8 @@ func ZmqsInit(sub_connect_port, sub_listen_port, pub_port, keylookup_port string
         }
         defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }()
 
+        sub_sock.Subscribe([]byte{}) //subscribe empty filter -> aka to all messages
+
            if err = sub_sock.Bind(sub_listen_port); err != nil {
             panic(err)
         }
@@ -41,7 +43,7 @@ func ZmqsInit(sub_connect_port, sub_listen_port, pub_port, keylookup_port string
     }
 
     if len(pub_port) > 0 {
-        pub_sock, err := ctx.Socket(zmq.Pub)
+        pub_sock, err = ctx.Socket(zmq.Pub)
         if err != nil {
             panic(err)
         }
@@ -55,7 +57,7 @@ func ZmqsInit(sub_connect_port, sub_listen_port, pub_port, keylookup_port string
     }
 
     if len(keylookup_port) > 0 {
-        keylookup_sock, err := ctx.Socket(zmq.Req)
+        keylookup_sock, err = ctx.Socket(zmq.Req)
         if err != nil {
             panic(err)
         }
@@ -93,7 +95,7 @@ func LookupCardIdNick(s *zmq.Socket, hexbytes []byte) (string, error) {
     answ := ZmqsRequestAnswer(s, [][]byte{hexbytes})
     if len(answ) == 0 {
         return "", errors.New("Empty reply received")
-    }    
+    }
     if bytes.Compare(answ[0], []byte("ERROR")) == 0 {
         return "", errors.New(string(bytes.Join(answ[1:],[]byte(" "))))
     }