Python elasticsearch.helpers 模块,parallel_bulk() 实例源码

我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用elasticsearch.helpers.parallel_bulk()

项目:scrapy-cdr    作者:TeamHG-Memex    | 项目源码 | 文件源码
def parallel_bulk(client, actions, thread_count=4, chunk_size=500,
                  max_chunk_bytes=100 * 1024 * 1024,
                  expand_action_callback=es_helpers.expand_action,
                  **kwargs):
    """ es_helpers.parallel_bulk rewritten with imap_fixed_output_buffer
    instead of Pool.imap, which consumed unbounded memory if the generator
    outruns the upload (which usually happens).
    """
    actions = map(expand_action_callback, actions)
    for result in imap_fixed_output_buffer(
            lambda chunk: list(
                es_helpers._process_bulk_chunk(client, chunk, **kwargs)),
            es_helpers._chunk_actions(actions, chunk_size, max_chunk_bytes,
                                      client.transport.serializer),
            threads=thread_count,
        ):
        for item in result:
            yield item
项目:pulsar    作者:mcholste    | 项目源码 | 文件源码
def bulk(self, client, actions, stats_only=False, **kwargs):
        success, failed = 0, 0

        # list of errors to be collected is not stats_only
        errors = []

        for ok, item in parallel_bulk(client, actions, **kwargs):
            # go through request-reponse pairs and detect failures
            if not ok:
                if not stats_only:
                    errors.append(item)
                failed += 1
            else:
                success += 1

        return success, failed if stats_only else errors
项目:mediachain-indexer    作者:mediachain    | 项目源码 | 文件源码
def parallel_bulk(self,
                      the_iter,
                      *args,
                      **kw):

        if self.use_custom_parallel_bulk:
            return self._non_parallel_bulk(the_iter,
                                           *args,
                                           **kw)

        else:
            return es_parallel_bulk(self.es,
                                    the_iter,
                                    *args,
                                    **kw)
项目:web    作者:pyjobs    | 项目源码 | 文件源码
def _perform_index_sync(self, sql_table_cls, es_doc_cls, id_logger):
        es_doc = es_doc_cls()

        elasticsearch_conn = connections.get_connection()

        sync_timestamp = current_server_timestamp()

        pending_insertions = self._compute_dirty_documents(
            sql_table_cls, es_doc.doc_type)

        bulk_op = self._synchronisation_op(es_doc, pending_insertions)

        self._logging(logging.INFO, 'Performing synchronization.')

        for ok, info in parallel_bulk(elasticsearch_conn, bulk_op):
            obj_id = info['index']['_id'] \
                if 'index' in info else info['update']['_id']

            if ok:
                # Mark the task as handled so we don't retreat it next time
                self._logging(logging.INFO,
                              'Document %s has been synced successfully.'
                              % obj_id)

                sql_table_cls.update_last_sync(obj_id, sync_timestamp)
            else:
                id_logger(obj_id, logging.ERROR,
                          'Error while syncing document %s index.' % obj_id)

        # Refresh indices to increase research speed
        elasticsearch_dsl.Index(es_doc.index).refresh()
项目:web    作者:pyjobs    | 项目源码 | 文件源码
def _geocomplete_index_batch(self, elasticsearch_conn, to_index):
        log_msg = 'Indexing documents.'
        self._logging(logging.INFO, log_msg)

        for ok, info in parallel_bulk(elasticsearch_conn, to_index):
            if not ok:
                doc_id = info['create']['_id']
                doc_type = info['create']['_type']
                doc_index = info['create']['_index']

                logging_level = logging.ERROR
                err_msg = "Couldn't index document: '%s', of type: %s, " \
                          "under index: %s." % (doc_id, doc_type, doc_index)

                self._logging(logging_level, err_msg)
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def run(self):
        # Chequeo si existe el índice, si no, lo creo
        if not self.elastic.indices.exists(settings.TEST_INDEX):
            self.elastic.indices.create(settings.TEST_INDEX,
                                        body=INDEX_CREATION_BODY)

        for interval in COLLAPSE_INTERVALS:
            self.init_series(interval)

        for success, info in parallel_bulk(self.elastic, self.bulk_items):
            if not success:
                print("ERROR:", info)
项目:series-tiempo-ar-api    作者:datosgobar    | 项目源码 | 文件源码
def run(self, distributions=None):
        """Indexa en Elasticsearch todos los datos de las
        distribuciones guardadas en la base de datos, o las
        especificadas por el iterable 'distributions'
        """
        self.init_index()

        # Optimización: Desactivo el refresh de los datos mientras indexo
        self.elastic.indices.put_settings(
            index=self.index,
            body=constants.DEACTIVATE_REFRESH_BODY
        )

        logger.info(strings.INDEX_START)

        for distribution in distributions:
            fields = distribution.field_set.all()
            fields = {field.title: field.series_id for field in fields}
            df = self.init_df(distribution, fields)

            self.generate_properties(df, fields)

        logger.info(strings.BULK_REQUEST_START)

        for success, info in parallel_bulk(self.elastic, self.bulk_actions):
            if not success:
                logger.warn(strings.BULK_REQUEST_ERROR, info)

        logger.info(strings.BULK_REQUEST_END)

        # Reactivo el proceso de replicado una vez finalizado
        self.elastic.indices.put_settings(
            index=self.index,
            body=constants.REACTIVATE_REFRESH_BODY
        )
        segments = constants.FORCE_MERGE_SEGMENTS
        self.elastic.indices.forcemerge(index=self.index,
                                        max_num_segments=segments)

        logger.info(strings.INDEX_END)
项目:dragnet    作者:excieve    | 项目源码 | 文件源码
def pump_it(es, bulk_accumulator):
    rows_pumped = 0
    # TODO: make threads and chunks configurable
    for success, info in parallel_bulk(es, bulk_accumulator, thread_count=16, chunk_size=300):
        if success:
            rows_pumped += 1
        else:
            logger.warning('Pumping documents failed: {}'.format(info))

    return rows_pumped
项目:browbeat    作者:openstack    | 项目源码 | 文件源码
def flush_cache(self):
        if len(self.cache) == 0:
            return True
        retry = 2
        for i in range(retry):
            try:
                to_upload = helpers.parallel_bulk(self.es,
                                                  self.cache_insertable_iterable())
                counter = 0
                num_items = len(self.cache)
                for item in to_upload:
                    self.logger.debug("{} of {} Elastic objects uploaded".format(num_items,
                                                                                 counter))
                    counter = counter + 1
                output = "Pushed {} items to Elasticsearch to index {}".format(num_items,
                                                                               self.index)
                output += " and browbeat UUID {}".format(str(browbeat_uuid))
                self.logger.info(output)
                self.cache = deque()
                self.last_upload = datetime.datetime.utcnow()
                return True
            except Exception as Err:
                self.logger.error(
                    "Error pushing data to Elasticsearch, going to retry"
                    " in 10 seconds")
                self.logger.error("Exception: {}".format(Err))
                time.sleep(10)
                if i == (retry - 1):
                    self.logger.error("Pushing Data to Elasticsearch failed in spite of retry,"
                                      " dumping JSON for {} cached items".format(len(self.cache)))
                    for item in self.cache:
                        filename = item['test_name'] + '-' + item['identifier']
                        filename += '-elastic' + '.' + 'json'
                        elastic_file = os.path.join(item['result_dir'],
                                                    filename)

                        with open(elastic_file, 'w') as result_file:
                            json.dump(item['result'],
                                      result_file,
                                      indent=4,
                                      sort_keys=True)

                            self.logger.info("Saved Elasticsearch consumable result JSON to {}".
                                             format(elastic_file))
                    self.cache = deque()
                    self.last_upload = datetime.datetime.utcnow()
                    return False