/** * If environment variable ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT is true, * this method will insert local notes into MongoDB on startup. * If a note already exists in MongoDB, skip it. */ private void insertFileSystemNotes() throws IOException { LinkedList<Document> docs = new LinkedList<>(); // docs to be imported NotebookRepo vfsRepo = new VFSNotebookRepo(this.conf); List<NoteInfo> infos = vfsRepo.list(null); // collect notes to be imported for (NoteInfo info : infos) { Note note = vfsRepo.get(info.getId(), null); Document doc = noteToDocument(note); docs.add(doc); } /* * 'ordered(false)' option allows to proceed bulk inserting even though * there are duplicated documents. The duplicated documents will be skipped * and print a WARN log. */ try { coll.insertMany(docs, new InsertManyOptions().ordered(false)); } catch (MongoBulkWriteException e) { printDuplicatedException(e); //print duplicated document warning log } vfsRepo.close(); // it does nothing for now but maybe in the future... }
/** * Insert bulk edges * * @see: outV|label|inV is unique in ChronoGraph. this operation does not permit * duplicated edges * @param edges * use with Converter.getBsonDocumentEdge */ public void addEdges(List<BsonDocument> edgeArray) { while (true) { try { edges.insertMany(edgeArray); return; } catch (MongoBulkWriteException e) { if (e.getCode() == -3) { int cnt = e.getWriteResult().getInsertedCount(); edgeArray = edgeArray.subList(cnt + 1, edgeArray.size()); } else throw e; } } }
public HashMap<String, Object> capture(List<BsonDocument> bsonDocumentList) { HashMap<String, Object> retMsg = new HashMap<String, Object>(); MongoCollection<BsonDocument> collection = Configuration.mongoDatabase.getCollection("EventData", BsonDocument.class); try { InsertManyOptions option = new InsertManyOptions(); option.ordered(false); collection.insertMany(bsonDocumentList, option); } catch (MongoBulkWriteException e) { retMsg.put("error", e.getMessage()); return retMsg; } retMsg.put("eventCaptured", bsonDocumentList.size()); return retMsg; }
/** * Ensures current exception has been generated due to a duplicate (primary) key. * Differentiates between Fongo and Mongo exceptions since the behaviour under these databases * is different. */ public static void assertDuplicateKeyException(Throwable exception) { Preconditions.checkNotNull(exception, "exception"); // unwrap, if necessary exception = exception instanceof MongoException ? exception : exception.getCause(); // fongo throws directly DuplicateKeyException if (exception instanceof DuplicateKeyException) return; // MongoDB throws custom exception if (exception instanceof MongoCommandException) { String codeName = ((MongoCommandException) exception).getResponse().get("codeName").asString().getValue(); int errorCode = ((MongoCommandException) exception).getErrorCode(); check(codeName).is("DuplicateKey"); check(errorCode).is(11000); // all good here (can return) return; } // for bulk writes as well if (exception instanceof MongoBulkWriteException) { List<BulkWriteError> errors = ((MongoBulkWriteException) exception).getWriteErrors(); check(errors).hasSize(1); check(errors.get(0).getCode()).is(11000); check(errors.get(0).getMessage()).contains("duplicate key"); return; } // if we got here means there is a problem (no duplicate key exception) fail("Should get duplicate key exception after " + exception); }
protected Map<Integer, BulkWriteError> executeBulkUpdate(List<UpdateManyModel<Document>> documentsToUpdate) { // mapping of array indices for flow file errors Map<Integer, BulkWriteError> writeErrors = new HashMap<>(); try { collection.bulkWrite(documentsToUpdate); } catch (MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError docError : errors) { writeErrors.put(docError.getIndex(), docError); } getLogger().warn("Error occurred during bulk write", e); } return writeErrors; }
protected Map<Integer, BulkWriteError> executeBulkInsert(List<InsertOneModel<Document>> documentsToInsert) { // mapping of array indices for flow file errors Map<Integer, BulkWriteError> writeErrors = new HashMap<>(); try { collection.bulkWrite(documentsToInsert, writeOptions); } catch (MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError docError : errors) { writeErrors.put(docError.getIndex(), docError); } getLogger().warn("Unable to perform bulk inserts", e); } return writeErrors; }
/** * MongoBulkWriteException contains error messages that inform * which documents were duplicated. This method catches those ID and print them. * @param e */ private void printDuplicatedException(MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError error : errors) { String msg = error.getMessage(); Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID Matcher matcher = pattern.matcher(msg); if (matcher.find()) { // if there were a note ID String noteId = matcher.group(); LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB"); } } }
@Test(expected = MongoBulkWriteException.class) @UsingDataSet(loadStrategy = LoadStrategyEnum.DELETE_ALL) public void shouldNotInsertOneFooWithIdTwice() throws IOException { // given final Foo foo = new FooBuilder().withId(new ObjectId("54c28b0b0f2dacc85ede5286")) .withStringField("jdoe").withPrimitiveIntField(42).withEnumFoo(EnumFoo.FOO).build(); // when 2 inserts should fail fooCollection.add(foo); fooCollection.add(foo); }
public Representation getRepresentation(HttpServerExchange exchange, MongoBulkWriteException mbwe) throws IllegalQueryParamenterException { final String requestPath = buildRequestPath(exchange); final Representation rep = createRepresentation(exchange, null, exchange.getRequestPath()); addWriteResult(mbwe.getWriteResult(), rep, requestPath); addWriteErrors(mbwe.getWriteErrors(), rep); return rep; }