--- /dev/null
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+ "fmt"
+ "os"
+ "flag"
+ "log"
+ "bufio"
+ "bytes"
+)
+
+
+// ---------- Main Code -------------
+
+var (
+ cmd_port_ string
+ sub_port_ string
+)
+
+func usage() {
+ fmt.Fprintf(os.Stderr, "Usage: door_client_zmq\n")
+ flag.PrintDefaults()
+}
+
+func init() {
+ flag.StringVar(&cmd_port_, "cmdport", "tcp://localhost:5555", "zmq command socket path")
+ flag.StringVar(&sub_port_, "pubport", "gmp://*:6666", "zmq subscribe/listen socket path")
+ flag.Usage = usage
+ flag.Parse()
+}
+
+func LineReader(out chan <- [][]byte, stdin * os.File) {
+ linescanner := bufio.NewScanner(stdin)
+ linescanner.Split(bufio.ScanLines)
+ for linescanner.Scan() {
+ if err := linescanner.Err(); err != nil {
+ log.Print(err)
+ close(out)
+ return
+ }
+ text := bytes.Fields([]byte(linescanner.Text()))
+ if len(text) == 0 {
+ continue
+ }
+ out <- text
+ }
+}
+
+func main() {
+ cmd_chans, sub_chans := ZmqsInit(cmd_port_, sub_port_)
+ defer cmd_chans.Close()
+ defer sub_chans.Close()
+ var listen bool
+ var ignore_next bool
+
+ user_input_chan := make(chan [][]byte, 1)
+ go LineReader(user_input_chan, os.Stdin)
+ defer os.Stdin.Close()
+
+ for {
+ select {
+ case input, input_open := (<- user_input_chan):
+ if input_open {
+ if len(input) == 0 { continue }
+ switch string(input[0]) {
+ case "listen":
+ listen = true
+ fmt.Println("Now listening")
+ case "quit":
+ break
+ default:
+ ignore_next = true
+ cmd_chans.Out() <- input
+ fmt.Println( <- cmd_chans.In())
+ }
+ } else {
+ break
+ }
+ case pubsubstuff := <- sub_chans.In():
+ if ignore_next {
+ ignore_next = false
+ continue
+ }
+ if listen {
+ fmt.Println(pubsubstuff)
+ }
+ }
+ }
+}
--- /dev/null
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+ zmq "github.com/vaughan0/go-zmq"
+ )
+
+// ---------- ZeroMQ Code -------------
+
+func ZmqsInit(cmd_port, sub_port string) (cmd_chans, pub_chans *zmq.Channels) {
+
+ cmd_ctx, err := zmq.NewContext()
+ if err != nil {
+ panic(err)
+ }
+ //close only on panic, otherwise leave open:
+ defer func(){ if r:= recover(); r != nil { cmd_ctx.Close(); panic(r) } }()
+
+ pub_ctx, err := zmq.NewContext()
+ if err != nil {
+ panic(err)
+ }
+ defer func() { if r:= recover(); r != nil { pub_ctx.Close(); panic(r) } }()
+
+ cmd_sock, err := cmd_ctx.Socket(zmq.Req)
+ if err != nil {
+ panic(err)
+ }
+ defer func() { if r:= recover(); r != nil { cmd_sock.Close(); panic(r) } }()
+
+ pub_sock, err := pub_ctx.Socket(zmq.Sub)
+ if err != nil {
+ panic(err)
+ }
+ defer func() { if r:= recover(); r != nil { pub_sock.Close(); panic(r) } }()
+
+ if err = cmd_sock.Bind(cmd_port); err != nil {
+ panic(err)
+ }
+
+ if err = pub_sock.Bind(sub_port); err != nil {
+ panic(err)
+ }
+
+ cmd_chans = cmd_sock.Channels()
+ pub_chans = cmd_sock.Channels()
+ go zmqsHandleError(cmd_chans, pub_chans)
+ return
+}
+
+func zmqsHandleError(cmd_chans, pub_chans *zmq.Channels) {
+ for {
+ select {
+ case cmd_error := <- cmd_chans.Errors():
+ cmd_chans.Close()
+ pub_chans.Close()
+ panic(cmd_error)
+ case pub_error := <- pub_chans.Errors():
+ cmd_chans.Close()
+ pub_chans.Close()
+ panic(pub_error)
+ }
+ }
+}
\ No newline at end of file