--bugs but cgo and zmq together still suck
authorBernhard Tittelbach <xro@realraum.at>
Tue, 24 Sep 2013 07:53:30 +0000 (07:53 +0000)
committerBernhard Tittelbach <xro@realraum.at>
Tue, 24 Sep 2013 07:53:30 +0000 (07:53 +0000)
go/door_client_zmq/main.go
go/door_client_zmq/zeromq.go
go/door_daemon_zmq/handle_commands.go
go/door_daemon_zmq/main.go
go/door_daemon_zmq/serial_tty.go
go/door_daemon_zmq/zeromq.go

index 2c2afe3..193c643 100644 (file)
@@ -25,8 +25,8 @@ func usage() {
 }
 
 func init() {
-    flag.StringVar(&cmd_port_, "cmdport", "tcp://localhost:5555", "zmq command socket path")
-    flag.StringVar(&sub_port_, "pubport", "gmp://*:6666", "zmq subscribe/listen socket path")
+    flag.StringVar(&cmd_port_, "cmdport", "tcp://127.0.0.1:3232", "zmq command socket path")
+    flag.StringVar(&sub_port_, "pubport", "pgm://233.252.1.42:4242", "zmq subscribe/listen socket path")
     flag.Usage = usage
     flag.Parse()
 }
@@ -34,13 +34,14 @@ func init() {
 func LineReader(out chan <- [][]byte, stdin * os.File) {
     linescanner := bufio.NewScanner(stdin)
     linescanner.Split(bufio.ScanLines)
+    defer close(out)
     for linescanner.Scan() {
         if err := linescanner.Err(); err != nil {
             log.Print(err)
-            close(out)
             return
         }
-        text := bytes.Fields([]byte(linescanner.Text()))
+        //text := bytes.Fields(linescanner.Bytes()) //this returns a slice (aka pointer, no array deep-copy here)
+        text := bytes.Fields([]byte(linescanner.Text())) //this allocates a string and slices it -> no race-condition with overwriting any data
         if len(text) == 0 {
             continue
         }
@@ -48,6 +49,11 @@ func LineReader(out chan <- [][]byte, stdin * os.File) {
     }
 }
 
+func ByteArrayToString(bb [][]byte) string {
+    b := bytes.Join(bb, []byte(" "))
+    return string(b)
+}
+
 func main() {
     cmd_chans, sub_chans := ZmqsInit(cmd_port_, sub_port_)
     defer cmd_chans.Close()
@@ -69,16 +75,21 @@ func main() {
                         listen = true
                         fmt.Println("Now listening")
                     case "quit":
-                        break
+                        os.Exit(0)
                     default:
                         ignore_next = true
                         cmd_chans.Out() <- input
-                        fmt.Println( <- cmd_chans.In())
+                        log.Print("input sent")
+                        reply := <- cmd_chans.In()
+                        log.Print("reply received")
+                        fmt.Println(ByteArrayToString(reply))
                 }
             } else {
-                break
+                os.Exit(0)
             }
         case pubsubstuff := <- sub_chans.In():
+            log.Print("pubsubstuff",pubsubstuff)
+            if len(pubsubstuff) == 0 { continue}
             if ignore_next {
                 ignore_next = false
                 continue
index 4b3b87d..485e3f2 100644 (file)
@@ -4,62 +4,61 @@ package main
 
 import (
     zmq "github.com/vaughan0/go-zmq"
+    "time"
  )
 
 // ---------- ZeroMQ Code -------------
 
-func ZmqsInit(cmd_port, sub_port string)  (cmd_chans, pub_chans *zmq.Channels) {
+func ZmqsInit(cmd_port, sub_port string)  (cmd_chans, sub_chans *zmq.Channels) {
 
-    cmd_ctx, err := zmq.NewContext()
+    ctx, err := zmq.NewContext()
     if err != nil {
         panic(err)
     }
     //close only on panic, otherwise leave open:
-    defer func(){ if r:= recover(); r != nil { cmd_ctx.Close(); panic(r) } }()
+    defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }()
 
-    pub_ctx, err := zmq.NewContext()
-    if err != nil {
-        panic(err)
-    }
-    defer func() { if r:= recover(); r != nil { pub_ctx.Close(); panic(r) } }()
-
-    cmd_sock, err := cmd_ctx.Socket(zmq.Req)
+    cmd_sock, err := ctx.Socket(zmq.Req)
     if err != nil {
         panic(err)
     }
     defer func() { if r:= recover(); r != nil { cmd_sock.Close(); panic(r) } }()
 
-    pub_sock, err := pub_ctx.Socket(zmq.Sub)
+    cmd_sock.SetRecvTimeout(2 * time.Second)
+    cmd_sock.SetSendTimeout(2 * time.Second)
+
+    sub_sock, err := ctx.Socket(zmq.Sub)
     if err != nil {
         panic(err)
     }
-    defer func() { if r:= recover(); r != nil { pub_sock.Close(); panic(r) } }()
+    defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }()
 
-    if err = cmd_sock.Bind(cmd_port); err != nil {
+    if err = cmd_sock.Connect(cmd_port); err != nil {
         panic(err)
     }
 
-    if err = pub_sock.Bind(sub_port); err != nil {
+    if err = sub_sock.Connect(sub_port); err != nil {
         panic(err)
     }
 
-    cmd_chans = cmd_sock.Channels()
-    pub_chans = cmd_sock.Channels()
-    go zmqsHandleError(cmd_chans, pub_chans)
+    cmd_chans = cmd_sock.ChannelsBuffer(10)
+    sub_chans = cmd_sock.ChannelsBuffer(10)
+
+    go zmqsHandleError(cmd_chans, sub_chans)
     return
 }
 
-func zmqsHandleError(cmd_chans, pub_chans *zmq.Channels) {
+func zmqsHandleError(cmd_chans, sub_chans *zmq.Channels) {
     for {
         select {
             case cmd_error := <- cmd_chans.Errors():
                 cmd_chans.Close()
-                pub_chans.Close()
+                sub_chans.Close()
                 panic(cmd_error)
-            case pub_error := <- pub_chans.Errors():
+            case sub_error := <- sub_chans.Errors():
                 cmd_chans.Close()
-                pub_chans.Close()
-                panic(pub_error)
+                sub_chans.Close()
+                panic(sub_error)
         }
     }
 }
\ No newline at end of file
index 2ac73ac..5e5d6e1 100644 (file)
@@ -46,7 +46,7 @@ func checkCmdStatus(tokens [][]byte) (error) {
     return nil
 }
 
-func HandleCommand(tokens [][]byte, topub chan <- [][]byte, serial_wr chan string, serial_rd chan [][]byte) ([][]byte, error){
+func HandleCommand(tokens [][]byte, topub chan <- [][]byte, serial_wr chan string) ([][]byte, error){
     if len(tokens) < 1 {
         return nil, errors.New("No Command to handle")
     }
@@ -63,6 +63,6 @@ func HandleCommand(tokens [][]byte, topub chan <- [][]byte, serial_wr chan strin
 
     topub <- tokens
     serial_wr <- dch.FirmwareChar
-    fw_reply := <- serial_rd
+    fw_reply := GetLastSerialLine()
     return fw_reply, nil
 }
index 9fa4531..0ff877c 100644 (file)
@@ -6,7 +6,7 @@ import (
     "fmt"
     "os"
     "flag"
-    "log"
+    //~ "log"
 )
 
 //~ func StringArrayToByteArray(ss []string) [][]byte {
@@ -30,8 +30,8 @@ func usage() {
 }
 
 func init() {
-    flag.StringVar(&cmd_port_, "cmdport", "tcp://localhost:5555", "zmq command socket path")
-    flag.StringVar(&pub_port_, "pubport", "gmp://*:6666", "zmq public/listen socket path")
+    flag.StringVar(&cmd_port_, "cmdport", "tcp://127.0.01:3232", "zmq command socket path")
+    flag.StringVar(&pub_port_, "pubport", "pgm://233.252.1.42:4242", "zmq public/listen socket path")
     flag.Usage = usage
     flag.Parse()
 }
@@ -46,23 +46,28 @@ func main() {
 
     cmd_chans, pub_chans := ZmqsInit(cmd_port_, pub_port_)
 
-    serial_wr, serial_rd, err := OpenAndHandleSerial(args[0], pub_chans.Out())
+    serial_wr, err := OpenAndHandleSerial(args[0], pub_chans.Out())
+    defer close(serial_wr)
     if err != nil {
-        close(serial_wr)
         panic(err)
     }
 
-    serial_wr <- "f"
-    firmware_version := <- serial_rd
-    log.Print("Firmware version:", firmware_version)
+    //~ serial_wr <- "f"
+    //~ firmware_version := <- serial_rd
+    //~ log.Print("Firmware version:", firmware_version)
 
     for incoming_request := range cmd_chans.In() {
-        reply, err := HandleCommand(incoming_request, pub_chans.Out(), serial_wr, serial_rd)
+        //~ log.Print(incoming_request)
+        reply, err := HandleCommand(incoming_request, pub_chans.Out(), serial_wr)
          if err != nil {
-            cmd_chans.Out() <- [][]byte{[]byte("ERROR"), []byte(err.Error())}
-            log.Print(err)
+            //~ log.Print(err)
+            out_msg := [][]byte{[]byte("ERROR"), []byte(err.Error())}
+            cmd_chans.Out() <- out_msg
+            //~ log.Print("sent error")
          } else {
+            //~ log.Print(reply)
             cmd_chans.Out() <- reply
+            //~ log.Print("sent reply")
          }
     }
 }
index 4ec786a..ddd41e6 100644 (file)
@@ -9,6 +9,7 @@ import (
     "os"
     "svn.spreadspace.org/realraum/go.svn/termios"
     "log"
+    "sync"
 )
 
 // ---------- Serial TTY Code -------------
@@ -24,37 +25,60 @@ func openTTY(name string) (*os.File, error) {
     return file, nil
 }
 
-func SerialWriter(in <- chan string, serial * os.File) {
+func serialWriter(in <- chan string, serial * os.File) {
     for totty := range(in) {
         serial.WriteString(totty)
         serial.Sync()
     }
 }
 
-func SerialReader(out chan <- [][]byte, topub chan <- [][]byte, serial * os.File) {
+var last_read_serial_input [][]byte = [][]byte{{}}
+var last_read_mutex sync.Mutex
+
+func serialReader(topub chan <- [][]byte, serial * os.File) {
     linescanner := bufio.NewScanner(serial)
     linescanner.Split(bufio.ScanLines)
     for linescanner.Scan() {
         if err := linescanner.Err(); err != nil {
             panic(fmt.Sprintf("Error in read from serial: %v\n",err.Error()))
         }
+        fmt.Println("read text", linescanner.Text())
         text := bytes.Fields([]byte(linescanner.Text()))
         if len(text) == 0 {
             continue
         }
-        out <- text
+        //~ for len(serial_read) > 5 {
+            //~ //drain channel before putting new line into it
+            //~ //thus we make sure "out" only ever holds the last line
+            //~ //thus the buffer never blocks and we don't need to read from out unless we need it
+            //~ // BUT: don't drain the chan dry, or we might have a race condition resulting in a deadlock
+            //~ <- serial_read
+        //~ }
+        last_read_mutex.Lock()
+        last_read_serial_input = text
+        fmt.Println("Put Text", text)
+        last_read_mutex.Unlock()
         topub <- text
     }
 }
 
-func OpenAndHandleSerial(filename string, topub chan <- [][]byte) (chan string, chan [][]byte, error) {
+//TODO: improve this, make it work for multiple open serial devices
+func GetLastSerialLine() [][]byte {
+    var last_line_pointer [][]byte
+    last_read_mutex.Lock()
+    last_line_pointer = last_read_serial_input
+    last_read_mutex.Unlock()
+    fmt.Println("Retrieve Text", last_line_pointer)
+    return last_line_pointer
+}
+
+func OpenAndHandleSerial(filename string, topub chan <- [][]byte) (chan string, error) {
     serial, err :=openTTY(filename)
     if err != nil {
-        return nil, nil, err
+        return nil, err
     }
-    var wr chan string
-    var rd chan [][]byte
-    go SerialWriter(wr, serial)
-    go SerialReader(rd, topub, serial)
-    return wr, rd, nil
+    wr := make(chan string)
+    go serialWriter(wr, serial)
+    go serialReader(topub, serial)
+    return wr, nil
 }
index cec7089..8f20ec4 100644 (file)
@@ -4,32 +4,30 @@ package main
 
 import (
     zmq "github.com/vaughan0/go-zmq"
+    "time"
  )
 
 // ---------- ZeroMQ Code -------------
 
 func ZmqsInit(cmd_port, pub_port string)  (cmd_chans, pub_chans *zmq.Channels) {
 
-    cmd_ctx, err := zmq.NewContext()
+    ctx, err := zmq.NewContext()
     if err != nil {
         panic(err)
     }
     //close only on panic, otherwise leave open:
-    defer func(){ if r:= recover(); r != nil { cmd_ctx.Close(); panic(r) } }()
+    defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }()
 
-    pub_ctx, err := zmq.NewContext()
-    if err != nil {
-        panic(err)
-    }
-    defer func() { if r:= recover(); r != nil { pub_ctx.Close(); panic(r) } }()
-
-    cmd_sock, err := cmd_ctx.Socket(zmq.Rep)
+    cmd_sock, err := ctx.Socket(zmq.Rep)
     if err != nil {
         panic(err)
     }
     defer func() { if r:= recover(); r != nil { cmd_sock.Close(); panic(r) } }()
 
-    pub_sock, err := pub_ctx.Socket(zmq.Pub)
+    cmd_sock.SetRecvTimeout(2 * time.Second)
+    cmd_sock.SetSendTimeout(2 * time.Second)
+
+    pub_sock, err := ctx.Socket(zmq.Pub)
     if err != nil {
         panic(err)
     }
@@ -43,8 +41,8 @@ func ZmqsInit(cmd_port, pub_port string)  (cmd_chans, pub_chans *zmq.Channels) {
         panic(err)
     }
 
-    cmd_chans = cmd_sock.Channels()
-    pub_chans = cmd_sock.Channels()
+    cmd_chans = cmd_sock.ChannelsBuffer(10)
+    pub_chans = cmd_sock.ChannelsBuffer(10)
     go zmqsHandleError(cmd_chans, pub_chans)
     return
 }