From 7be7cb4517b411371b18375fd181d48df2184739 Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Thu, 26 Sep 2013 17:05:20 +0000 Subject: [PATCH] unfinished --- go/r3-eventbroker_zmq/design.txt | 5 + go/r3-eventbroker_zmq/main.go | 65 +++++++++++++ go/r3-eventbroker_zmq/presence.txt | 29 ++++++ go/r3-eventbroker_zmq/sockettoevent.go | 164 ++++++++++++++++++++++++++++++++ go/r3-eventbroker_zmq/zeromq.go | 118 +++++++++++++++++++++++ 5 files changed, 381 insertions(+) create mode 100644 go/r3-eventbroker_zmq/design.txt create mode 100644 go/r3-eventbroker_zmq/main.go create mode 100644 go/r3-eventbroker_zmq/presence.txt create mode 100644 go/r3-eventbroker_zmq/sockettoevent.go create mode 100644 go/r3-eventbroker_zmq/zeromq.go diff --git a/go/r3-eventbroker_zmq/design.txt b/go/r3-eventbroker_zmq/design.txt new file mode 100644 index 0000000..44602b9 --- /dev/null +++ b/go/r3-eventbroker_zmq/design.txt @@ -0,0 +1,5 @@ +This daemon should furthermore be the only one ! to regexp parse door firmware string messages, door command string messages and sensor string notices from different sources. +Thus this should be the only place we have to change code when regular expressions change or new sources are added or ip addresses change. +The data is then exported as go-struct like events. Internally (to add i.e. presence) and externally via 0mq pub +Thus the same events are usable in different services written in different languages +(instead of just one language where we could import a common library for regexp parsing again and again) diff --git a/go/r3-eventbroker_zmq/main.go b/go/r3-eventbroker_zmq/main.go new file mode 100644 index 0000000..dfdaff5 --- /dev/null +++ b/go/r3-eventbroker_zmq/main.go @@ -0,0 +1,65 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + "fmt" + "os" + "flag" + "time" + "log/syslog" + "log" +) + +//~ func StringArrayToByteArray(ss []string) [][]byte { + //~ bb := make([][]byte, len(ss)) + //~ for index, s := range(ss) { + //~ bb[index] = []byte(s) + //~ } + //~ return bb +//~ } + +// ---------- Main Code ------------- + +var ( + doorsub_addr_ string + sensorssub_port_ string + pub_port_ string + keylookup_addr_ string + use_syslog_ bool + Syslog_ *log.Logger +) + +func usage() { + fmt.Fprintf(os.Stderr, "Usage: zmq_broker_event_transformer [options]\n") + flag.PrintDefaults() +} + +func init() { + flag.StringVar(&doorsub_addr_, "doorsubaddr", "tcp://wuzzler.realraum.at:4242", "zmq door event publish addr") + flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://:4243", "zmq public/listen socket addr for incoming sensor data") + flag.StringVar(&pub_port_, "pubport", "tcp://:4244", "zmq port publishing consodilated events") + flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups") + flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local1 facility") + flag.Usage = usage + flag.Parse() +} + +func main() { + zmqctx, sub_in_chans, pub_out_socket, keylookup_socket := ZmqsInit(doorsub_addr_, sensorssub_port_, pub_port_, keylookup_addr_) + defer zmqctx.Close() + defer sub_in_chans.Close() + defer pub_out_socket.Close() + defer keylookup_socket.Close() + + if use_syslog_ { + var logerr error + Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | syslog.LOG_LOCAL2, 0) + if logerr != nil { panic(logerr) } + Syslog_.Print("started") + defer Syslog_.Print("exiting") + } + + //~ nick, err := keylookup_socket.LookupCardIdNick(keyhexid) + +} diff --git a/go/r3-eventbroker_zmq/presence.txt b/go/r3-eventbroker_zmq/presence.txt new file mode 100644 index 0000000..ee70af3 --- /dev/null +++ b/go/r3-eventbroker_zmq/presence.txt @@ -0,0 +1,29 @@ + +Presence Meta Event Injector: + events indicating presence: + - front door ajar + - door lock using manual movement closing (unless door ajar, assume we would not use key to close from inside ) + - door unlock (any method, manual, key, card, phone, ssh, etc) + - panic button press + - door toggle button press + - back door ajar + - movement sensor (maybe threshold number movements within 5 minutes) + + events indicating somebody left: + - door closed with card, phone, ssh (while backdor and frontdoor shut) + - no movement within 3 hours and movement within the last 6 hours (disable trigger if sensor is broken) + + events indicating "alarm state" / special message: + - Panic Button pressend + - Sudden rise in Temp-Sensor-Value + - Sudden rise in Dust/Smoke-Sensor-Value + + +Movement Meta Event Injector: + movement sensor (maybe threshold number movements within 5 minutes) -> Movement Passed Threshold + no movement within 3 hours and movement within the last 6 hours -> Movement Absence Passed Threshold + +Sensor Spike Event Injector: + monitors sensor values and calculates running average mean, stddev over last hours, + raises Event is value spikes, aka rised beyond mean +- stddev within 2 min (enables after 1 hour of collecting data) + diff --git a/go/r3-eventbroker_zmq/sockettoevent.go b/go/r3-eventbroker_zmq/sockettoevent.go new file mode 100644 index 0000000..e4bfc8e --- /dev/null +++ b/go/r3-eventbroker_zmq/sockettoevent.go @@ -0,0 +1,164 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + pubsub "github.com/tuxychandru/pubsub" + "regexp" + "strconv" + "bufio" + "time" + //~ "./brain" + "net" + ) + +var ( + re_presence_ *regexp.Regexp = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?") + re_state_ *regexp.Regexp = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*") + re_status_ *regexp.Regexp = regexp.MustCompile("Status: (closed|opened) (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing) (ajar|shut).*") + re_infocard_ *regexp.Regexp = regexp.MustCompile("Info\(card\): card\(([a-fA-F0-9]+)\) (found|not found).*") + re_cardid_ *regexp.Regexp = regexp.MustCompile("card\(([a-fA-F0-9]+)\)") + re_infoajar_ *regexp.Regexp = regexp.MustCompile("Info\(ajar\): door is now (ajar|shut)") + re_command_ *regexp.Regexp = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?") + re_button_ *regexp.Regexp = regexp.MustCompile("PanicButton|button\\d?") + re_temp_ *regexp.Regexp = regexp.MustCompile("temp0: (\\d+\\.\\d+)") + re_photo_ *regexp.Regexp = regexp.MustCompile("photo0: (\\d+)") +) + + +type PresenceUpdate struct { + Present bool + Ts int64 +} + +type DoorLockUpdate struct { + DoorID byte + Locked bool + Ts int64 +} + +type DoorAjarUpdate struct { + DoorID byte + Shut bool + Ts int64 +} + +type DoorCommandEvent struct { + Command string + Using string + Who string + Ts int64 +} + +type ButtonPressUpdate struct { + Buttonindex int + Ts int64 +} + +type TempSensorUpdate struct { + Sensorindex int + Value float64 + Ts int64 +} + +type IlluminationSensorUpdate struct { + Sensorindex int + Value int64 + Ts int64 +} + +type TimeTick struct { + Ts int64 +} + +type MovementSensorUpdate struct { + Sensorindex int + Ts int64 +} + +func parseSocketInputLine_State(lines [][]byte, ps *pubsub.PubSub, ts uint64) { + switch string(lines[0]) { + case "closed": + ps.Pub(DoorLockUpdate{0, true, ts}, "door") + case "opened": + ps.Pub(DoorLockUpdate{0, false, ts}, "door") + case "manual": //movement + case "error": + case "reset": + ps.Pub(DoorLockUpdate{0, true, ts}, "door") + case "timeout": //after open | after close + case "opening": + case "closing": + default: + } +} + + +func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub) { //, brn *brain.Brain) { + var tidbit interface{} + ts := time.Now().Unix() + if len(lines) < 1 { return } + switch string(lines[0]) { + case "State:": + if len(lines) < 2 { continue } + parseSocketInputLine_State(lines[1:], ps, ts) + case "Status:": + if len(lines) < 3 { continue } + tidbit = DoorLockUpdate{0, lines[1] == []byte("closed"), ts} + //~ brn.Oboite("door", tidbit) + ps.Pub(tidbit, "door") + tidbit = DoorAjarUpdate{0, lines[-1] == []byte("shut"), ts} + //~ brn.Oboite("door", tidbit) + ps.Pub(tidbit, "door") + case "Info(card):": + if len(lines) < 3 { continue } + if lines[2] != []byte("found") { + continue + } + match_cardid := re_cardid_.FindSubmatch(lines[1]) + if len(match_cardid) > 1 { + // PreCondition: same thread/goroutinge as created keylookup_socket !!!! + nick, err := keylookup_socket.LookupCardIdNick(match_cardid[1]) + if err != nil { + Syslog_.Print("CardID Lookup Error",err) + nick := "Unresolvable KeyID" + } + // new event: toggle by user nick using card + ps.Pub(DoorCommandEvent{"toggle", "Card", nick, ts},"doorcmd") + } + case "Info(ajar):": + if len(lines) < 5 { continue } + DoorAjarUpdate{0, match_status[4] == []byte("shut"), ts} + //~ brn.Oboite("door", tidbit) + ps.Pub(tidbit, "door") + case "open", "close", "toggle", "reset": + ps.Pub(DoorCommandEvent{string(lines[0]), string(lines[1]), string(lines[2]), ts},"doorcmd") + } + + + //~ match_presence := re_presence_.FindStringSubmatch(line) + //~ match_status := re_status_.FindStringSubmatch(line) + //~ match_command := re_command_.FindStringSubmatch(line) + //~ match_button := re_button_.FindStringSubmatch(line) + //~ match_temp := re_temp_.FindStringSubmatch(line) + //~ match_photo := re_photo_.FindStringSubmatch(line) + //~ if match_button != nil { + //~ // brn.Oboite("button0", ts) + //~ ps.Pub(ButtonPressUpdate{0, ts}, "buttons") + //~ } else if match_temp != nil { + //~ newtemp, err := strconv.ParseFloat((match_temp[1]), 32) + //~ if err == nil { + //~ // brn.Oboite( "temp0", newtemp) + //~ ps.Pub(TempSensorUpdate{0, newtemp, ts}, "sensors") + //~ } + //~ } else if match_photo != nil { + //~ newphoto, err := strconv.ParseInt(match_photo[1], 10, 32) + //~ if err == nil { + //~ // brn.Oboite("photo0", newphoto) + //~ ps.Pub(IlluminationSensorUpdate{0, newphoto, ts}, "sensors") + //~ } + //~ } else if line == "movement" { + //~ // brn.Oboite("movement", ts) + //~ ps.Pub(MovementSensorUpdate{0, ts}, "movements") + //~ } +} diff --git a/go/r3-eventbroker_zmq/zeromq.go b/go/r3-eventbroker_zmq/zeromq.go new file mode 100644 index 0000000..2e28d50 --- /dev/null +++ b/go/r3-eventbroker_zmq/zeromq.go @@ -0,0 +1,118 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + zmq "github.com/vaughan0/go-zmq" + "bytes" + "error" + ) + +// ---------- ZeroMQ Code ------------- + +type ReqSocket *zmq.Socket +type PubSocket *zmq.Socket + +func ZmqsInit(sub_connect_port, sub_listen_port, pub_port, keylookup_port string) (ctx *zmq.Context, sub_chans *zmq.Channels, pub_sock PubSocket, keylookup_sock ReqSocket) { + var err error + ctx, err = zmq.NewContext() + if err != nil { + panic(err) + } + //close only on later panic, otherwise leave open: + defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }() + + if len(sub_port) > 0 { + sub_sock, err := ctx.Socket(zmq.Sub) + if err != nil { + panic(err) + } + defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }() + + if err = sub_sock.Bind(sub_listen_port); err != nil { + panic(err) + } + + if err = sub_sock.Connect(sub_connect_port); err != nil { + panic(err) + } + + sub_chans = sub_sock.ChannelsBuffer(10) + go zmqsHandleError(sub_chans) + } else { + sub_chans = nil + } + + if len(pub_port) > 0 { + pub_sock, err := ctx.Socket(zmq.Pub) + if err != nil { + panic(err) + } + defer func() { if r:= recover(); r != nil { pub_sock.Close(); panic(r) } }() + + if err = pub_sock.Bind(pub_port); err != nil { + panic(err) + } + } else { + pub_sock = nil + } + + if len(keylookup_port) > 0 { + keylookup_sock, err := ctx.Socket(zmq.Req) + if err != nil { + panic(err) + } + defer func() { if r:= recover(); r != nil { keylookup_sock.Close(); panic(r) } }() + + if err = keylookup_sock.Connect(keylookup_port); err != nil { + panic(err) + } + } else { + keylookup_sock = nil + } + + return +} + +func zmqsHandleError(chans *zmq.Channels) { + for error := range(chans.Errors()) { + chans.Close() + panic(error) + } +} + +func (s ReqSocket) ZmqsRequestAnswer(request [][]byte) (answer []][]byte) { + sock := s.(*zmq.Socket) + if err = sock.Send(request); err != nil { + panic(err) + } + parts, err := sock.Recv() + if err != nil { + panic(err) + } + return parts +} + +func (s PubSocket) ZmqsPublish(msg [][]byte) { + sock := s.(*zmq.Socket) + if err = sock.Send(msg); err != nil { + panic(err) + } +} + +func (s ReqSocket) LookupCardIdNick(hexbytes []byte) (nick string, error) { + answ := s.ZmqsRequestAnswer([][]byte{hexbytes}) + if len(answ) == 0 { + return "", errors.New("Empty reply received") + } + if answ[0] == []byte("ERROR") { + return "", errors.New(string(bytes.Join(answ[1:]))) + } + if answ[0] != []byte("RESULT") || len(answ) != 3{ + return "", errors.New("Unknown reply received") + } + if answ[1] != hexbytes { + return "", errors.New("Wrong reply received") + } + return string(answ[2]), nil +} \ No newline at end of file -- 1.7.10.4