Java NIO与反应器模式

Java NIO是1.4中出现的重要更新,其中几个重要概念简单理解以下,并且学习其中用到的Reactor模式,即反应器模式。

一、缓冲区 Buffer

缓冲区是NIO中最重要的一个概念,最上面的是Buffer接口,下面有:

  • CharBuffer
  • IntBuffer
  • DoubleBuffer
  • ShortBuffer
  • LongBuffer
  • FloatBuffer
  • ByteBuffer MappedBuffer

有7个主要的Buffer类,每个都对应一种Java语言中非布尔类型的原始类型数据

1、基本属性与方法

Buffer是包在一个对象内的基本数据元素数组。还有四个属性来提供包含的信息:

  1. 容量 Capacity
  2. 上届 Limit
  3. 位置 Position
  4. 标价 Mark

这四个属性分别指向数组的某个位置,从而方便实现不同的方法,方法有很多不一一列举,用的多了自然就熟悉了。

2、Buffer的创建

上面说的7个Buffer类都是抽象类,无法直接实例化对象,包含静态工厂方法来创建相应的实例。主要有几下两种方法来创建实例:

2.1、分配 allocate

分配就是在堆中分配一个指定的大小的缓存:

1
CharBuffer charbuffer=CharBuffer.allocate(100);

意义为从堆中创建一个char型数组来存储100个char型变量。

2.2、包装 wrap

还可以将已经存在的数组包装为Buffer:

1
2
3
char[] charArray=new char[1000];
CharBuffer charbuffer=CharBuffer.wrap(charArray);
CharBuffer charbuffer2=CharBuffer.wrap(charArray, 12, 42);

重载方法可以指定offset和length

2.3、直接缓冲区

直接缓冲区是IO的最佳选择,因为它使用的内存是通过调用本地操作系统的代码分配的,可能建立的成本会更高,但是效率也更高。

这部分内存是独立于JVM之外的。

1
ByteBuffer byteBuffer=ByteBuffer.allocateDirect(7);

3、ByteBuffer

在7中Buffer中,ByteBuffer与其他6个最明显的区别就是:它可以成为通道执行IO的源头或目标

因为操作系统上的IO操作本质上也是对连续字节的操作。

字节顺序

除了布尔不管是什么数据类型,它在内存中肯定都是连续的字节。只不过不同的数据类型由不同的字节表示。

比如int型有四个字节,而这四个字节的排列顺序就是字节顺序,有大端字节顺序小端字节顺序

视图缓冲区

视图缓冲区通过已经存在的缓冲区对象实例的工厂方法来创建。这种视图:

  • 有自己的属性,容量、位置上届和标记
  • 和原来的缓冲区共享数据
1
2
ByteBuffer byteBuffer=ByteBuffer.allocate(7).order(ByteOrder.BIG_ENDIAN);
CharBuffer charBuffer=byteBuffer.asCharBuffer();

4、MappedBuffer

映射缓冲区是带有存储在文件,通过内存映射来存取数据元素的字节缓冲区。在后面讨论。

二、通道 Channel

通道是访问IO服务的导管。IO分为文件IO和套接字IO两大类,对应的通道也分为两大类

  • 文件IO
    • FileChannel
  • 套接字IO
    • SocketChannel
    • ServerSocketChannel
    • DatagramChannel

1、Channel的创建

1.1、FileChannel的创建

FileChannel对象只能通过在一个打开的:

  • RandomAccessFile
  • FileInputStream
  • FileOutputStream

对象上调用getChannel()方法来获取。

1
2
RandomAccessFile raf=new RandomAccessFile("somefile", "r");
FileChannel fc=raf.getChannel();

1.2、Socket通道的创建

Socket通道可以直接调用工厂方法

1
2
3
SocketChannel sc=SocketChannel.open();        
ServerSocketChannel ssc=ServerSocketChannel.open();
DatagramChannel dc=DatagramChannel.open();

2、Channel的使用

2.1、read与write

两个接口分别定义了读写的方法,可以看出,都是只能对ByteBuffer进行读写的。

1
2
public int read(ByteBuffer dst) throws IOException;
public int write(ByteBuffer src) throws IOException;

2.2、Scatter与Gather

该功能可以支持Channel在多个Buffer上顺序抽取进入通道。

3、文件通道

关于文件通道:

  • FileChannel总是阻塞式的,不能被置于非阻塞模式。
  • 文件锁定,锁与文件关联而不是与通道关联。文件锁不适用于判断同意虚拟机上多个线程发起的访问。也就是说,同一Java虚拟机的不同线程公用同一个文件锁。

内存映射文件

定义

传统的文件IO的通过用户进程发布read()和write()系统调用来传输数据的,为了在内核空间与用户空间的内存区之间移动数据,一次以上的拷贝操作是避免不了的。

为了解决这一问题,有了内存映射文件。用户可以把文件数据当做内存,无需发布read或write的系统调用。

FileChannel中的内存映射文件

上面提到过MappedByteBuffer,就是FileChannel中对内存映射文件的支持。

FileChannel类提供一个map()方法,改方法可以在一个打开的文件和一个特殊类型的ByteBuffer之间建立一个虚拟内存映射。

该方法会创建一个有磁盘文件支持的虚拟内存映射并在那块内存空间外部封装MappedByteBuffer对象。

使用方法

创建映射的时候可以指定参数mode、position和size。mode代表映射模式。文件的映射模式有三种:

  • READ_ONLY
  • WRITE_ONLY
  • PRIVATE

前面两个很好理解,而第三个PRIVATE表示要建立一个写时拷贝的映射。

这意味着通过put()方法所做的任何修改都会导致产生一个私有的数据拷贝并且该拷贝中的数据只有MappedByteBuffer实例可以看到。该过程不会对底层文件做任何修改,而且一旦缓冲区被垃圾回收,那些修改都会丢失。

假设一个文件被多个MappedByteBuffer对象映射并且每个映射都是PRIVATE模式,你们这个文件的大部分内容都可以被所有映射共享

4、Socket通道

Socket通道最重要的就是可以设置为非阻塞模式。相关操作在一个公有超类SelectableChannel中定义,这个类也与后面的Selector类是实现反应器模式的关键,后面会有更多介绍。

4.1、ServerSocketChannel

ServerSocketChannel对应的是java.net.ServerSocket类。并在其基础上增加了通道语义,能够在非阻塞模式下运行。

创建

使用静态工厂方法可以新建ServerSocketChannel对象,并有一个与之关联的未绑定的ServerSocket。该关联的ServerSocket实例可以通过ServerSocketChannel的socket()方法获取。

1
2
ServerSocketChannel ssc=ServerSocketChannel.open();
ServerSocket sc=ssc.socket();

绑定

ServerSocketChannel上并没有bind()方法,因此必须取出关联的ServerSocket并用它来绑定到一个端口以开始监听。

1
sc.bind(new InetSocketAddress(1234));

accept()

ServerSocketChannel与ServerSocket都用accept()方法。

  • 调用ServerSocket的accept()方法会和其他的ServerSocket一样,阻塞地返回一个java.net.Socket对象。
  • 调用ServerSocketChannel的accept()方法则会返回一个SocketChannel对象,并且能够在非阻塞模式下运行。

4.2、SocketChannel

与ServerSocketChannel的逻辑很类似,SocketChannel对应的是java.net.Socket类

创建

调用open()方法可以创建SocketChannel对象,在SocketChannel对象上调用socket()方法可以返回对等的Socket对象。

1
2
SocketChannel sc=SocketChannel.open();
Socket s=sc.socket();

连接

新创建的SocketChannel是未连接的,直接进行IO操作会抛出异常。

连接也是可以通过SocketChannel上直接调用connect()方法或这在关联的Socket对象调用connect()方法。

  • 在Socket对象上调用connect()方法,与传统语义一样,在连接建立好之前都将阻塞
  • 在SocketChannel上调用connect()方法,如果默认的通道处于阻塞模式,那么会和上面的情况一样。
  • Channel在非阻塞模式下调用connect()方法会提供并发连接:发起对请求地址的连接并且立即返回:
    • 如果是true说明连接已经建立
    • 如果没有建立连接会立即返回false并继续连接过程

与连接有关的方法使得我们可以对一个通道进行轮询并在连接过程中判断通道所处的状态。而在下面的选择器中,则可以使用选择器来避免轮询并在异步建立连接之后收到通知。

三、选择器 Selector

在传统的Java解决方案中,会为每个Socket都创建一个线程使得线程在read()的时候阻塞,直到数据可用,但是这样的效率并不高,而且线程的上下文切换也需要消耗资源。

有了非阻塞模式之后,有了一种方案就是使用非阻塞功能检测就绪功能。但是也有一个问题,当非阻塞方法执行成功后,就必须要读进这些数据并进行处理了。这使得检查就绪的代码和处理数据的代码不能分离开。

而非阻塞方法最优雅的使用就是这里要介绍的选择器。

  • 将之前创建的一个或多个可选择的通道注册到选择器对象中,一个表示通道和选择器的选择键将会被返回
  • 选择键会记住关心的通道,追踪通道是否已经就绪
  • 调用选择器的select()方法时,相关的键会被更新,用来检查注册的通道
  • 可以选择一个键的集合,从而找到已经就绪的通道,遍历这个集合,就可以选择出每个从上次调用select()方法之后到现在已经准备就绪的通道。

1、关键类

1.1、选择器 Selector

管理着一个被注册的通道集合的信息和它们的就绪状态

1.2、可选择通道 SelectableChannel

  • 继承自这个类的通道都是可选择通道
  • 它可以SelectableChannel.register()方法被注册到Slector对象上,并且可以初测到多个通道。
  • 被注册到一个选择器之前,必须设置为非阻塞模式

1.3、选择键 SelectionKey

选择器键封装了特定通道与特定选择器的注册关系。该对象被SelectableChannel.register()方法返回。

包含两个集合(以整数形式编码):

  • interest集:表示注册关系所关心的通道操作,它是在调用register()方法的时候指定的,有四种可选操作

    • read
    • write
    • connect
    • accept

    通过调用interestOps()方法可以可以获取集合。

  • ready集:表示已经准备好的操作,interest集合的子集。通过readyOps()方法可以获取,比较方便的是直接调用相应的

    • isReadable()
    • isWritable()
    • isConnectable()
    • isAcceptable()

    方法来直接测试

2、使用选择器

2.1、Selector类

选择器内部有三个集合:

已注册的键的集合 Registered key set

  • 与选择器关联的已注册的键的集合
  • 并不是所有注册过的键都有效
  • 通过keys()方法返回,可能为空

已选择的键的集合 Selected key set

  • 上面集合的子集
  • 该集合中每个键对应的通道都是已经准备好的
  • 通过selectdKeys()方法返回
  • 与ready集合不一样,该集合是一个选择键的集合,其中的每个选择键都关联一个已经准别好至少一种操作的通道

已取消的键的集合 Cancelled key set

  • 也是已注册的键的子集
  • 包含cancel()方法被调用过的键
  • 私有成员,无法直接访问

2.2、选择过程

Selector类的select()方法有三种形式

无限阻塞:

1
2
Selector selector=Selector.open();
int n=selector.select();
  • 这种调用在没有通道就绪时将无限阻塞
  • 一旦至少有一个通道就绪,选择器的选择键集合就会被更新,并且每个就绪通道的ready集合也被更新。
  • 返回值将会是已经确定就绪通道的数量

限制等待时间:

1
2
Selector selector=Selector.open();
int n=selector.select(1000);
  • 可以指定一个参数,以毫秒计算
  • 如果在指定时间内没有通道就绪,将返回0值
  • 如果在时间内有至少一个通道就绪,将更新键表并立即返回。
  • 如果时间设置为0说明无限制等待,与上面方法相同。

非阻塞:

1
2
Selector selector=Selector.open();
int n=selector.selectNow();
  • 方法不会阻塞
  • 如果没有通道就绪,立即返回0

2.3、停止过程

有三种方法可以唤醒在select()方法中睡眠的线程

wakeup()方法

close()方法

interrupt()方法

四、Reactor模式

1、概念

Reactor设计模式,是一种基于事件驱动的设计模式。Reactor框架是ACE各个框架中最基础的一个框架,其他框架都或多或少地用到了Reactor框架。

在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。

Reactor模式与观察者模式有点像。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。当一个主体发生改变时,所有依属体都得到通知。

2、特点

优点

  1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;

  2. 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;

  3. 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

  4. 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

缺点

  1. 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
  2. Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
  3. Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。

3、架构模式

4、构成

  • Handles :即操作系统中的句柄,是对资源在操作系统层面上的一种抽象,它可以是打开的文件、一个连接(Socket)、Timer等。由于Reactor模式一般使用在网络编程中,因而这里一般指Socket Handle,即一个网络连接(Connection,在Java NIO中的Channel)。这个Channel注册到Synchronous Event Demultiplexer中,以监听Handle中发生的事件,对ServerSocketChannnel可以是CONNECT事件,对SocketChannel可以是READ、WRITE、CLOSE事件等。

  • Synchronous Event Demultiplexer:阻塞等待一系列的Handle中的事件到来,如果阻塞等待返回,即表示在返回的Handle中可以不阻塞的执行返回的事件类型。这个模块一般使用操作系统的select来实现。在Java NIO中用Selector来封装,当Selector.select()返回时,可以调用Selector的selectedKeys()方法获取Set,一个SelectionKey表达一个有事件发生的Channel以及该Channel上的事件类型。

  • Initiation Dispatcher:用于管理Event Handler,即EventHandler的容器,用以注册、移除EventHandler等;另外,它还作为Reactor模式的入口调用Synchronous Event Demultiplexer的select方法以阻塞等待事件返回,当阻塞等待返回时,根据事件发生的Handle将其分发给对应的Event Handler处理,即回调EventHandler中的handle_event()方法。

  • Event Handler:定义事件处理方法:handle_event(),以供InitiationDispatcher回调使用。

  • Concrete Event Handler:事件EventHandler接口,实现特定事件处理逻辑。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package designpattern.reactor;

import java.io.IOException;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable{
private Reactor reactor;
public Acceptor(Reactor reactor){
this.reactor=reactor;
}

@Override
public void run() {
try {
SocketChannel socketChannel=reactor.serverSocketChannel.accept();
if(socketChannel!=null)//调用Handler来处理channel
new SocketReadHandler(reactor.selector, socketChannel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package designpattern.reactor;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;


/**
*
* 反应器模式
*
* Java NIO中
* 可注册的ServerSocketChannnel相当于Handles
* Slector就相当于Synchronous Event Demultiplexer
* Reactor类,相当于一个Initiation Dispatcher,负责最开始的注册
* Acceptor类相当于Event Handler
*
*/



/*
*
*/
public class Reactor implements Runnable{

public final Selector selector;
public final ServerSocketChannel serverSocketChannel;


public Reactor(int port) throws IOException{
selector=Selector.open();
serverSocketChannel=ServerSocketChannel.open();
InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);
serverSocketChannel.socket().bind(inetSocketAddress);
serverSocketChannel.configureBlocking(false);

//向selector注册该channel
SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

//利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
selectionKey.attach(new Acceptor(this));
}


@Override
public void run() {
try {
while(!Thread.interrupted()){
selector.select();
Set<SelectionKey> selectionKeys= selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while(it.hasNext()){
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
SelectionKey selectionKey=it.next();
//这相当于Event_handlerd的回调
dispatch(selectionKey);
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

/*
* 当阻塞等待返回时,根据事件发生的Handle将其分发给对应的Event Handler处理
* 这里的Event Handler被定义为一个线程绑定到对于的SelectionKey上面
*
* 这里的attachment第一次是accepter线程,后面是SocketReadHandler线程
*/
private void dispatch(SelectionKey selectionKey) {
Runnable r = (Runnable)(selectionKey.attachment());
if (r != null){
r.run();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package designpattern.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;



public class SocketReadHandler implements Runnable{
private SocketChannel socketChannel;
public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{
this.socketChannel=socketChannel;
socketChannel.configureBlocking(false);

SelectionKey selectionKey=socketChannel.register(selector, 0);

//将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
//参看dispatch(SelectionKey key)
selectionKey.attach(this);

//同时将SelectionKey标记为可读,以便读取。
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}


/**
* 处理读取数据
*/
@Override
public void run() {
ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
inputBuffer.clear();
try {
socketChannel.read(inputBuffer);
//激活线程池 处理这些request
//requestHandle(new Request(socket,btt));
} catch (IOException e) {
e.printStackTrace();
}
}
}

五、参考地址

http://www.blogjava.net/DLevin/archive/2015/09/02/427045.html

http://blog.csdn.net/u010168160/article/details/53019039