From: Bernhard Tittelbach Date: Wed, 6 Nov 2013 16:19:04 +0000 (+0000) Subject: ask for missed events X-Git-Url: https://git.realraum.at/?a=commitdiff_plain;h=cb2342c75d929f11f901e6f8e3d44f6bdc0261b4;p=svn42.git ask for missed events --- diff --git a/go/r3-netstatus/main.go b/go/r3-netstatus/main.go index 85309ae..cc63daa 100644 --- a/go/r3-netstatus/main.go +++ b/go/r3-netstatus/main.go @@ -10,6 +10,7 @@ import ( "fmt" //~ "./brain" r3events "svn.spreadspace.org/realraum/go.svn/r3events" + zmq "github.com/vaughan0/go-zmq" ) type SpaceState struct { @@ -65,11 +66,9 @@ func composeDoorLockMessage(locked bool, frontshut r3events.DoorAjarUpdate, door } } -func EventToXMPP(ps *pubsub.PubSub, xmpp_presence_events_chan chan <- interface{}) { - events := ps.Sub("presence","door","buttons","updateinterval") +func EventToXMPP(events <- chan interface{}, xmpp_presence_events_chan chan <- interface{}) { defer func() { - ps.Unsub(events, "presence","door","buttons","updateinterval") if x := recover(); x != nil { //defer ist called _after_ EventToXMPP function has returned. Thus we recover after returning from this function. Syslog_.Printf("handleIncomingXMPPStanzas: run time panic: %v", x) @@ -125,14 +124,16 @@ func EventToXMPP(ps *pubsub.PubSub, xmpp_presence_events_chan chan <- interface{ } } -func RunXMPPBot(ps *pubsub.PubSub) { +func RunXMPPBot(ps *pubsub.PubSub, zmqctx *zmq.Context) { var xmpperr error var bot *r3xmppbot.XmppBot var xmpp_presence_events_chan chan interface{} + psevents := ps.Sub("presence","door","buttons","updateinterval") for { bot, xmpp_presence_events_chan, xmpperr = r3xmppbot.NewStartedBot(xmpp_login_.jid, xmpp_login_.pass, xmpp_bot_authstring_, xmpp_state_save_dir_, true) if xmpperr == nil { - EventToXMPP(ps, xmpp_presence_events_chan) + QueryLatestEventsAndInjectThem(ps, zmqctx) + EventToXMPP(psevents, xmpp_presence_events_chan) bot.StopBot() } else { Syslog_.Printf("Error starting XMPP Bot: %s", xmpperr.Error()) @@ -148,6 +149,20 @@ func ParseZMQr3Event(lines [][]byte, ps *pubsub.PubSub) { ps.Pub(evnt, pubsubcat) } +func QueryLatestEventsAndInjectThem(ps *pubsub.PubSub, zmqctx *zmq.Context) { + answ := ZmqsAskQuestionsAndClose(zmqctx, brain_connect_addr_, [][][]byte{ + [][]byte{[]byte("DoorCommandEvent")}, + [][]byte{[]byte("DoorLockUpdate")}, + [][]byte{[]byte("DoorAjarUpdate")}, + [][]byte{[]byte("BackdoorAjarUpdate")}, + [][]byte{[]byte("PresenceUpdate")}, + [][]byte{[]byte("IlluminationSensorUpdate")}, + [][]byte{[]byte("TempSensorUpdate")}}) + for _, a := range(answ) { + ParseZMQr3Event(a, ps) + } +} + func main() { if enable_syslog_ { LogEnableSyslog(); r3xmppbot.LogEnableSyslog() } if enable_debug_ { LogEnableDebuglog(); r3xmppbot.LogEnableDebuglog() } @@ -160,19 +175,15 @@ func main() { panic("zmq sockets must not be nil !!") } - ps := pubsub.New(1) + ps := pubsub.New(10) defer ps.Shutdown() //~ brn := brain.New() //~ defer brn.Shutdown() go EventToWeb(ps) - go RunXMPPBot(ps) - // --- get update on most recent events --- - answ := ZmqsAskQuestionsAndClose(zmqctx, brain_connect_addr_, [][][]byte{[][]byte{[]byte("DoorCommandEvent")}, [][]byte{[]byte("DoorLockUpdate")}, [][]byte{[]byte("DoorAjarUpdate")}, [][]byte{[]byte("PresenceUpdate")}, [][]byte{[]byte("IlluminationSensorUpdate")}, [][]byte{[]byte("TempSensorUpdate")}}) - for _, a := range(answ) { - ParseZMQr3Event(a, ps) - } + QueryLatestEventsAndInjectThem(ps, zmqctx) + go RunXMPPBot(ps, zmqctx) // --- receive and distribute events --- ticker := time.NewTicker(time.Duration(7) * time.Minute)