Java 类com.mongodb.client.model.BulkWriteOptions 实例源码

项目:para-dao-mongodb    文件:MongoDBDAO.java   
@Override
public <P extends ParaObject> void updateAll(String appid, List<P> objects) {
    if (StringUtils.isBlank(appid) || objects == null) {
        return;
    }
    try {
        ArrayList<WriteModel<Document>> updates = new ArrayList<WriteModel<Document>>();
        List<String> ids = new ArrayList<String>(objects.size());
        for (P object : objects) {
            if (object != null) {
                object.setUpdated(Utils.timestamp());
                Document id = new Document(ID, object.getId());
                Document data = new Document("$set", toRow(object, Locked.class, true));
                UpdateOneModel<Document> um = new UpdateOneModel<Document>(id, data);
                updates.add(um);
                ids.add(object.getId());
            }
        }
        BulkWriteResult res = getTable(appid).bulkWrite(updates, new BulkWriteOptions().ordered(true));
        logger.debug("Updated: " + res.getModifiedCount() + ", keys: " + ids);
    } catch (Exception e) {
        logger.error(null, e);
    }
    logger.debug("DAO.updateAll() {}", objects.size());
}
项目:core-ng-project    文件:MongoCollectionImpl.java   
@Override
public long bulkDelete(List<?> ids) {
    StopWatch watch = new StopWatch();
    int deletedRows = 0;
    try {
        List<DeleteOneModel<T>> models = new ArrayList<>(ids.size());
        for (Object id : ids) {
            models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
        }
        BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
        deletedRows = result.getDeletedCount();
        return deletedRows;
    } finally {
        long elapsedTime = watch.elapsedTime();
        ActionLogContext.track("mongoDB", elapsedTime, 0, deletedRows);
        logger.debug("bulkDelete, collection={}, size={}, elapsedTime={}", collectionName, ids.size(), elapsedTime);
        checkSlowOperation(elapsedTime);
    }
}
项目:testcontainers-hazelcast    文件:MongoMapStore.java   
@Override
public void storeAll(Map<String, Supplement> map) {
    log.info("storeAll");
    List<InsertOneModel> batch = new LinkedList<InsertOneModel>();
    for (Map.Entry<String, Supplement> entry : map.entrySet()) {
        String key = entry.getKey();
        Supplement value = entry.getValue();
        batch.add(new InsertOneModel(
                new Document("name", value.getName()).append("price", value.getPrice())
                        .append("_id", key)));
    }
    this.collection.bulkWrite(batch, new BulkWriteOptions().ordered(false));
}
项目:nifi-nars    文件:StoreInMongo.java   
@OnScheduled
public void onScheduled(final ProcessContext context) {
    batchSize = context.getProperty(MongoProps.BATCH_SIZE).asInteger();
    boolean ordered = context.getProperty(MongoProps.ORDERED).asBoolean();
    writeOptions = new BulkWriteOptions().ordered(ordered);

    createMongoConnection(context);
    ensureIndexes(context, collection);
}
项目:ibm-performance-monitor    文件:ProfiledMongoClientTest.java   
@Test
public void bulkWrite()
{
    List<WriteModel<Document>> list = insertOneWithBulk();

    coll.deleteOne(Filters.eq("name", "DELETEME"));

    coll.bulkWrite(list, new BulkWriteOptions());

    coll.deleteMany(Filters.eq("name", "DELETEME"));
}
项目:AbacusUtil    文件:AsyncMongoDBExecutor.java   
public CompletableFuture<Integer> bulkInsert(final String collectionName, final Collection<?> entities, final BulkWriteOptions options) {
    return asyncExecutor.execute(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return dbExecutor.bulkInsert(collectionName, entities, options);
        }
    });
}
项目:AbacusUtil    文件:AsyncMongoDBExecutor.java   
public CompletableFuture<BulkWriteResult> bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests,
        final BulkWriteOptions options) {
    return asyncExecutor.execute(new Callable<BulkWriteResult>() {
        @Override
        public BulkWriteResult call() throws Exception {
            return dbExecutor.bulkWrite(collectionName, requests, options);
        }
    });
}
项目:AbacusUtil    文件:MongoCollectionExecutor.java   
public int bulkInsert(final Collection<?> entities, final BulkWriteOptions options) {
    final List<InsertOneModel<Document>> list = new ArrayList<>(entities.size());

    for (Object entity : entities) {
        if (entity instanceof Document) {
            list.add(new InsertOneModel<Document>((Document) entity));
        } else {
            list.add(new InsertOneModel<Document>(MongoDBExecutor.toDocument(entity)));
        }
    }

    return bulkWrite(list, options).getInsertedCount();
}
项目:AbacusUtil    文件:MongoCollectionExecutor.java   
public BulkWriteResult bulkWrite(final List<? extends WriteModel<? extends Document>> requests, final BulkWriteOptions options) {
    if (options == null) {
        return coll.bulkWrite(requests);
    } else {
        return coll.bulkWrite(requests, options);
    }
}
项目:AbacusUtil    文件:AsyncMongoCollectionExecutor.java   
public CompletableFuture<Integer> bulkInsert(final Collection<?> entities, final BulkWriteOptions options) {
    return asyncExecutor.execute(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return collExecutor.bulkInsert(entities, options);
        }
    });
}
项目:AbacusUtil    文件:AsyncMongoCollectionExecutor.java   
public CompletableFuture<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends Document>> requests, final BulkWriteOptions options) {
    return asyncExecutor.execute(new Callable<BulkWriteResult>() {
        @Override
        public BulkWriteResult call() throws Exception {
            return collExecutor.bulkWrite(requests, options);
        }
    });
}
项目:df_data_service    文件:MongoAdminClient.java   
public MongoAdminClient importJsonFile(String fileNamePath) {
    int count = 0;
    int batch = 100;

    List<InsertOneModel<Document>> docs = new ArrayList<>();

    try (BufferedReader br = new BufferedReader(new FileReader(fileNamePath))) {
        String line;
        while ((line = br.readLine()) != null) {
            docs.add(new InsertOneModel<>(Document.parse(line)));
            count++;
            if (count == batch) {
                this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
                docs.clear();
                count = 0;
            }
        }
    } catch (IOException fnfe) {
        fnfe.printStackTrace();
    }

    if (count > 0) {
        collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
    }

    return this;
}
项目:df_data_service    文件:MongoAdminClient.java   
public MongoAdminClient importJsonInputStream(InputStream fileInputStream) {
    int count = 0;
    int batch = 100;

    List<InsertOneModel<Document>> docs = new ArrayList<>();

    try (BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream))) {
        String line;
        while ((line = br.readLine()) != null) {
            docs.add(new InsertOneModel<>(Document.parse(line)));
            count++;
            if (count == batch) {
                this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
                docs.clear();
                count = 0;
            }
        }
    } catch (IOException fnfe) {
        fnfe.printStackTrace();
    }

    if (count > 0) {
        collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
    }

    return this;
}
项目:mongo-java-driver-rx    文件:MongoCollectionImpl.java   
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
                                            final BulkWriteOptions options) {
    return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() {
        @Override
        public void apply(final SingleResultCallback<BulkWriteResult> callback) {
            wrapped.bulkWrite(requests, options, callback);
        }
    }), observableAdapter);
}
项目:mongo-java-driver-reactivestreams    文件:MongoCollectionImpl.java   
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
                                            final BulkWriteOptions options) {
    return new ObservableToPublisher<BulkWriteResult>(observe(new Block<SingleResultCallback<BulkWriteResult>>(){
        @Override
        public void apply(final SingleResultCallback<BulkWriteResult> callback) {
            wrapped.bulkWrite(requests, options, callback);
        }
    }));
}
项目:mongo-java-driver-reactivestreams    文件:MongoCollectionImpl.java   
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
                                            final List<? extends WriteModel<? extends TDocument>> requests,
                                            final BulkWriteOptions options) {
    return new ObservableToPublisher<BulkWriteResult>(observe(new Block<SingleResultCallback<BulkWriteResult>>(){
        @Override
        public void apply(final SingleResultCallback<BulkWriteResult> callback) {
            wrapped.bulkWrite(clientSession, requests, options, callback);
        }
    }));
}
项目:AbacusUtil    文件:MongoDBExecutor.java   
public int bulkInsert(final String collectionName, final Collection<?> entities, final BulkWriteOptions options) {
    return collExecutor(collectionName).bulkInsert(entities, options);
}
项目:AbacusUtil    文件:MongoDBExecutor.java   
public BulkWriteResult bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests,
        final BulkWriteOptions options) {
    return collExecutor(collectionName).bulkWrite(requests, options);
}
项目:mongo-java-driver-rx    文件:MongoCollectionImpl.java   
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
    return bulkWrite(requests, new BulkWriteOptions());
}
项目:mongo-java-driver-reactivestreams    文件:MongoCollectionImpl.java   
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
    return bulkWrite(requests, new BulkWriteOptions());
}
项目:mongo-java-driver-reactivestreams    文件:MongoCollectionImpl.java   
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
                                            final List<? extends WriteModel<? extends TDocument>> requests) {
    return bulkWrite(clientSession, requests, new BulkWriteOptions());
}
项目:mongo-java-driver-rx    文件:MongoCollection.java   
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param requests the writes to execute
 * @param options  the options to apply to the bulk write operation
 * @return an Observable with a single element the BulkWriteResult
 */
Observable<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
项目:mongo-java-driver-reactivestreams    文件:MongoCollection.java   
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param requests the writes to execute
 * @param options  the options to apply to the bulk write operation
 * @return a publisher with a single element the BulkWriteResult
 */
Publisher<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
项目:mongo-java-driver-reactivestreams    文件:MongoCollection.java   
/**
 * Executes a mix of inserts, updates, replaces, and deletes.
 *
 * @param clientSession the client session with which to associate this operation
 * @param requests the writes to execute
 * @param options  the options to apply to the bulk write operation
 * @return a publisher with a single element the BulkWriteResult
 * @mongodb.server.release 3.6
 * @since 1.7
 */
Publisher<BulkWriteResult> bulkWrite(ClientSession clientSession, List<? extends WriteModel<? extends TDocument>> requests,
                                     BulkWriteOptions options);