void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(Time.now()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { // a WrappedRpcServerException is an exception that has been sent // to the client, so the stacktrace is unnecessary; any other // exceptions are unexpected internal server errors and thus the // stacktrace should be logged LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + c.getHostAddress() + " threw exception [" + e + "]", (e instanceof WrappedRpcServerException) ? null : e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { closeConnection(c); c = null; } else { c.setLastContact(Time.now()); } }
private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops) { String waitingFor; switch(ops) { case SelectionKey.OP_READ : waitingFor = "read"; break; case SelectionKey.OP_WRITE : waitingFor = "write"; break; case SelectionKey.OP_CONNECT : waitingFor = "connect"; break; default : waitingFor = "" + ops; } return timeout + " millis timeout while " + "waiting for channel to be ready for " + waitingFor + ". ch : " + channel; }
public final void sendPacket(final SendablePacket<T> sp) { sp._client = _client; if (_pendingClose) return; synchronized (getSendQueue()) { _sendQueue.addLast(sp); } if (!_sendQueue.isEmpty()) { try { _selectionKey.interestOps(_selectionKey.interestOps() | SelectionKey.OP_WRITE); } catch (CancelledKeyException e) { // ignore } } }
private void process() { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } ChannelHandler handler = (ChannelHandler) key.attachment(); if (key.isAcceptable()) { handler.accept(key); } else if (key.isConnectable()) { handler.connect(key); } else if (key.isReadable()) { handler.read(key); } else if (key.isWritable()) { handler.write(key); } } }
/** * Iterate over the queue of accepted connections that have been * assigned to this thread but not yet placed on the selector. */ private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } }
@Override public void run() { try { while (true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> i = keys.iterator(); if (i.hasNext()) { SelectionKey key = i.next(); keys.remove(key); handleEvent(key); } } } catch (IOException e) { e.printStackTrace(); } }
/** * Queue a FastSerializable object for writing. This is 2nd best way to serialize and queue messages. * The expected message size is used to size the initial allocation for the FastSerializer. * Because FastSerializer is used to serialize the object there is some over head incurred * when the FastSerializer has to check if it needs to grow, but the cost is pretty minor compared * to the cost of actually growing the FastSerializer. * @param f */ @Override public boolean enqueue(final FastSerializable f, final int expectedSize) { synchronized (this) { if (m_isShutdown) { return false; } updateLastPendingWriteTimeAndQueueBackpressure(); m_queuedWrites.offer(new DeferredSerialization() { @Override public BBContainer serialize(final DBBPool pool) throws IOException { final FastSerializer fs = new FastSerializer(pool, expectedSize); return fs.writeObjectForMessaging(f); } @Override public void cancel() {} }); m_port.setInterests( SelectionKey.OP_WRITE, 0); } return true; }
private void connect(Selector selector) { AbstractConnection c = null; while ((c = connectQueue.poll()) != null) { try { SocketChannel channel = (SocketChannel) c.getChannel(); // 注册 OP_CONNECT(建立连接) 监听与后端连接是否真正建立 // 监听到之后是图-MySql第3步,(TCP连接建立) channel.register(selector, SelectionKey.OP_CONNECT, c); // 主动连接 阻塞或者非阻塞 // 图-MySql第1步,(TCP连接请求) channel.connect(new InetSocketAddress(c.host, c.port)); } catch (Exception e) { LOGGER.error("error:",e); c.close(e.toString()); } } }
private boolean connectMaster() throws ClosedChannelException { if (null == socketChannel) { String addr = this.masterAddress.get(); if (addr != null) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null) { this.socketChannel = RemotingUtil.connect(socketAddress); if (this.socketChannel != null) { this.socketChannel.register(this.selector, SelectionKey.OP_READ); } } } // 每次连接时,要重新拿到最大的Offset this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); this.lastWriteTimestamp = System.currentTimeMillis(); } return this.socketChannel != null; }
public void onReadable(SelectionKey key){ try { ByteBuffer buffer=GL_BUFFER; buffer.clear(); int bytesRead=m_InnerChannel.read(buffer); if(bytesRead>0){ buffer.flip(); afterReceived(buffer);//先让子类处理,例如解密数据。 if(isTunnelEstablished()&&buffer.hasRemaining()){//将读到的数据,转发给兄弟。 m_BrotherTunnel.beforeSend(buffer);//发送之前,先让子类处理,例如做加密等。 if(!m_BrotherTunnel.write(buffer,true)){ key.cancel();//兄弟吃不消,就取消读取事件。 if(ProxyConfig.IS_DEBUG) System.out.printf("%s can not read more.\n", m_ServerEP); } } }else if(bytesRead<0) { this.dispose();//连接已关闭,释放资源。 } } catch (Exception e) { e.printStackTrace(); this.dispose(); } }
public void testInvokeCallbacks() throws InterruptedException{ MockSelector selector = new MockSelector(); VoltNetwork vn = new VoltNetwork(selector); // network with fake selector MockVoltPort vp = new MockVoltPort(vn, new MockInputHandler()); // implement abstract run() MockSelectionKey selectionKey = new MockSelectionKey(); // fake selection key // glue the key, the selector and the port together. selectionKey.interestOps(SelectionKey.OP_WRITE); selector.setFakeKey(selectionKey); vp.m_selectionKey = selectionKey; selectionKey.attach(vp); selectionKey.readyOps(SelectionKey.OP_WRITE); // invoke call backs and see that the volt port has the expected // selected operations. vn.invokeCallbacks(); assertEquals(SelectionKey.OP_WRITE, vp.readyOps()); // and another time through, should have the new interests selected vp.setInterests(SelectionKey.OP_ACCEPT, 0); selectionKey.readyOps(SelectionKey.OP_ACCEPT); vn.installInterests(); vn.invokeCallbacks(); vn.shutdown(); assertEquals(SelectionKey.OP_ACCEPT, vp.readyOps()); }
/** * Finishes the connection process for the given connection * * @param p_key * the selection key */ public void connect(final SelectionKey p_key) throws NetworkException { if (getPipeOut().getChannel().isConnectionPending()) { try { if (getPipeOut().getChannel().finishConnect()) { connected(p_key); } else { // #if LOGGER >= ERROR LOGGER.error("Connection could not be finished: %s", this); // #endif /* LOGGER >= ERROR */ } } catch (final IOException ignore) { abortConnectionCreation(); } } else { // #if LOGGER >= WARN LOGGER.warn("Connection is not pending, connect aborted: %s", this); // #endif /* LOGGER >= WARN */ } }
public static void main(String[] argv) throws Exception { try (ByteServer server = new ByteServer(); SocketChannel sc = SocketChannel.open(server.address())) { server.acceptConnection(); try (Selector sel = Selector.open()) { sc.configureBlocking(false); sc.register(sel, SelectionKey.OP_WRITE); sel.select(); sel.selectedKeys().clear(); if (sel.select() == 0) { throw new Exception("Select returned zero"); } } } }
@Override protected void close(DatagramChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if (key != null) { key.cancel(); } handle.disconnect(); handle.close(); }
public Client() throws IOException { // 同样的,注册闹钟. this.selector = Selector.open(); // 连接远程server socketChannel = SocketChannel.open(); // 如果快速的建立了连接,返回true.如果没有建立,则返回false,并在连接后出发Connect事件. Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 3562)); socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); if (isConnected) { this.sendFirstMsg(); } else { // 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件. key.interestOps(SelectionKey.OP_CONNECT); } }
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
private synchronized void doRunLoop() { while (running) { try { readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } } } catch (InterruptedException e) { LOG.debug("Interrupted while sleeping"); return; } catch (IOException ex) { LOG.info(getName() + ": IOException in Reader", ex); } } }
@Override public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { int newOps = 0; /* Translate ops */ if ((ops & SelectionKey.OP_ACCEPT) != 0) newOps |= Net.POLLIN; /* Place ops into pollfd array */ sk.selector.putEventOps(sk, newOps); }
public static void close(SelectionKey key) { try { key.channel().close(); } catch (IOException e) { // nop } }
/** * * // 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 * @throws // IOException * @throws Exception */ @SuppressWarnings("unchecked") public void listen() throws Exception { // 轮询访问selector while (true) { // 选择一组可以进行I/O操作的事件,放在selector中,客户端的该方法不会阻塞, // 这里和服务端的方法不一样,查看api注释可以知道,当至少一个通道被选中时, // selector的wakeup方法被调用,方法返回,而对于客户端来说,通道一直是被选中的 selector.select(); // 获得selector中选中的项的迭代器 Iterator ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 删除已选的key,以防重复处理 ite.remove(); // 连接事件发生 if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); // 如果正在连接,则完成连接 if (channel.isConnectionPending()) { channel.finishConnect(); } // 设置成非阻塞 channel.configureBlocking(false); // 在这里可以给服务端发送信息哦 channel.write(ByteBuffer.wrap(new String("hello server!").getBytes())); // 在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。 channel.register(this.selector, SelectionKey.OP_READ); // 获得了可读的事件 } else if (key.isReadable()) { read(key); } } } }
public void remove(final KeyAttachment key, final int ops) { Runnable r = new Runnable() { @Override public void run() { if ( key == null ) return; NioChannel nch = key.getChannel(); if ( nch == null ) return; SocketChannel ch = nch.getIOChannel(); if ( ch == null ) return; SelectionKey sk = ch.keyFor(selector); try { if (sk == null) { if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); } else { if (sk.isValid()) { sk.interestOps(sk.interestOps() & (~ops)); if (SelectionKey.OP_WRITE==(ops&SelectionKey.OP_WRITE)) countDown(key.getWriteLatch()); if (SelectionKey.OP_READ==(ops&SelectionKey.OP_READ))countDown(key.getReadLatch()); if (sk.interestOps()==0) { sk.cancel(); sk.attach(null); } }else { sk.cancel(); sk.attach(null); } } }catch (CancelledKeyException cx) { if (sk!=null) { sk.cancel(); sk.attach(null); } } } }; events.offer(r); wakeup(); }
@Override protected boolean breakKeepAliveLoop(SocketWrapper<NioChannel> socketWrapper) { openSocket = keepAlive; // Do sendfile as needed: add socket to sendfile and end if (sendfileData != null && !getErrorState().isError()) { ((KeyAttachment) socketWrapper).setSendfileData(sendfileData); sendfileData.keepAlive = keepAlive; SelectionKey key = socketWrapper.getSocket().getIOChannel().keyFor( socketWrapper.getSocket().getPoller().getSelector()); //do the first write on this thread, might as well switch (socketWrapper.getSocket().getPoller().processSendfile( key, (KeyAttachment) socketWrapper, true)) { case DONE: // If sendfile is complete, no need to break keep-alive loop sendfileData = null; return false; case PENDING: sendfileInProgress = true; return true; case ERROR: // Write failed if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.sendfile.error")); } setErrorState(ErrorState.CLOSE_NOW, null); return true; } } return false; }
private void waitForWriteBufferToDrain() throws IOException { if (selector == null) { selector = Selector.open(); } SelectionKey key = socket.register(selector, SelectionKey.OP_WRITE); // block until ready for write operations selector.select(); // cancel OP_WRITE selection key.cancel(); // complete cancelling key selector.selectNow(); }
private void clearSelectionKey() { try { SelectionKey key = this.processKey; if (key != null && key.isValid()) { key.attach(null); key.cancel(); } } catch (Exception e) { AbstractConnection.LOGGER.info("clear selector keys err:" + e); } }
void doRead(SelectionKey key) throws InterruptedException { int count; Connection c = (Connection) key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); if (count > 0) { c.setLastContact(System.currentTimeMillis()); } } catch (InterruptedException ieo) { throw ieo; } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage()); } count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": DISCONNECTING client " + c.toString() + " because read count=" + count + ". Number of active connections: " + numConnections); } closeConnection(c); } }
/** * Register a channel with the selector and create a Connection that will pass incoming events * to the provided handler. * @param channel * @param handler * @throws IOException */ public Connection registerChannel( SocketChannel channel, InputHandler handler, int interestOps) throws IOException { channel.configureBlocking (false); channel.socket().setKeepAlive(true); VoltPort port = new VoltPort( this, handler, handler.getExpectedOutgoingMessageSize(), channel.socket().getInetAddress().getHostName()); port.registering(); acquireRegistrationLock(); try { SelectionKey key = channel.register (m_selector, interestOps, port); port.setKey (key); port.registered(); return port; } finally { synchronized (m_ports) { m_ports.add(port); } releaseRegistrationLock(); } }
/** * {@inheritDoc} */ @Override protected ConnectionRequest getConnectionRequest(SocketChannel handle) { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid())) { return null; } return (ConnectionRequest) key.attachment(); }
void send() throws IOException { output.clear(); output.put("111".getBytes()); output.flip(); socket.write(output); System.out.println("write"); if (outputIsComplete()){ sk.interestOps(SelectionKey.OP_READ); state = READING; // sk.cancel(); // write完就结束了, 关闭select key } }
protected void closeExistingSelector() { for( Iterator<SelectionKey> i = selector.keys().iterator(); i.hasNext(); ) { SelectionKey key = i.next(); RegistrationData data = (RegistrationData)key.attachment(); parent.selectFailure(data.listener, data.channel, data.attachment, new Throwable( "selector destroyed" ) ); } try{ selector.close(); AEDiagnostics.log( "seltrace", "Selector destroyed for '" + parent.getName() + "'," + selector_guard.getType()); } catch( Throwable t ) { t.printStackTrace(); } }
public Object getAttachment() { Poller pol = getPoller(); Selector sel = pol != null ? pol.getSelector() : null; SelectionKey key = sel != null ? getIOChannel().keyFor(sel) : null; Object att = key != null ? key.attachment() : null; return att; }
/** * Called to initiate a unit of work by this worker thread on the provided * SelectionKey object. This method is synchronized, as is the run() method, * so only one key can be serviced at a given time. Before waking the worker * thread, and before returning to the main selection loop, this key's * interest set is updated to remove OP_READ. This will cause the selector * to ignore read-readiness for this channel while the worker thread is * servicing it. */ public synchronized void serviceChannel(SelectionKey key) { if (log.isTraceEnabled()) log.trace("About to service key:" + key); ObjectReader reader = (ObjectReader) key.attachment(); if (reader != null) reader.setLastAccess(System.currentTimeMillis()); this.key = key; key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); }
public ConnectionHandler(StreamConnection connection, SelectionKey key, Set<ConnectionHandler> connectedHandlers) { this(checkNotNull(connection), key); // closeConnection() may have already happened because we invoked the other c'tor above, which called // connection.setWriteTarget which might have re-entered already. In this case we shouldn't add ourselves // to the connectedHandlers set. lock.lock(); try { this.connectedHandlers = connectedHandlers; if (!closeCalled) checkState(this.connectedHandlers.add(this)); } finally { lock.unlock(); } }
/** * Removes any events with the given {@code interestOps}, and if no * events remaining, cancels the associated SelectionKey. */ void resetInterestOps(int interestOps) { int newOps = 0; Iterator<AsyncEvent> itr = pending.iterator(); while (itr.hasNext()) { AsyncEvent event = itr.next(); int evops = event.interestOps(); if (event.repeating()) { newOps |= evops; continue; } if ((evops & interestOps) != 0) { itr.remove(); } else { newOps |= evops; } } this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); if (newOps == 0) { key.cancel(); } else { key.interestOps(newOps); } }
@Override public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) { int newOps = 0; if ((ops & SelectionKey.OP_READ) != 0) newOps |= Net.POLLIN; if ((ops & SelectionKey.OP_WRITE) != 0) newOps |= Net.POLLOUT; sk.selector.putEventOps(sk, newOps); }
private void resumeSelection( VoltPort port) { SelectionKey key = port.getKey(); if (key.isValid()) { key.interestOps (port.interestOps()); } else { synchronized (m_ports) { m_ports.remove(port); } } }
private void disableWrite() { try { SelectionKey key = this.processKey; key.interestOps(key.interestOps() & OP_NOT_WRITE); } catch (Exception e) { AbstractConnection.LOGGER.info("can't disable write " + e + " con " + con); } }
private void updateInterests() { int interestOps = SelectionKey.OP_READ; // we always want to read if (!networkToClient.isEmpty()) { interestOps |= SelectionKey.OP_WRITE; } if (interests != interestOps) { // interests must be changed interests = interestOps; selectionKey.interestOps(interestOps); } }
public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) it.next()); // Reactor负责dispatch收到的事件 selected.clear(); } } catch (IOException ex) { ex.printStackTrace(); } }
public Object getAttachment() { Poller pol = getPoller(); Selector sel = pol!=null?pol.getSelector():null; SelectionKey key = sel!=null?getIOChannel().keyFor(sel):null; Object att = key!=null?key.attachment():null; return att; }