@Override protected void doStop() { try { ChannelGroupFuture f = allChannels.close(); f.addListener(new ChannelGroupFutureListener() { @Override public void operationComplete(ChannelGroupFuture future) throws Exception { if (future.isSuccess()) { notifyStopped(); } else { notifyFailed(future.cause()); } } }); } catch (Throwable t) { notifyFailed(t); Throwables.propagate(t); } }
@Override public ChannelGroupFuture close() { this.lock.writeLock().lock(); try { if (!this.closed.getAndSet(true)) { // First time close() is called. return super.close(); } else { // FIXME DefaultChannelGroupFuture is package protected // Collection<ChannelFuture> futures = new ArrayList<>(); // logger.debug("CleanupChannelGroup already closed"); // return new DefaultChannelGroupFuture(ChannelGroup.class.cast(this), futures, // GlobalEventExecutor.INSTANCE); throw new UnsupportedOperationException("CleanupChannelGroup already closed"); } } finally { this.lock.writeLock().unlock(); } }
/** * Closes all channels opened by this proxy server. * * @param graceful when false, attempts to shutdown all channels immediately and ignores any channel-closing exceptions */ protected void closeAllChannels(boolean graceful) { LOG.info("Closing all channels " + (graceful ? "(graceful)" : "(non-graceful)")); ChannelGroupFuture future = allChannels.close(); // if this is a graceful shutdown, log any channel closing failures. if this isn't a graceful shutdown, ignore them. if (graceful) { try { future.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Interrupted while waiting for channels to shut down gracefully."); } if (!future.isSuccess()) { for (ChannelFuture cf : future) { if (!cf.isSuccess()) { LOG.info("Unable to close channel. Cause of failure for {} is {}", cf.channel(), cf.cause()); } } } } }
/** * Stop proxy server * */ public void stop() { ChannelGroupFuture future = _allChannels.close().awaitUninterruptibly(); if (!future.isSuccess()) { final Iterator<ChannelFuture> iter = future.iterator(); while (iter.hasNext()) { final ChannelFuture cf = iter.next(); if (!cf.isSuccess()) { LOG.warn(String.format("Failed to close channel %s because %s", cf.channel(), cf.cause())); } } } _acceptorGroup.shutdownGracefully(); _upstreamWorkerGroup.shutdownGracefully(); _downstreamWorkerGroup.shutdownGracefully(); }
public ChannelGroupFuture close() { EventLoopGroup eventLoopGroup = null; for (Channel channel : openChannels) { if (channel instanceof ServerChannel) { eventLoopGroup = channel.eventLoop().parent(); break; } } ChannelGroupFuture future; try { future = openChannels.close(); } finally { assert eventLoopGroup != null; eventLoopGroup.shutdownGracefully(0, 15, TimeUnit.SECONDS); } return future; }
@Override protected void doStop() throws Exception { LOG.debug("Stopping producer at address: {}", configuration.getAddress()); // close all channels LOG.trace("Closing {} channels", allChannels.size()); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); // and then shutdown the thread pools if (workerGroup != null) { workerGroup.shutdownGracefully(); workerGroup = null; } if (pool != null) { if (LOG.isDebugEnabled()) { LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle()); } pool.close(); pool = null; } super.doStop(); }
/** * Send to group. * * @param groupName * the group key * @param message * the message * @param matcher * the matcher * @param listeners * the listeners */ public void sendToGroup(String groupName, byte[] message, ChannelMatcher matcher, ChannelFutureListener... listeners) { groupCheck(groupName); checkMessage(message); if (!groups.containsKey(groupName)) { log.warn("No group {} to send message {}", groupName, message); return; } ChannelGroup group = groups.get(groupName); ChannelFutureListener[] all = utils.prependArray(f -> log((ChannelGroupFuture) f, groupName), listeners); ChannelGroupFuture cf = group.writeAndFlush(message, matcher); cf.addListeners(all); }
@Override public synchronized void pause() { if (paused) { return; } if (channelClazz == null) { return; } // We *pause* the acceptor so no new connections are made if (serverChannelGroup != null) { ChannelGroupFuture future = serverChannelGroup.close().awaitUninterruptibly(); if (!future.isSuccess()) { ActiveMQServerLogger.LOGGER.nettyChannelGroupBindError(); for (Channel channel : future.group()) { if (channel.isActive()) { ActiveMQServerLogger.LOGGER.nettyChannelStillBound(channel, channel.remoteAddress()); } } } } paused = true; }
public static CompletableFuture<Void> completable(ChannelGroupFuture future) { CompletableFuture<Void> cf = new CompletableFuture<>(); future.addListener( (ChannelGroupFutureListener) future1 -> { if (future1.isSuccess()) { cf.complete(null); } else { cf.completeExceptionally(future1.cause()); } }); return cf; }
public ChannelGroupFuture shutdownAsync() { for (Channel channel : channels) { RedisConnection connection = RedisConnection.getFrom(channel); if (connection != null) { connection.setClosed(true); } } return channels.close(); }
/** * Send the message to a specific group. * * @param groupName * the group key * @param message * the message * @param listeners * the listeners */ public void sendToGroup(String groupName, String message, ChannelFutureListener... listeners) { groupCheck(groupName); if (!groups.containsKey(groupName)) { log.warn("No group {} to send message {}", groupName, message); return; } ChannelGroup group = groups.get(groupName); ChannelFutureListener[] all = utils.prependArray(f -> log((ChannelGroupFuture) f, groupName), listeners); ChannelGroupFuture cf = group.writeAndFlush(message); cf.addListeners(all); }
/** * Send the message to a specific group. * * @param groupName * the group key * @param message * the message * @param listeners * the listeners */ public void sendToGroup(String groupName, byte[] message, ChannelFutureListener... listeners) { groupCheck(groupName); if (!groups.containsKey(groupName)) { log.warn("No group {} to send message {}", groupName, message); return; } ChannelGroup group = groups.get(groupName); ChannelFutureListener[] all = utils.prependArray(f -> log((ChannelGroupFuture) f, groupName), listeners); ChannelGroupFuture cf = group.writeAndFlush(message); cf.addListeners(all); }
/** * Send to group. * * @param groupName * the group key * @param message * the message * @param matcher * the matcher * @param listeners * the listeners */ public void sendToGroup(String groupName, String message, ChannelMatcher matcher, ChannelFutureListener... listeners) { groupCheck(groupName); if (!groups.containsKey(groupName)) { log.warn("No group {} to send message {}", groupName, message); return; } ChannelGroup group = groups.get(groupName); ChannelFutureListener[] all = utils.prependArray(f -> log((ChannelGroupFuture) f, groupName), listeners); ChannelGroupFuture cf = group.writeAndFlush(message, matcher); cf.addListeners(all); }
/** * 关闭所有channel * * @return */ public ChannelGroupFuture closeAllChannels() { if (channels != null && channels.size() > 0) { return channels.close(); } return null; }
public static void main(String[] args) throws Exception { final Bootstrap bootstrap = new Bootstrap(); bootstrap.option(ChannelOption.TCP_NODELAY,true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("handshaker", new ServerHandshakeHandler()); p.addLast("decoder", new RtmpDecoder()); p.addLast("encoder", new RtmpEncoder()); p.addLast("handler", new ServerHandler()); } }); final InetSocketAddress socketAddress = new InetSocketAddress(RtmpConfig.SERVER_PORT); bootstrap.bind(socketAddress); logger.info("server started, listening on: {}", socketAddress); final Thread monitor = new StopMonitor(RtmpConfig.SERVER_STOP_PORT); monitor.start(); monitor.join(); TIMER.stop(); final ChannelGroupFuture future = CHANNELS.close(); logger.info("closing channels"); future.awaitUninterruptibly(); logger.info("releasing resources"); logger.info("server stopped"); }
/** * Closes all channels and releases all resources. */ @Override public void close() { LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress); final ChannelGroupFuture clientChannelGroupFuture = this.clientChannelGroup.close(); final ChannelGroupFuture serverChannelGroupFuture = this.serverChannelGroup.close(); final ChannelFuture acceptorFuture = this.acceptor.close(); final ArrayList<Future> eventLoopGroupFutures = new ArrayList<>(3); eventLoopGroupFutures.add(this.clientWorkerGroup.shutdownGracefully()); eventLoopGroupFutures.add(this.serverBossGroup.shutdownGracefully()); eventLoopGroupFutures.add(this.serverWorkerGroup.shutdownGracefully()); clientChannelGroupFuture.awaitUninterruptibly(); serverChannelGroupFuture.awaitUninterruptibly(); try { acceptorFuture.sync(); } catch (final Exception ex) { LOG.log(Level.SEVERE, "Error closing the acceptor channel for " + this.localAddress, ex); } for (final Future eventLoopGroupFuture : eventLoopGroupFutures) { eventLoopGroupFuture.awaitUninterruptibly(); } LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress); }
/** * Shutdown this channel creator. This means that no TCP or UDP connection can be established. * * @return The shutdown future. */ public FutureDone<Void> shutdown() { // set shutdown flag for UDP and TCP, if we acquire a write lock, all read locks are blocked as well writeUDP.lock(); writeTCP.lock(); try { if (shutdownTCP || shutdownUDP) { shutdownFuture().setFailed("already shutting down"); return shutdownFuture(); } shutdownUDP = true; shutdownTCP = true; } finally { writeTCP.unlock(); writeUDP.unlock(); } recipients.close().addListener(new GenericFutureListener<ChannelGroupFuture>() { @Override public void operationComplete(final ChannelGroupFuture future) throws Exception { if (!semaphoreUPD.tryAcquire(maxPermitsUDP)) { LOG.error("Cannot shutdown, as connections (UDP) are still alive"); shutdownFuture().setFailed("Cannot shutdown, as connections (UDP) are still alive"); throw new RuntimeException("Cannot shutdown, as connections (UDP) are still alive"); } if (!semaphoreTCP.tryAcquire(maxPermitsTCP)) { LOG.error("Cannot shutdown, as connections (TCP) are still alive"); shutdownFuture().setFailed("Cannot shutdown, as connections (TCP) are still alive"); throw new RuntimeException("Cannot shutdown, as connections (TCP) are still alive"); } shutdownFuture().setDone(); } }); return shutdownFuture(); }
public ChannelGroupFuture writeGroup(OutMessage message) { message.setH(handlerName); if (chanelGroup != null) { return chanelGroup.write(message); } return null; }
public ChannelGroupFuture shutdownAsync() { return channels.close(); }
private void log(ChannelGroupFuture e, String groupName) { e.iterator().forEachRemaining(cf -> log(cf, groupName)); }
@Override public synchronized void stop() { if (channelClazz == null) { return; } if (protocolHandler != null) { protocolHandler.close(); } if (batchFlusherFuture != null) { batchFlusherFuture.cancel(false); flusher.cancel(); flusher = null; batchFlusherFuture = null; } // serverChannelGroup has been unbound in pause() if (serverChannelGroup != null) { serverChannelGroup.close().awaitUninterruptibly(); } if (channelGroup != null) { ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly(); if (!future.isSuccess()) { ActiveMQServerLogger.LOGGER.nettyChannelGroupError(); for (Channel channel : future.group()) { if (channel.isActive()) { ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, channel.remoteAddress()); } } } } // Shutdown the EventLoopGroup if no new task was added for 100ms or if // 3000ms elapsed. eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS); eventLoopGroup = null; channelClazz = null; for (Connection connection : connections.values()) { listener.connectionDestroyed(connection.getID()); } connections.clear(); if (notificationService != null) { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("factory"), new SimpleString(NettyAcceptorFactory.class.getName())); props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host)); props.putIntProperty(new SimpleString("port"), port); Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STOPPED, props); try { notificationService.sendNotification(notification); } catch (Exception e) { ActiveMQServerLogger.LOGGER.failedToSendNotification(e); } } paused = false; }