}
ps := pubsub.New(3)
+ defer ps.Shutdown()
//~ ticker := time.NewTicker(time.Duration(5) * time.Minute)
publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
go-linux-386 clean
#go-linux-386 build
#strip ${PWD:t}
-go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/
+go-linux-386 build -ldflags "-s" && rsync -v --progress ${PWD:t} wuzzler.realraum.at:/flash/tuer/
"time"
"fmt"
//~ "./brain"
+ r3events "svn.spreadspace.org/realraum/go.svn/r3-eventbroker_zmq/r3events"
)
type SpaceState struct {
}
var (
- presence_socket_path_ string
xmpp_presence_events_chan_ chan interface{}
xmpp_login_ struct {jid string; pass string}
xmpp_bot_authstring_ string
xmpp_state_save_dir_ string
+ r3eventssub_port_ string
button_press_timeout_ int64 = 3600
)
flag.StringVar(&xmpp_login_.jid, "xjid", "realrauminfo@realraum.at/Tuer", "XMPP Bot Login JID")
flag.StringVar(&xmpp_login_.pass, "xpass", "", "XMPP Bot Login Password")
flag.StringVar(&xmpp_bot_authstring_, "xbotauth", "", "String that user use to authenticate themselves to the bot")
- flag.StringVar(&presence_socket_path_,"presencesocket", "/var/run/tuer/presence.socket", "Path to presence socket")
flag.StringVar(&xmpp_state_save_dir_,"xstatedir","/flash/var/lib/r3netstatus/", "Directory to save XMPP bot state in")
+ flag.StringVar(&r3eventssub_port_, "eventsubport", "tcp://wuzzler.realraum.at:4244", "zmq address to subscribe r3events")
flag.Parse()
}
present_status := r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowOnline,"Somebody is present"}
notpresent_status := r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowNotAvailabe,"Nobody is here"}
button_status := r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowFreeForChat, "The button has been pressed :-)"}
-
+
xmpp_presence_events_chan_ <- r3xmppbot.XMPPStatusEvent{r3xmppbot.ShowNotAvailabe, "Nobody is here"}
-
+
for eventinterface := range(events) {
+ fmt.Println("event2xmpp", eventinterface)
switch event := eventinterface.(type) {
- case PresenceUpdate:
+ case r3events.PresenceUpdate:
present = event.Present
xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: composeMessage(present, locked, shut, who, event.Ts), DistributeLevel: r3xmppbot.R3OnlineOnlyInfo, RememberAsStatus: true}
if present {
xmpp_presence_events_chan_ <- present_status
} else {
xmpp_presence_events_chan_ <- notpresent_status
- }
- case DoorCommandEvent:
+ }
+ case r3events.DoorCommandEvent:
if len(event.Who) > 0 && len(event.Using) > 0 {
who = fmt.Sprintf("%s (%s)",event.Who, event.Using)
} else {
who = event.Who
}
xmpp_presence_events_chan_ <- fmt.Sprintln("DoorCommand:",event.Command, "using", event.Using, "by", event.Who, time.Unix(event.Ts,0))
- case DoorStatusUpdate:
+ case r3events.DoorLockUpdate:
locked = event.Locked
+ xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: composeMessage(present, locked, shut, who, event.Ts), DistributeLevel: r3xmppbot.R3DebugInfo, RememberAsStatus: true}
+ case r3events.DoorAjarUpdate:
shut = event.Shut
xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: composeMessage(present, locked, shut, who, event.Ts), DistributeLevel: r3xmppbot.R3DebugInfo, RememberAsStatus: true}
- case ButtonPressUpdate:
+ case r3events.ButtonPressUpdate:
xmpp_presence_events_chan_ <- r3xmppbot.XMPPMsgEvent{Msg: button_msg, DistributeLevel: r3xmppbot.R3OnlineOnlyInfo}
xmpp_presence_events_chan_ <- button_status
last_buttonpress = event.Ts
- case TimeTick:
+ case r3events.TimeTick:
if present && last_buttonpress > 0 && time.Now().Unix() - last_buttonpress > button_press_timeout_ {
xmpp_presence_events_chan_ <- present_status
last_buttonpress = 0
}
func main() {
+ zmqctx, zmqsub := ZmqsInit(r3eventssub_port_)
+ defer zmqctx.Close()
+ if zmqsub != nil {defer zmqsub.Close()}
+ if zmqsub == nil {
+ panic("zmq sockets must not be nil !!")
+ }
+
var xmpperr error
var bot *r3xmppbot.XmppBot
bot, xmpp_presence_events_chan_, xmpperr = r3xmppbot.NewStartedBot(xmpp_login_.jid, xmpp_login_.pass, xmpp_bot_authstring_, xmpp_state_save_dir_, true)
-
- newlinequeue := make(chan string, 1)
ps := pubsub.New(1)
- //~ brn := brain.New()
- defer close(newlinequeue)
defer ps.Shutdown()
+ //~ brn := brain.New()
//~ defer brn.Shutdown()
go EventToWeb(ps)
fmt.Println(xmpperr)
fmt.Println("XMPP Bot disabled")
}
- go ReadFromUSocket(presence_socket_path_, newlinequeue)
+
ticker := time.NewTicker(time.Duration(7) * time.Minute)
for {
select {
- case e := <-newlinequeue:
- ParseSocketInputLine(e, ps) //, brn)
+ case e := <-zmqsub.In():
+ ParseZMQr3Event(e, ps) //, brn)
case <-ticker.C:
- ps.Pub(TimeTick{time.Now().Unix()}, "updateinterval")
+ ps.Pub(r3events.TimeTick{time.Now().Unix()}, "updateinterval")
}
}
}
go-linux-386 clean
#go-linux-386 build
#strip ${PWD:t}
-go-linux-386 build -ldflags "-s" && rsync -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/
+go-linux-386 build -ldflags "-s" && rsync --progress -v ${PWD:t} wuzzler.realraum.at:/flash/tuer/
gen_status = nil
} else {
gen_status = &xmpp.Generic{Chardata: status}
- }
+ }
return &xmpp.Presence{Header: xmppmsgheader, Show: gen_show, Status: gen_status}
}
xmppout <- botdata.makeXMPPMessage(to, pec, nil)
}
}
-
+
case XMPPStatusEvent:
xmppout <- botdata.makeXMPPPresence("", "", pec.Show, pec.Status)
-
+
case XMPPMsgEvent:
if pec.RememberAsStatus {
last_status_msg = &pec.Msg
switch stanza.GetHeader().Type {
case "subscribe":
xmppout <- botdata.makeXMPPPresence(stanza.GetHeader().From, "subscribed", "", "")
- jabber_events <- JabberEvent{stanza.GetHeader().From, true, R3NoChange, false}
+ jabber_events <- JabberEvent{stanza.GetHeader().From, true, R3NoChange, false}
xmppout <- botdata.makeXMPPPresence(stanza.GetHeader().From, "subscribe", "", "")
case "unsubscribe", "unsubscribed":
jabber_events <- JabberEvent{stanza.GetHeader().From, false, R3NeverInfo, false}
import (
pubsub "github.com/tuxychandru/pubsub"
- "regexp"
- "strconv"
- "bufio"
- "time"
+ //~ "bufio"
+ //~ "time"
//~ "./brain"
- "net"
+ //~ "net"
+ "encoding/json"
+ "log"
+ r3events "svn.spreadspace.org/realraum/go.svn/r3-eventbroker_zmq/r3events"
)
-var (
- re_presence_ *regexp.Regexp = regexp.MustCompile("Presence: (yes|no)(?:, (opened|closed), (.+))?")
- re_state_ *regexp.Regexp = regexp.MustCompile("State: (closed|opened|manual movement|error|reset|timeout after open|timeout after close|opening|closing).*")
- re_infocard_ *regexp.Regexp = regexp.MustCompile("Info\(card\): card\(([a-fA-F0-9]+)\) (found|not found).*")
- re_infoajar_ *regexp.Regexp = regexp.MustCompile("Info\(ajar\): door is now (ajar|shut)")
- re_command_ *regexp.Regexp = regexp.MustCompile("(open|close|toggle|reset)(?: +(Card|Phone|SSH|ssh))?(?: +(.+))?")
- re_button_ *regexp.Regexp = regexp.MustCompile("PanicButton|button\\d?")
- re_temp_ *regexp.Regexp = regexp.MustCompile("temp0: (\\d+\\.\\d+)")
- re_photo_ *regexp.Regexp = regexp.MustCompile("photo0: (\\d+)")
-)
-
-
-type PresenceUpdate struct {
- Present bool
- Ts int64
-}
-
-type DoorStatusUpdate struct {
- Locked bool
- Shut bool
- Ts int64
-}
-
-type DoorCommandEvent struct {
- Command string
- Using string
- Who string
- Ts int64
-}
-
-type ButtonPressUpdate struct {
- Buttonindex int
- Ts int64
-}
-
-type TempSensorUpdate struct {
- Sensorindex int
- Value float64
- Ts int64
+func ParseZMQr3Event(lines [][]byte, ps *pubsub.PubSub) { //, brn *brain.Brain) {
+ //log.Printf("ParseZMQr3Event: len: %d lines: %s", len(lines), lines)
+ if len(lines) != 2 {
+ return
+ }
+ switch string(lines[0]) {
+ case "PresenceUpdate":
+ evnt := new(r3events.PresenceUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "presence")}
+ case "IlluminationSensorUpdate" :
+ evnt := new(r3events.IlluminationSensorUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "sensors")}
+ case "TempSensorUpdate" :
+ evnt := new(r3events.TempSensorUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "sensors")}
+ case "MovementSensorUpdate" :
+ evnt := new(r3events.MovementSensorUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "movement")}
+ case "ButtonPressUpdate" :
+ evnt := new(r3events.ButtonPressUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "buttons")}
+ case "DoorLockUpdate" :
+ log.Print("DoorLockUpdate received")
+ evnt := new(r3events.DoorLockUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "door")}
+ case "DoorAjarUpdate" :
+ evnt := new(r3events.DoorAjarUpdate)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "door")}
+ case "DoorCommandEvent" :
+ evnt := new(r3events.DoorCommandEvent)
+ err := json.Unmarshal(lines[1],evnt)
+ if err == nil {ps.Pub(*evnt, "door")}
+ }
}
-
-type IlluminationSensorUpdate struct {
- Sensorindex int
- Value int64
- Ts int64
-}
-
-type TimeTick struct {
- Ts int64
-}
-
-type MovementSensorUpdate struct {
- Sensorindex int
- Ts int64
-}
-
-func ParseSocketInputLine(line string, ps *pubsub.PubSub) { //, brn *brain.Brain) {
- match_presence := re_presence_.FindStringSubmatch(line)
- match_status := re_status_.FindStringSubmatch(line)
- match_command := re_command_.FindStringSubmatch(line)
- match_button := re_button_.FindStringSubmatch(line)
- match_temp := re_temp_.FindStringSubmatch(line)
- match_photo := re_photo_.FindStringSubmatch(line)
-
- //~ log.Println("ParseSocketInputLine",line)
- var tidbit interface{}
- ts := time.Now().Unix()
- if match_presence != nil {
- if match_presence[2] != "" { ps.Pub(DoorStatusUpdate{match_presence[2] == "closed", true, ts}, "door"); }
- tidbit = PresenceUpdate{match_presence[1] == "yes", ts}
- //~ brn.Oboite("presence", tidbit)
- ps.Pub(tidbit, "presence")
- } else if match_status != nil {
- tidbit = DoorStatusUpdate{match_status[1] == "closed", match_status[3] == "shut", ts}
- //~ brn.Oboite("door", tidbit)
- ps.Pub(tidbit, "door")
- } else if match_command != nil {
- tidbit = DoorCommandEvent{match_command[1], match_command[2], match_command[3], ts}
- //~ brn.Oboite("doorcmd", tidbit)
- ps.Pub(tidbit, "door")
- } else if match_button != nil {
- //~ brn.Oboite("button0", ts)
- ps.Pub(ButtonPressUpdate{0, ts}, "buttons")
- } else if match_temp != nil {
- newtemp, err := strconv.ParseFloat((match_temp[1]), 32)
- if err == nil {
- //~ brn.Oboite( "temp0", newtemp)
- ps.Pub(TempSensorUpdate{0, newtemp, ts}, "sensors")
- }
- } else if match_photo != nil {
- newphoto, err := strconv.ParseInt(match_photo[1], 10, 32)
- if err == nil {
- //~ brn.Oboite("photo0", newphoto)
- ps.Pub(IlluminationSensorUpdate{0, newphoto, ts}, "sensors")
- }
- } else if line == "movement" {
- //~ brn.Oboite("movement", ts)
- ps.Pub(MovementSensorUpdate{0, ts}, "movements")
- }
-}
-
-func ReadFromUSocket(path string, c chan string) {
-ReOpenSocket:
- for {
- presence_socket, err := net.Dial("unix", path)
- if err != nil {
- //Waiting on Socket
- time.Sleep(5 * time.Second)
- continue ReOpenSocket
- }
- presence_reader := bufio.NewReader(presence_socket)
- for {
- line, err := presence_reader.ReadString('\n')
- if err != nil {
- //Socket closed
- presence_socket.Close()
- continue ReOpenSocket
- }
- c <- line
- }
- }
-}
\ No newline at end of file
"net/url"
"log"
"time"
+ r3events "svn.spreadspace.org/realraum/go.svn/r3-eventbroker_zmq/r3events"
)
events := ps.Sub("presence","door","sensors","buttons","updateinterval")
for eventinterface := range(events) {
+ //log.Printf("EventToWeb: %s" , eventinterface)
switch event := eventinterface.(type) {
- case TimeTick:
+ case r3events.TimeTick:
publishStateToWeb()
- case PresenceUpdate:
+ case r3events.PresenceUpdate:
statusstate.present = event.Present
publishStateToWeb()
- case DoorStatusUpdate:
- spaceapidata.MergeInSensor(spaceapi.MakeDoorLockSensor("TorwaechterLock", "Türschloß", event.Locked))
+ case r3events.DoorAjarUpdate:
spaceapidata.MergeInSensor(spaceapi.MakeDoorLockSensor("TorwaechterAjarSensor", "Türkontakt", event.Shut))
publishStateToWeb()
- case ButtonPressUpdate:
+ case r3events.DoorLockUpdate:
+ spaceapidata.MergeInSensor(spaceapi.MakeDoorLockSensor("TorwaechterLock", "Türschloß", event.Locked))
+ publishStateToWeb()
+ case r3events.ButtonPressUpdate:
statusstate.buttonpress_until = event.Ts + 3600
spaceapidata.AddSpaceEvent("PanicButton", "check-in", "The button has been pressed")
publishStateToWeb()
- case TempSensorUpdate:
+ case r3events.TempSensorUpdate:
spaceapidata.MergeInSensor(spaceapi.MakeTempCSensor("Temp0","Decke", event.Value))
- case IlluminationSensorUpdate:
+ case r3events.IlluminationSensorUpdate:
spaceapidata.MergeInSensor(spaceapi.MakeIlluminationSensor("Photodiode","Decke","1024V/5V", event.Value))
}
}
--- /dev/null
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+ zmq "github.com/vaughan0/go-zmq"
+ )
+
+// ---------- ZeroMQ Code -------------
+
+func ZmqsInit(sub_port string) (ctx *zmq.Context, sub_chans *zmq.Channels) {
+ var err error
+ ctx, err = zmq.NewContext()
+ if err != nil {
+ panic(err)
+ }
+ //close only on later panic, otherwise leave open:
+ defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }()
+
+ if len(sub_port) > 0 {
+ sub_sock, err := ctx.Socket(zmq.Sub)
+ if err != nil {
+ panic(err)
+ }
+ defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }()
+
+ sub_sock.Subscribe([]byte{}) //subscribe empty filter -> aka to all messages
+
+ if err = sub_sock.Connect(sub_port); err != nil {
+ panic(err)
+ }
+
+ sub_chans = sub_sock.ChannelsBuffer(10)
+ go zmqsHandleError(sub_chans)
+ } else {
+ sub_chans = nil
+ }
+
+ return
+}
+
+func zmqsHandleError(chans *zmq.Channels) {
+ for error := range(chans.Errors()) {
+ chans.Close()
+ panic(error)
+ }
+}