From ff98cdc53bc87b9d4f3215df248c188a84cb2fb1 Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Thu, 3 Oct 2013 21:59:06 +0000 Subject: [PATCH] brain remembers latest events (note design-problem: does not distinguish between e.g. door ids) --- go/r3-eventbroker_zmq/main.go | 12 +++++-- go/r3-eventbroker_zmq/metamovement.go | 1 + go/r3-eventbroker_zmq/presence.go | 5 ++- go/r3-eventbroker_zmq/r3events/marshal_events.go | 11 +++--- go/r3-eventbroker_zmq/smallbrain.go | 41 ++++++++++++++++++++++ go/r3-eventbroker_zmq/zeromq.go | 18 ++++++++++ go/r3-netstatus/main.go | 12 ++++++- go/r3-netstatus/r3xmppbot/r3xmppbot.go | 16 ++++----- go/r3-netstatus/zeromq.go | 31 ++++++++++++++++ tuer_presence.initscript | 13 ++++--- 10 files changed, 136 insertions(+), 24 deletions(-) create mode 100644 go/r3-eventbroker_zmq/smallbrain.go diff --git a/go/r3-eventbroker_zmq/main.go b/go/r3-eventbroker_zmq/main.go index 8ab1858..447ac1c 100644 --- a/go/r3-eventbroker_zmq/main.go +++ b/go/r3-eventbroker_zmq/main.go @@ -28,6 +28,7 @@ var ( sensorssub_port_ string pub_port_ string keylookup_addr_ string + brain_listen_addr_ string use_syslog_ bool Syslog_ *log.Logger ) @@ -42,6 +43,7 @@ func init() { flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://*:4243", "zmq public/listen socket addr for incoming sensor data") flag.StringVar(&pub_port_, "pubport", "tcp://*:4244", "zmq port publishing consodilated events") flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups") + flag.StringVar(&brain_listen_addr_, "brainlisten", "tcp://*:4245", "address to listen for requests about latest stored event") flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local2 facility") flag.Usage = usage flag.Parse() @@ -56,6 +58,7 @@ func main() { if sub_in_chans == nil || pub_out_socket == nil || keylookup_socket == nil { panic("zmq sockets must not be nil !!") } + if use_syslog_ { var logerr error Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0) @@ -65,14 +68,17 @@ func main() { defer Syslog_.Print("exiting") } - ps := pubsub.New(3) - defer ps.Shutdown() + ps := pubsub.New(10) + defer ps.Shutdown() // ps.Shutdown should be called before zmq_ctx.Close(), since it will cause goroutines to shutdown and close zqm_sockets which is needed for zmq_ctx.Close() to return //~ ticker := time.NewTicker(time.Duration(5) * time.Minute) - publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement") + + store_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement") + go BrainCenter(zmqctx, brain_listen_addr_, store_these_events_chan) go MetaEventRoutine_Movement(ps, 10, 20, 10) go MetaEventRoutine_Presence(ps) + publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement") for { select { case subin := <- sub_in_chans.In(): diff --git a/go/r3-eventbroker_zmq/metamovement.go b/go/r3-eventbroker_zmq/metamovement.go index d908758..7210e6d 100644 --- a/go/r3-eventbroker_zmq/metamovement.go +++ b/go/r3-eventbroker_zmq/metamovement.go @@ -15,6 +15,7 @@ func MetaEventRoutine_Movement(ps *pubsub.PubSub, granularity, gran_duration int var last_movement int64 movement_window := ring.New(granularity+1) events_chan := ps.Sub("movement") + defer ps.Unsub(events_chan, "movement") myticker := time.NewTicker(time.Duration(gran_duration) * time.Second) for { select { diff --git a/go/r3-eventbroker_zmq/presence.go b/go/r3-eventbroker_zmq/presence.go index aac99f5..be7b12f 100644 --- a/go/r3-eventbroker_zmq/presence.go +++ b/go/r3-eventbroker_zmq/presence.go @@ -7,6 +7,7 @@ import ( //~ "./brain" pubsub "github.com/tuxychandru/pubsub" "./r3events" + //~ "log" ) type doorstate struct { @@ -21,8 +22,10 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) { doorstatemap := make(map[int]doorstate,1) events_chan := ps.Sub("door", "doorcmd", "buttons", "movement") + defer ps.Unsub(events_chan, "door", "doorcmd", "buttons", "movement") for event := range(events_chan) { + //~ log.Printf("Presence: %s - %s", event, doorstatemap) new_presence := last_presence ts := time.Now().Unix() switch evnt := event.(type) { @@ -59,7 +62,7 @@ func MetaEventRoutine_Presence(ps *pubsub.PubSub) { } else { new_presence = false } - + //~ log.Printf("Presence: new: %s , last:%s", new_presence, last_presence) if new_presence != last_presence { last_presence = new_presence ps.Pub(r3events.PresenceUpdate{new_presence, ts} , "presence") diff --git a/go/r3-eventbroker_zmq/r3events/marshal_events.go b/go/r3-eventbroker_zmq/r3events/marshal_events.go index d00d462..dd23d53 100644 --- a/go/r3-eventbroker_zmq/r3events/marshal_events.go +++ b/go/r3-eventbroker_zmq/r3events/marshal_events.go @@ -9,17 +9,20 @@ import ( "strings" ) +func NameOfStruct(evi interface{}) (name string) { + etype := fmt.Sprintf("%T", evi) + etype_lastsep := strings.LastIndex(etype,".") + return etype[etype_lastsep+1:] //works in all cases for etype_lastsep in range -1 to len(etype)-1 +} func MarshalEvent2ByteByte(event_interface interface{}) (data [][]byte, err error) { var msg []byte - fmt.Printf("%T%+v\n", event_interface, event_interface) + //~ fmt.Printf("%T%+v\n", event_interface, event_interface) msg, err = json.Marshal(event_interface) if err != nil { return } - etype := fmt.Sprintf("%T", event_interface) - etype_lastsep := strings.LastIndex(etype,".") - data = [][]byte{[]byte(etype[etype_lastsep+1:]), msg} //works in all cases for etype_lastsep in range -1 to len(etype)-1 + data = [][]byte{[]byte(NameOfStruct(event_interface)), msg} //works in all cases for etype_lastsep in range -1 to len(etype)-1 return } diff --git a/go/r3-eventbroker_zmq/smallbrain.go b/go/r3-eventbroker_zmq/smallbrain.go new file mode 100644 index 0000000..0996d78 --- /dev/null +++ b/go/r3-eventbroker_zmq/smallbrain.go @@ -0,0 +1,41 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + zmq "github.com/vaughan0/go-zmq" + "./r3events" + "log" +) + +type hippocampus map[string]interface{} + +func BrainCenter( zmq_ctx *zmq.Context, listen_addr string, event_chan <- chan interface{} ) { + zbrain_chans, err := ZmqsBindNewReplySocket(zmq_ctx, listen_addr) + if err != nil { panic(err) } + defer zbrain_chans.Close() + h := make(hippocampus,5) + + for { select { + case event, ec_still_open := <- event_chan: + if ! ec_still_open { return } + h[r3events.NameOfStruct(event)] = event + log.Printf("Brain: stored %s, %s", r3events.NameOfStruct(event), event) + + case brain_request := <- zbrain_chans.In(): + if len(brain_request) == 0 { continue } + requested_eventname := string(brain_request[0]) + log.Printf("Brain: received request: %s", requested_eventname) + retr_event, is_in_map := h[requested_eventname] + if is_in_map { + data, err := r3events.MarshalEvent2ByteByte(retr_event) + if err == nil { + zbrain_chans.Out() <- data + continue + } else { + if Syslog_ != nil {Syslog_.Print("BrainCenter", err)} + } + } + zbrain_chans.Out() <- [][]byte{[]byte("UNKNOWN")} + } } +} diff --git a/go/r3-eventbroker_zmq/zeromq.go b/go/r3-eventbroker_zmq/zeromq.go index e13e6f8..81952c9 100644 --- a/go/r3-eventbroker_zmq/zeromq.go +++ b/go/r3-eventbroker_zmq/zeromq.go @@ -80,6 +80,24 @@ func zmqsHandleError(chans *zmq.Channels) { } } +func ZmqsBindNewReplySocket(ctx *zmq.Context, addr string) (chans *zmq.Channels, err error) { + if len(addr) == 0 { + return nil, errors.New("No listen address given") + } + sock, err := ctx.Socket(zmq.Rep) + if err != nil { return nil, err} + + if err = sock.Bind(addr); err != nil { + sock.Close() + return nil, err + } + + chans = sock.ChannelsBuffer(10) + go zmqsHandleError(chans) + + return chans, nil +} + func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) { if err := sock.Send(request); err != nil { panic(err) diff --git a/go/r3-netstatus/main.go b/go/r3-netstatus/main.go index ff94189..f387789 100644 --- a/go/r3-netstatus/main.go +++ b/go/r3-netstatus/main.go @@ -26,6 +26,7 @@ var ( xmpp_state_save_dir_ string r3eventssub_port_ string button_press_timeout_ int64 = 3600 + brain_connect_addr_ string ) //------- @@ -36,6 +37,7 @@ func init() { flag.StringVar(&xmpp_bot_authstring_, "xbotauth", "", "String that user use to authenticate themselves to the bot") flag.StringVar(&xmpp_state_save_dir_,"xstatedir","/flash/var/lib/r3netstatus/", "Directory to save XMPP bot state in") flag.StringVar(&r3eventssub_port_, "eventsubport", "tcp://wuzzler.realraum.at:4244", "zmq address to subscribe r3events") + flag.StringVar(&brain_connect_addr_, "brainconnect", "tcp://wuzzler.realraum.at:4245", "address to ask about most recent stored events") flag.Parse() } @@ -123,6 +125,7 @@ func main() { var xmpperr error var bot *r3xmppbot.XmppBot bot, xmpp_presence_events_chan_, xmpperr = r3xmppbot.NewStartedBot(xmpp_login_.jid, xmpp_login_.pass, xmpp_bot_authstring_, xmpp_state_save_dir_, true) + ps := pubsub.New(1) defer ps.Shutdown() //~ brn := brain.New() @@ -137,8 +140,15 @@ func main() { fmt.Println("XMPP Bot disabled") } - ticker := time.NewTicker(time.Duration(7) * time.Minute) + // --- get update on most recent events --- + answ := ZmqsAskQuestionsAndClose(zmqctx, brain_connect_addr_, [][][]byte{[][]byte{[]byte("DoorLockUpdate")}, [][]byte{[]byte("DoorAjarUpdate")}, [][]byte{[]byte("DoorCommandEvent")}, [][]byte{[]byte("PresenceUpdate")}, [][]byte{[]byte("IlluminationSensorUpdate")}, [][]byte{[]byte("TempSensorUpdate")}}) + for _, a := range(answ) { + //~ fmt.Println("recent event:", a) + ParseZMQr3Event(a, ps) + } + // --- receive and distribute events --- + ticker := time.NewTicker(time.Duration(7) * time.Minute) for { select { case e := <-zmqsub.In(): diff --git a/go/r3-netstatus/r3xmppbot/r3xmppbot.go b/go/r3-netstatus/r3xmppbot/r3xmppbot.go index 3e6f210..9963485 100644 --- a/go/r3-netstatus/r3xmppbot/r3xmppbot.go +++ b/go/r3-netstatus/r3xmppbot/r3xmppbot.go @@ -12,16 +12,16 @@ import ( "path" ) -//~ type StdLogger struct { -//~ } +type StdLogger struct { +} -//~ func (s *StdLogger) Log(v ...interface{}) { - //~ log.Println(v...) -//~ } +func (s *StdLogger) Log(v ...interface{}) { + log.Println(v...) +} -//~ func (s *StdLogger) Logf(fmt string, v ...interface{}) { - //~ log.Printf(fmt, v...) -//~ } +func (s *StdLogger) Logf(fmt string, v ...interface{}) { + log.Printf(fmt, v...) +} func (botdata *XmppBot) makeXMPPMessage(to string, message interface{}, subject interface{}) *xmpp.Message { diff --git a/go/r3-netstatus/zeromq.go b/go/r3-netstatus/zeromq.go index 38d1732..86f4206 100644 --- a/go/r3-netstatus/zeromq.go +++ b/go/r3-netstatus/zeromq.go @@ -45,3 +45,34 @@ func zmqsHandleError(chans *zmq.Channels) { panic(error) } } + +func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) { + if err := sock.Send(request); err != nil { + panic(err) + } + parts, err := sock.Recv() + if err != nil { + panic(err) + } + return parts +} + +func ZmqsAskQuestionsAndClose(ctx *zmq.Context, addr string, questions [][][]byte) [][][]byte { + if len(addr) == 0 || ctx == nil { return nil } + + req_sock, err := ctx.Socket(zmq.Req) + if err != nil { + return nil + } + defer req_sock.Close() + + if err = req_sock.Connect(addr); err != nil { + return nil + } + + rv := make([][][]byte, len(questions)) + for index, q := range(questions) { + rv[index] = ZmqsRequestAnswer(req_sock, q) + } + return rv +} \ No newline at end of file diff --git a/tuer_presence.initscript b/tuer_presence.initscript index 6ef6d49..0972666 100755 --- a/tuer_presence.initscript +++ b/tuer_presence.initscript @@ -9,13 +9,12 @@ # Default-Stop: 0 1 6 ### END INIT INFO -EXE_TRACK=/flash/tuer/track-presence.py - -CFG_TRACK=/flash/tuer/track-presence.cfg +EXE_TRACK=/flash/tuer/r3-eventbroker_zmq +CFG_TRACK="" . /etc/default/tuer -PIDFILE_TRACK=${DIR_RUN}/track-presence.pid +PIDFILE_TRACK=${DIR_RUN}/r3-eventbroker_zmq.pid test -f $EXE_TRACK || exit 1 if [ ! -d $DIR_RUN ]; then mkdir -p $DIR_RUN || exit 2 @@ -30,12 +29,12 @@ fi case "$1" in start) - log_daemon_msg "Starting door daemon" "track-presence" - start-stop-daemon --start --quiet --pidfile $PIDFILE_TRACK -b -m -c $DOOR_USR --startas $EXE_TRACK -- $CFG_TRACK + log_daemon_msg "Starting r3 event broker" "r3-eventbroker_zmq" + start-stop-daemon --start --quiet --pidfile $PIDFILE_TRACK -b -m -c $DOOR_USR -g $DOOR_GRP --startas $EXE_TRACK -- $CFG_TRACK log_end_msg $? ;; stop) - log_daemon_msg "Stopping door daemon" "track-presence" + log_daemon_msg "Stopping r3 event broker" "r3-eventbroker_zmq" start-stop-daemon --stop --quiet --pidfile $PIDFILE_TRACK -m --retry TERM/1/TERM/1/KILL log_end_msg $? ;; -- 1.7.10.4