netty 实战
java 

异步和事件驱动

Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端

高性能的系统不仅要求超一流的编程技巧,还需要网络编程、多线程处理和并发的专业知识。

一个既是异步的又是事件驱动的系统会表现出一种特殊的、对我们来说极具价值的行为:它可以以任意的顺序响应在任意的时间点产生的事件。

Netty 的设计底蕴:

  • 非阻塞网络调用使得我们可以不必等待一个操作的完成。完全异步的I/O正是基于这个特性构建的,并且更进一步:异步方法会立即返回,并且在它完成时,会直接或者在稍后的某个时间点通知用户。
  • 选择器使得我们能够通过较少的线程便可监视许多连接上的事件。

Netty通过触发事件将Selector从应用程序中抽象出来,消除了所有本来将需要手动编写的派发代码。在内部,将会为每个Channel分配一个EventLoop,用以处理所有事件,包括:

  • 注册感兴趣的事件;
  • 将事件派发给ChannelHandler;
  • 安排进一步的动作

netty 源码

// 是 2 的指数
private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}

MultithreadEventExecutorGroup 类中存贮着如下内容:

  • EventExecutor[] children: EventExecutor
  • Set<EventExecutor> readonlyChildren
  • AtomicInteger terminatedChildren
  • Promise<?> terminationFuture
  • EventExecutorChooserFactory.EventExecutorChooser chooser

NioEventLoopGroup

以下代码初始化了 NioEventLoopGroup 类,netty 做了很多工作

EventLoopGroup bossGroup = new NioEventLoopGroup();

跟踪 new NioEventLoopGroup():

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

发现其只是调用了父类的初始化方法,chooserFactory 使用的是 DefaultEventExecutorChooserFactory:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // 线程数默认使用的是 CPU 核的2倍
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        // 使用默认的 线程工厂 初始化执行器
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 初始化执行器,返回的是 NioEventLoop 实例
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // 工作线程选择器
    chooser = chooserFactory.newChooser(children);

    // 关闭监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

newChild 方法创建 NioEventLoop extends SingleThreadEventLoop 对象

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

ServerBootstrap()

ServerBootstrap b = new ServerBootstrap();
// bossGroup 是 parentGroup, workerGroup 是子group
b.group(bossGroup, workerGroup)
	// 初始化 channelFactory,新建 channel 时使用 NioServerSocketChannel 类
	.channel(NioServerSocketChannel.class)
	.childHandler(handler)
	// option 提供给NioServerSocketChannel用来接收进来的连接
	.option(ChannelOption.SO_BACKLOG, 128)
	// childOption 是对父管道ServerChannel接收到的连接的配置
	.childOption(ChannelOption.SO_KEEPALIVE, true);

// 绑定端口并开始接受请求
ChannelFuture f = b.bind(port).sync();

channel

channel(NioServerSocketChannel.class) 定义了使用哪个 channel,主要内容是初始化了一个 channelFactory:

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

ReflectiveChannelFactory 主要有两个内容:

// 构造函数,定义了该工厂初始化哪个类
public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        this.constructor = clazz.getConstructor();
    } catch (NoSuchMethodException e) {
        throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                " does not have a public non-arg constructor", e);
    }
}
// 使用默认构造器生成 channel 的实例
@Override
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

childHandleroptionchildOption 只是将参数保存到 ServerBootstrap

bind(port)

bind(port) 的主要代码在 AbstractBootstrap.doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
	// 使用 ReflectiveChannelFactory 初始化 channel,
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

初始化 channel

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
    	// 使用工厂方法初始化 channel,也即是初始化 `NioServerSocketChannel`
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 在 NioEventLoop (MultithreadEventLoopGroup) 中注册 channel
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

channelFactory.newChannel() 使用默认构造器创建了 NioServerSocketChannel 对象 初始化 NioServerSocketChannel,打开 serverSocketChannel,并在其父类中初始化 pipeline 以及 初始化 NioServerSocketChannelConfig:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    // 不同子类实现各自的 AbstractUnsafe,是消息收发的实现。nio 中在 `AbstractNioMessageChannel` 类中
    unsafe = newUnsafe();
    // 初始化 pipeline
    pipeline = newChannelPipeline();
}

初始化 ChannelPipeline:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // pipeline 的头和尾
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

init(channel) 中主要在 pipeline 中添加了如下的 channel:

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});
// SingleThreadEventLoop 注册 channel
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }
    // AbstractNioMessageChannel.NioMessageUnsafe
    channel.unsafe().register(this, promise);
    return promise;
}

// AbstractChannel.AbstractUnsafe
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // pipeline 中传播 注册的消息
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

channel.unsafe().register(this, promise); 是在 AbstractNioMessageChannel.NioMessageUnsafe 中注册了 channel

// AbstractNioChannel 注册 channel
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                ((NioEventLoop) eventLoop().unwrap()).selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

ServerBootstrap 主要的逻辑在 AbstractBootstrap 类中

AbstractBootstrap.doBind() 完成了绑定端口的操作

并通过映射新建了 NioServerSocketChannel 类,初始化的过程中监听 socket,使用 new DefaultChannelPipeline(this) 初始化 pipeline,并将自己添加到 pipeline 中。

一个 channel 对应一个 pipeline

DefaultChannelPipeline.addLast0() 将 channelHandler 加到 pipeline 中

local_offer #java 
navigate_before navigate_next