From 03f11406495d2de6340f0331f91ebfd25fbe67ed Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Wed, 2 Oct 2013 21:37:08 +0000 Subject: [PATCH] does not yet work --- go/r3-eventbroker_zmq/main.go | 23 ++++++---- go/r3-eventbroker_zmq/make_deploy.zsh | 7 ++++ go/r3-eventbroker_zmq/metamovement.go | 46 ++++++++++++++++++++ go/r3-eventbroker_zmq/presence.go | 72 ++++++++++++++++++++++++++++++++ go/r3-eventbroker_zmq/sockettoevent.go | 25 ++++------- go/r3-eventbroker_zmq/zeromq.go | 12 +++--- 6 files changed, 154 insertions(+), 31 deletions(-) create mode 100644 go/r3-eventbroker_zmq/make_deploy.zsh create mode 100644 go/r3-eventbroker_zmq/metamovement.go create mode 100644 go/r3-eventbroker_zmq/presence.go diff --git a/go/r3-eventbroker_zmq/main.go b/go/r3-eventbroker_zmq/main.go index 3145c6c..dbd35eb 100644 --- a/go/r3-eventbroker_zmq/main.go +++ b/go/r3-eventbroker_zmq/main.go @@ -6,7 +6,7 @@ import ( "fmt" "os" "flag" - "time" + //~ "time" "log/syslog" "log" pubsub "github.com/tuxychandru/pubsub" @@ -37,11 +37,11 @@ func usage() { } func init() { - flag.StringVar(&doorsub_addr_, "doorsubaddr", "tcp://wuzzler.realraum.at:4242", "zmq door event publish addr") - flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://:4243", "zmq public/listen socket addr for incoming sensor data") - flag.StringVar(&pub_port_, "pubport", "tcp://:4244", "zmq port publishing consodilated events") + flag.StringVar(&doorsub_addr_, "doorsubaddr", "tcp://torwaechter.realraum.at:4242", "zmq door event publish addr") + flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://*:4243", "zmq public/listen socket addr for incoming sensor data") + flag.StringVar(&pub_port_, "pubport", "tcp://*:4244", "zmq port publishing consodilated events") flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups") - flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local1 facility") + flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local2 facility") flag.Usage = usage flag.Parse() } @@ -55,25 +55,30 @@ func main() { if use_syslog_ { var logerr error - Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | syslog.LOG_LOCAL2, 0) + Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0) + //~ Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | syslog.LOG_LOCAL2, 0) if logerr != nil { panic(logerr) } Syslog_.Print("started") defer Syslog_.Print("exiting") } ps := pubsub.New(3) - ticker := time.NewTicker(time.Duration(5) * time.Minute) + //~ ticker := time.NewTicker(time.Duration(5) * time.Minute) publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement") + go MetaEventRoutine_Movement(ps, 10, 20, 10) + go MetaEventRoutine_Presence(ps) for { + log.Print("for loop") select { case subin := <- sub_in_chans.In(): ParseSocketInputLine(subin, ps, keylookup_socket) - case <- ticker.C: - MakeTimeTick(ps) + //~ case <- ticker.C: + //~ MakeTimeTick(ps) case event_interface := <- publish_these_events_chan: data, err := FormatEventForSocket(event_interface) + log.Print("publishing", data) if err != nil { Syslog_.Print(err) continue diff --git a/go/r3-eventbroker_zmq/make_deploy.zsh b/go/r3-eventbroker_zmq/make_deploy.zsh new file mode 100644 index 0000000..746215b --- /dev/null +++ b/go/r3-eventbroker_zmq/make_deploy.zsh @@ -0,0 +1,7 @@ +#!/bin/zsh +export GO386=387 +export CGO_ENABLED=1 +go-linux-386 clean +#go-linux-386 build +#strip ${PWD:t} +go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/ diff --git a/go/r3-eventbroker_zmq/metamovement.go b/go/r3-eventbroker_zmq/metamovement.go new file mode 100644 index 0000000..98d8e8d --- /dev/null +++ b/go/r3-eventbroker_zmq/metamovement.go @@ -0,0 +1,46 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + "time" + //~ "./brain" + pubsub "github.com/tuxychandru/pubsub" + "container/ring" + ) + +type SomethingReallyIsMoving struct { + Movement bool + Ts int64 +} + + +func MetaEventRoutine_Movement(ps *pubsub.PubSub, granularity, gran_duration int , threshold uint32) { + var last_movement int64 + movement_window := ring.New(granularity+1) + events_chan := ps.Sub("movement") + myticker := time.NewTicker(time.Duration(gran_duration) * time.Second) + + for { select { + case event := <- events_chan: + switch event.(type) { + case MovementSensorUpdate: + movement_window.Value = (uint32) (movement_window.Value.(uint32) + 1) + } + case <- myticker.C: + movement_window.Prev().Value = (uint32) (0) + movement_window = movement_window.Next() + var movsum uint32 = 0 + movement_window.Do(func(v interface{}){if v != nil {movsum += v.(uint32)}}) + ts := time.Now().Unix() + if movsum > threshold { + ps.Pub( SomethingReallyIsMoving{true,ts}, "movement") + last_movement = ts + } + + if last_movement > 0 && ts - last_movement < 3600*6 && ts - last_movement > 3600*3 { + last_movement = 0 + ps.Pub( SomethingReallyIsMoving{false, ts}, "movement") + } + } } +} \ No newline at end of file diff --git a/go/r3-eventbroker_zmq/presence.go b/go/r3-eventbroker_zmq/presence.go new file mode 100644 index 0000000..953af11 --- /dev/null +++ b/go/r3-eventbroker_zmq/presence.go @@ -0,0 +1,72 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + "time" + //~ "./brain" + pubsub "github.com/tuxychandru/pubsub" + ) + +type PresenceUpdate struct { + Present bool + Ts int64 +} + +type doorstate struct { + locked bool + shut bool +} + +func MetaEventRoutine_Presence(ps *pubsub.PubSub) { + //~ var last_door_cmd *DoorCommandEvent + var last_presence bool + var last_movement, last_buttonpress int64 + doorstatemap := make(map[byte]doorstate,1) + + events_chan := ps.Sub("door", "doorcmd", "buttons", "movement") + + for event := range(events_chan) { + new_presence := last_presence + ts := time.Now().Unix() + switch evnt := event.(type) { + case SomethingReallyIsMoving: + if evnt.Movement { + last_movement = evnt.Ts + } else { + last_movement = 0 + } + case ButtonPressUpdate: + last_buttonpress = evnt.Ts + new_presence = true + //~ case DoorCommandEvent: + //~ last_door_cmd = &evnt + case DoorLockUpdate: + doorstatemap[evnt.DoorID]=doorstate{locked:evnt.Locked, shut:doorstatemap[evnt.DoorID].shut} + case DoorAjarUpdate: + doorstatemap[evnt.DoorID]=doorstate{locked:doorstatemap[evnt.DoorID].locked, shut:evnt.Shut} + } + + any_door_unlocked := false + any_door_ajar := false + for _, ds := range(doorstatemap) { + if ds.locked == false {any_door_unlocked = true } + if ds.shut == false {any_door_ajar = true } + } + + if new_presence != last_presence { + //... skip state check .. we had a definite presence event + } else if any_door_unlocked || any_door_ajar { + new_presence = true + } else if last_movement != 0 || ts - last_buttonpress < 200 { + new_presence = true + } else { + new_presence = false + } + + if new_presence != last_presence { + last_presence = new_presence + ps.Pub(PresenceUpdate{new_presence, ts} , "presence") + } + } +} \ No newline at end of file diff --git a/go/r3-eventbroker_zmq/sockettoevent.go b/go/r3-eventbroker_zmq/sockettoevent.go index f2e7a20..84dff4a 100644 --- a/go/r3-eventbroker_zmq/sockettoevent.go +++ b/go/r3-eventbroker_zmq/sockettoevent.go @@ -10,6 +10,7 @@ import ( "encoding/json" pubsub "github.com/tuxychandru/pubsub" zmq "github.com/vaughan0/go-zmq" + "log" ) var ( @@ -26,13 +27,6 @@ var ( ) -type PresenceUpdate struct { - Present bool - Ts int64 -} -func (s PresenceUpdate) Serialize() string - - type DoorLockUpdate struct { DoorID byte Locked bool @@ -100,12 +94,13 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z var tidbit interface{} ts := time.Now().Unix() if len(lines) < 1 { return } + log.Print("ParseSocketInputLine",string(lines[0])) switch string(lines[0]) { case "State:": - if len(lines) < 2 { continue } + if len(lines) < 2 { return } parseSocketInputLine_State(lines[1:], ps, ts) case "Status:": - if len(lines) < 3 { continue } + if len(lines) < 3 { return } tidbit = DoorLockUpdate{0, string(lines[1]) == "closed", ts} //~ brn.Oboite("door", tidbit) ps.Pub(tidbit, "door") @@ -113,23 +108,21 @@ func ParseSocketInputLine(lines [][]byte, ps *pubsub.PubSub, keylookup_socket *z //~ brn.Oboite("door", tidbit) ps.Pub(tidbit, "door") case "Info(card):": - if len(lines) < 3 { continue } - if string(lines[2]) != "found" { - continue - } + if len(lines) < 3 { return } + if string(lines[2]) != "found" { return } match_cardid := re_cardid_.FindSubmatch(lines[1]) if len(match_cardid) > 1 { // PreCondition: same thread/goroutinge as created keylookup_socket !!!! - nick, err := keylookup_socket.LookupCardIdNick(match_cardid[1]) + nick, err := LookupCardIdNick(keylookup_socket, match_cardid[1]) if err != nil { Syslog_.Print("CardID Lookup Error",err) - nick := "Unresolvable KeyID" + nick = "Unresolvable KeyID" } // new event: toggle by user nick using card ps.Pub(DoorCommandEvent{"toggle", "Card", nick, ts},"doorcmd") } case "Info(ajar):": - if len(lines) < 5 { continue } + if len(lines) < 5 { return } tidbit = DoorAjarUpdate{0, string(lines[4]) == "shut", ts} //~ brn.Oboite("door", tidbit) ps.Pub(tidbit, "door") diff --git a/go/r3-eventbroker_zmq/zeromq.go b/go/r3-eventbroker_zmq/zeromq.go index 55aa201..09720e1 100644 --- a/go/r3-eventbroker_zmq/zeromq.go +++ b/go/r3-eventbroker_zmq/zeromq.go @@ -78,7 +78,7 @@ func zmqsHandleError(chans *zmq.Channels) { } } -func (sock *zmq.Socket) ZmqsRequestAnswer(request [][]byte) (answer [][]byte) { +func ZmqsRequestAnswer(sock *zmq.Socket, request [][]byte) (answer [][]byte) { if err := sock.Send(request); err != nil { panic(err) } @@ -89,18 +89,18 @@ func (sock *zmq.Socket) ZmqsRequestAnswer(request [][]byte) (answer [][]byte) { return parts } -func (s *zmq.Socket) LookupCardIdNick(hexbytes []byte) (nick string, error) { - answ := s.ZmqsRequestAnswer([][]byte{hexbytes}) +func LookupCardIdNick(s *zmq.Socket, hexbytes []byte) (string, error) { + answ := ZmqsRequestAnswer(s, [][]byte{hexbytes}) if len(answ) == 0 { return "", errors.New("Empty reply received") } - if answ[0] == []byte("ERROR") { + if bytes.Compare(answ[0], []byte("ERROR")) == 0 { return "", errors.New(string(bytes.Join(answ[1:],[]byte(" ")))) } - if answ[0] != []byte("RESULT") || len(answ) != 3{ + if bytes.Compare(answ[0], []byte("RESULT")) != 0 || len(answ) != 3{ return "", errors.New("Unknown reply received") } - if answ[1] != hexbytes { + if bytes.Compare(answ[1], hexbytes) != 0 { return "", errors.New("Wrong reply received") } return string(answ[2]), nil -- 1.7.10.4