X-Git-Url: https://git.realraum.at/?a=blobdiff_plain;f=go%2Fuc_sensor_node_zmq%2Fmain.go;h=fa917e88daa81109b0efc6a19a46feed557ec9e0;hb=a87c91b7d4544703879b7ef6e472b863765e3949;hp=52a8aad6feb2c8d73880b2665fef3ab931bb99e4;hpb=59b5b0d11e44853e93a5a66dc66133254f844753;p=svn42.git diff --git a/go/uc_sensor_node_zmq/main.go b/go/uc_sensor_node_zmq/main.go index 52a8aad..fa917e8 100644 --- a/go/uc_sensor_node_zmq/main.go +++ b/go/uc_sensor_node_zmq/main.go @@ -3,41 +3,91 @@ package main import ( - "flag" -) + "flag" + "time" + zmq "github.com/vaughan0/go-zmq" +) // ---------- Main Code ------------- var ( - tty_dev_ string - pub_addr string - use_syslog_ bool + tty_dev_ string + pub_addr string + use_syslog_ bool + enable_debug_ bool + serial_speed_ uint ) +const exponential_backof_activation_threshold int64 = 4 + func init() { - flag.StringVar(&pub_addr, "brokeraddr", "tcp://torwaechter.realraum.at:4244", "zmq address to send stuff to") - flag.StringVar(&tty_dev_, "ttydev", "/dev/ttyACM0", "path do tty uc device") - flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local1 facility") - flag.Parse() + flag.StringVar(&pub_addr, "brokeraddr", "tcp://torwaechter.realraum.at:4243", "zmq address to send stuff to") + flag.StringVar(&tty_dev_, "ttydev", "/dev/ttyACM0", "path do tty uc device") + flag.UintVar(&serial_speed_, "serspeed", 0, "tty baudrate (0 to disable setting a baudrate e.g. in case of ttyACM)") + flag.BoolVar(&use_syslog_, "syslog", false, "log to syslog local1 facility") + flag.BoolVar(&enable_debug_, "debug", false, "debugging messages on") + flag.Parse() +} + +func ConnectSerialToZMQ(pub_sock *zmq.Socket, timeout time.Duration) { + defer func() { + if x := recover(); x != nil { + Syslog_.Println(x) + } + }() + + serial_wr, serial_rd, err := OpenAndHandleSerial(tty_dev_, serial_speed_) + if err != nil { + panic(err) + } + defer close(serial_wr) + + t := time.NewTimer(timeout) + for { + select { + case incoming_ser_line, seropen := <-serial_rd: + if !seropen { + return + } + t.Reset(timeout) + Syslog_.Printf("%s", incoming_ser_line) + if err := pub_sock.Send(incoming_ser_line); err != nil { + Syslog_.Println(err.Error()) + } + + case <-t.C: + Syslog_.Print("Timeout, no message for 120 seconds") + } + } } func main() { - zmqctx, pub_sock := ZmqsInit(pub_addr) - defer zmqctx.Close() - defer pub_sock.Close() - - if use_syslog_ { - LogEnableSyslog() - Syslog_.Print("started") - } - - serial_wr, serial_rd, err := OpenAndHandleSerial(tty_dev_) - if err != nil { panic(err) } - defer close(serial_wr) - - for incoming_ser_line := range(serial_rd) { - Syslog_.Printf("%s",incoming_ser_line) - if err := pub_sock.Send(incoming_ser_line); err != nil { panic(err) } - } + zmqctx, pub_sock := ZmqsInit(pub_addr) + if pub_sock == nil { + panic("zmq socket creation failed") + } + defer zmqctx.Close() + defer pub_sock.Close() + + if enable_debug_ { + LogEnableDebuglog() + } else if use_syslog_ { + LogEnableSyslog() + Syslog_.Print("started") + } + + var backoff_exp uint32 = 0 + for { + start_time := time.Now().Unix() + ConnectSerialToZMQ(pub_sock, time.Second*120) + run_time := time.Now().Unix() - start_time + if run_time > exponential_backof_activation_threshold { + backoff_exp = 0 + } + time.Sleep(150 * (1 << backoff_exp) * time.Millisecond) + if backoff_exp < 12 { + backoff_exp++ + } + } }