[TOC]

ServerSocketChannel

阻塞方式的ServerSocketChannel

 public static void main(String[] args) throws IOException {
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8889));
    ssc.configureBlocking(true);

    System.out.println("server started, listening on :" + ssc.getLocalAddress());

    while(true) {
        //  ssc.configureBlocking(true);则 accept会一直阻塞直到接收到连接
        SocketChannel sc = ssc.accept();
        System.out.println("accept SocketChannel:" + sc.getRemoteAddress());
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.clear();
            int len = sc.read(buffer);

            if(len != -1) {
                System.out.println("receive client data:" + new String(buffer.array(), 0, len));
            }
            ByteBuffer bufferToWrite = ByteBuffer.wrap("I am Server".getBytes());
            sc.write(bufferToWrite);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(sc != null) {
                try {
                    sc.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • accept方法一直阻塞等待连接的到来

阻塞方式register selector会报错

Selector selector = Selector.open();
// selector 注册感兴趣的事情:连接事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
  • 报错如下
Exception in thread "main" java.nio.channels.IllegalBlockingModeException
    at java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:201)
    at java.nio.channels.SelectableChannel.register(SelectableChannel.java:280)
    at io.Server.main(Server.java:28)
  • 源码
/**
    * Registers this channel with the given selector, returning a selection key.
    *
    * <p>  This method first verifies that this channel is open and that the
    * given initial interest set is valid.
    *
    * <p> If this channel is already registered with the given selector then
    * the selection key representing that registration is returned after
    * setting its interest set to the given value.
    *
    * <p> Otherwise this channel has not yet been registered with the given
    * selector, so the {@link AbstractSelector#register register} method of
    * the selector is invoked while holding the appropriate locks.  The
    * resulting key is added to this channel's key set before being returned.
    * </p>
    *
    * @throws  ClosedSelectorException {@inheritDoc}
    *
    * @throws  IllegalBlockingModeException {@inheritDoc}
    *
    * @throws  IllegalSelectorException {@inheritDoc}
    *
    * @throws  CancelledKeyException {@inheritDoc}
    *
    * @throws  IllegalArgumentException {@inheritDoc}
    */
public final SelectionKey register(Selector sel, int ops,
                                    Object att)
    throws ClosedChannelException
{
    synchronized (regLock) {
        if (!isOpen())
            throw new ClosedChannelException();
        if ((ops & ~validOps()) != 0)
            throw new IllegalArgumentException();
        if (blocking)
            throw new IllegalBlockingModeException();
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}

附:linux socket accept方法

#include <sys/types.h>

#include <sys/socket.h>

int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);

accept()系统调用主要用在基于连接的套接字类型,比如SOCK_STREAM和SOCK_SEQPACKET。它提取出所监听套接字的等待连接队列中第一个连接请求,创建一个新的套接字,并返回指向该套接字的文件描述符。

一般accept()为阻塞函数,当监听socket调用accept()时,它先到自己的receive_buf中查看是否有连接数据包;若有,把数据拷贝出来,删掉接收到的数据包,创建新的socket与客户发来的地址建立连接;若没有,就阻塞等待;

非阻塞方式

public static void main(String[] args) throws IOException {
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8889));
    ssc.configureBlocking(false);

    System.out.println("server started, listening on :" + ssc.getLocalAddress());

    while(true) {
        // 非阻塞的accept: 如果没有连接,则返回null
        SocketChannel sc = ssc.accept();
        if(sc != null) {
            System.out.println("accept SocketChannel:" + sc.getRemoteAddress());
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.clear();
                int len = sc.read(buffer);

                if (len != -1) {
                    System.out.println("receive client data:" + new String(buffer.array(), 0, len));
                }
                ByteBuffer bufferToWrite = ByteBuffer.wrap("I am Server".getBytes());
                sc.write(bufferToWrite);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (sc != null) {
                    try {
                        sc.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

Selector处理事件

public static void main(String[] args) throws IOException {
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8889));
    ssc.configureBlocking(false);

    System.out.println("server started, listening on :" + ssc.getLocalAddress());

    // Select 轮询监听channel事件(这里是注册连接事件)
    Selector selector = Selector.open();
    ssc.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // selector.select(); 阻塞等待连接事件
        selector.select();
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> it = keys.iterator();
        while (it.hasNext()) {
            SelectionKey key = it.next();
            it.remove();
            // 处理连接事件
            handleConnect(key);
        }
    }
}

private static void handleConnect(SelectionKey key) {
    if(key.isAcceptable()) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel sc = ssc.accept();
            System.out.println("accept a client:" + sc.getRemoteAddress());
            // 继续注册socket的读事件
            sc.configureBlocking(false);
            sc.register(key.selector(), SelectionKey.OP_READ );
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
        }
    } else if (key.isReadable()) {
        SocketChannel sc = null;
        try {
            sc = (SocketChannel)key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.clear();
            int len = sc.read(buffer);
            if(len != -1) {
                System.out.println(new String(buffer.array(), 0, len));
            }
            ByteBuffer bufferToWrite = ByteBuffer.wrap("I am Server".getBytes());
            sc.write(bufferToWrite);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(sc != null) {
                try {
                    sc.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Copyright @doctording all right reserved,powered by Gitbook该文件修改时间: 2023-08-06 15:35:01

results matching ""

    No results matching ""