door_client_zmq
authorBernhard Tittelbach <xro@realraum.at>
Tue, 24 Sep 2013 03:24:36 +0000 (03:24 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Tue, 24 Sep 2013 03:24:36 +0000 (03:24 +0000)
go/door_client_zmq/main.go [new file with mode: 0644]
go/door_client_zmq/zeromq.go [new file with mode: 0644]

diff --git a/go/door_client_zmq/main.go b/go/door_client_zmq/main.go
new file mode 100644 (file)
index 0000000..4db2f1e
--- /dev/null
@@ -0,0 +1,91 @@
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+    "fmt"
+    "os"
+    "flag"
+    "log"
+    "bufio"
+    "bytes"
+)
+
+
+// ---------- Main Code -------------
+
+var (
+    cmd_port_ string
+    sub_port_ string
+)
+
+func usage() {
+    fmt.Fprintf(os.Stderr, "Usage: door_client_zmq\n")
+    flag.PrintDefaults()
+}
+
+func init() {
+    flag.StringVar(&cmd_port_, "cmdport", "tcp://localhost:5555", "zmq command socket path")
+    flag.StringVar(&sub_port_, "pubport", "gmp://*:6666", "zmq subscribe/listen socket path")
+    flag.Usage = usage
+    flag.Parse()
+}
+
+func LineReader(out chan <- [][]byte, stdin * os.File) {
+    linescanner := bufio.NewScanner(stdin)
+    linescanner.Split(bufio.ScanLines)
+    for linescanner.Scan() {
+        if err := linescanner.Err(); err != nil {
+            log.Print(err)
+            close(out)
+            return
+        }
+        text := bytes.Fields([]byte(linescanner.Text()))
+        if len(text) == 0 {
+            continue
+        }
+        out <- text
+    }
+}
+
+func main() { 
+    cmd_chans, sub_chans := ZmqsInit(cmd_port_, sub_port_)
+    defer cmd_chans.Close()
+    defer sub_chans.Close()
+    var listen bool
+    var ignore_next bool
+
+    user_input_chan := make(chan [][]byte, 1)
+    go LineReader(user_input_chan, os.Stdin)
+    defer os.Stdin.Close()
+    
+    for {
+        select {
+        case input, input_open := (<- user_input_chan):
+            if input_open {
+                if len(input) == 0 { continue }
+                 switch string(input[0]) {
+                    case "listen":
+                        listen = true
+                        fmt.Println("Now listening")
+                    case "quit":
+                        break
+                    default:
+                        ignore_next = true
+                        cmd_chans.Out() <- input
+                        fmt.Println( <- cmd_chans.In())
+                }
+            } else {
+                break
+            }
+        case pubsubstuff := <- sub_chans.In():
+            if ignore_next {
+                ignore_next = false
+                continue
+            }
+            if listen {
+                fmt.Println(pubsubstuff)
+            }
+        }
+    }
+}
diff --git a/go/door_client_zmq/zeromq.go b/go/door_client_zmq/zeromq.go
new file mode 100644 (file)
index 0000000..b74ca5e
--- /dev/null
@@ -0,0 +1,65 @@
+// (c) Bernhard Tittelbach, 2013
+
+package main
+
+import (
+    zmq "github.com/vaughan0/go-zmq"
+ )
+
+// ---------- ZeroMQ Code -------------
+
+func ZmqsInit(cmd_port, sub_port string)  (cmd_chans, pub_chans *zmq.Channels) {
+
+    cmd_ctx, err := zmq.NewContext()
+    if err != nil {
+        panic(err)
+    }
+    //close only on panic, otherwise leave open:
+    defer func(){ if r:= recover(); r != nil { cmd_ctx.Close(); panic(r) } }()
+    
+    pub_ctx, err := zmq.NewContext()
+    if err != nil {
+        panic(err)
+    }
+    defer func() { if r:= recover(); r != nil { pub_ctx.Close(); panic(r) } }()
+    
+    cmd_sock, err := cmd_ctx.Socket(zmq.Req)
+    if err != nil {
+        panic(err)
+    }
+    defer func() { if r:= recover(); r != nil { cmd_sock.Close(); panic(r) } }()
+
+    pub_sock, err := pub_ctx.Socket(zmq.Sub)
+    if err != nil {
+        panic(err)
+    }
+    defer func() { if r:= recover(); r != nil { pub_sock.Close(); panic(r) } }()
+
+    if err = cmd_sock.Bind(cmd_port); err != nil {
+        panic(err)
+    }
+
+    if err = pub_sock.Bind(sub_port); err != nil {
+        panic(err)
+    }
+    
+    cmd_chans = cmd_sock.Channels()
+    pub_chans = cmd_sock.Channels()
+    go zmqsHandleError(cmd_chans, pub_chans)
+    return
+}
+
+func zmqsHandleError(cmd_chans, pub_chans *zmq.Channels) {
+    for {
+        select {
+            case cmd_error := <- cmd_chans.Errors():
+                cmd_chans.Close()
+                pub_chans.Close()
+                panic(cmd_error)
+            case pub_error := <- pub_chans.Errors():
+                cmd_chans.Close()
+                pub_chans.Close()
+                panic(pub_error)
+        }
+    }
+}
\ No newline at end of file