Netty

 · 8 mins read

The Problem

Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well.

It is like how we don’t use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What’s required is a highly optimized protocol implementation that is dedicated to a special purpose.

The Solution

Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development.

‘Quick and easy’ does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

Now take a look at this:

Netty is an asynchronous event-driven network application framework

for rapid development of maintainable high performance protocol servers & clients.

Won’t you feel like a sort of server like node.js? But node+express is basically a http server, while netty is a more fundamental framework working with TCP and UDP protocols, which could be used for building not only just http server but also a socket server via text or binary protocols!

Wow, that sounds powerful! Here you can see those examples on its website.

Fundamental

  • Echo ‐ the very basic client and server
  • Discard ‐ see how to send an infinite data stream asynchronously without flooding the write buffer
  • Uptime ‐ implement automatic reconnection mechanism

Text protocols

  • Telnet ‐ a classic line-based network application
  • Quote of the Moment ‐ broadcast a UDP/IP packet
  • SecureChat ‐ an TLS-based chat server, derived from the Telnet example

Binary protocols

  • ObjectEcho ‐ exchange serializable Java objects
  • Factorial ‐ write a stateful client and server with a custom binary protocol
  • WorldClock ‐ rapid protocol protyping with Google Protocol Buffers integration

HTTP

  • Snoop ‐ build your own extremely light-weight HTTP client and server
  • File server ‐ asynchronous large file streaming in HTTP
  • Web Sockets (Client & Server) ‐ add a two-way full-duplex communication channel to HTTP using Web Sockets
  • SPDY (Client & Server) ‐ implement SPDY protocol
  • CORS demo ‐ implement cross-origin resource sharing

Advanced

  • Proxy server ‐ write a highly efficient tunneling proxy server
  • Port unification ‐ run services with different protocols on a single TCP/IP port

UDT

  • Byte streams ‐ use UDT in TCP-like byte streaming mode
  • Message flow ‐ use UDT in UDP-like message delivery mode
  • Byte streams in symmetric peer-to-peer rendezvous connect mode
  • Message flow in symmetric peer-to-peer rendezvous connect mode

On Github, Netty

I think there would be more and more people pay attention on it, because it’s really great,

Some Concepts

Channel

Channel can be thought as a tunnel that I/O request will go through. Every Channel has its own pipeline. On API level, the most used channel are io.netty.channel.NioServerSocketChannel for socket server and io.netty.channel.NioSocketChannel for socket client.

Pipeline

You can treat pipeline as a bi-direction queue. The queue is filled with inbound and outbound handlers. Every handler works like a servlet filter.

Handler

“Inbound” handlers only process read-in I/O event, “OutBound” handlers only process write-out I/O event, “InOutbound” handlers process both

In real project, the first input handler, handler 1 in above chart, is usually an decoder. The last output handler, handler 2 in above chart, is usually a encoder. The last InOutboundHandler usually do the real business, process input data object and send reply back.

Decoder transfers the read-in ByteBuf into data structure that is used in above bussiness logic. e.g. transfer byte stream into POJOs. If a frame is not fully received, it will block until its completion, so the next handler would NOT need to face a partial frame.

Encoder transfers the internal data structure to ByteBuf that will finally write out by socket.

Normally, the last business logic handler often executes in a different thread than I/O thread so that the I/O is not blocked by any time-consuming tasks.

It’s much more clear When you look at this, see more detail at here

Create a netty project

Create a maven project, then add this dependency to the dependencies element in pom.xml.

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.0.34.Final</version>
    </dependency>

Take the Echo of fundamental above for example, get code sources from the link then create java classes

EchoClient.java

package net.flywithfan;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.CharsetUtil;

/**
 * Sends one message when a connection is open and echoes back any received
 * data to the server.  Simply put, the echo client initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                            }


//                            p.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
//
//                            p.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));


                            p.addLast(new LoggingHandler(LogLevel.INFO));

                            p.addLast(new EchoClientHandler());

                        }
                    });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

EchoClientHandler.java

package net.flywithfan;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private InternalLogger logger   = InternalLoggerFactory.getInstance(getClass());

    private final ByteBuf reqMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        String str = "I am Client. Hello, Server!";
        reqMessage = Unpooled.buffer(EchoClient.SIZE);
        reqMessage.writeBytes(str.getBytes());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(reqMessage);
    }


    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf)msg;
        int len = in.readableBytes();
        byte[] bytes = new byte[len];
        in.readBytes(bytes);
        logger.info("msg from server: " + new String(bytes));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

EchoServer.java

package net.flywithfan;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.CharsetUtil;

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }

//                            p.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
//
//                            p.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));

                            p.addLast(new LoggingHandler(LogLevel.INFO));

                            p.addLast(new EchoServerHandler());
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

EchoServerHandler.java

package net.flywithfan;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.logging.LogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    private InternalLogger logger = InternalLoggerFactory.getInstance(getClass());

    private final ByteBuf respMessage;

    public EchoServerHandler() {
        String str = "I am Server, and I got your message.";
        respMessage = Unpooled.buffer(str.length());
        respMessage.writeBytes(str.getBytes());
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(respMessage);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

Then run the Server firstly, and the Client secondly, you will see these outputs:

client:

server:


reference:

1.Netty

2.Netty tutorial – hello world example