private Stream<JsonObject> performMapReduce(String dbIP, String tableName, String mapRedDir) { MongoClient mClient = new MongoClient(dbIP, 27017); MapReduceSources mrs = MapReduceSources.fromDir(mapRedDir); MongoDatabase database = mClient.getDatabase(tableName); Map<String, MapReduceIterable<Document>> mapRedMap = new HashMap<String, MapReduceIterable<Document>>(); for (String collName : database.listCollectionNames()) { MongoCollection<Document> collection = database.getCollection(collName); mapRedMap.put(collName, collection.mapReduce(mrs.getMapJSCode(), mrs.getReduceJSCode())); } Stream<JsonObject> result = adapter.adaptStream(mapRedMap); result = result.onClose(() -> { mClient.close();}); return result; }
public Stream<JsonObject> adaptStream(Map<String, MapReduceIterable<Document>> mapRedMap) { Stream<JsonObject> result = Stream.empty(); JsonParser parser = new JsonParser(); result = mapRedMap.entrySet().stream().flatMap(e -> StreamSupport.stream(e.getValue().spliterator(), false).map(doc -> { JsonObject jObj = (JsonObject)(parser).parse(doc.get("_id").toString()); jObj.addProperty("_type", e.getKey()); return jObj; })); return result; }
private void applyPropertiesToCursor( MongoIterable<Document> mongoIterable, QueryProperties queryProps, boolean includeMetaDataSearchLimit, boolean includeSortExpr ) { if( includeMetaDataSearchLimit ) { Integer searchLimit = getModel().getEffectiveMDSearchLimit( queryProps ); if( searchLimit > 0 ) { // Apply to FindIterable or MapReduceIterable if ( mongoIterable instanceof FindIterable ) { FindIterable<Document> findIterable = (FindIterable<Document>) mongoIterable; findIterable.limit( searchLimit.intValue( ) ); } else if ( mongoIterable instanceof MapReduceIterable ) { MapReduceIterable<Document> mapReduceIterable = (MapReduceIterable<Document>) mongoIterable; mapReduceIterable.limit( searchLimit.intValue( ) ); } } } applyPropertiesToCursor( mongoIterable, queryProps, includeSortExpr ); }
public ProfiledMapReduceIterable(String mapFunction, String reduceFunction, MapReduceIterable<TResult> mapReduce, ProfiledMongoCollection<TDocument> collection) { super(); this.mapFunction = mapFunction; this.reduceFunction = reduceFunction; this.mapReduceIterable = mapReduce; this.collection = collection; }
public MapReduceIterable<TResult> getMapReduceIterable() { return mapReduceIterable; }
public cfData execute(cfSession _session, cfArgStructData argStruct ) throws cfmRunTimeException { MongoDatabase db = getMongoDatabase( _session, argStruct ); String collection = getNamedStringParam(argStruct, "collection", null); if ( collection == null ) throwException(_session, "please specify a collection"); String map = getNamedStringParam(argStruct, "map", null ); if ( map == null ) throwException(_session, "please specify a map"); String reduce = getNamedStringParam(argStruct, "reduce", null ); if ( reduce == null ) throwException(_session, "please specify a reduce"); String outputcollection = getNamedStringParam(argStruct, "outputcollection", null ); if ( outputcollection == null ) throwException(_session, "please specify a outputcollection"); String action = getNamedStringParam(argStruct, "type", "replace" ).toLowerCase(); String finalize = getNamedStringParam(argStruct, "finalize", null ); cfData query = getNamedParam(argStruct, "query", null ); try{ MapReduceIterable<Document> mi = db.getCollection( collection ).mapReduce( map, reduce ); if ( query != null ) mi.filter( getDocument( query ) ); if ( finalize != null ) mi.finalizeFunction( finalize ); mi.collectionName( outputcollection ); mi.action( MapReduceAction.valueOf( action ) ); // Kick start the map reduce mi.first(); return cfBooleanData.TRUE; } catch (MongoException me){ throwException(_session, me.getMessage()); return null; } }
<T extends IEntity, RESULT> MapReduceIterable<RESULT> mapReduce(Class<T> entity, Class<RESULT> resultClass, String mapFunction, String reduceFunction) throws Exception;
<T extends IEntity> MapReduceIterable<Document> mapReduce(Class<T> entity, String mapFunction, String reduceFunction) throws Exception;