`
aids198311
  • 浏览: 58814 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

仿照jetty的nio原理例子2(7月10号改进)

    博客分类:
  • nio
阅读更多
改动点:
1.分成了4个class文件,看起来清晰一点
2.把请求封装成附件,放到socketChannel里面
3.selector.accept()方法删除,取而代之的是selector.selectNow(),并且放到处理注册信息之后。增加了休息策略,selector.select(400),避免不停的循环,占用cpu%的情况。
4.每个请求到来之后,直接分出一个线程去处理。

7月10日改进点:

1.增加了自动删除超时的连接功能
2.key.interestOps操作优化,放到selector线程里面去做
3.request取消了runnable接口






SimpleJettyServerPlus 这个是server
package com.daizuan.jetty.plus;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author daizuan
 */
public class SimpleJettyServerPlus {

    private final ConcurrentLinkedQueue<Object> _changes_con = new ConcurrentLinkedQueue<Object>();
    private ServerSocketChannel                 channel;
    private Selector                            selector;
    private int                                 port;
    private Runnable                            connectionHandler;
    private Runnable                            requestHandler;

    public SimpleJettyServerPlus(int port) throws IOException{
        this.port = port;
        this.channel = ServerSocketChannel.open();
        this.selector = Selector.open();
    }

    public void setConnectionHandler(ConnectionHandler connectionHandler) {
        this.connectionHandler = connectionHandler;
    }

    public void setRequestHandler(RequestHandler requestHandler) {
        this.requestHandler = requestHandler;
    }

    public void listen() throws IOException { // 服务器开始监听端口,提供服务
        channel.socket().bind(new InetSocketAddress(port)); // 将scoket榜定在制定的端口上
        channel.configureBlocking(true);
        startConnectionHandler();
        startRequestHandler();
    }

    private void startRequestHandler() {
        if (requestHandler == null) {
            requestHandler = new RequestHandler(_changes_con, selector);
        }
        startThread(requestHandler);
    }

    private void startConnectionHandler() {
        if (connectionHandler == null) {
            connectionHandler = new ConnectionHandler(_changes_con, channel, selector);
        }
        startThread(connectionHandler);
    }

    private void startThread(Runnable run) {
        new Thread(run).start();
    }

    public static void main(String[] args) throws IOException {
        // System.out.println("server start.........");
        SimpleJettyServerPlus server = new SimpleJettyServerPlus(6789);
        server.listen(); // 服务器开始监听端口,提供服务
    }

}



ConnectionHandler 这个是提交连接事件的
package com.daizuan.jetty.plus;

import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConnectionHandler implements Runnable {

    private ConcurrentLinkedQueue<Object> _changes_con;
    private ServerSocketChannel           channel;
    private Selector                      selector;

    public ConnectionHandler(ConcurrentLinkedQueue<Object> _changes_con, ServerSocketChannel channel, Selector selector){
        this._changes_con = _changes_con;
        this.channel = channel;
        this.selector = selector;
    }

    @Override
    public void run() {
        System.out.println("ConnectionHander:connection Hander start......");
        while (true) {
            // 分发连接事件
            SocketChannel sc = null;
            try {
                // 这里阻塞监听连接事件
                sc = channel.accept();
                sc.configureBlocking(false);
                _changes_con.add(sc);
                // 释放selector的锁,以便接收注册信息
                selector.wakeup();
                System.out.println("listener:a client in![" + sc.socket().getRemoteSocketAddress() + "]");
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }

}


Request这个是附件,放再socketChannel里的附件,包含了请求信息
package com.daizuan.jetty.plus;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Request {

    private final ConcurrentLinkedQueue<Object> _changes_req       = new ConcurrentLinkedQueue<Object>();
    private SelectionKey                        key;
    Selector                                    selector;
    private static int                          DEFAULT_BUFFERSIZE = 16;
    private static String                       DEFAULT_CHARSET    = "GBK";
    private static final String                 FORMAT             = "yyyy-MM-dd HH:mm:ss";
    private static final String                 EXIT               = "exit";
    private static final int                    MAX_ZERO_COUNT     = 16;
    private static final long                   MAX_IDLE_TIME      = 60000;
    private String                              id;
    private boolean                             isDispatched       = false;
    private Runnable                            _handle            = new Runnable() {

                                                                       @Override
                                                                       public void run() {
                                                                           handle();

                                                                       }
                                                                   };

    private volatile long                       dispatchedTime     = 0;

    private SocketChannel                       sc;

    private RequestHandler                      reqHandler;

    private int                                 interestOps;

    public void setReqHandler(RequestHandler reqHandler) {
        this.reqHandler = reqHandler;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Request(Selector selector, SelectionKey key){
        this.key = key;
        this.selector = selector;
        this.sc = (SocketChannel) key.channel();
        dispatchedTime = System.currentTimeMillis();
    }

    public void addTask(Object o) {
        _changes_req.add(o);
    }

    public void process() {
        synchronized (this) {
            if (isDispatched) {
                System.out.println("I am dispatched ,so return..");
                key.interestOps(0);
                return;
            }
            interestOps = key.interestOps();
            dispatchedTime = System.currentTimeMillis();
            isDispatched = true;
            new Thread(_handle).start();

        }
    }

    private void handle() {
        try {
            // 解析出请求
            String request = parseRequest();

            System.out.println("read [" + request + "] from " + id);
            if (request == null || needToCanncel(request)) {
                System.out.println(id + "I am die!");
                close();
                return;
            }
            // 向客户端写一些信息
            write("[" + getTime() + "] " + request + "\n");
            unDispatched();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void unDispatched() {
        synchronized (this) {
            isDispatched = false;
            updateKey();
        }

    }

    /**
     * 重新设置key,并不做实际更新,仅仅设置,把实际的更新操作放到selector线程里面去做
     */
    private void updateKey() {
        synchronized (this) {
            interestOps = !isDispatched ? SelectionKey.OP_READ : 0;
            System.out.println("interestOps:" + interestOps + ",SelectionKey.OP_READ:" + SelectionKey.OP_READ);
            if (key.interestOps() == interestOps) {

                return;
            }
            reqHandler.addChange(this);
            selector.wakeup();
        }

    }

    /**
     * 更新key
     */
    public void doUpdateKey() {
        synchronized (this) {
            if (key != null && key.isValid() && sc.isOpen()) {
                key.interestOps(interestOps);
                System.out.println("interestOps-->" + interestOps);
            } else {
                close();
            }
        }
    }

    public void timeOut() {
        long now = System.currentTimeMillis();
        if (now - dispatchedTime > MAX_IDLE_TIME) {
            close();
        }
    }

    private String getTime() {
        DateFormat df = new SimpleDateFormat(FORMAT);
        return df.format(new Date());

    }

    private boolean needToCanncel(String request) {
        return EXIT.equals(request);
    }

    private String parseRequest() throws IOException {
        ByteBuffer bbuffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
        int count = 0;
        int off = 0;
        byte[] data = new byte[DEFAULT_BUFFERSIZE * 10];
        bbuffer.clear();
        int zeroCount = 0;
        // 循环一次性吧所有数据读完,否则可能buffer满了,数据未读完
        System.out.println(11111111);
        while ((count = sc.read(bbuffer)) != -1) {
            if (count == 0 && ++zeroCount > MAX_ZERO_COUNT) {
                System.out.println("read zero count:" + zeroCount + ",break");
                break;
            }
            bbuffer.flip();
            if ((off + count) > data.length) {
                data = grow(data, DEFAULT_BUFFERSIZE * 10);
            }
            byte[] buf = bbuffer.array();
            System.arraycopy(buf, 0, data, off, count);
            off += count;
        }

        if (count < 0) {
            return null;
        }

        byte[] req = new byte[off];
        System.arraycopy(data, 0, req, 0, off);
        return new String(req, DEFAULT_CHARSET).trim();

    }

    private void close() {
        if (sc != null && sc.socket() != null) {
            try {
                if (!sc.socket().isClosed()) {
                    sc.socket().close();
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        if (sc != null) {
            try {
                sc.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if (key != null) {
            key.cancel();
        }

        reqHandler.removeReq(this);

    }

    private void write(String str) {
        try {
            sc.write(ByteBuffer.wrap(str.getBytes(DEFAULT_CHARSET)));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 数组扩容
     * 
     * @param src byte[] 源数组数据
     * @param size int 扩容的增加量
     * @return byte[] 扩容后的数组
     */
    private byte[] grow(byte[] src, int size) {
        byte[] tmp = new byte[src.length + size];
        System.arraycopy(src, 0, tmp, 0, src.length);
        return tmp;
    }
}



RequestHandlerl用来提交请求信息
package com.daizuan.jetty.plus;

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

public class RequestHandler implements Runnable {

    private ConcurrentLinkedQueue<Object>          _changes_con;
    private Selector                               selector;
    private static final long                      MAX_IDLE = 400;
    private long                                   kickTime = 0;
    private ConcurrentMap<Request, RequestHandler> requests = new ConcurrentHashMap<Request, RequestHandler>();

    public RequestHandler(ConcurrentLinkedQueue<Object> _changes_con, Selector selector){
        this._changes_con = _changes_con;
        this.selector = selector;
    }

    @Override
    public void run() {
        System.out.println("RequestHander:Request Hander start......");
        while (true) {
            try {
                int changes = _changes_con.size();
                Object change = null;
                while (changes-- > 0 && (change = _changes_con.poll()) != null) {
                    if (change instanceof SocketChannel) {
                        processCon(change);
                    } else if (change instanceof Request) {
                        ((Request) change).doUpdateKey();
                    } else {
                        System.out.println("what's this??");
                    }
                }
                int count = selector.selectNow();
                if (count == 0) selector.select(MAX_IDLE);
                Set<SelectionKey> keys = selector.selectedKeys();
                // 处理请求信息
                for (SelectionKey key : keys) {
                    System.out.println("find some keys " + key);
                    processReq(key);
                }
                selector.selectedKeys().clear();
                long now = System.currentTimeMillis();
                if (now - kickTime > MAX_IDLE) {
                    kickTime = now;
                    kick();
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    private void processCon(Object change) {
        try {
            if (change instanceof SocketChannel) {
                SocketChannel sc = (SocketChannel) change;
                String id = "[" + sc.socket().getRemoteSocketAddress() + "] ";
                SelectionKey key = sc.register(selector, SelectionKey.OP_READ, null);
                Request req = new Request(selector, key);
                req.setReqHandler(this);
                req.setId(id);
                key.attach(req);
                requests.put(req, this);
                req.process();
                System.out.println("a client connected!" + id);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 定时的清除一些超时的连接
     */
    private void kick() {
        new Thread(new Runnable() {

            @Override
            public void run() {
                for (Map.Entry<Request, RequestHandler> entry : requests.entrySet()) {
                    entry.getKey().timeOut();
                }

            }
        }

        ).start();
    }

    public void removeReq(Request req) {
        System.out.println("remvoe:" + req);
        requests.remove(req);
    }

    public void addChange(Request req) {
        System.out.println("add:" + req);
        this._changes_con.add(req);
    }

    private void processReq(SelectionKey key) {
        if (!key.isValid()) {
            key.cancel();
            Request req = (Request) key.attachment();
            if (req != null) req.doUpdateKey();
            return;
        }
        Request req = (Request) key.attachment();
        req.process();
    }
}

分享到:
评论
2 楼 悲伤逆流成河 2013-01-02  
我也研究过,写了个socks5代理服务器,感觉真的nio真的一般般,真的好麻烦,一个版本只用了3个线程,担心读取不及时。另外一个版本用了线程池读写数据,那真的还不如用bio呢。现在思考中,是不是我的代码有问题,不会使用nio,还是nio本身就有问题,这也是这么就以来叫好不叫坐的原因(java里的nio的api太少了,另外根本没办法自己写出一个selector)??
1 楼 zhhzhfya 2012-03-19  
你好,我用一个IE访问
ConnectionHandler.java

// 这里阻塞监听连接事件
sc = channel.accept();
这里进行2次accept,我感觉应该一次吧
会出现下面的日志:
ConnectionHander:connection Hander start......
RequestHander:Request Hander start......
listener:a client in![/127.0.0.1:7882]
listener:a client in![/127.0.0.1:7883]

请帮忙解释下,谢谢

相关推荐

Global site tag (gtag.js) - Google Analytics