本文主要是重新梳理了Java的IO模型,基于之前NIO的文章進行補充,為學習Netty做準備。
1、什么是IO模型:
簡單地說,就是用什么樣的通道進行數(shù)據(jù)的發(fā)送和接收。比如通道是阻塞的還是非阻塞的,是同步還是異步的。
2、Java支持的IO模型:
java支持的IO模型有:
BIO:就是JDK原生的IO,同步并且是阻塞的。在用BIO進行網(wǎng)絡(luò)通信時,服務(wù)端的實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務(wù)端就要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷。適用場景:連接數(shù)比較小且固定的架構(gòu),程序簡單易于理解。
NIO:同步非阻塞的IO,服務(wù)端的實現(xiàn)模式為一個線程處理多個請求,即客戶端發(fā)送的連接請求都會注冊到多路復用器上,多路復用器輪詢到連接有IO請求就進行處理,下圖中的selector就是這個多路復用器。當然server也可以啟動多個線程,一個線程維護一個selector,一個selector維護多個client。Netty就是基于NIO。適用場景:連接數(shù)多且連接比較短,比如聊天服務(wù)器、彈幕系統(tǒng)、服務(wù)器之間的通訊等,編程比較復雜。
AIO:異步非阻塞的IO,JDK1.7開始出現(xiàn)的,目前還沒得到廣泛的應(yīng)用。適用場景:連接數(shù)多且連接比較長的重架構(gòu),編程比較復雜
1、BIO編程流程:
服務(wù)器端啟動一個serverSocket;
客戶端啟動一個socket對服務(wù)端進行通信,默認情況下服務(wù)端需要對每個客戶端連接建立一個線程與之通信;
客戶端發(fā)出連接請求后,先咨詢服務(wù)端是否有線程響應(yīng),如果沒有則會等待或者遭到拒絕;如果有線程響應(yīng),客戶端線程會等待請求結(jié)束后繼續(xù)執(zhí)行;
2、BIO的應(yīng)用實例:
需求:使用BIO編寫一個服務(wù)端,監(jiān)聽6666端口,當有客戶端連接時,就啟動一個線程與之通信。使用線程池機制進行改善,使其可以連接多個客戶端。服務(wù)端可以接收客戶端發(fā)送的數(shù)據(jù)。
代碼:
public class BioServer {
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建線程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 2. 創(chuàng)建serverSocket并監(jiān)聽端口
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服務(wù)端已啟動");
// 3. 等待客戶端連接
System.out.println("等待連接……");
while (true) {
final Socket socket = serverSocket.accept();
System.out.println("客戶端連接進來了");
// 4. 創(chuàng)建一個線程與之通信
executorService.execute(new Runnable() {
@Override
public void run() {
handler(socket);
}
});
}
}
public static void handler(Socket socket) {
try {
byte[] bys = new byte[1024];
// 通過socket獲取輸入流
InputStream inputstream = socket.getInputStream();
// 循環(huán)讀取客戶端發(fā)送的數(shù)據(jù)
while (true) {
System.out.println("線程id:" + Thread.currentThread().getId() + ",線程名:" + Thread.currentThread().getName());
System.out.println("reading……");
int read = inputstream.read(bys);
// 不等于負一表示還沒讀取完
if (read != -1) {
System.out.println(new String(bys, 0, read));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關(guān)閉socket連接
try { socket.close(); } catch (IOException e) { e.printStackTrace(); }
}
}
}
這段代碼就是按照需求編寫了一個服務(wù)端。啟動這個類,然后在CMD窗口輸入telnet 127.0.0.1 6666
回車,然后按ctrl
+ ]
,就進入了telnet,輸入send hello
,服務(wù)端控制臺就會打印出hello
以及線程信息??梢园l(fā)現(xiàn),當我們啟動服務(wù)端后,控制臺會打印出等待連接……
,然后就卡在這里不動了,當我們通過telnet連接后,會打印出reading……
,并且卡在那里,說明這是阻塞的。我們啟動兩個telnet去連接,通過控制臺打印的線程id可以發(fā)現(xiàn),處理這兩個客戶端連接的是兩個線程,這與之前的模型分析一致
1、NIO三大核心部分:
Channel:通道,可以理解為是鐵路;
Buffer:緩沖區(qū),可以理解為火車;程序不是直接通過channel讀寫數(shù)據(jù),而是通過buffer。這很好理解,火車裝著貨物跑到鐵路上,對應(yīng)了buffer裝著數(shù)據(jù)跑在channel上;
Selector:選擇器,就是上面NIO模型圖中的selector,selector發(fā)現(xiàn)這個通道有內(nèi)容要讀取,就處理這個通道,如果這個通道沒啥事兒,它不會阻塞在這里等這個通道,而是去看別的通道有沒有內(nèi)容要讀取,如果都沒有,管理selector的這個線程還可以去做別的事。
selector、buffer、channel之間的關(guān)系:
每個channel都會對應(yīng)一個buffer;一個channel可以理解為就是一個連接;
一個selector對應(yīng)一個線程;一個selector對應(yīng)多個channel;
程序切換到哪個channel是由事件決定;
selector會根據(jù)不同的事件,在各通道上切換;
buffer底層是一個數(shù)組;
數(shù)據(jù)的讀取和寫入是通過buffer來完成的;BIO的讀取和寫入是通過輸入輸出流,不能雙向,而buffer是雙向的;
2、buffer:
buffer有四個重要的屬性:
capacity:容量,該buffer能夠容納的最大數(shù)據(jù)量,緩沖區(qū)創(chuàng)建時被設(shè)定且不能修改;
limit:緩沖區(qū)當前的終點,不能對緩沖區(qū)超出終點的位置進行讀寫,limit是可變的;
position:下一個要被讀或?qū)懙脑氐乃饕?,每次讀寫都會改變該值;
mark:標記;
讀取數(shù)據(jù)的時候可以設(shè)置position和limit,表示從哪兒開始讀,讀到哪兒結(jié)束。
3、channel:
channel類似BIO的流,但是有些區(qū)別,如下:
通過buffer,可以同時進行讀寫,而流只能讀或者寫;
通道可以實現(xiàn)異步讀寫數(shù)據(jù);
通道可以從緩沖區(qū)讀數(shù)據(jù),也可以寫數(shù)據(jù)到緩沖區(qū);
channel是一個接口,用得比較多的實現(xiàn)有如下幾個:
FileChannel:對文件進行操作的;
DatagramChannel:通過 UDP 讀寫網(wǎng)絡(luò)中的數(shù)據(jù)通道;
ServerSocketChannel:類似ServerSocket;
SocketChannel:類似Socket
看幾個實操案例:
通過ByteBuffer和FileChannel將“帶你去爬山”這句話寫入到test01.txt中:
public class NioFileChannel01 {
public static void main(String[] args) throws IOException {
String str = "帶你去爬山啊";
FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test01.txt");
// 1. 通過FileOutputStream獲取對應(yīng)的FileChannel
FileChannel fc = fos.getChannel();
// 2. 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 3. 將str放入buffer中
buffer.put(str.getBytes());
// 4. 切換寫數(shù)據(jù)模式
buffer.flip();
// 5. 將buffer數(shù)據(jù)寫入到通道
fc.write(buffer);
// 6. 關(guān)閉資源
fos.close();
fc.close();
}
}
通過ByteBuffer和FileChanneltest01.txt文件中的內(nèi)容:
public class NioFileChannel02 {
public static void main(String[] args) throws IOException {
// 1. 讀取test01.txt文件
File file = new File("C:\\Users\\14751\\Desktop\\test01.txt");
// 2. 將file轉(zhuǎn)成FileInputStream
FileInputStream fis = new FileInputStream(file);
// 3. 獲取通道
FileChannel channel = fis.getChannel();
// 4. 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
// 5. 將通道數(shù)據(jù)讀到buffer中
channel.read(buffer);
System.out.println(new String(buffer.array()));
// 6. 關(guān)閉資源
fis.close();
channel.close();
}
}
使用FileChannel的read和write方法完成對文件的拷貝:
public class NioFileChannel03 {
public static void main(String[] args) throws IOException {
// 1. 讀取源文件
FileInputStream fis = new FileInputStream("C:\\Users\\14751\\Desktop\\test01.txt");
// 2. 獲取通道
FileChannel sourceChannel = fis.getChannel();
// 3. 加載目標文件
FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test02.txt");
// 4. 獲取通道
FileChannel targetChannel = fos.getChannel();
// 5. 創(chuàng)建緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
// 6. 標志位復位,一定不能漏了這步,否則死循環(huán)
buffer.clear();
// 7. 讀取數(shù)據(jù)
int read = sourceChannel.read(buffer);
if (read == -1) {
break;
}
// 8. 切換到寫數(shù)據(jù)模式,并將buffer中的數(shù)據(jù)寫入到targetChannel
buffer.flip();
targetChannel.write(buffer);
}
// 9. 關(guān)閉資源
fis.close();
sourceChannel.close();
fos.close();
targetChannel.close();
}
}
使用transferFrom完成對文件的拷貝:
public class NioFileChannel04 {
public static void main(String[] args) throws IOException {
// 1. 讀取源文件
FileInputStream fis = new FileInputStream("C:\\Users\\14751\\Desktop\\test01.txt");
// 2. 獲取通道
FileChannel sourceChannel = fis.getChannel();
// 3. 加載目標文件
FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test03.txt");
// 4. 獲取通道
FileChannel targetChannel = fos.getChannel();
// 5. 使用transferFrom完成拷貝
targetChannel.transferFrom(sourceChannel, 0, sourceChannel.size());
// 6. 關(guān)閉資源
fis.close();
sourceChannel.close();
fos.close();
targetChannel.close();
}
}
MappedByteBuffer:可以讓文件直接在內(nèi)存中修改,操作系統(tǒng)不需要拷貝,用法如下:
public class NioFileChannel05 {
public static void main(String[] args) throws IOException {
// 1. 加載文件
RandomAccessFile file = new RandomAccessFile("C:\\Users\\14751\\Desktop\\test01.txt", "rw"); // rw表示讀寫
// 2. 獲取文件通道
FileChannel channel = file.getChannel();
// 3. 獲取MappedByteBuffer,這三個參數(shù),第一個表示讀寫模式,第二個表示直接修改的起始位置,第三個表示映射到內(nèi)存中的大小
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
// 4. 對test01.txt進行修改
buffer.put(0, (byte)'A'); // 第一個字符改成A
buffer.put(1, (byte)'B'); // 第二個字符改成B
// 5. 關(guān)閉資源
file.close();
channel.close();
}
}
分散和聚集:上面的案例,都是通過一個buffer來完成的,如果數(shù)據(jù)比較多,讀取數(shù)據(jù)的時候,可以將數(shù)據(jù)讀取到多個buffer中,同樣寫數(shù)據(jù)的時候,可以將多個buffer的數(shù)據(jù)寫入到channel中,案例如下:
/**
* scattering:將數(shù)據(jù)寫入到buffer時,可以采用buffer數(shù)組,依次寫入
* gathering:從buffer讀數(shù)據(jù)的時候,可以采用buffer數(shù)組,依次讀取
* @author zhu
*
*/
public class NioFileChannel06 {
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 2. 綁定端口并啟動
InetSocketAddress address = new InetSocketAddress(6666);
serverChannel.socket().bind(address);
// 3. 創(chuàng)建buffer數(shù)組
ByteBuffer[] buffers = new ByteBuffer[2];
buffers[0] = ByteBuffer.allocate(5);
buffers[1] = ByteBuffer.allocate(4);
// 4. 等待客戶端連接
SocketChannel channel = serverChannel.accept();
// 5. 循環(huán)讀取
// 假設(shè)客戶端會發(fā)送8個字節(jié)
int len = 8;
while (true) {
int read = 0;
while (read < len) {
long byteNum = channel.read(buffers);
read += byteNum;
System.out.println("讀取到的字節(jié)數(shù):" + read);
}
// 6. 切換模式
Arrays.asList(buffers).forEach(buffer -> buffer.flip());
// 7. 將讀取到的數(shù)據(jù)顯示到客戶端
long writeLen = 0;
while (writeLen < len) {
long byteNum = channel.write(buffers);
writeLen += byteNum;
}
// 8. 將所有buffer進行clear
Arrays.asList(buffers).forEach(buffer -> buffer.clear());
}
}
}
4、selector:
selector能夠檢測多個通道是否有事件要發(fā)生,多個channel以事件的方式可以注冊到同一個selector中。主要工作流程如下:
當客戶端連接時,會通過severSocket channel得到對應(yīng)的socketChannel,并且將socketChannel通過register方法注冊到selector中,注冊后返回一個selectionKey;
selector通過集合關(guān)聯(lián)這個selectionKey;
selector通過select方法進行監(jiān)聽(select方法是阻塞的,也可以傳入超時時間,阻塞指定的時間,還可以用selectNow方法,這個就是非阻塞的;NIO的非阻塞也就體現(xiàn)在這里),返回有事件發(fā)生的通道的個數(shù);
selector可以得到有事件發(fā)生的通道的selectionKey;
通過selectionKey,就可以得到它對應(yīng)的通道,然后就可以完成業(yè)務(wù)操作了。
看一個實操案例:用NIO實現(xiàn)服務(wù)端和客戶端的通訊:
服務(wù)端:
public class NIOServer {
public static void main(String[] args) throws IOException {
// 1. 創(chuàng)建NIOServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 得到Selector對象
Selector selector = Selector.open();
// 3. 綁定端口,進行監(jiān)聽
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 4. 設(shè)置為非阻塞
serverSocketChannel.configureBlocking(false);
// 5. 把serverSocketChannel注冊到selector中,設(shè)置關(guān)心事件為 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 循環(huán)等待客戶端連接
while (true) {
if (selector.select(1000) == 0) { // 沒有事件
System.out.println("服務(wù)器等待了1秒鐘,沒有事件發(fā)生");
continue;
} else { // 有事件
// 7. 有事件發(fā)生,就拿到selectionKey的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 8. 通過selectionKeys得到channel
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 9. 根據(jù)key的不同事件,做對應(yīng)的處理
if (key.isAcceptable()) { // 如果是OP_ACCEPT連接事件
// 10. 為該客戶端生成一個SocketChannel并設(shè)置成非阻塞
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 11. 將當前socketChannel也注冊到selector中,關(guān)注事件為OP_READ,并且關(guān)聯(lián)一個Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) { // 如果是OP_READ讀取事件
// 12. 通過key得到channel
SocketChannel channel = (SocketChannel) key.channel();
// 13. 獲取到該channel關(guān)聯(lián)的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 14. 將channel中的數(shù)據(jù)讀到buffer中去
channel.read(buffer);
System.out.println("客戶端發(fā)送的數(shù)據(jù):" + new String(buffer.array()));
}
// 15. 移除當前的selectionKey,防止重復操作
keyIterator.remove();
}
}
}
}
}
客戶端:
public class NIOClient {
public static void main(String[] args) throws IOException {
// 1. 設(shè)置ip和端口
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 6666);
// 2. 創(chuàng)建SocketChannel并設(shè)置成非阻塞
SocketChannel socketChannel = SocketChannel.open(address);
socketChannel.configureBlocking(false);
// 3. 連接服務(wù)器
String str = "hello world";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
// 4. 將數(shù)據(jù)寫入channel
socketChannel.write(buffer);
System.in.read();
}