Custom Dialers

Why do you need them.

Things like net.Dial are amazing , you can very simply create a socket and write to them , and the same for any abstraction really , but the problem begins when you want to do something not covered by the abstraction , this happened to me this week.

Socket Options

So I’ve been doing a lot of work with go and rabbitmq , and mostly interacting with this library :https://github.com/streadway/amqp which i think is the most used library when you trying to work with any amqp protocol implementations like RabbitMQ or PubSub. The problem mainly was that that i wanted to set SO_REUSEPORT to 1 to enable it , if you look at some examples for streadway/aqmp connections you will see code like:

func main() {
  conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  ...

So as you can see it is pretty simple , there’s not really a place where i could set socket options , so i went and check how amqp.Dial() was implemented.

Before that setting socket options is done with the syscall package and it looks something like this

err = syscall.SetsockoptInt(int(ff.Fd()), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)

You can see the main thing we’re after is for amqp.Dial() to somehow return a file descriptor so we can run SetsockOptInt().

Tracing the Dial Call.

Ok so amqp.Dial() looks like this: amqp.Dial:

func Dial(url string) (*Connection, error) {
  return DialConfig(url, Config{
    Heartbeat: defaultHeartbeat,
    Locale:    defaultLocale,
  })
}

That is calling amqp.DialConfig which looks like amqp.DialConfig , which is a long function but the important bit is:

dialer := config.Dial
if dialer == nil {
  dialer = DefaultDial(defaultConnectionTimeout)
}

If you don’t feed DialConfig a dialer it will use DefaultDial let’s go see it!: amqp.DefaultDial

func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
  return func(network, addr string) (net.Conn, error) {
    conn, err := net.DialTimeout(network, addr, connectionTimeout)
    if err != nil {
      return nil, err
    }
    if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
      return nil, err
    }
    return conn, nil
  }
}

So we are getting closer to the bone , now we find a call for net.DialTimeout , which is from the net package obviously but i don’t want to get that far.

Trying to find the file descriptor

Ok so the amqp.DefaultDial function is where the magic happen , i could go ahead and re-write the function and change net.DialTimeout for something that actually returns the fd , but that would mean i would be changing a library not really good. Fortunately streadway/amqp provides you with amqp.DialConfig:

func DialConfig(url string, config Config) (*Connection, error) {

It’s the same than amqp.Dial but you can pass a config type , let’s look at the struct config Config:

type Config struct {
  SASL []Authentication
  Vhost string
  ChannelMax int           // 0 max channels means 2^16 - 1
  FrameSize  int           // 0 max bytes means unlimited
  Heartbeat  time.Duration // less than 1s uses the server's interval
  TLSClientConfig *tls.Config
  Properties Table
  Locale string
  ....
  // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
  // then an AMQP connection handshake.
  // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
  // used during TLS and AMQP handshaking.
  Dial func(network, addr string) (net.Conn, error)
}

Aha! you can pass your own dial function to it , and amqp.DialConfig will implement that to connect to rabbit , so you would be in full control, the only requirement is that it needs to return net.Conn and an error , net.Conn looks like net.Conn :

type Conn interface {
  Read(b []byte) (n int, err error)
  Write(b []byte) (n int, err error)
  Close() error
  LocalAddr() Addr
  RemoteAddr() Addr
  SetDeadline(t time.Time) error
  SetReadDeadline(t time.Time) error
  SetWriteDeadline(t time.Time) error
}

So as long as we return something that implement that interface (and all its method signatures we would be good)

Trying to find a net function that gives you access to the fd and returns net.Conn

The net package for go has a ton of ways to create sockets , but the one i like is net.DialTCP (don’t mistake it with net.dialTCP, which its got something interesting , it returns a pointer to net.TCPConn -> which is very similar to net.TCPListener net.TCPListener , most of these structs implicitly implement a bigger interface anyways so they all mostly do the same the special thing about net.TCPListener is that it’s got a File() method , which returns the file descriptor i so very much need!

The end product:

So i feed net.DialConfig a Dialer function which i implement in the way i want as long as it returns the type it needs to:

config.Dial = func(network, addr string) (net.Conn, error) {
  raddr, err := net.ResolveIPAddr("ip", strings.Split(addr, ":")[0])
  if err != nil {
    panic(err)
  }
  port, err := strconv.Atoi(strings.Split(addr, ":")[1])
  if err != nil {
    panic(err)
  }
  tcpaddr := net.TCPAddr{raddr.IP, port, ""}
  tcp, err := net.DialTCP("tcp", nil, &tcpaddr)
  if err != nil {
    panic(err)
  }
  ff, _ := tcp.File()
  err = syscall.SetsockoptInt(int(ff.Fd()), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
  return tcp, nil
}

Ignore the error handling please , but you can see how i finally got access to the file descriptor to run setSockOptions in it , and now i can see the results.

Note: Im using knetstat to see the sock options on already established sockets , you can do it with strace if you’re able to run the process first go , but here it is , remeber the whole idea was setting SO_REUSEPORT to 1: This is before

>> cat /proc/net/tcpstat | grep 5672
     0      0 127.0.0.1:44874         127.0.0.1:5672          ESTB      SO_REUSEADDR=0,SO_REUSEPORT=0,SO_KEEPALIVE=0,TCP_NODELAY=1,TCP_DEFER_ACCEPT=0

And after:

>> cat /proc/net/tcpstat | grep 5672
     0      0 127.0.0.1:44998         127.0.0.1:5672          ESTB      SO_REUSEADDR=0,SO_REUSEPORT=1,SO_KEEPALIVE=0,TCP_NODELAY=1,TCP_DEFER_ACCEPT=0

Well this was a hell of a learning but the most important thing to me was how nice is to have interfaces all over the code in golang , the fact that types sometimes are so “compatible” with each-other and some of them implement methods that you sometimes need.