Java 类com.datastax.driver.core.policies.DefaultRetryPolicy 实例源码

项目:act-platform    文件:ClusterManager.java   
@Override
public void startComponent() {
  if (cluster == null) {
    // Configure and build up the Cassandra cluster.
    cluster = Cluster.builder()
            .withClusterName(clusterName)
            .withPort(port)
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
            // TokenAware requires query has routing info (e.g. BoundStatement with all PK value bound).
            .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
            .addContactPoints(contactPoints.toArray(new String[contactPoints.size()]))
            .build();

    // Register any codecs.
    cluster.getConfiguration().getCodecRegistry()
            .register(new CassandraEnumCodec<>(AccessMode.class, AccessMode.getValueMap()))
            .register(new CassandraEnumCodec<>(Direction.class, Direction.getValueMap()))
            .register(new CassandraEnumCodec<>(SourceEntity.Type.class, SourceEntity.Type.getValueMap()));

    // Create a session.
    manager = new MappingManager(cluster.connect());
  }
}
项目:cassandra-reaper    文件:CassandraStorage.java   
@Override
public RetryDecision onReadTimeout(
    Statement stmt,
    ConsistencyLevel cl,
    int required,
    int received,
    boolean retrieved,
    int retry) {

  if (retry > 1) {
    try {
      Thread.sleep(100);
    } catch (InterruptedException expected) { }
  }
  return null != stmt && stmt.isIdempotent()
      ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow()
      : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry);
}
项目:heroic    文件:DatastaxMetricModule.java   
@JsonCreator
public DatastaxMetricModule(
    @JsonProperty("id") Optional<String> id, @JsonProperty("groups") Optional<Groups> groups,
    @JsonProperty("seeds") Optional<Set<String>> seeds,
    @JsonProperty("schema") Optional<SchemaModule> schema,
    @JsonProperty("configure") Optional<Boolean> configure,
    @JsonProperty("fetchSize") Optional<Integer> fetchSize,
    @JsonProperty("readTimeout") Optional<Duration> readTimeout,
    @JsonProperty("consistencyLevel") Optional<ConsistencyLevel> consistencyLevel,
    @JsonProperty("retryPolicy") Optional<RetryPolicy> retryPolicy,
    @JsonProperty("authentication") Optional<DatastaxAuthentication> authentication
) {
    this.id = id;
    this.groups = groups.orElseGet(Groups::empty).or("heroic");
    this.seeds = convert(seeds.orElse(DEFAULT_SEEDS));
    this.schema = schema.orElseGet(NextGenSchemaModule.builder()::build);
    this.configure = configure.orElse(DEFAULT_CONFIGURE);
    this.fetchSize = fetchSize.orElse(DEFAULT_FETCH_SIZE);
    this.readTimeout = readTimeout.orElse(DEFAULT_READ_TIMEOUT);
    this.consistencyLevel = consistencyLevel.orElse(ConsistencyLevel.ONE);
    this.retryPolicy = retryPolicy.orElse(DefaultRetryPolicy.INSTANCE);
    this.authentication = authentication.orElseGet(DatastaxAuthentication.None::new);
}
项目:Mache    文件:MacheAbstractCassandraKafkaSamplerClient.java   
protected void createCache(Map<String, String> mapParams) throws Exception {
    final Cluster.Builder bluePrint = Cluster.builder().withClusterName("BluePrint")
            .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
            .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy()))
            .addContactPoint(mapParams.get("cassandra.server.ip.address")).withPort(9042);

    cache1 = mache(String.class, CassandraTestEntity.class)
            .cachedBy(guava())
            .storedIn(cassandra()
                    .withCluster(bluePrint)
                    .withKeyspace(mapParams.get("keyspace.name"))
                    .withSchemaOptions(SchemaOptions.CREATE_SCHEMA_IF_NEEDED)
                    .build())
            .withMessaging(kafka()
                    .withKafkaMqConfig(KafkaMqConfigBuilder.builder()
                            .withZkHost(mapParams.get("kafka.connection"))
                            .build())
                    .withTopic(mapParams.get("kafka.topic"))
                    .build())
            .macheUp();
}
项目:cassandra-jdbc-wrapper    文件:UtilsUnitTest.java   
@Test
public void testRetryPolicyParsing() throws Exception
{
    String retryPolicyStr = "DefaultRetryPolicy";
    System.out.println(retryPolicyStr);
    assertTrue(Utils.parseRetryPolicy(retryPolicyStr) instanceof DefaultRetryPolicy);
    System.out.println("====================");
    retryPolicyStr = "DowngradingConsistencyRetryPolicy";
    System.out.println(retryPolicyStr);
    assertTrue(Utils.parseRetryPolicy(retryPolicyStr) instanceof DowngradingConsistencyRetryPolicy);
    System.out.println("====================");
    retryPolicyStr = "FallthroughRetryPolicy";
    System.out.println(retryPolicyStr);
    assertTrue(Utils.parseRetryPolicy(retryPolicyStr) instanceof FallthroughRetryPolicy);
    System.out.println("====================");


}
项目:carbon-data    文件:CassandraConfig.java   
private Builder populateRetrytPolicy(Map<String, String> properties, Builder builder) throws DataServiceFault {
    String retryPolicy = properties.get(DBConstants.Cassandra.RETRY_POLICY);
    if (retryPolicy != null) {
        if ("DefaultRetryPolicy".equals(retryPolicy)) {
            builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
        } else if ("DowngradingConsistencyRetryPolicy".equals(retryPolicy)) {
            builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
        } else if ("FallthroughRetryPolicy".equals(retryPolicy)) {
            builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE);
        } else if ("LoggingDefaultRetryPolicy".equals(retryPolicy)) {
            builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
        } else if ("LoggingDowngradingConsistencyRetryPolicy".equals(retryPolicy)) {
            builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));                
        } else if ("LoggingFallthroughRetryPolicy".equals(retryPolicy)) {
            builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE));                
        } else {
            throw new DataServiceFault("Invalid Cassandra retry policy: " + retryPolicy);
        }
    }
    return builder;
}
项目:spring-cloud-connectors    文件:CassandraClusterXmlConfigTest.java   
@Test
public void cassandraSessionWithConfiguration() throws Exception {
    ApplicationContext testContext = getTestApplicationContext(
            "cloud-cassandra-with-config.xml", createService("my-service"));
    Cluster cluster = testContext.getBean("cassandra-full-config",
            getConnectorType());

    assertNotNull(cluster.getConfiguration().getSocketOptions());
    assertEquals(15000,
            cluster.getConfiguration().getSocketOptions().getConnectTimeoutMillis());
    assertTrue(DefaultRetryPolicy.class.isAssignableFrom(
            cluster.getConfiguration().getPolicies().getRetryPolicy().getClass()));
    assertTrue(RoundRobinPolicy.class.isAssignableFrom(cluster.getConfiguration()
            .getPolicies().getLoadBalancingPolicy().getClass()));
    assertTrue(ConstantReconnectionPolicy.class.isAssignableFrom(cluster
            .getConfiguration().getPolicies().getReconnectionPolicy().getClass()));
}
项目:sunbird-utils    文件:CassandraConnectionManagerImpl.java   
/**
 * @param ip String
 * @param port String
 * @param poolingOptions PoolingOptions
 * @return Cluster Cluster
 */
private static Cluster createCluster(String ip, String port, PoolingOptions poolingOptions) {
  return Cluster.builder().addContactPoint(ip).withPort(Integer.parseInt(port))
      .withProtocolVersion(ProtocolVersion.V3).withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withTimestampGenerator(new AtomicMonotonicTimestampGenerator())
      .withPoolingOptions(poolingOptions).build();
}
项目:cassandra-java-driver-examples    文件:TracingExample.java   
public static void main(String[] args){
    Cluster cluster;
    Session session;
    cluster = Cluster
            .builder()
            .addContactPoint("127.0.0.1")
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE) //Other option: DowngradingConsistencyRetryPolicy
            .withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
            .build();
    session = cluster.connect("demo");


    PreparedStatement statement = session.prepare("INSERT INTO user (id, name) VALUES (?, ?)");
    Statement boundStatement = statement
            .bind(1, "user 1")
            .enableTracing();

    long startTime = System.currentTimeMillis();
    ResultSet resultSet = session.execute(boundStatement);
    long duration = System.currentTimeMillis() - startTime;
    System.out.format("Time taken: %d", duration);

    ExecutionInfo executionInfo = resultSet.getExecutionInfo();
    printQueryTrace(executionInfo.getQueryTrace());
    cluster.close();

}
项目:gcplot    文件:CassandraConnector.java   
public void init() {
    LOG.info("Starting Cassandra connector initialization.");
    Cluster.Builder builder = Cluster.builder()
            .addContactPoints(hosts)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectionDelayMs))
            .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
            .withCompression(ProtocolOptions.Compression.LZ4)
            .withSocketOptions(new SocketOptions()
                    .setReceiveBufferSize(receiverBufferSize)
                    .setSendBufferSize(senderBufferSize))
            .withPort(port);
    if (poolingOptions != null) {
        int procs = Runtime.getRuntime().availableProcessors();
        poolingOptions
                .setConnectionsPerHost(HostDistance.LOCAL, procs, procs * 2)
                .setConnectionsPerHost(HostDistance.REMOTE, (procs / 2), procs * 2)
                .setPoolTimeoutMillis(poolTimeoutMillis)
                .setMaxRequestsPerConnection(HostDistance.LOCAL, maxRequestsPerConnection)
                .setMaxRequestsPerConnection(HostDistance.REMOTE, maxRequestsPerConnection);
        builder.withPoolingOptions(poolingOptions);
    }
    if (!Strings.isNullOrEmpty(username)) {
        builder.withCredentials(username, password);
    }
    cluster = builder.build();
    session = cluster.connect(keyspace);
}
项目:cassandra-reaper    文件:CassandraStorage.java   
@Override
public RetryDecision onWriteTimeout(
    Statement stmt,
    ConsistencyLevel cl,
    WriteType type,
    int required,
    int received,
    int retry) {

  return null != stmt && stmt.isIdempotent()
      ? RetryDecision.retry(cl)
      : DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry);
}
项目:dropwizard-cassandra    文件:DefaultRetryPolicyFactoryTest.java   
@Test
public void returnsDefaultRetryPolicyInstance() throws Exception {
    final DefaultRetryPolicyFactory factory = new DefaultRetryPolicyFactory();

    final DefaultRetryPolicy policy = (DefaultRetryPolicy) factory.build();

    assertThat(policy).isSameAs(DefaultRetryPolicy.INSTANCE);
}
项目:gora    文件:CassandraClient.java   
private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) {
  String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY);
  if (retryPolicy != null) {
    switch (retryPolicy) {
      case "DefaultRetryPolicy":
        builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
        break;
      case "DowngradingConsistencyRetryPolicy":
        builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
        break;
      case "FallthroughRetryPolicy":
        builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE);
        break;
      case "LoggingDefaultRetryPolicy":
        builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
        break;
      case "LoggingDowngradingConsistencyRetryPolicy":
        builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
        break;
      case "LoggingFallthroughRetryPolicy":
        builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE));
        break;
      default:
        LOG.error("Unsupported retry policy : {} ", retryPolicy);
        break;
    }
  }
  return builder;
}
项目:ddth-id    文件:CassandraIdGenerator.java   
/**
 * Creates a {@link SessionManager} instance. Sub-class my override this
 * method to customized its own {@link SessionManager}.
 * 
 * @return
 */
protected SessionManager createSessionManager() {
    SessionManager sm = new SessionManager();

    // sm.setRetryPolicy(new
    // LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
    sm.setRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
    sm.setSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(10000, 3));

    sm.init();
    return sm;
}
项目:izettle-toolbox    文件:CassandraSessionFactory.java   
public CassandraSessionManaged build(Environment environment, String localDc) {
    PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions.setConnectionsPerHost(HostDistance.LOCAL,  3, 5)
        .setConnectionsPerHost(HostDistance.REMOTE, 1, 2);
    final DCAwareRoundRobinPolicy.Builder builder = DCAwareRoundRobinPolicy.builder();
    if (localDc != null) {
        builder.withLocalDc(localDc);
    }
    QueryOptions queryOptions = new QueryOptions();
    queryOptions.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
    final Cluster cluster = Cluster
        .builder()
        .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
        .withReconnectionPolicy(new ExponentialReconnectionPolicy(10L, 1000L))
        .withQueryOptions(queryOptions)
        .withLoadBalancingPolicy(new TokenAwarePolicy(builder.build()))
        .addContactPoints(getContactPoints().stream().toArray(String[]::new))
        .withPort(getPort())
        .withSpeculativeExecutionPolicy(new ConstantSpeculativeExecutionPolicy(1000, 2))
        .withPoolingOptions(poolingOptions)
        .build();

    cluster.getConfiguration().getCodecRegistry()
        .register(InstantCodec.instance);

    Session session = cluster.connect(getKeySpace());

    CassandraSessionManaged cassandraSessionManaged = new CassandraSessionManaged(cluster, session);
    environment.lifecycle().manage(cassandraSessionManaged);

    return cassandraSessionManaged;
}
项目:elasticactors    文件:BackplaneConfiguration.java   
@PostConstruct
public void initialize() {
    String cassandraHosts = env.getProperty("ea.cassandra.hosts","localhost:9042");
    String cassandraClusterName = env.getProperty("ea.cassandra.cluster","ElasticActorsCluster");
    String cassandraKeyspaceName = env.getProperty("ea.cassandra.keyspace","\"ElasticActors\"");
    Integer cassandraPort = env.getProperty("ea.cassandra.port", Integer.class, 9042);

    Set<String> hostSet = StringUtils.commaDelimitedListToSet(cassandraHosts);

    String[] contactPoints = new String[hostSet.size()];
    int i=0;
    for (String host : hostSet) {
        if(host.contains(":")) {
            contactPoints[i] = host.substring(0,host.indexOf(":"));
        } else {
            contactPoints[i] = host;
        }
        i+=1;
    }

    PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions.setHeartbeatIntervalSeconds(60);
    poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, 2, env.getProperty("ea.cassandra.maxActive",Integer.class,Runtime.getRuntime().availableProcessors() * 3));
    poolingOptions.setPoolTimeoutMillis(2000);

    Cluster cassandraCluster =
            Cluster.builder().withClusterName(cassandraClusterName)
                    .addContactPoints(contactPoints)
                    .withPort(cassandraPort)
            .withLoadBalancingPolicy(new RoundRobinPolicy())
            .withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))
            .withPoolingOptions(poolingOptions)
            .withReconnectionPolicy(new ConstantReconnectionPolicy(env.getProperty("ea.cassandra.retryDownedHostsDelayInSeconds",Integer.class,1) * 1000))
            .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM)).build();

    this.cassandraSession = cassandraCluster.connect(cassandraKeyspaceName);


}
项目:copper-engine    文件:CassandraStorage.java   
protected void prepareStatements() throws Exception {
    prepare(CQL_UPD_WORKFLOW_INSTANCE_NOT_WAITING);
    prepare(CQL_UPD_WORKFLOW_INSTANCE_WAITING);
    prepare(CQL_DEL_WORKFLOW_INSTANCE_WAITING);
    prepare(CQL_SEL_WORKFLOW_INSTANCE);
    prepare(CQL_UPD_WORKFLOW_INSTANCE_STATE);
    prepare(CQL_INS_EARLY_RESPONSE);
    prepare(CQL_DEL_EARLY_RESPONSE);
    prepare(CQL_SEL_EARLY_RESPONSE);
    prepare(CQL_UPD_WORKFLOW_INSTANCE_STATE_AND_RESPONSE_MAP);
    prepare(CQL_INS_WFI_ID);
    prepare(CQL_DEL_WFI_ID);
    prepare(CQL_SEL_WFI_ID_ALL, DefaultRetryPolicy.INSTANCE);
}
项目:database-transform-tool    文件:CassandraFactory.java   
/**
     * 描述: 初始化配置
     * 时间: 2017年11月15日 上午11:25:07
     * @author yi.zhang
     * @param servers   服务地址
     * @param keyspace  命名空间
     * @param username  账号
     * @param password  密码
     */
    public void init(String servers,String keyspace,String username,String password) {
        try {
            // socket 链接配置
            SocketOptions socket = new SocketOptions();
            socket.setKeepAlive(true);
            socket.setReceiveBufferSize(1024* 1024);
            socket.setSendBufferSize(1024* 1024);
            socket.setConnectTimeoutMillis(5 * 1000);
            socket.setReadTimeoutMillis(1000);
            //设置连接池
            PoolingOptions pool = new PoolingOptions();
            // pool.setMaxRequestsPerConnection(HostDistance.LOCAL, 32);
            // pool.setMaxRequestsPerConnection(HostDistance.REMOTE, 32);
            // pool.setCoreConnectionsPerHost(HostDistance.LOCAL, 2);
            // pool.setCoreConnectionsPerHost(HostDistance.REMOTE, 2);
            // pool.setMaxConnectionsPerHost(HostDistance.LOCAL, 4);
            // pool.setMaxConnectionsPerHost(HostDistance.REMOTE, 4);
            pool.setHeartbeatIntervalSeconds(60);
            pool.setIdleTimeoutSeconds(120);
            pool.setPoolTimeoutMillis(5 * 1000);
            List<InetSocketAddress> saddress = new ArrayList<InetSocketAddress>();
            if (servers != null && !"".equals(servers)) {
                for (String server : servers.split(",")) {
                    String[] address = server.split(":");
                    String ip = address[0];
                    int port = 9042;
                    if (address != null && address.length > 1) {
                        port = Integer.valueOf(address[1]);
                    }
                    saddress.add(new InetSocketAddress(ip, port));
                }
            }
            InetSocketAddress[] addresses = new InetSocketAddress[saddress.size()];
            saddress.toArray(addresses);

            Builder builder = Cluster.builder();
            builder.withSocketOptions(socket);
            // 设置压缩方式
            builder.withCompression(ProtocolOptions.Compression.LZ4);
            // 负载策略
//          DCAwareRoundRobinPolicy loadBalance = DCAwareRoundRobinPolicy.builder().withLocalDc("localDc").withUsedHostsPerRemoteDc(2).allowRemoteDCsForLocalConsistencyLevel().build();
//          builder.withLoadBalancingPolicy(loadBalance);
            // 重试策略
            builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
            builder.withPoolingOptions(pool);
            builder.addContactPointsWithPorts(addresses);
            builder.withCredentials(username, password);
            Cluster cluster = builder.build();
            if (keyspace != null && !"".equals(keyspace)) {
                session = cluster.connect(keyspace);
            } else {
                session = cluster.connect();
            }
            mapping = new MappingManager(session);
        } catch (Exception e) {
            logger.error("-----Cassandra Config init Error-----", e);
        }
    }
项目:cassandra-reaper    文件:CassandraStorage.java   
@Override
public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required, int aliveReplica, int retry) {
  return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica, retry == 1 ? 0 : retry);
}
项目:cassandra-reaper    文件:CassandraStorage.java   
@Override
public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex, int nbRetry) {
  return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry);
}
项目:presto    文件:BackoffRetryPolicy.java   
@Override
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry)
{
    return DefaultRetryPolicy.INSTANCE.onReadTimeout(statement, cl, requiredResponses, receivedResponses, dataRetrieved, nbRetry);
}
项目:presto    文件:BackoffRetryPolicy.java   
@Override
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry)
{
    return DefaultRetryPolicy.INSTANCE.onWriteTimeout(statement, cl, writeType, requiredAcks, receivedAcks, nbRetry);
}
项目:dropwizard-cassandra    文件:DefaultRetryPolicyFactory.java   
@Override
public RetryPolicy build() {
    return DefaultRetryPolicy.INSTANCE;
}
项目:scassandra-example-java    文件:PersonDaoCassandra.java   
@Override
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry) {
    return DefaultRetryPolicy.INSTANCE.onWriteTimeout(statement, cl, writeType, receivedAcks, receivedAcks, nbRetry);
}
项目:scassandra-example-java    文件:PersonDaoCassandra.java   
@Override
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica, int aliveReplica, int nbRetry) {
    return DefaultRetryPolicy.INSTANCE.onUnavailable(statement, cl, requiredReplica, aliveReplica, nbRetry);
}
项目:sunbird-utils    文件:CassandraConnectionManagerImpl.java   
/**
 * @param ip String
 * @param port String
 * @param userName String
 * @param password String
 * @param poolingOptions PoolingOptions
 * @return Cluster Cluster
 */
private static Cluster createCluster(String ip, String port, String userName, String password,
    PoolingOptions poolingOptions) {
  return Cluster.builder().addContactPoint(ip).withPort(Integer.parseInt(port))
      .withProtocolVersion(ProtocolVersion.V3).withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withTimestampGenerator(new AtomicMonotonicTimestampGenerator())
      .withPoolingOptions(poolingOptions).withCredentials(userName, password).build();
}