From 02dbcb404955284e9849d80ea56e9c5e02534d40 Mon Sep 17 00:00:00 2001 From: Bernhard Tittelbach Date: Tue, 24 Sep 2013 07:53:30 +0000 Subject: [PATCH] --bugs but cgo and zmq together still suck --- go/door_client_zmq/main.go | 25 +++++++++++++------ go/door_client_zmq/zeromq.go | 43 ++++++++++++++++---------------- go/door_daemon_zmq/handle_commands.go | 4 +-- go/door_daemon_zmq/main.go | 27 +++++++++++--------- go/door_daemon_zmq/serial_tty.go | 44 +++++++++++++++++++++++++-------- go/door_daemon_zmq/zeromq.go | 22 ++++++++--------- 6 files changed, 101 insertions(+), 64 deletions(-) diff --git a/go/door_client_zmq/main.go b/go/door_client_zmq/main.go index 2c2afe3..193c643 100644 --- a/go/door_client_zmq/main.go +++ b/go/door_client_zmq/main.go @@ -25,8 +25,8 @@ func usage() { } func init() { - flag.StringVar(&cmd_port_, "cmdport", "tcp://localhost:5555", "zmq command socket path") - flag.StringVar(&sub_port_, "pubport", "gmp://*:6666", "zmq subscribe/listen socket path") + flag.StringVar(&cmd_port_, "cmdport", "tcp://127.0.0.1:3232", "zmq command socket path") + flag.StringVar(&sub_port_, "pubport", "pgm://233.252.1.42:4242", "zmq subscribe/listen socket path") flag.Usage = usage flag.Parse() } @@ -34,13 +34,14 @@ func init() { func LineReader(out chan <- [][]byte, stdin * os.File) { linescanner := bufio.NewScanner(stdin) linescanner.Split(bufio.ScanLines) + defer close(out) for linescanner.Scan() { if err := linescanner.Err(); err != nil { log.Print(err) - close(out) return } - text := bytes.Fields([]byte(linescanner.Text())) + //text := bytes.Fields(linescanner.Bytes()) //this returns a slice (aka pointer, no array deep-copy here) + text := bytes.Fields([]byte(linescanner.Text())) //this allocates a string and slices it -> no race-condition with overwriting any data if len(text) == 0 { continue } @@ -48,6 +49,11 @@ func LineReader(out chan <- [][]byte, stdin * os.File) { } } +func ByteArrayToString(bb [][]byte) string { + b := bytes.Join(bb, []byte(" ")) + return string(b) +} + func main() { cmd_chans, sub_chans := ZmqsInit(cmd_port_, sub_port_) defer cmd_chans.Close() @@ -69,16 +75,21 @@ func main() { listen = true fmt.Println("Now listening") case "quit": - break + os.Exit(0) default: ignore_next = true cmd_chans.Out() <- input - fmt.Println( <- cmd_chans.In()) + log.Print("input sent") + reply := <- cmd_chans.In() + log.Print("reply received") + fmt.Println(ByteArrayToString(reply)) } } else { - break + os.Exit(0) } case pubsubstuff := <- sub_chans.In(): + log.Print("pubsubstuff",pubsubstuff) + if len(pubsubstuff) == 0 { continue} if ignore_next { ignore_next = false continue diff --git a/go/door_client_zmq/zeromq.go b/go/door_client_zmq/zeromq.go index 4b3b87d..485e3f2 100644 --- a/go/door_client_zmq/zeromq.go +++ b/go/door_client_zmq/zeromq.go @@ -4,62 +4,61 @@ package main import ( zmq "github.com/vaughan0/go-zmq" + "time" ) // ---------- ZeroMQ Code ------------- -func ZmqsInit(cmd_port, sub_port string) (cmd_chans, pub_chans *zmq.Channels) { +func ZmqsInit(cmd_port, sub_port string) (cmd_chans, sub_chans *zmq.Channels) { - cmd_ctx, err := zmq.NewContext() + ctx, err := zmq.NewContext() if err != nil { panic(err) } //close only on panic, otherwise leave open: - defer func(){ if r:= recover(); r != nil { cmd_ctx.Close(); panic(r) } }() + defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }() - pub_ctx, err := zmq.NewContext() - if err != nil { - panic(err) - } - defer func() { if r:= recover(); r != nil { pub_ctx.Close(); panic(r) } }() - - cmd_sock, err := cmd_ctx.Socket(zmq.Req) + cmd_sock, err := ctx.Socket(zmq.Req) if err != nil { panic(err) } defer func() { if r:= recover(); r != nil { cmd_sock.Close(); panic(r) } }() - pub_sock, err := pub_ctx.Socket(zmq.Sub) + cmd_sock.SetRecvTimeout(2 * time.Second) + cmd_sock.SetSendTimeout(2 * time.Second) + + sub_sock, err := ctx.Socket(zmq.Sub) if err != nil { panic(err) } - defer func() { if r:= recover(); r != nil { pub_sock.Close(); panic(r) } }() + defer func() { if r:= recover(); r != nil { sub_sock.Close(); panic(r) } }() - if err = cmd_sock.Bind(cmd_port); err != nil { + if err = cmd_sock.Connect(cmd_port); err != nil { panic(err) } - if err = pub_sock.Bind(sub_port); err != nil { + if err = sub_sock.Connect(sub_port); err != nil { panic(err) } - cmd_chans = cmd_sock.Channels() - pub_chans = cmd_sock.Channels() - go zmqsHandleError(cmd_chans, pub_chans) + cmd_chans = cmd_sock.ChannelsBuffer(10) + sub_chans = cmd_sock.ChannelsBuffer(10) + + go zmqsHandleError(cmd_chans, sub_chans) return } -func zmqsHandleError(cmd_chans, pub_chans *zmq.Channels) { +func zmqsHandleError(cmd_chans, sub_chans *zmq.Channels) { for { select { case cmd_error := <- cmd_chans.Errors(): cmd_chans.Close() - pub_chans.Close() + sub_chans.Close() panic(cmd_error) - case pub_error := <- pub_chans.Errors(): + case sub_error := <- sub_chans.Errors(): cmd_chans.Close() - pub_chans.Close() - panic(pub_error) + sub_chans.Close() + panic(sub_error) } } } \ No newline at end of file diff --git a/go/door_daemon_zmq/handle_commands.go b/go/door_daemon_zmq/handle_commands.go index 2ac73ac..5e5d6e1 100644 --- a/go/door_daemon_zmq/handle_commands.go +++ b/go/door_daemon_zmq/handle_commands.go @@ -46,7 +46,7 @@ func checkCmdStatus(tokens [][]byte) (error) { return nil } -func HandleCommand(tokens [][]byte, topub chan <- [][]byte, serial_wr chan string, serial_rd chan [][]byte) ([][]byte, error){ +func HandleCommand(tokens [][]byte, topub chan <- [][]byte, serial_wr chan string) ([][]byte, error){ if len(tokens) < 1 { return nil, errors.New("No Command to handle") } @@ -63,6 +63,6 @@ func HandleCommand(tokens [][]byte, topub chan <- [][]byte, serial_wr chan strin topub <- tokens serial_wr <- dch.FirmwareChar - fw_reply := <- serial_rd + fw_reply := GetLastSerialLine() return fw_reply, nil } diff --git a/go/door_daemon_zmq/main.go b/go/door_daemon_zmq/main.go index 9fa4531..0ff877c 100644 --- a/go/door_daemon_zmq/main.go +++ b/go/door_daemon_zmq/main.go @@ -6,7 +6,7 @@ import ( "fmt" "os" "flag" - "log" + //~ "log" ) //~ func StringArrayToByteArray(ss []string) [][]byte { @@ -30,8 +30,8 @@ func usage() { } func init() { - flag.StringVar(&cmd_port_, "cmdport", "tcp://localhost:5555", "zmq command socket path") - flag.StringVar(&pub_port_, "pubport", "gmp://*:6666", "zmq public/listen socket path") + flag.StringVar(&cmd_port_, "cmdport", "tcp://127.0.01:3232", "zmq command socket path") + flag.StringVar(&pub_port_, "pubport", "pgm://233.252.1.42:4242", "zmq public/listen socket path") flag.Usage = usage flag.Parse() } @@ -46,23 +46,28 @@ func main() { cmd_chans, pub_chans := ZmqsInit(cmd_port_, pub_port_) - serial_wr, serial_rd, err := OpenAndHandleSerial(args[0], pub_chans.Out()) + serial_wr, err := OpenAndHandleSerial(args[0], pub_chans.Out()) + defer close(serial_wr) if err != nil { - close(serial_wr) panic(err) } - serial_wr <- "f" - firmware_version := <- serial_rd - log.Print("Firmware version:", firmware_version) + //~ serial_wr <- "f" + //~ firmware_version := <- serial_rd + //~ log.Print("Firmware version:", firmware_version) for incoming_request := range cmd_chans.In() { - reply, err := HandleCommand(incoming_request, pub_chans.Out(), serial_wr, serial_rd) + //~ log.Print(incoming_request) + reply, err := HandleCommand(incoming_request, pub_chans.Out(), serial_wr) if err != nil { - cmd_chans.Out() <- [][]byte{[]byte("ERROR"), []byte(err.Error())} - log.Print(err) + //~ log.Print(err) + out_msg := [][]byte{[]byte("ERROR"), []byte(err.Error())} + cmd_chans.Out() <- out_msg + //~ log.Print("sent error") } else { + //~ log.Print(reply) cmd_chans.Out() <- reply + //~ log.Print("sent reply") } } } diff --git a/go/door_daemon_zmq/serial_tty.go b/go/door_daemon_zmq/serial_tty.go index 4ec786a..ddd41e6 100644 --- a/go/door_daemon_zmq/serial_tty.go +++ b/go/door_daemon_zmq/serial_tty.go @@ -9,6 +9,7 @@ import ( "os" "svn.spreadspace.org/realraum/go.svn/termios" "log" + "sync" ) // ---------- Serial TTY Code ------------- @@ -24,37 +25,60 @@ func openTTY(name string) (*os.File, error) { return file, nil } -func SerialWriter(in <- chan string, serial * os.File) { +func serialWriter(in <- chan string, serial * os.File) { for totty := range(in) { serial.WriteString(totty) serial.Sync() } } -func SerialReader(out chan <- [][]byte, topub chan <- [][]byte, serial * os.File) { +var last_read_serial_input [][]byte = [][]byte{{}} +var last_read_mutex sync.Mutex + +func serialReader(topub chan <- [][]byte, serial * os.File) { linescanner := bufio.NewScanner(serial) linescanner.Split(bufio.ScanLines) for linescanner.Scan() { if err := linescanner.Err(); err != nil { panic(fmt.Sprintf("Error in read from serial: %v\n",err.Error())) } + fmt.Println("read text", linescanner.Text()) text := bytes.Fields([]byte(linescanner.Text())) if len(text) == 0 { continue } - out <- text + //~ for len(serial_read) > 5 { + //~ //drain channel before putting new line into it + //~ //thus we make sure "out" only ever holds the last line + //~ //thus the buffer never blocks and we don't need to read from out unless we need it + //~ // BUT: don't drain the chan dry, or we might have a race condition resulting in a deadlock + //~ <- serial_read + //~ } + last_read_mutex.Lock() + last_read_serial_input = text + fmt.Println("Put Text", text) + last_read_mutex.Unlock() topub <- text } } -func OpenAndHandleSerial(filename string, topub chan <- [][]byte) (chan string, chan [][]byte, error) { +//TODO: improve this, make it work for multiple open serial devices +func GetLastSerialLine() [][]byte { + var last_line_pointer [][]byte + last_read_mutex.Lock() + last_line_pointer = last_read_serial_input + last_read_mutex.Unlock() + fmt.Println("Retrieve Text", last_line_pointer) + return last_line_pointer +} + +func OpenAndHandleSerial(filename string, topub chan <- [][]byte) (chan string, error) { serial, err :=openTTY(filename) if err != nil { - return nil, nil, err + return nil, err } - var wr chan string - var rd chan [][]byte - go SerialWriter(wr, serial) - go SerialReader(rd, topub, serial) - return wr, rd, nil + wr := make(chan string) + go serialWriter(wr, serial) + go serialReader(topub, serial) + return wr, nil } diff --git a/go/door_daemon_zmq/zeromq.go b/go/door_daemon_zmq/zeromq.go index cec7089..8f20ec4 100644 --- a/go/door_daemon_zmq/zeromq.go +++ b/go/door_daemon_zmq/zeromq.go @@ -4,32 +4,30 @@ package main import ( zmq "github.com/vaughan0/go-zmq" + "time" ) // ---------- ZeroMQ Code ------------- func ZmqsInit(cmd_port, pub_port string) (cmd_chans, pub_chans *zmq.Channels) { - cmd_ctx, err := zmq.NewContext() + ctx, err := zmq.NewContext() if err != nil { panic(err) } //close only on panic, otherwise leave open: - defer func(){ if r:= recover(); r != nil { cmd_ctx.Close(); panic(r) } }() + defer func(){ if r:= recover(); r != nil { ctx.Close(); panic(r) } }() - pub_ctx, err := zmq.NewContext() - if err != nil { - panic(err) - } - defer func() { if r:= recover(); r != nil { pub_ctx.Close(); panic(r) } }() - - cmd_sock, err := cmd_ctx.Socket(zmq.Rep) + cmd_sock, err := ctx.Socket(zmq.Rep) if err != nil { panic(err) } defer func() { if r:= recover(); r != nil { cmd_sock.Close(); panic(r) } }() - pub_sock, err := pub_ctx.Socket(zmq.Pub) + cmd_sock.SetRecvTimeout(2 * time.Second) + cmd_sock.SetSendTimeout(2 * time.Second) + + pub_sock, err := ctx.Socket(zmq.Pub) if err != nil { panic(err) } @@ -43,8 +41,8 @@ func ZmqsInit(cmd_port, pub_port string) (cmd_chans, pub_chans *zmq.Channels) { panic(err) } - cmd_chans = cmd_sock.Channels() - pub_chans = cmd_sock.Channels() + cmd_chans = cmd_sock.ChannelsBuffer(10) + pub_chans = cmd_sock.ChannelsBuffer(10) go zmqsHandleError(cmd_chans, pub_chans) return } -- 1.7.10.4