Java 类com.mongodb.BulkWriteException 实例源码

项目:lightblue-mongo    文件:BatchUpdate.java   
/**
 * Runs a batch update using bwo
 *
 * @param bwo The bulk write operation
 * @param writeConcern
 * @param batchSize
 * @param results The results are populated during this call with an error for each failed doc
 * @param logger The logger
 *
 * @return If returns true, all docs are updated. Otherwise, there
 * are some failed docs, and concurrent update error detection
 * should be called
 */
public static boolean batchUpdate(BulkWriteOperation bwo,
                                  WriteConcern writeConcern,
                                  int batchSize,
                                  Map<Integer,Error> results,
                                  Logger logger) {
    boolean ret=true;
    BulkWriteResult writeResult;
    logger.debug("attemptToUpdate={}",batchSize);
    try {
        if(writeConcern==null) {
            writeResult=bwo.execute();
        } else {
            writeResult=bwo.execute(writeConcern);
        }
        logger.debug("writeResult={}",writeResult);
        if(batchSize==writeResult.getMatchedCount()) {
            logger.debug("Successful update");
        } else {
            logger.warn("notUpdated={}",batchSize-writeResult.getMatchedCount());
            ret=false;
        }
    } catch (BulkWriteException e) {
        List<BulkWriteError> writeErrors=e.getWriteErrors();
        if(writeErrors!=null) {
            for(BulkWriteError we:writeErrors) {
                if (MongoCrudConstants.isDuplicate(we.getCode())) {
                    results.put(we.getIndex(),
                                Error.get("update", MongoCrudConstants.ERR_DUPLICATE, we.getMessage()));
                } else {
                    results.put(we.getIndex(),
                                Error.get("update", MongoCrudConstants.ERR_SAVE_ERROR, we.getMessage()));
                }
            }
        }
        ret=false;
    }
    return ret;
}
项目:hvdf    文件:RawStorageInterceptor.java   
private void storeBatch(BasicDBList sample, BasicDBList resultList) {

    // The batch may span collection splits, so maintain
    // a current collection and batch operation
    BulkWriteOperation currentOp = null;
    int currentOpOffset = 0;
    int sampleIdx = 0;
    DBCollection currentColl = null;    

    logger.debug("Received batch of size : {}", sample.size());

    try{
        for(; sampleIdx < sample.size(); ++sampleIdx){

            // prepare the sample to batch
            BasicDBObject doc = (BasicDBObject) (sample.get(sampleIdx));
            SampleId _id = this.idFactory.createId(doc);
            doc.put(Sample.ID_KEY, _id.toObject());
            resultList.add(_id.toObject());
            long timestamp = doc.getLong(Sample.TS_KEY);
            DBCollection collection = collectionAllocator.getCollection(timestamp);

            // if the collection has changed, commit the current
            // batch to the collection and start new
            if(collection.equals(currentColl) == false){
                executeBatchWrite(currentOp, sample);
                currentColl = collection;
                currentOp = collection.initializeUnorderedBulkOperation();
                currentOpOffset = sampleIdx;
            }

            // put the doc insert into the batch
            currentOp.insert(doc);
        }       

        // Finalize the last batch
        executeBatchWrite(currentOp, sample);       

    } catch(Exception ex){

        // One of the bulk writes has failed
        BasicDBList failedDocs = new BasicDBList();
        if(ex instanceof BulkWriteException){

            // We need to figure out the failures and remove the writes
            // that worked from the batch
            int batchSize = sampleIdx - currentOpOffset;
            BulkWriteException bwex = (BulkWriteException)ex;
            int errorCount = bwex.getWriteErrors().size(); 
            if(errorCount < batchSize){

                for(BulkWriteError we : bwex.getWriteErrors()){
                    failedDocs.add(sample.get(currentOpOffset + we.getIndex()));
                }

                // since we have accounted for the failures in the current
                // batch, move the offset forward to the last sample
                currentOpOffset = sampleIdx;                    
            }
        }

        // If this happened part way through the batch, send remaining 
        // docs to failed list and update sample to contain only failed docs
        if(currentOpOffset > 0){
            for(; currentOpOffset < sample.size(); ++currentOpOffset)
                failedDocs.add(sample.get(currentOpOffset));
            sample.clear();
            sample.addAll(failedDocs);  
        }

        // TODO : we also need to handle the result Ids here as well,
        // the failed doc Ids must be pulled from the resultList
        throw ex;
    }
}