2e28d50c276208286ea439c2d3bca91277cba914
[svn42.git] / go / r3-eventbroker_zmq / zeromq.go
1 // (c) Bernhard Tittelbach, 2013
2
3 package main
4
5 import (
6     zmq "github.com/vaughan0/go-zmq"
7     "bytes"
8     "error"
9  )
10
11 // ---------- ZeroMQ Code -------------
12
13 type ReqSocket *zmq.Socket
14 type PubSocket *zmq.Socket
15
16 func ZmqsInit(sub_connect_port, sub_listen_port, pub_port, keylookup_port string)  (ctx *zmq.Context, sub_chans *zmq.Channels, pub_sock PubSocket, keylookup_sock ReqSocket) {
17     var err error
18     ctx, err = zmq.NewContext()
19     if err != nil {
20         panic(err)
21     }
22     //close only on later panic, otherwise leave open:
23     defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }()
24
25     if len(sub_port) > 0 {
26         sub_sock, err := ctx.Socket(zmq.Sub)
27         if err != nil {
28             panic(err)
29         }
30         defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }()
31
32             if err = sub_sock.Bind(sub_listen_port); err != nil {
33             panic(err)
34         }
35
36             if err = sub_sock.Connect(sub_connect_port); err != nil {
37             panic(err)
38         }
39
40         sub_chans = sub_sock.ChannelsBuffer(10)
41         go zmqsHandleError(sub_chans)
42     } else {
43         sub_chans = nil
44     }
45
46     if len(pub_port) > 0 {
47         pub_sock, err := ctx.Socket(zmq.Pub)
48         if err != nil {
49             panic(err)
50         }
51         defer func() { if r:= recover(); r != nil { pub_sock.Close(); panic(r) } }()
52
53         if err = pub_sock.Bind(pub_port); err != nil {
54             panic(err)
55         }
56     } else {
57         pub_sock = nil
58     }
59
60     if len(keylookup_port) > 0 {
61         keylookup_sock, err := ctx.Socket(zmq.Req)
62         if err != nil {
63             panic(err)
64         }
65         defer func() { if r:= recover(); r != nil { keylookup_sock.Close(); panic(r) } }()
66
67         if err = keylookup_sock.Connect(keylookup_port); err != nil {
68             panic(err)
69         }
70     } else {
71         keylookup_sock = nil
72     }
73
74     return
75 }
76
77 func zmqsHandleError(chans *zmq.Channels) {
78     for error := range(chans.Errors()) {
79         chans.Close()
80         panic(error)
81     }
82 }
83
84 func (s ReqSocket) ZmqsRequestAnswer(request [][]byte) (answer []][]byte) {
85     sock := s.(*zmq.Socket)
86     if err = sock.Send(request); err != nil {
87         panic(err)
88     }
89     parts, err := sock.Recv()
90     if err != nil {
91         panic(err)
92     }
93     return parts
94 }
95
96 func (s PubSocket) ZmqsPublish(msg [][]byte) {
97     sock := s.(*zmq.Socket)
98     if err = sock.Send(msg); err != nil {
99         panic(err)
100     }
101 }
102
103 func (s ReqSocket) LookupCardIdNick(hexbytes []byte) (nick string, error) {
104     answ := s.ZmqsRequestAnswer([][]byte{hexbytes})
105     if len(answ) == 0 {
106         return "", errors.New("Empty reply received")
107     }    
108     if answ[0] == []byte("ERROR") {
109         return "", errors.New(string(bytes.Join(answ[1:])))
110     }
111     if answ[0] !=  []byte("RESULT") || len(answ) != 3{
112         return "", errors.New("Unknown reply received")
113     }
114     if answ[1] !=  hexbytes {
115         return "", errors.New("Wrong reply received")
116     }
117     return string(answ[2]), nil
118 }