From 2416b163f05370fcffb2f62bea000265d10aac42 Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Thu, 3 Oct 2013 21:59:02 +0000 Subject: [PATCH] so far so good --- go/r3-eventbroker_zmq/main.go | 1 + go/r3-eventbroker_zmq/make_deploy.zsh | 2 +- go/r3-netstatus/main.go | 44 ++++---- go/r3-netstatus/make_deploy.zsh | 2 +- go/r3-netstatus/r3xmppbot/r3xmppbot.go | 8 +- go/r3-netstatus/sockettoevent.go | 172 +++++++++----------------------- go/r3-netstatus/webstatus.go | 18 ++-- go/r3-netstatus/zeromq.go | 47 +++++++++ 8 files changed, 137 insertions(+), 157 deletions(-) create mode 100644 go/r3-netstatus/zeromq.go diff --git a/go/r3-eventbroker_zmq/main.go b/go/r3-eventbroker_zmq/main.go index aaaf8bd..8ab1858 100644 --- a/go/r3-eventbroker_zmq/main.go +++ b/go/r3-eventbroker_zmq/main.go @@ -66,6 +66,7 @@ func main() { } ps := pubsub.New(3) + defer ps.Shutdown() //~ ticker := time.NewTicker(time.Duration(5) * time.Minute) publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement") diff --git a/go/r3-eventbroker_zmq/make_deploy.zsh b/go/r3-eventbroker_zmq/make_deploy.zsh index 746215b..f43a274 100644 --- a/go/r3-eventbroker_zmq/make_deploy.zsh +++ b/go/r3-eventbroker_zmq/make_deploy.zsh @@ -4,4 +4,4 @@ export CGO_ENABLED=1 go-linux-386 clean #go-linux-386 build #strip ${PWD:t} -go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/ +go-linux-386 build -ldflags "-s" && rsync -v --progress ${PWD:t} wuzzler.realraum.at:/flash/tuer/ diff --git a/go/r3-netstatus/main.go b/go/r3-netstatus/main.go index c199744..ff94189 100644 --- a/go/r3-netstatus/main.go +++ b/go/r3-netstatus/main.go @@ -9,6 +9,7 @@ import ( "time" "fmt" //~ "./brain" + r3events "svn.spreadspace.org/realraum/go.svn/r3-eventbroker_zmq/r3events" ) type SpaceState struct { @@ -19,11 +20,11 @@ type SpaceState struct { } var ( - presence_socket_path_ string xmpp_presence_events_chan_ chan interface{} xmpp_login_ struct {jid string; pass string} xmpp_bot_authstring_ string xmpp_state_save_dir_ string + r3eventssub_port_ string button_press_timeout_ int64 = 3600 ) @@ -33,8 +34,8 @@ func init() { flag.StringVar(&xmpp_login_.jid, "xjid", "realrauminfo@realraum.at/Tuer", "XMPP Bot Login JID") flag.StringVar(&xmpp_login_.pass, "xpass", "", "XMPP Bot Login Password") flag.StringVar(&xmpp_bot_authstring_, "xbotauth", "", "String that user use to authenticate themselves to the bot") - flag.StringVar(&presence_socket_path_,"presencesocket", "/var/run/tuer/presence.socket", "Path to presence socket") flag.StringVar(&xmpp_state_save_dir_,"xstatedir","/flash/var/lib/r3netstatus/", "Directory to save XMPP bot state in") + flag.StringVar(&r3eventssub_port_, "eventsubport", "tcp://wuzzler.realraum.at:4244", "zmq address to subscribe r3events") flag.Parse() } @@ -71,35 +72,38 @@ func EventToXMPP(ps *pubsub.PubSub, xmpp_presence_events_chan_ chan <- interface present_status := r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowOnline,"Somebody is present"} notpresent_status := r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowNotAvailabe,"Nobody is here"} button_status := r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowFreeForChat, "The button has been pressed :-)"} - + xmpp_presence_events_chan_ <- r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowNotAvailabe, "Nobody is here"} - + for eventinterface := range(events) { + fmt.Println("event2xmpp", eventinterface) switch event := eventinterface.(type) { - case PresenceUpdate: + case r3events.PresenceUpdate: present = event.Present xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: composeMessage(present, locked, shut, who, event.Ts), DistributeLevel: r3xmppbot.R3OnlineOnlyInfo, RememberAsStatus: true} if present { xmpp_presence_events_chan_ <- present_status } else { xmpp_presence_events_chan_ <- notpresent_status - } - case DoorCommandEvent: + } + case r3events.DoorCommandEvent: if len(event.Who) > 0 && len(event.Using) > 0 { who = fmt.Sprintf("%s (%s)",event.Who, event.Using) } else { who = event.Who } xmpp_presence_events_chan_ <- fmt.Sprintln("DoorCommand:",event.Command, "using", event.Using, "by", event.Who, time.Unix(event.Ts,0)) - case DoorStatusUpdate: + case r3events.DoorLockUpdate: locked = event.Locked + xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: composeMessage(present, locked, shut, who, event.Ts), DistributeLevel: r3xmppbot.R3DebugInfo, RememberAsStatus: true} + case r3events.DoorAjarUpdate: shut = event.Shut xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: composeMessage(present, locked, shut, who, event.Ts), DistributeLevel: r3xmppbot.R3DebugInfo, RememberAsStatus: true} - case ButtonPressUpdate: + case r3events.ButtonPressUpdate: xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: button_msg, DistributeLevel: r3xmppbot.R3OnlineOnlyInfo} xmpp_presence_events_chan_ <- button_status last_buttonpress = event.Ts - case TimeTick: + case r3events.TimeTick: if present && last_buttonpress > 0 && time.Now().Unix() - last_buttonpress > button_press_timeout_ { xmpp_presence_events_chan_ <- present_status last_buttonpress = 0 @@ -109,15 +113,19 @@ func EventToXMPP(ps *pubsub.PubSub, xmpp_presence_events_chan_ chan <- interface } func main() { + zmqctx, zmqsub := ZmqsInit(r3eventssub_port_) + defer zmqctx.Close() + if zmqsub != nil {defer zmqsub.Close()} + if zmqsub == nil { + panic("zmq sockets must not be nil !!") + } + var xmpperr error var bot *r3xmppbot.XmppBot bot, xmpp_presence_events_chan_, xmpperr = r3xmppbot.NewStartedBot(xmpp_login_.jid, xmpp_login_.pass, xmpp_bot_authstring_, xmpp_state_save_dir_, true) - - newlinequeue := make(chan string, 1) ps := pubsub.New(1) - //~ brn := brain.New() - defer close(newlinequeue) defer ps.Shutdown() + //~ brn := brain.New() //~ defer brn.Shutdown() go EventToWeb(ps) @@ -128,15 +136,15 @@ func main() { fmt.Println(xmpperr) fmt.Println("XMPP Bot disabled") } - go ReadFromUSocket(presence_socket_path_, newlinequeue) + ticker := time.NewTicker(time.Duration(7) * time.Minute) for { select { - case e := <-newlinequeue: - ParseSocketInputLine(e, ps) //, brn) + case e := <-zmqsub.In(): + ParseZMQr3Event(e, ps) //, brn) case <-ticker.C: - ps.Pub(TimeTick{time.Now().Unix()}, "updateinterval") + ps.Pub(r3events.TimeTick{time.Now().Unix()}, "updateinterval") } } } diff --git a/go/r3-netstatus/make_deploy.zsh b/go/r3-netstatus/make_deploy.zsh index 408b049..24be54e 100644 --- a/go/r3-netstatus/make_deploy.zsh +++ b/go/r3-netstatus/make_deploy.zsh @@ -3,4 +3,4 @@ export GO386=387 go-linux-386 clean #go-linux-386 build #strip ${PWD:t} -go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/ +go-linux-386 build -ldflags "-s" && rsync --progress -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/ diff --git a/go/r3-netstatus/r3xmppbot/r3xmppbot.go b/go/r3-netstatus/r3xmppbot/r3xmppbot.go index 8ee0775..3e6f210 100644 --- a/go/r3-netstatus/r3xmppbot/r3xmppbot.go +++ b/go/r3-netstatus/r3xmppbot/r3xmppbot.go @@ -77,7 +77,7 @@ func (botdata *XmppBot) makeXMPPPresence(to, ptype, show, status string) *xmpp.P gen_status = nil } else { gen_status = &xmpp.Generic{Chardata: status} - } + } return &xmpp.Presence{Header: xmppmsgheader, Show: gen_show, Status: gen_status} } @@ -202,10 +202,10 @@ func (botdata *XmppBot) handleEventsforXMPP(xmppout chan <- xmpp.Stanza, presenc xmppout <- botdata.makeXMPPMessage(to, pec, nil) } } - + case XMPPStatusEvent: xmppout <- botdata.makeXMPPPresence("", "", pec.Show, pec.Status) - + case XMPPMsgEvent: if pec.RememberAsStatus { last_status_msg = &pec.Msg @@ -333,7 +333,7 @@ func (botdata *XmppBot) handleIncomingXMPPStanzas(xmppin <- chan xmpp.Stanza, xm switch stanza.GetHeader().Type { case "subscribe": xmppout <- botdata.makeXMPPPresence(stanza.GetHeader().From, "subscribed", "", "") - jabber_events <- JabberEvent{stanza.GetHeader().From, true, R3NoChange, false} + jabber_events <- JabberEvent{stanza.GetHeader().From, true, R3NoChange, false} xmppout <- botdata.makeXMPPPresence(stanza.GetHeader().From, "subscribe", "", "") case "unsubscribe", "unsubscribed": jabber_events <- JabberEvent{stanza.GetHeader().From, false, R3NeverInfo, false} diff --git a/go/r3-netstatus/sockettoevent.go b/go/r3-netstatus/sockettoevent.go index ed3c918..8782177 100644 --- a/go/r3-netstatus/sockettoevent.go +++ b/go/r3-netstatus/sockettoevent.go @@ -4,133 +4,53 @@ package main import ( pubsub "github.com/tuxychandru/pubsub" - "regexp" - "strconv" - "bufio" - "time" + //~ "bufio" + //~ "time" //~ "./brain" - "net" + //~ "net" + "encoding/json" + "log" + r3events "svn.spreadspace.org/realraum/go.svn/r3-eventbroker_zmq/r3events" ) -var ( - re_presence_ *regexp.Regexp = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?") - re_state_ *regexp.Regexp = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*") - re_infocard_ *regexp.Regexp = regexp.MustCompile("Info\(card\): card\(([a-fA-F0-9]+)\) (found|not found).*") - re_infoajar_ *regexp.Regexp = regexp.MustCompile("Info\(ajar\): door is now (ajar|shut)") - re_command_ *regexp.Regexp = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?") - re_button_ *regexp.Regexp = regexp.MustCompile("PanicButton|button\\d?") - re_temp_ *regexp.Regexp = regexp.MustCompile("temp0: (\\d+\\.\\d+)") - re_photo_ *regexp.Regexp = regexp.MustCompile("photo0: (\\d+)") -) - - -type PresenceUpdate struct { - Present bool - Ts int64 -} - -type DoorStatusUpdate struct { - Locked bool - Shut bool - Ts int64 -} - -type DoorCommandEvent struct { - Command string - Using string - Who string - Ts int64 -} - -type ButtonPressUpdate struct { - Buttonindex int - Ts int64 -} - -type TempSensorUpdate struct { - Sensorindex int - Value float64 - Ts int64 +func ParseZMQr3Event(lines [][]byte, ps *pubsub.PubSub) { //, brn *brain.Brain) { + //log.Printf("ParseZMQr3Event: len: %d lines: %s", len(lines), lines) + if len(lines) != 2 { + return + } + switch string(lines[0]) { + case "PresenceUpdate": + evnt := new(r3events.PresenceUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "presence")} + case "IlluminationSensorUpdate" : + evnt := new(r3events.IlluminationSensorUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "sensors")} + case "TempSensorUpdate" : + evnt := new(r3events.TempSensorUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "sensors")} + case "MovementSensorUpdate" : + evnt := new(r3events.MovementSensorUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "movement")} + case "ButtonPressUpdate" : + evnt := new(r3events.ButtonPressUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "buttons")} + case "DoorLockUpdate" : + log.Print("DoorLockUpdate received") + evnt := new(r3events.DoorLockUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "door")} + case "DoorAjarUpdate" : + evnt := new(r3events.DoorAjarUpdate) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "door")} + case "DoorCommandEvent" : + evnt := new(r3events.DoorCommandEvent) + err := json.Unmarshal(lines[1],evnt) + if err == nil {ps.Pub(*evnt, "door")} + } } - -type IlluminationSensorUpdate struct { - Sensorindex int - Value int64 - Ts int64 -} - -type TimeTick struct { - Ts int64 -} - -type MovementSensorUpdate struct { - Sensorindex int - Ts int64 -} - -func ParseSocketInputLine(line string, ps *pubsub.PubSub) { //, brn *brain.Brain) { - match_presence := re_presence_.FindStringSubmatch(line) - match_status := re_status_.FindStringSubmatch(line) - match_command := re_command_.FindStringSubmatch(line) - match_button := re_button_.FindStringSubmatch(line) - match_temp := re_temp_.FindStringSubmatch(line) - match_photo := re_photo_.FindStringSubmatch(line) - - //~ log.Println("ParseSocketInputLine",line) - var tidbit interface{} - ts := time.Now().Unix() - if match_presence != nil { - if match_presence[2] != "" { ps.Pub(DoorStatusUpdate{match_presence[2] == "closed", true, ts}, "door"); } - tidbit = PresenceUpdate{match_presence[1] == "yes", ts} - //~ brn.Oboite("presence", tidbit) - ps.Pub(tidbit, "presence") - } else if match_status != nil { - tidbit = DoorStatusUpdate{match_status[1] == "closed", match_status[3] == "shut", ts} - //~ brn.Oboite("door", tidbit) - ps.Pub(tidbit, "door") - } else if match_command != nil { - tidbit = DoorCommandEvent{match_command[1], match_command[2], match_command[3], ts} - //~ brn.Oboite("doorcmd", tidbit) - ps.Pub(tidbit, "door") - } else if match_button != nil { - //~ brn.Oboite("button0", ts) - ps.Pub(ButtonPressUpdate{0, ts}, "buttons") - } else if match_temp != nil { - newtemp, err := strconv.ParseFloat((match_temp[1]), 32) - if err == nil { - //~ brn.Oboite( "temp0", newtemp) - ps.Pub(TempSensorUpdate{0, newtemp, ts}, "sensors") - } - } else if match_photo != nil { - newphoto, err := strconv.ParseInt(match_photo[1], 10, 32) - if err == nil { - //~ brn.Oboite("photo0", newphoto) - ps.Pub(IlluminationSensorUpdate{0, newphoto, ts}, "sensors") - } - } else if line == "movement" { - //~ brn.Oboite("movement", ts) - ps.Pub(MovementSensorUpdate{0, ts}, "movements") - } -} - -func ReadFromUSocket(path string, c chan string) { -ReOpenSocket: - for { - presence_socket, err := net.Dial("unix", path) - if err != nil { - //Waiting on Socket - time.Sleep(5 * time.Second) - continue ReOpenSocket - } - presence_reader := bufio.NewReader(presence_socket) - for { - line, err := presence_reader.ReadString('\n') - if err != nil { - //Socket closed - presence_socket.Close() - continue ReOpenSocket - } - c <- line - } - } -} \ No newline at end of file diff --git a/go/r3-netstatus/webstatus.go b/go/r3-netstatus/webstatus.go index 7becbb0..489d770 100644 --- a/go/r3-netstatus/webstatus.go +++ b/go/r3-netstatus/webstatus.go @@ -10,6 +10,7 @@ import ( "net/url" "log" "time" + r3events "svn.spreadspace.org/realraum/go.svn/r3-eventbroker_zmq/r3events" ) @@ -73,23 +74,26 @@ func EventToWeb(ps *pubsub.PubSub) { events := ps.Sub("presence","door","sensors","buttons","updateinterval") for eventinterface := range(events) { + //log.Printf("EventToWeb: %s" , eventinterface) switch event := eventinterface.(type) { - case TimeTick: + case r3events.TimeTick: publishStateToWeb() - case PresenceUpdate: + case r3events.PresenceUpdate: statusstate.present = event.Present publishStateToWeb() - case DoorStatusUpdate: - spaceapidata.MergeInSensor(spaceapi.MakeDoorLockSensor("TorwaechterLock", "Türschloß", event.Locked)) + case r3events.DoorAjarUpdate: spaceapidata.MergeInSensor(spaceapi.MakeDoorLockSensor("TorwaechterAjarSensor", "Türkontakt", event.Shut)) publishStateToWeb() - case ButtonPressUpdate: + case r3events.DoorLockUpdate: + spaceapidata.MergeInSensor(spaceapi.MakeDoorLockSensor("TorwaechterLock", "Türschloß", event.Locked)) + publishStateToWeb() + case r3events.ButtonPressUpdate: statusstate.buttonpress_until = event.Ts + 3600 spaceapidata.AddSpaceEvent("PanicButton", "check-in", "The button has been pressed") publishStateToWeb() - case TempSensorUpdate: + case r3events.TempSensorUpdate: spaceapidata.MergeInSensor(spaceapi.MakeTempCSensor("Temp0","Decke", event.Value)) - case IlluminationSensorUpdate: + case r3events.IlluminationSensorUpdate: spaceapidata.MergeInSensor(spaceapi.MakeIlluminationSensor("Photodiode","Decke","1024V/5V", event.Value)) } } diff --git a/go/r3-netstatus/zeromq.go b/go/r3-netstatus/zeromq.go new file mode 100644 index 0000000..38d1732 --- /dev/null +++ b/go/r3-netstatus/zeromq.go @@ -0,0 +1,47 @@ +// (c) Bernhard Tittelbach, 2013 + +package main + +import ( + zmq "github.com/vaughan0/go-zmq" + ) + +// ---------- ZeroMQ Code ------------- + +func ZmqsInit(sub_port string) (ctx *zmq.Context, sub_chans *zmq.Channels) { + var err error + ctx, err = zmq.NewContext() + if err != nil { + panic(err) + } + //close only on later panic, otherwise leave open: + defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }() + + if len(sub_port) > 0 { + sub_sock, err := ctx.Socket(zmq.Sub) + if err != nil { + panic(err) + } + defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }() + + sub_sock.Subscribe([]byte{}) //subscribe empty filter -> aka to all messages + + if err = sub_sock.Connect(sub_port); err != nil { + panic(err) + } + + sub_chans = sub_sock.ChannelsBuffer(10) + go zmqsHandleError(sub_chans) + } else { + sub_chans = nil + } + + return +} + +func zmqsHandleError(chans *zmq.Channels) { + for error := range(chans.Errors()) { + chans.Close() + panic(error) + } +} -- 1.7.10.4