aaaf8bd33ff3b7e4cd8426d095cfb82679ebab5a
[svn42.git] / go / r3-eventbroker_zmq / main.go
1 // (c) Bernhard Tittelbach, 2013
2
3 package main
4
5 import (
6     "fmt"
7     "os"
8     "flag"
9     //~ "time"
10     "log/syslog"
11     "log"
12     pubsub "github.com/tuxychandru/pubsub"
13     "./r3events"
14 )
15
16 //~ func StringArrayToByteArray(ss []string) [][]byte {
17     //~ bb := make([][]byte, len(ss))
18     //~ for index, s := range(ss) {
19         //~ bb[index] = []byte(s)
20     //~ }
21     //~ return bb
22 //~ }
23
24 // ---------- Main Code -------------
25
26 var (
27     doorsub_addr_ string
28     sensorssub_port_ string
29     pub_port_ string
30     keylookup_addr_ string
31     use_syslog_ bool
32     Syslog_ *log.Logger
33 )
34
35 func usage() {
36     fmt.Fprintf(os.Stderr, "Usage: zmq_broker_event_transformer [options]\n")
37     flag.PrintDefaults()
38 }
39
40 func init() {
41     flag.StringVar(&doorsub_addr_, "doorsubaddr", "tcp://torwaechter.realraum.at:4242", "zmq door event publish addr")
42     flag.StringVar(&sensorssub_port_, "sensorsubport", "tcp://*:4243", "zmq public/listen socket addr for incoming sensor data")
43     flag.StringVar(&pub_port_, "pubport", "tcp://*:4244", "zmq port publishing consodilated events")
44     flag.StringVar(&keylookup_addr_, "keylookupaddr", "ipc:///run/tuer/door_keyname.ipc", "address to use for key/name lookups")
45     flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local2 facility")
46     flag.Usage = usage
47     flag.Parse()
48 }
49
50 func main() {
51     zmqctx, sub_in_chans, pub_out_socket, keylookup_socket := ZmqsInit(doorsub_addr_, sensorssub_port_, pub_port_, keylookup_addr_)
52     if sub_in_chans != nil {defer sub_in_chans.Close()}
53     defer zmqctx.Close()
54     if pub_out_socket != nil {defer pub_out_socket.Close()}
55     if keylookup_socket != nil {defer keylookup_socket.Close()}
56     if sub_in_chans == nil || pub_out_socket == nil || keylookup_socket == nil {
57         panic("zmq sockets must not be nil !!")
58     }
59     if use_syslog_ {
60         var logerr error
61         Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | (18<<3), 0)
62         //~ Syslog_, logerr = syslog.NewLogger(syslog.LOG_INFO | syslog.LOG_LOCAL2, 0)
63         if logerr != nil { panic(logerr) }
64         Syslog_.Print("started")
65         defer Syslog_.Print("exiting")
66     }
67
68     ps := pubsub.New(3)
69     //~ ticker := time.NewTicker(time.Duration(5) * time.Minute)
70     publish_these_events_chan := ps.Sub("door", "doorcmd", "presence", "sensors", "buttons", "movement")
71
72     go MetaEventRoutine_Movement(ps, 10, 20, 10)
73     go MetaEventRoutine_Presence(ps)
74
75     for {
76         select {
77             case subin := <- sub_in_chans.In():
78                 ParseSocketInputLine(subin, ps, keylookup_socket)
79             //~ case <- ticker.C:
80                 //~ MakeTimeTick(ps)
81             case event_interface := <- publish_these_events_chan:
82                 data, err := r3events.MarshalEvent2ByteByte(event_interface)
83                 log.Printf("publishing %s",data)
84                 if err != nil {
85                     if Syslog_ != nil {Syslog_.Print(err)}
86                     continue
87                 }
88                 if err := pub_out_socket.Send(data); err != nil {
89                     panic(err)
90                 }
91         }
92     }
93
94 }