Java 类akka.actor.ActorNotFound 实例源码

项目:flink    文件:MetricRegistryTest.java   
/**
 * Tests that the query actor will be stopped when the MetricRegistry is shut down.
 */
@Test
public void testQueryActorShutdown() throws Exception {
    final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);

    MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());

    final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();

    registry.startQueryService(actorSystem, null);

    ActorRef queryServiceActor = registry.getQueryService();

    registry.shutdown();

    try {
        Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);

        fail("The query actor should be terminated resulting in a ActorNotFound exception.");
    } catch (ActorNotFound e) {
        // we expect the query actor to be shut down
    }
}
项目:flink    文件:MetricRegistryImplTest.java   
/**
 * Tests that the query actor will be stopped when the MetricRegistry is shut down.
 */
@Test
public void testQueryActorShutdown() throws Exception {
    final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);

    MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());

    final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();

    registry.startQueryService(actorSystem, null);

    ActorRef queryServiceActor = registry.getQueryService();

    registry.shutdown();

    try {
        Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);

        fail("The query actor should be terminated resulting in a ActorNotFound exception.");
    } catch (ActorNotFound e) {
        // we expect the query actor to be shut down
    }
}
项目:query-window-example    文件:QueryActor.java   
public Future<Object> queryStateFuture(final QueryState<K> queryState) {
    LOG.debug("Try to get ActorRef future for key {}.", queryState.getKey());

    Future<ActorRef> actorRefFuture = getActorRefFuture(queryState.getKey());

    @SuppressWarnings("unchecked")
    Future<Object> result =  actorRefFuture.flatMap(new Mapper<ActorRef, Future<Object>>() {
        public Future<Object> apply(ActorRef actorRef) {
            LOG.debug("Ask response actor for state for key {}.", queryState.getKey());
            return Patterns.ask(actorRef, queryState, new Timeout(askTimeout));
        }
    }, executor).recoverWith(new Recover<Future<Object>>() {
        @Override
        public Future<Object> recover(final Throwable failure) throws Throwable {
            if (failure instanceof WrongKeyPartitionException || failure instanceof ActorNotFound) {
                // wait askTimeout because we communicated with the wrong actor. This usually
                // indicates that not all actors have registered at the registry.
                return Patterns.after(
                    askTimeout,
                    getContext().system().scheduler(),
                    executor,
                    new Callable<Future<Object>>() {
                        @Override
                        public Future<Object> call() throws Exception {
                            refreshCache();
                            return Futures.failed(failure);
                        }
                    });
            } else if (failure instanceof AskTimeoutException) {
                LOG.debug("Ask timed out.", failure);
                handleAskTimeout();
                return Futures.failed(failure);
            } else {
                LOG.debug("State query failed with.", failure);
                refreshCache();
                return Futures.failed(failure);
            }
        }
    }, executor);

    return result;
}