brain remembers latest events (note design-problem: does not distinguish between...
authorBernhard Tittelbach <xro@realraum.at>
Thu, 3 Oct 2013 21:59:06 +0000 (21:59 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Thu, 3 Oct 2013 21:59:06 +0000 (21:59 +0000)
go/r3-eventbroker_zmq/main.go
go/r3-eventbroker_zmq/metamovement.go
go/r3-eventbroker_zmq/presence.go
go/r3-eventbroker_zmq/r3events/marshal_events.go
go/r3-eventbroker_zmq/smallbrain.go [new file with mode: 0644]
go/r3-eventbroker_zmq/zeromq.go
go/r3-netstatus/main.go
go/r3-netstatus/r3xmppbot/r3xmppbot.go
go/r3-netstatus/zeromq.go
tuer_presence.initscript

index 8ab1858..447ac1c 100644 (file)
@@ -28,6 +28,7 @@ var (
     sensorssub_port_ string
     pub_port_ string
     keylookup_addr_ string
+    brain_listen_addr_ string
     use_syslog_ bool
     Syslog_ *log.Logger
 )
@@ -42,6 +43,7 @@ func init() {
     flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://*:4243", "zmq public/listen socket addr for incoming sensor data")
     flag.StringVar(&pub_port_, "pubport", "tcp://*:4244", "zmq port publishing consodilated events")
     flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups")
+    flag.StringVar(&brain_listen_addr_, "brainlisten", "tcp://*:4245", "address to listen for requests about latest stored event")
     flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local2 facility")
     flag.Usage = usage
     flag.Parse()
@@ -56,6 +58,7 @@ func main() {
     if sub_in_chans == nil || pub_out_socket == nil || keylookup_socket == nil {
         panic("zmq sockets must not be nil !!")
     }
+
     if use_syslog_ {
         var logerr error
         Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0)
@@ -65,14 +68,17 @@ func main() {
         defer Syslog_.Print("exiting")
     }
 
-    ps := pubsub.New(3)
-    defer ps.Shutdown()
+    ps := pubsub.New(10)
+    defer ps.Shutdown() // ps.Shutdown should be called before zmq_ctx.Close(), since it will cause goroutines to shutdown and close zqm_sockets which is needed for zmq_ctx.Close() to return
     //~ ticker := time.NewTicker(time.Duration(5) * time.Minute)
-    publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
+
+    store_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
+    go BrainCenter(zmqctx, brain_listen_addr_, store_these_events_chan)
 
     go MetaEventRoutine_Movement(ps, 10, 20, 10)
     go MetaEventRoutine_Presence(ps)
 
+    publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
     for {
         select {
             case subin := <- sub_in_chans.In():
index d908758..7210e6d 100644 (file)
@@ -15,6 +15,7 @@ func MetaEventRoutine_Movement(ps *pubsub.PubSub, granularity, gran_duration int
     var last_movement int64
     movement_window := ring.New(granularity+1)
     events_chan := ps.Sub("movement")
+    defer ps.Unsub(events_chan, "movement")
     myticker := time.NewTicker(time.Duration(gran_duration) * time.Second)
 
     for { select {
index aac99f5..be7b12f 100644 (file)
@@ -7,6 +7,7 @@ import (
     //~ "./brain"
     pubsub "github.com/tuxychandru/pubsub"
     "./r3events"
+    //~ "log"
     )
 
 type doorstate struct {
@@ -21,8 +22,10 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) {
     doorstatemap := make(map[int]doorstate,1)
 
     events_chan := ps.Sub("door", "doorcmd", "buttons", "movement")
+    defer ps.Unsub(events_chan, "door", "doorcmd", "buttons", "movement")
 
     for event := range(events_chan) {
+        //~ log.Printf("Presence: %s - %s", event, doorstatemap)
         new_presence := last_presence
         ts := time.Now().Unix()
         switch evnt := event.(type) {
@@ -59,7 +62,7 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) {
         } else {
             new_presence = false
         }
-
+        //~ log.Printf("Presence: new: %s , last:%s", new_presence, last_presence)
         if new_presence != last_presence {
             last_presence = new_presence
             ps.Pub(r3events.PresenceUpdate{new_presence, ts} , "presence")
index d00d462..dd23d53 100644 (file)
@@ -9,17 +9,20 @@ import (
     "strings"
     )
 
+func NameOfStruct(evi interface{}) (name string) {
+    etype := fmt.Sprintf("%T", evi)
+    etype_lastsep := strings.LastIndex(etype,".")
+    return etype[etype_lastsep+1:] //works in all cases for etype_lastsep in range -1 to len(etype)-1
+}
 
 func MarshalEvent2ByteByte(event_interface interface{}) (data [][]byte, err error) {
     var msg []byte
-    fmt.Printf("%T%+v\n", event_interface, event_interface)
+    //~ fmt.Printf("%T%+v\n", event_interface, event_interface)
        msg, err = json.Marshal(event_interface)
        if err != nil {
                return
        }
-    etype := fmt.Sprintf("%T", event_interface)
-    etype_lastsep := strings.LastIndex(etype,".")
-    data = [][]byte{[]byte(etype[etype_lastsep+1:]), msg} //works in all cases for etype_lastsep in range -1 to len(etype)-1
+    data = [][]byte{[]byte(NameOfStruct(event_interface)), msg} //works in all cases for etype_lastsep in range -1 to len(etype)-1
     return
 }
 
diff --git a/go/r3-eventbroker_zmq/smallbrain.go b/go/r3-eventbroker_zmq/smallbrain.go
new file mode 100644 (file)
index 0000000..0996d78
--- /dev/null
@@ -0,0 +1,41 @@
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+    zmq "github.com/vaughan0/go-zmq"
+    "./r3events"
+    "log"
+)
+
+type hippocampus map[string]interface{}
+
+func BrainCenter( zmq_ctx *zmq.Context, listen_addr string, event_chan <- chan interface{} ) {
+    zbrain_chans, err := ZmqsBindNewReplySocket(zmq_ctx, listen_addr)
+    if err != nil { panic(err) }
+    defer zbrain_chans.Close()
+    h := make(hippocampus,5)
+
+    for { select {
+        case event, ec_still_open := <- event_chan:
+            if ! ec_still_open { return }
+            h[r3events.NameOfStruct(event)] = event
+            log.Printf("Brain: stored %s, %s", r3events.NameOfStruct(event), event)
+
+        case brain_request := <- zbrain_chans.In():
+            if len(brain_request) == 0 { continue }
+            requested_eventname := string(brain_request[0])
+            log.Printf("Brain: received request: %s", requested_eventname)
+            retr_event, is_in_map := h[requested_eventname]
+            if is_in_map {
+                data, err := r3events.MarshalEvent2ByteByte(retr_event)
+                if err == nil {
+                    zbrain_chans.Out() <- data
+                    continue
+                } else {
+                    if Syslog_ != nil {Syslog_.Print("BrainCenter", err)}
+                }
+            }
+            zbrain_chans.Out() <- [][]byte{[]byte("UNKNOWN")}
+    } }
+}
index e13e6f8..81952c9 100644 (file)
@@ -80,6 +80,24 @@ func zmqsHandleError(chans *zmq.Channels) {
     }
 }
 
+func ZmqsBindNewReplySocket(ctx *zmq.Context, addr string) (chans *zmq.Channels, err error) {
+    if len(addr) == 0 {
+        return nil, errors.New("No listen address given")
+    }
+    sock, err := ctx.Socket(zmq.Rep)
+    if err != nil { return nil, err}
+
+    if err = sock.Bind(addr); err != nil {
+        sock.Close()
+        return nil, err
+    }
+
+    chans = sock.ChannelsBuffer(10)
+    go zmqsHandleError(chans)
+
+    return chans, nil
+}
+
 func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) {
     if err := sock.Send(request); err != nil {
         panic(err)
index ff94189..f387789 100644 (file)
@@ -26,6 +26,7 @@ var (
     xmpp_state_save_dir_ string
     r3eventssub_port_ string
     button_press_timeout_ int64 = 3600
+    brain_connect_addr_ string
 )
 
 //-------
@@ -36,6 +37,7 @@ func init() {
     flag.StringVar(&xmpp_bot_authstring_, "xbotauth", "", "String that user use to authenticate themselves to the bot")
     flag.StringVar(&xmpp_state_save_dir_,"xstatedir","/flash/var/lib/r3netstatus/",  "Directory to save XMPP bot state in")
     flag.StringVar(&r3eventssub_port_, "eventsubport", "tcp://wuzzler.realraum.at:4244", "zmq address to subscribe r3events")
+    flag.StringVar(&brain_connect_addr_, "brainconnect", "tcp://wuzzler.realraum.at:4245", "address to ask about most recent stored events")
     flag.Parse()
 }
 
@@ -123,6 +125,7 @@ func main() {
     var xmpperr error
     var bot *r3xmppbot.XmppBot
     bot, xmpp_presence_events_chan_, xmpperr = r3xmppbot.NewStartedBot(xmpp_login_.jid, xmpp_login_.pass, xmpp_bot_authstring_, xmpp_state_save_dir_, true)
+
     ps := pubsub.New(1)
     defer ps.Shutdown()
     //~ brn := brain.New()
@@ -137,8 +140,15 @@ func main() {
         fmt.Println("XMPP Bot disabled")
     }
 
-    ticker := time.NewTicker(time.Duration(7) * time.Minute)
+    // --- get update on most recent events ---
+    answ := ZmqsAskQuestionsAndClose(zmqctx, brain_connect_addr_, [][][]byte{[][]byte{[]byte("DoorLockUpdate")}, [][]byte{[]byte("DoorAjarUpdate")}, [][]byte{[]byte("DoorCommandEvent")}, [][]byte{[]byte("PresenceUpdate")}, [][]byte{[]byte("IlluminationSensorUpdate")}, [][]byte{[]byte("TempSensorUpdate")}})
+    for _, a := range(answ) {
+        //~ fmt.Println("recent event:", a)
+        ParseZMQr3Event(a, ps)
+    }
 
+    // --- receive and distribute events ---
+    ticker := time.NewTicker(time.Duration(7) * time.Minute)
     for {
     select {
         case e := <-zmqsub.In():
index 3e6f210..9963485 100644 (file)
@@ -12,16 +12,16 @@ import (
     "path"
 )
 
-//~ type StdLogger struct {
-//~ }
+type StdLogger struct {
+}
 
-//~ func (s *StdLogger) Log(v ...interface{}) {
-        //~ log.Println(v...)
-//~ }
+func (s *StdLogger) Log(v ...interface{}) {
+        log.Println(v...)
+}
 
-//~ func (s *StdLogger) Logf(fmt string, v ...interface{}) {
-        //~ log.Printf(fmt, v...)
-//~ }
+func (s *StdLogger) Logf(fmt string, v ...interface{}) {
+        log.Printf(fmt, v...)
+}
 
 
 func (botdata *XmppBot) makeXMPPMessage(to string, message interface{}, subject interface{}) *xmpp.Message {
index 38d1732..86f4206 100644 (file)
@@ -45,3 +45,34 @@ func zmqsHandleError(chans *zmq.Channels) {
         panic(error)
     }
 }
+
+func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) {
+    if err := sock.Send(request); err != nil {
+        panic(err)
+    }
+    parts, err := sock.Recv()
+    if err != nil {
+        panic(err)
+    }
+    return parts
+}
+
+func ZmqsAskQuestionsAndClose(ctx *zmq.Context, addr string, questions [][][]byte) [][][]byte {
+    if len(addr) == 0 || ctx == nil { return nil }
+
+    req_sock, err := ctx.Socket(zmq.Req)
+    if err != nil {
+        return nil
+    }
+    defer req_sock.Close()
+
+    if err = req_sock.Connect(addr); err != nil {
+        return nil
+    }
+
+    rv := make([][][]byte, len(questions))
+    for index, q := range(questions) {
+        rv[index] = ZmqsRequestAnswer(req_sock, q)
+    }
+    return rv
+}
\ No newline at end of file
index 6ef6d49..0972666 100755 (executable)
@@ -9,13 +9,12 @@
 # Default-Stop:      0 1 6
 ### END INIT INFO
 
-EXE_TRACK=/flash/tuer/track-presence.py
-
-CFG_TRACK=/flash/tuer/track-presence.cfg
+EXE_TRACK=/flash/tuer/r3-eventbroker_zmq
+CFG_TRACK=""
 
 . /etc/default/tuer
 
-PIDFILE_TRACK=${DIR_RUN}/track-presence.pid
+PIDFILE_TRACK=${DIR_RUN}/r3-eventbroker_zmq.pid
 test -f $EXE_TRACK  || exit 1
 if [ ! -d $DIR_RUN ]; then
        mkdir -p $DIR_RUN || exit 2
@@ -30,12 +29,12 @@ fi
 
 case "$1" in
 start) 
-       log_daemon_msg "Starting door daemon" "track-presence"
-        start-stop-daemon --start --quiet --pidfile $PIDFILE_TRACK -b -m -c $DOOR_USR --startas $EXE_TRACK  -- $CFG_TRACK
+       log_daemon_msg "Starting r3 event broker" "r3-eventbroker_zmq"
+        start-stop-daemon --start --quiet --pidfile $PIDFILE_TRACK -b -m -c $DOOR_USR -g $DOOR_GRP --startas $EXE_TRACK  -- $CFG_TRACK
         log_end_msg $? 
        ;;
 stop)
-       log_daemon_msg "Stopping door daemon" "track-presence"
+       log_daemon_msg "Stopping r3 event broker" "r3-eventbroker_zmq"
         start-stop-daemon --stop --quiet --pidfile $PIDFILE_TRACK -m --retry TERM/1/TERM/1/KILL 
         log_end_msg $? 
         ;;