sensorssub_port_ string
pub_port_ string
keylookup_addr_ string
+ brain_listen_addr_ string
use_syslog_ bool
Syslog_ *log.Logger
)
flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://*:4243", "zmq public/listen socket addr for incoming sensor data")
flag.StringVar(&pub_port_, "pubport", "tcp://*:4244", "zmq port publishing consodilated events")
flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups")
+ flag.StringVar(&brain_listen_addr_, "brainlisten", "tcp://*:4245", "address to listen for requests about latest stored event")
flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local2 facility")
flag.Usage = usage
flag.Parse()
if sub_in_chans == nil || pub_out_socket == nil || keylookup_socket == nil {
panic("zmq sockets must not be nil !!")
}
+
if use_syslog_ {
var logerr error
Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0)
defer Syslog_.Print("exiting")
}
- ps := pubsub.New(3)
- defer ps.Shutdown()
+ ps := pubsub.New(10)
+ defer ps.Shutdown() // ps.Shutdown should be called before zmq_ctx.Close(), since it will cause goroutines to shutdown and close zqm_sockets which is needed for zmq_ctx.Close() to return
//~ ticker := time.NewTicker(time.Duration(5) * time.Minute)
- publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
+
+ store_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
+ go BrainCenter(zmqctx, brain_listen_addr_, store_these_events_chan)
go MetaEventRoutine_Movement(ps, 10, 20, 10)
go MetaEventRoutine_Presence(ps)
+ publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
for {
select {
case subin := <- sub_in_chans.In():
var last_movement int64
movement_window := ring.New(granularity+1)
events_chan := ps.Sub("movement")
+ defer ps.Unsub(events_chan, "movement")
myticker := time.NewTicker(time.Duration(gran_duration) * time.Second)
for { select {
//~ "./brain"
pubsub "github.com/tuxychandru/pubsub"
"./r3events"
+ //~ "log"
)
type doorstate struct {
doorstatemap := make(map[int]doorstate,1)
events_chan := ps.Sub("door", "doorcmd", "buttons", "movement")
+ defer ps.Unsub(events_chan, "door", "doorcmd", "buttons", "movement")
for event := range(events_chan) {
+ //~ log.Printf("Presence: %s - %s", event, doorstatemap)
new_presence := last_presence
ts := time.Now().Unix()
switch evnt := event.(type) {
} else {
new_presence = false
}
-
+ //~ log.Printf("Presence: new: %s , last:%s", new_presence, last_presence)
if new_presence != last_presence {
last_presence = new_presence
ps.Pub(r3events.PresenceUpdate{new_presence, ts} , "presence")
"strings"
)
+func NameOfStruct(evi interface{}) (name string) {
+ etype := fmt.Sprintf("%T", evi)
+ etype_lastsep := strings.LastIndex(etype,".")
+ return etype[etype_lastsep+1:] //works in all cases for etype_lastsep in range -1 to len(etype)-1
+}
func MarshalEvent2ByteByte(event_interface interface{}) (data [][]byte, err error) {
var msg []byte
- fmt.Printf("%T%+v\n", event_interface, event_interface)
+ //~ fmt.Printf("%T%+v\n", event_interface, event_interface)
msg, err = json.Marshal(event_interface)
if err != nil {
return
}
- etype := fmt.Sprintf("%T", event_interface)
- etype_lastsep := strings.LastIndex(etype,".")
- data = [][]byte{[]byte(etype[etype_lastsep+1:]), msg} //works in all cases for etype_lastsep in range -1 to len(etype)-1
+ data = [][]byte{[]byte(NameOfStruct(event_interface)), msg} //works in all cases for etype_lastsep in range -1 to len(etype)-1
return
}
--- /dev/null
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+ zmq "github.com/vaughan0/go-zmq"
+ "./r3events"
+ "log"
+)
+
+type hippocampus map[string]interface{}
+
+func BrainCenter( zmq_ctx *zmq.Context, listen_addr string, event_chan <- chan interface{} ) {
+ zbrain_chans, err := ZmqsBindNewReplySocket(zmq_ctx, listen_addr)
+ if err != nil { panic(err) }
+ defer zbrain_chans.Close()
+ h := make(hippocampus,5)
+
+ for { select {
+ case event, ec_still_open := <- event_chan:
+ if ! ec_still_open { return }
+ h[r3events.NameOfStruct(event)] = event
+ log.Printf("Brain: stored %s, %s", r3events.NameOfStruct(event), event)
+
+ case brain_request := <- zbrain_chans.In():
+ if len(brain_request) == 0 { continue }
+ requested_eventname := string(brain_request[0])
+ log.Printf("Brain: received request: %s", requested_eventname)
+ retr_event, is_in_map := h[requested_eventname]
+ if is_in_map {
+ data, err := r3events.MarshalEvent2ByteByte(retr_event)
+ if err == nil {
+ zbrain_chans.Out() <- data
+ continue
+ } else {
+ if Syslog_ != nil {Syslog_.Print("BrainCenter", err)}
+ }
+ }
+ zbrain_chans.Out() <- [][]byte{[]byte("UNKNOWN")}
+ } }
+}
}
}
+func ZmqsBindNewReplySocket(ctx *zmq.Context, addr string) (chans *zmq.Channels, err error) {
+ if len(addr) == 0 {
+ return nil, errors.New("No listen address given")
+ }
+ sock, err := ctx.Socket(zmq.Rep)
+ if err != nil { return nil, err}
+
+ if err = sock.Bind(addr); err != nil {
+ sock.Close()
+ return nil, err
+ }
+
+ chans = sock.ChannelsBuffer(10)
+ go zmqsHandleError(chans)
+
+ return chans, nil
+}
+
func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) {
if err := sock.Send(request); err != nil {
panic(err)
xmpp_state_save_dir_ string
r3eventssub_port_ string
button_press_timeout_ int64 = 3600
+ brain_connect_addr_ string
)
//-------
flag.StringVar(&xmpp_bot_authstring_, "xbotauth", "", "String that user use to authenticate themselves to the bot")
flag.StringVar(&xmpp_state_save_dir_,"xstatedir","/flash/var/lib/r3netstatus/", "Directory to save XMPP bot state in")
flag.StringVar(&r3eventssub_port_, "eventsubport", "tcp://wuzzler.realraum.at:4244", "zmq address to subscribe r3events")
+ flag.StringVar(&brain_connect_addr_, "brainconnect", "tcp://wuzzler.realraum.at:4245", "address to ask about most recent stored events")
flag.Parse()
}
var xmpperr error
var bot *r3xmppbot.XmppBot
bot, xmpp_presence_events_chan_, xmpperr = r3xmppbot.NewStartedBot(xmpp_login_.jid, xmpp_login_.pass, xmpp_bot_authstring_, xmpp_state_save_dir_, true)
+
ps := pubsub.New(1)
defer ps.Shutdown()
//~ brn := brain.New()
fmt.Println("XMPP Bot disabled")
}
- ticker := time.NewTicker(time.Duration(7) * time.Minute)
+ // --- get update on most recent events ---
+ answ := ZmqsAskQuestionsAndClose(zmqctx, brain_connect_addr_, [][][]byte{[][]byte{[]byte("DoorLockUpdate")}, [][]byte{[]byte("DoorAjarUpdate")}, [][]byte{[]byte("DoorCommandEvent")}, [][]byte{[]byte("PresenceUpdate")}, [][]byte{[]byte("IlluminationSensorUpdate")}, [][]byte{[]byte("TempSensorUpdate")}})
+ for _, a := range(answ) {
+ //~ fmt.Println("recent event:", a)
+ ParseZMQr3Event(a, ps)
+ }
+ // --- receive and distribute events ---
+ ticker := time.NewTicker(time.Duration(7) * time.Minute)
for {
select {
case e := <-zmqsub.In():
"path"
)
-//~ type StdLogger struct {
-//~ }
+type StdLogger struct {
+}
-//~ func (s *StdLogger) Log(v ...interface{}) {
- //~ log.Println(v...)
-//~ }
+func (s *StdLogger) Log(v ...interface{}) {
+ log.Println(v...)
+}
-//~ func (s *StdLogger) Logf(fmt string, v ...interface{}) {
- //~ log.Printf(fmt, v...)
-//~ }
+func (s *StdLogger) Logf(fmt string, v ...interface{}) {
+ log.Printf(fmt, v...)
+}
func (botdata *XmppBot) makeXMPPMessage(to string, message interface{}, subject interface{}) *xmpp.Message {
panic(error)
}
}
+
+func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) {
+ if err := sock.Send(request); err != nil {
+ panic(err)
+ }
+ parts, err := sock.Recv()
+ if err != nil {
+ panic(err)
+ }
+ return parts
+}
+
+func ZmqsAskQuestionsAndClose(ctx *zmq.Context, addr string, questions [][][]byte) [][][]byte {
+ if len(addr) == 0 || ctx == nil { return nil }
+
+ req_sock, err := ctx.Socket(zmq.Req)
+ if err != nil {
+ return nil
+ }
+ defer req_sock.Close()
+
+ if err = req_sock.Connect(addr); err != nil {
+ return nil
+ }
+
+ rv := make([][][]byte, len(questions))
+ for index, q := range(questions) {
+ rv[index] = ZmqsRequestAnswer(req_sock, q)
+ }
+ return rv
+}
\ No newline at end of file
# Default-Stop: 0 1 6
### END INIT INFO
-EXE_TRACK=/flash/tuer/track-presence.py
-
-CFG_TRACK=/flash/tuer/track-presence.cfg
+EXE_TRACK=/flash/tuer/r3-eventbroker_zmq
+CFG_TRACK=""
. /etc/default/tuer
-PIDFILE_TRACK=${DIR_RUN}/track-presence.pid
+PIDFILE_TRACK=${DIR_RUN}/r3-eventbroker_zmq.pid
test -f $EXE_TRACK || exit 1
if [ ! -d $DIR_RUN ]; then
mkdir -p $DIR_RUN || exit 2
case "$1" in
start)
- log_daemon_msg "Starting door daemon" "track-presence"
- start-stop-daemon --start --quiet --pidfile $PIDFILE_TRACK -b -m -c $DOOR_USR --startas $EXE_TRACK -- $CFG_TRACK
+ log_daemon_msg "Starting r3 event broker" "r3-eventbroker_zmq"
+ start-stop-daemon --start --quiet --pidfile $PIDFILE_TRACK -b -m -c $DOOR_USR -g $DOOR_GRP --startas $EXE_TRACK -- $CFG_TRACK
log_end_msg $?
;;
stop)
- log_daemon_msg "Stopping door daemon" "track-presence"
+ log_daemon_msg "Stopping r3 event broker" "r3-eventbroker_zmq"
start-stop-daemon --stop --quiet --pidfile $PIDFILE_TRACK -m --retry TERM/1/TERM/1/KILL
log_end_msg $?
;;