我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用concurrent.futures.append()。
def upload_blocks(bucket, chunk_size, max_threads, lines): session = botocore.session.get_session() client = session.create_client('s3') start = time.perf_counter() futures = [] with ThreadPoolExecutor(max_workers=max_threads) as executor: # Start the load operations and mark each future with its URL for line in lines: raw_block, key = load_json_block(line) futures.append(executor.submit(client.put_object,Bucket=bucket, Key=key, Body=raw_block, ContentEncoding='UTF-8', ContentType='application/json')) end = time.perf_counter() done, pending = concurrent.futures.wait(futures) complete = time.perf_counter() rate = 1 / ((complete - start) / len(done)) return len(done), int(rate)
def get(self, request): alerts = [] try: url = urljoin(settings.PROMGEN['alertmanager']['url'], '/api/v1/alerts') response = util.get(url) except requests.exceptions.ConnectionError: logger.error('Error connecting to %s', url) return JsonResponse({}) data = response.json().get('data', []) if data is None: # Return an empty alert-all if there are no active alerts from AM return JsonResponse({}) for alert in data: alert.setdefault('annotations', {}) # Humanize dates for frontend for key in ['startsAt', 'endsAt']: if key in alert: alert[key] = parser.parse(alert[key]) # Convert any links to <a> for frontend for k, v in alert['annotations'].items(): alert['annotations'][k] = defaultfilters.urlize(v) alerts.append(alert) return JsonResponse(alerts, safe=False)
def get(self, request, label): data = set() futures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: for host in models.Shard.objects.filter(proxy=True): futures.append(executor.submit(util.get, '{}/api/v1/label/{}/values'.format(host.url, label), headers=self.headers)) for future in concurrent.futures.as_completed(futures): try: result = future.result() # Need to try to decode the json BEFORE we raise_for_status # so that we can pass back the error message from Prometheus _json = result.json() result.raise_for_status() logger.debug('Appending data from %s', result.request.url) data.update(_json['data']) except: logger.exception('Error with response') _json['promgen_proxy_request'] = result.request.url return JsonResponse(_json, status=result.status_code) return JsonResponse({ 'status': 'success', 'data': sorted(data) })
def main(args): results = [] state_machine_arn = args.state_machine_arn # TODO(dw): pagination for > 100 executions executions = SFN.list_executions( stateMachineArn=state_machine_arn, statusFilter='RUNNING' ) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: futures = [] for e in executions['executions']: future = executor.submit(get_execution_details, e['executionArn']) futures.append(future) for future in concurrent.futures.as_completed(futures): results.append(future.result()) print(json.dumps(results))
def run_multiprocess(func, tasks, *args, **kwargs): results = [] remains = list(enumerate(tasks)) while remains: errors = [] with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: futures = [executor.submit(func, *tuple(chain([task], args)), **kwargs) for _, task in remains] concurrent.futures.wait(futures) for future, t in zip(futures, remains): n, task = t try: results.append((n, future.result())) except Exception as e: errors.append((n, task)) remains = errors return list(map(lambda x: x[1], sorted(results, key=lambda x: x[0])))
def _make_request(self, edited_title, current_datetime, diff, imdbID): from_date = (current_datetime - datetime.timedelta(days=7-diff)).strftime('%Y-%m-%d') to_date = (current_datetime - datetime.timedelta(days=6-diff)).strftime('%Y-%m-%d') tweets = [] ## Make search request ## Request not to recieve tweets that contain links, follow the RT pattern of retweets try: response = self.api.GetSearch(term='"'+edited_title +'" -filter:links -RT', since=from_date, until=to_date, lang='en', result_type='mixed') except Exception as e: print(e) for tweet in response: ## Tag movie with imdbID tweet.imdbID = imdbID ## Only append Tweets in English if tweet.lang == 'en' or tweet.user.lang == 'en': tweets.append(tweet) return tweets # Make complicated title simple to improve search results
def extract(self, season, result): self.default_url = result with concurrent.futures.ThreadPoolExecutor(16) as executor: # First step, we extract links to all the episodes episodes, base_url, pages = self.page_worker( self.default_url ) futures = [] for page in pages: self.logger.debug('Processing page {}'.format(page)) futures.append(executor.submit( self.page_worker, base_url + str(page) )) results = concurrent.futures.wait(futures) for completed in results.done: episodes += completed.result() # Second step, we get all the available sources. list(executor.map(self.episode_worker, episodes))
def _register(self): futures = [] for message in self._register_requests(): self._stream.wait_for_ready() future = self._stream.send( message_type=Message.TP_REGISTER_REQUEST, content=message.SerializeToString()) futures.append(future) for future in futures: resp = TpRegisterResponse() try: resp.ParseFromString(future.result().content) LOGGER.info("register attempt: %s", TpRegisterResponse.Status.Name(resp.status)) except ValidatorConnectionError as vce: LOGGER.info("during waiting for response on registration: %s", vce)
def do_load(args): auth_info = _get_auth_info(args.auth_user, args.auth_password) with open(args.filename, mode='rb') as fd: batches = batch_pb2.BatchList() batches.ParseFromString(fd.read()) start = time.time() futures = [] executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) for batch_list in _split_batch_list(batches): fut = executor.submit(post_batches, args.url, auth_info, batch_list) futures.append(fut) # Wait until all futures are complete wait(futures) stop = time.time() print("batches: {} batch/sec: {}".format( str(len(batches.batches)), len(batches.batches) / (stop - start)))
def run(self): PluginHelpers.run_method_for_each('on_start', self.global_options, self.run_log) max_workers = 1 if self.global_options.get('tests_sequential', False) else None executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) futures = [] for set_item in self.settings: if 'url' in set_item: url = set_item['url'] futures.append(executor.submit(self.run_tests, url, set_item, self.global_options)) elif 'wait' in set_item: now = time.time() executor.shutdown(True) # wait for all current jobs to stop then = time.time() wait_more = set_item['wait'] - (then - now) if wait_more > 0: time.sleep(wait_more) executor = concurrent.futures.ThreadPoolExecutor() # replace the executor executor.shutdown(True) # wait for all current jobs to stop for future in futures: self.run_log.append(future.result()) PluginHelpers.run_method_for_each('on_end', self.global_options, self.run_log)
def detectValidExtension(self, future) : if not self.stopThreads : html = future.result()[0].text ext = future.ext[0] r = self.isASuccessfulUpload(html) if r : self.validExtensions.append(ext) if self.shouldLog : self.logger.info("\033[1m\033[42mExtension %s seems valid for this form.\033[m", ext) if r != True : self.logger.info("\033[1;32mTrue regex matched the following information : %s\033[m",r) return r else : return None #detects valid extensions for this upload form (sending legit files with legit mime types)
def tiles_urllib(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties, progress_bar: bool, n_parallel, timeout: float) -> List[HipsTile]: """Generator function to fetch HiPS tiles from a remote URL.""" with concurrent.futures.ThreadPoolExecutor(max_workers=n_parallel) as executor: futures = [] for meta in tile_metas: url = hips_survey.tile_url(meta) future = executor.submit(fetch_tile_urllib, url, meta, timeout) futures.append(future) futures = concurrent.futures.as_completed(futures) if progress_bar: from tqdm import tqdm futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles') tiles = [] for future in futures: tiles.append(future.result()) return tiles
def fetch_all_tiles_aiohttp(tile_metas: List[HipsTileMeta], hips_survey: HipsSurveyProperties, progress_bar: bool, n_parallel: int, timeout: float) -> List[HipsTile]: """Generator function to fetch HiPS tiles from a remote URL using aiohttp.""" import aiohttp connector = aiohttp.TCPConnector(limit=n_parallel) async with aiohttp.ClientSession(connector=connector) as session: futures = [] for meta in tile_metas: url = hips_survey.tile_url(meta) future = asyncio.ensure_future(fetch_tile_aiohttp(url, meta, session, timeout)) futures.append(future) futures = asyncio.as_completed(futures) if progress_bar: from tqdm import tqdm futures = tqdm(futures, total=len(tile_metas), desc='Fetching tiles') tiles = [] for future in futures: tiles.append(await future) return tiles
def chunkify(iterable, chunksize=10000): i = 0 chunk = [] for item in iterable: chunk.append(item) i += 1 if i == chunksize: yield chunk i = 0 chunk = [] if len(chunk) > 0: yield chunk
def populate(filename, max_workers, max_threads, chunk_size, bucket=S3_BLOCKS_BUCKET): with open(filename, mode='r') as f: chunks = chunkify(f, chunksize=chunk_size) start = time.perf_counter() func = functools.partial(upload_blocks, bucket, chunk_size, max_threads) counter = 0 samples = 20 chunk_rates = deque(maxlen=samples) actual_rates = deque(maxlen=samples) overheads = deque(maxlen=samples) with Pool(processes=max_workers) as pool: results = pool.imap_unordered(func, chunks, chunksize=1) for count, rate in results: elapsed = int(time.perf_counter() - start) counter += count chunk_rates.append(rate) avg_chunk_rate = int(sum(chunk_rates)/samples) perfect = avg_chunk_rate * max_workers actual = int(counter / elapsed) actual_rates.append(actual) avg_actual_rate = int(sum(actual_rates)/samples) overhead = int(100 - ((actual / perfect) * 100)) overheads.append(overhead) avg_overhead = int(sum(overheads)/samples) report_progress(counter, avg_chunk_rate, avg_actual_rate, avg_overhead) end = time.perf_counter() complete = time.perf_counter() print('master scheduling time:%s complete_time:%s b/s: %s' % ( end - start, complete - start, 1 / ((complete - start) / chunk_size) ))
def get_context_data(self, **kwargs): context = super(HostList, self).get_context_data(**kwargs) context['host_groups'] = collections.defaultdict(list) for host in context['object_list']: context['host_groups'][host.name].append(host) context['host_groups'] = dict(context['host_groups']) return context
def form_valid(self, form): project = get_object_or_404(models.Project, id=self.kwargs['pk']) futures = [] context = { 'target': self.request.POST['target'].strip('#'), 'results': [], 'errors': [], } headers = { 'referer': project.get_absolute_url() } # Default /metrics path if not form.cleaned_data['path']: form.cleaned_data['path'] = 'metrics' if not project.farm: context['errors'].append({'url': headers['referer'], 'message': 'Missing Farm'}) else: with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: for host in project.farm.host_set.all(): futures.append(executor.submit(util.get, 'http://{}:{}/{}'.format( host.name, form.cleaned_data['port'], form.cleaned_data['path'] ), headers=headers)) for future in concurrent.futures.as_completed(futures): try: result = future.result() context['results'].append(result) except: result = future.exception() logger.warning('Error with response') context['errors'].append({'url': result.request.url, 'message': result}) return JsonResponse({'#' + context['target']: render_to_string('promgen/ajax_exporter.html', context)})
def post(self, request): silences = collections.defaultdict(list) try: url = urljoin(settings.PROMGEN['alertmanager']['url'], '/api/v1/silences') response = util.get(url) except requests.exceptions.ConnectionError: logger.error('Error connecting to %s', url) return JsonResponse({}) data = response.json().get('data', []) if data is None: # Return an empty silence-all if there are no active silences from AM return JsonResponse({}) currentAt = datetime.datetime.now(datetime.timezone.utc) for silence in data: if 'comment' in silence: silence['comment'] = defaultfilters.urlize(silence['comment']) # Since there is no status field, compare endsAt with the current time if 'endsAt' in silence: silence['endsAt'] = parser.parse(silence['endsAt']) if silence['endsAt'] < currentAt: continue silences['silence-all'].append(silence) for matcher in silence.get('matchers'): if matcher.get('name') in ['service', 'project']: silences['silence-{}-{}'.format(matcher.get('name'), matcher.get('value'))].append(silence) context = {'#' + slugify(key): render_to_string('promgen/ajax_silence.html', {'silences': silences[key], 'key': key}, request).strip() for key in silences} context['#silence-load'] = render_to_string('promgen/ajax_silence_button.html', {'silences': silences['silence-all'], 'key': 'silence-all'}).strip() return JsonResponse(context)
def get(self, request): data = [] futures = [] resultType = None with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: for host in models.Shard.objects.filter(proxy=True): futures.append(executor.submit(util.get, '{}/api/v1/query_range?{}'.format(host.url, request.META['QUERY_STRING']), headers=self.headers)) for future in concurrent.futures.as_completed(futures): try: result = future.result() # Need to try to decode the json BEFORE we raise_for_status # so that we can pass back the error message from Prometheus _json = result.json() result.raise_for_status() logger.debug('Appending data from %s', result.request.url) data += _json['data']['result'] resultType = _json['data']['resultType'] except: logger.exception('Error with response') _json['promgen_proxy_request'] = result.request.url return JsonResponse(_json, status=result.status_code) return JsonResponse({ 'status': 'success', 'data': { 'resultType': resultType, 'result': data, } })
def run_synchronize(func, tasks, *args, **kwargs): results = [] for task in tasks: results.append(func(task, *args, **kwargs)) return results
def run_raw_multiprocess(func, tasks, *args, **kwargs): pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), maxtasksperchild=1) futures = [] for task in tasks: futures.append(pool.apply_async(func, tuple(chain([task], args)), kwargs)) pool.close() results = list(map(lambda x: x.get(), futures)) return results
def append_results(results, function, *args, **kwargs): """ Calls *function* with the given *args* and *kwargs* then appends the result to *results* (which must be a list). If we're not in the main process the given *function* will be called using `safe_call`. """ if os.getpid() != PID: results.append(safe_call(function, *args, **kwargs)) else: results.append(function(*args, **kwargs))
def callback_when_complete(futures, callback): """ Calls *callback* after all *futures* (list) have completed running. """ counter = count(1) io_loop = IOLoop.current() results = [] def add_one(f): c = counter.next() results.append(f.result()) if c >= len(futures): return callback(results) for future in futures: io_loop.add_future(future, add_one)
def call_singleton(self, function, identifier, *args, **kwargs): """ Executes *function* if no other function with the given *identifier* is already running. If a function is currently running with the given *identifier* the passed *function* will be called when the first function is complete. In other words, functions called via this method will be executed in sequence with each function being called after the first is complete. The function will be passed any given *args* and *kwargs* just like :meth:`AsyncRunner.call`. If 'callback' is passed as a keyword argument (*kwargs*) it will be called with the result when complete. """ callback = kwargs.pop('callback', None) if identifier in ONE_CALLS: ONE_CALLS[identifier]['queue'].append( (function, args, kwargs, callback)) else: from collections import deque future = self.executor.submit(safe_call, function, *args, **kwargs) ONE_CALLS[identifier] = { 'future': future, 'queue': deque() } if callback: done_callback( ONE_CALLS[identifier]['future'], lambda f: callback(f.result())) completed = partial(_call_complete, self, identifier) done_callback(ONE_CALLS[identifier]['future'], completed) #print 'ONE_CALLS',ONE_CALLS #print 'identifier',identifier return ONE_CALLS[identifier]['future']
def add_task(self, funcs): """ Adds the given *funcs* to this schedule. """ if not isinstance(funcs, list): funcs = [funcs] # Make it a list self.funcs.append(funcs)
def search_movie(self, movie): """ :param movie: a Movie object with valid fields :return tweets: A List<twitter.models.Status> containing statuses posted between one year before the movie was released and the current date if movie is a Movie object Otherwise returns None """ if movie.Title == '' or not isinstance(movie, Movie) or not isinstance(movie.Title,str): return None edited_title = self.__clean_title(movie.Title) imdbID = movie.imdbID current_datetime = timezone.now() executor = concurrent.futures.ThreadPoolExecutor(max_workers=7) futures = [] tweets = [] print('Searching Twitter for ' + edited_title) for diff in range(0, 6): futures.append(executor.submit(self._make_request, edited_title, current_datetime, diff, imdbID)) for future in futures: tweets += future.result() if tweets == []: return None return tweets # Make the request to Twitter (is run by individual threads)
def search(self, title): """ :params title: a string holding the title of the movie :return movie: if at least one movie with a similar title is found, this is a Movie object created from the most relevant result returned by OMDb, otherwise it is empty list """ ## Search for all movies with similar titles try: matching_movies = omdb.search_movie(title) except: raise ConnectionError ## Return most relevant movie highestIMDB = 0 if matching_movies: movie = matching_movies.pop(0) print("MOVIE: " + movie.title) try: movieObj = Movie.objects.get(imdbID=movie.imdb_id) except Movie.DoesNotExist: movieObj = None if not movieObj: response = omdb.request(i=movie.imdb_id, tomatoes=True, type='movie').json() movieObj = Movie().fillWithJsonObject(response) if not movieObj: return None self.known_omdb_titles.append(movieObj.Title) return movieObj else: return None
def search(self): html = requests.post( 'http://animehaven.org/wp-admin/admin-ajax.php', data={ 'action': 'search_ajax', 'keyword': self.media.metadata['name'] } ).text html = html.replace('\\n', '') html = html.replace('\\t', '') html = html.replace('\\', '') soup = BeautifulSoup(html, 'html.parser') results = [] for result in soup.find_all('div', {'class': 'sa_post'}): title_block = result.find('h6') link = title_block.find('a') # The first one is the good one title, href = link.get('title'), link.get('href') self.logger.debug('Found block {} ({})'.format(title, href)) versions_soup = self._get(href) versions = list( ('Sub' if 'sub' in x.text.lower() else 'Dub', x.get('href')) for x in versions_soup.find_all('a', {'class': 'ah_button'}) ) for version, url in versions: self.logger.debug('-> Found version {}'.format(url)) results.append(('{} ({})'.format(title, version), url)) return SearchResult.from_tuples(self.media, results)
def search(self): soup = self._get('http://www.icefilms.info/search.php', params={ 'q': self.media.metadata['name'], 'x': 0, 'y': 0 }) results = [] for result in soup.select('.title a'): results.append((result.text, result.get('href'))) return SearchResult.from_tuples(self.media, results)
def extract(self, result): soup = self._get(self._get_url() + result) sources_soup = self._get( self._get_url() + soup.select('iframe#videoframe')[0].get('src') ) referer = self._get_url() + \ soup.select('iframe#videoframe')[0].get('src') with concurrent.futures.ThreadPoolExecutor(16) as executor: futures = [] for quality_div in sources_soup.select('.ripdiv'): self.logger.debug('=> ' + quality_div.find('b').text) t = re.search('t=(\d+?)"', sources_soup.prettify()).group(1) results = re.search( 'var s=(\d+?),m=(\d+?);', sources_soup.prettify()) s, m = results.group(1), results.group(2) sec = re.search( 'f.lastChild.value="(.+?)"', sources_soup.prettify() ).group(1) for source in quality_div.select('a[onclick]'): futures.append(executor.submit( self._source_worker, referer, t, sec, source )) list(concurrent.futures.as_completed(futures))
def first_pass(config): config = copy.deepcopy(config) config['result_dir_prefix'] = config['result_dir_prefix'] + '/first-pass' if config['debug']: print('Removing fixed params') config["fixed_params"] = {} config['max_iter'] = 5 if config['debug'] else 150 if config['debug']: print('Overriding max_iter params to %d' % config['max_iter']) dry_run = True if config['debug'] else False get_params = get_agent_class(config).get_random_config results = [] futures = [] with concurrent.futures.ProcessPoolExecutor(min(multiprocessing.cpu_count(), config['nb_process'])) as executor: nb_config = 5 if config['debug'] else 1000 for i in range(nb_config): params = get_params(config["fixed_params"]) config.update(params) futures.append(executor.submit(exec_first_pass, i, copy.deepcopy(config), params)) concurrent.futures.wait(futures) results = [] for future in futures: results.append(future.result()) return { 'results': sorted(results, key=lambda result: result['mean_score'], reverse=True) }
def second_pass(config, best_agent_config): config = copy.deepcopy(config) config.update(best_agent_config) config['result_dir_prefix'] = config['result_dir_prefix'] + '/second-pass' config['max_iter'] = 5 if config['debug'] else 500 futures = [] with concurrent.futures.ProcessPoolExecutor(min(multiprocessing.cpu_count(), config['nb_process'])) as executor: if config['debug']: lrs = [1e-4, 1e-2, 1] else: lrs = [1e-4, 2e-4, 3e-4, 4e-4, 5e-4, 6e-4, 7e-4, 8e-4, 9e-4, 1e-3, 2e-3, 3e-3, 4e-3, 5e-3, 6e-3, 7e-3, 8e-3, 9e-3, 1e-2, 2e-2, 3e-2, 4e-2, 5e-2, 6e-2, 7e-2, 8e-2, 9e-2, 1e-1, 2e-1, 3e-1, 4e-1, 5e-1, 6e-1, 7e-1, 8e-1, 9e-1, 1] for lr in lrs: config['lr'] = lr futures.append(executor.submit(exec_second_pass, copy.deepcopy(config))) concurrent.futures.wait(futures) results = [] for future in futures: results.append(future.result()) return { 'best_agent_config': best_agent_config , 'results': sorted(results, key=lambda result: result['mean_score'], reverse=True) }
def search(config): get_params = get_agent_class(config).get_random_config params_keys = list(get_params().keys()) nb_hp_params = len(params_keys) if config['debug']: print('*** Number of hyper-parameters: %d' % nb_hp_params) config['max_iter'] = 5 if config['debug'] else 500 futures = [] with concurrent.futures.ProcessPoolExecutor(min(multiprocessing.cpu_count(), config['nb_process'])) as executor: nb_config = 5 if config['debug'] else 200 * nb_hp_params for i in range(nb_config): params = get_params(config["fixed_params"]) config.update(params) config['random_seed'] = 1 futures.append(executor.submit(test_params, i, copy.deepcopy(config), copy.deepcopy(params))) concurrent.futures.wait(futures) results = [future.result() for future in futures] results = sorted(results, key=lambda result: result['mean_score'], reverse=True) best_params = results[0]['params'] return { 'best_params': best_params , 'results': results }
def run_all(self): try: self.api.connect() except URLError as e: self.application_logger.exception('Failed to connect to the server. Salt API URLError: %s', e.args[0].strerror) self.test_report_logger.debug(e) exit(1) # Run async tests started_counter = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [] for test in self.test_suite.test_cases_async: self.application_logger.info('Start test ' + test.name) futures.append(executor.submit(self._start_test_async, test)) for x in concurrent.futures.as_completed(futures): if not x.result(): self.application_logger.error('Error starting async test') executor.shutdown(wait=False) exit(1) started_counter += 1 self.application_logger.info('Started test %s of %s', started_counter, len(self.test_suite.test_cases_async)) test_counter = 0 self.application_logger.info('----------------Started all tests-----------------') for test in self.test_suite.test_cases_async: self.application_logger.info('CollectResult of Test ' + test.name) self._collect_result(test) test_counter += 1 self.application_logger.info('Collected results from %s of %s tests', test_counter, len(self.test_suite.test_cases_async)) self.application_logger.info('--------------Collected all results---------------') # Run sync tests for test in self.test_suite.test_cases_sync: self.application_logger.info('Start Test ' + test.name) self._start_test_sync(test) self.application_logger.info('\n')
def _find_filenames(directory, extensions, class_indices, follow_links=False): def _recursive_list(subpath): return sorted(os.walk(subpath, followlinks=follow_links), key=lambda tpl: tpl[0]) classes = [] filenames = [] subdir = os.path.basename(directory) basedir = os.path.dirname(directory) for root, _, files in _recursive_list(directory): for fname in files: is_valid = False for extension in extensions: if fname.lower().endswith('.' + extension): is_valid = True break if is_valid: classes.append(class_indices[subdir]) # add filename relative to directory absolute_path = os.path.join(root, fname) filenames.append(os.path.relpath(absolute_path, basedir)) return classes, filenames
def add_handler(self, handler): """Adds a transaction family handler Args: handler (TransactionHandler): the handler to be added """ self._handlers.append(handler)
def _split_batch_list(batch_list): new_list = [] for batch in batch_list.batches: new_list.append(batch) if len(new_list) == 100: yield batch_pb2.BatchList(batches=new_list) new_list = [] if new_list: yield batch_pb2.BatchList(batches=new_list)
def extract_source(self, source: DataWarehouseSchema, relations: List[RelationDescription]) -> List[RelationDescription]: """ For a given upstream source, iterate through given relations to extract the relations' data. """ self.logger.info("Extracting %d relation(s) from source '%s'", len(relations), source.name) failed = [] with Timer() as timer: for i, relation in enumerate(relations): try: def _monitored_table_extract(attempt_num): with etl.monitor.Monitor(relation.identifier, "extract", options=self.options_info(), source=self.source_info(source, relation), destination={'bucket_name': relation.bucket_name, 'object_key': relation.manifest_file_name}, index={"current": i + 1, "final": len(relations), "name": source.name}, dry_run=self.dry_run, attempt_num=attempt_num + 1): self.extract_table(source, relation) retries = get_config_int("arthur_settings.extract_retries") retry(retries, _monitored_table_extract, self.logger) except ETLRuntimeError: self.failed_sources.add(source.name) failed.append(relation) if not relation.is_required: self.logger.exception("Extract failed for non-required relation '%s':", relation.identifier) elif self.keep_going: self.logger.exception("Ignoring failure of required relation '%s' and proceeding as requested:", relation.identifier) else: self.logger.debug("Extract failed for required relation '%s'", relation.identifier) raise self.logger.info("Finished extract from source '%s': %d succeeded, %d failed (%s)", source.name, len(relations) - len(failed), len(failed), timer) return failed
def extract_sources(self) -> None: """ Iterate over sources to be extracted and parallelize extraction at the source level """ self.logger.info("Starting to extract %d relation(s) in %d schema(s)", len(self.relations), len(self.schemas)) self.failed_sources.clear() max_workers = len(self.schemas) # TODO With Python 3.6, we should pass in a thread_name_prefix with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for source_name, relation_group in groupby(self.relations, attrgetter("source_name")): future = executor.submit(self.extract_source, self.schemas[source_name], list(relation_group)) futures.append(future) if self.keep_going: done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED) else: done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION) if self.failed_sources: self.logger.error("Failed to extract from these source(s): %s", join_with_quotes(self.failed_sources)) # Note that iterating over result of futures may raise an exception (which surfaces exceptions from threads) missing_tables = [] # type: List for future in done: missing_tables.extend(future.result()) for table_name in missing_tables: self.logger.warning("Failed to extract: '%s'", table_name.identifier) if not_done: raise DataExtractError("Extract failed to complete for {:d} source(s)".format(len(not_done)))
def sync_with_s3(relations: List[RelationDescription], bucket_name: str, prefix: str, dry_run: bool=False) -> None: """ Copy (validated) table design and SQL files from local directory to S3 bucket. """ logger.info("Validating %d table design(s) before upload", len(relations)) RelationDescription.load_in_parallel(relations) files = [] # typing: List[Tuple[str, str]] for relation in relations: relation_files = [relation.design_file_name] if relation.is_transformation: if relation.sql_file_name: relation_files.append(relation.sql_file_name) else: raise MissingQueryError("Missing matching SQL file for '%s'" % relation.design_file_name) for file_name in relation_files: local_filename = relation.norm_path(file_name) remote_filename = os.path.join(prefix, local_filename) files.append((local_filename, remote_filename)) uploader = etl.s3.S3Uploader(bucket_name, dry_run=dry_run) with Timer() as timer: futures = [] # typing: List[concurrent.futures.Future] # TODO With Python 3.6, we should pass in a thread_name_prefix with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: for local_filename, remote_filename in files: futures.append(executor.submit(uploader.__call__, local_filename, remote_filename)) errors = 0 for future in concurrent.futures.as_completed(futures): exception = future.exception() if exception is not None: logger.error("Failed to upload file: %s", exception) errors += 1 if not dry_run: logger.info("Uploaded %d of %d file(s) to 's3://%s/%s (%s)", len(files) - errors, len(files), bucket_name, prefix, timer) if errors: raise ETLRuntimeError("There were {:d} error(s) during upload".format(errors))
def replace_vars(value, *, autonumify=False): if isinstance(value, dict): materialized = {} for k, v in value.items(): materialized[k] = replace_vars(v) return materialized if isinstance(value, list): materialized = [] for v in value: materialized.append(replace_vars(v)) return materialized if type(value) is str: for var in re.findall("<<\\w+>>", value): k = var.replace('<<', '').replace('>>', '') nv = STORED_VARS[k] if k in STORED_VARS else None if nv is not None: value = value.replace(var, str(nv)) if autonumify: try: return float(value) except Exception: pass return value
def load_classes(kind: str): import pkgutil import importlib paths = [] paths.append(os.path.join(os.path.dirname(__file__), "..", "plugins")) extra_path = os.getenv('PHAT_EXTRA_PLUGINS_DIR') if extra_path: extra_path = os.path.join(extra_path, "plugins") paths.append(extra_path) for path in paths: if path not in sys.path: sys.path.append(path) logger.debug("Loading {}s from {}: ".format(kind, paths)) loaded_items = [] for _, name, ispkg in pkgutil.walk_packages(path=paths): logger.debug("Loading {}: {}".format(kind, name)) if ispkg: try: module = importlib.import_module('{}.{}'.format(name, kind)) except ImportError as ex: logger.debug("Error importing module: {0} of kind: {1}".format(name, kind)) else: loaded_items.append(module) # TODO: Return the classes instead of modules return loaded_items
def include_files(self, filename): materialized_tests = [] f = open(filename, encoding='utf-8') if f: try: settings = json.load(f) except JSONDecodeError as e: data = open(filename, 'r').read() print("{filename}:{line}:{col}: failed to decode json: {msg}".format(filename=filename, line=e.lineno, col=e.colno, msg=e.msg)) print("\tGave up here: {context} ?".format( context=repr(data[max(0, e.pos - 40):e.pos + 1].translate(str.maketrans("\t\n", " "))))) exit(1) tests = settings["tests"] it = iter(tests) for item in it: if 'include' in item: inc_filename = item['include'] if not os.path.isabs(inc_filename): base_path = os.path.dirname(filename) inc_filename = os.path.join(base_path, inc_filename) inc = self.include_files(inc_filename) materialized_tests.extend(inc) else: materialized_tests.append(item) return materialized_tests
def fail(self, message, *, url=None): if url is None: url = self.url url = replace_vars(url) self.errors.append({"url": url, "error": message}) print("\N{BALLOT X}", end="", flush=True)
def chunkify(iterable, chunksize=3000): i = 0 chunk = [] for item in iterable: chunk.append(item) i += 1 if i == chunksize: yield chunk i = 0 chunk = [] if chunk: yield chunk
def get_blocks(self, block_nums): requests = ( { 'jsonrpc': '2.0', 'id': block_num, 'method': 'get_block', 'params': [block_num] } for block_num in block_nums) batched_requests = chunkify(requests, self.batch_request_size) coros = (self.fetch(batch) for batch in batched_requests) first_coros = islice(coros, 0, self.concurrent_tasks_limit) futures = [asyncio.ensure_future(c) for c in first_coros] logger.debug(f'inital futures:{len(futures)}') start = time.perf_counter() while futures: await asyncio.sleep(0) for f in futures: try: if f.done(): self._perf_history.append(time.perf_counter() - start) result = f.result() futures.remove(f) logger.debug(f'futures:{len(futures)}') try: futures.append(asyncio.ensure_future(next(coros))) except StopIteration as e: logger.debug('StopIteration') except concurrent.futures._base.CancelledError: return start = time.perf_counter() yield result except KeyboardInterrupt: logger.debug('client.get blocks kbi') for f in futures: f.cancel() self.close() return except Exception as e: logger.exception(f'client.get_blocks error:{e}') continue
def detectValidExtensions(self,extensions,maxN,extList=None) : self.logger.info("### Starting detection of valid extensions ...") n = 0 if extList : tmpExtList = [] for e in extList : tmpExtList.append((e,getMime(extensions,e))) else : tmpExtList = extensions validExtensions = [] extensionsToTest = tmpExtList[0:maxN] with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor : futures = [] try : for ext in extensionsToTest: f = executor.submit(self.uploadFile,"."+ext[0],ext[1],os.urandom(self.size)) f.ext = ext f.add_done_callback(self.detectValidExtension) futures.append(f) for future in concurrent.futures.as_completed(futures) : a = future.result() n += 1 except KeyboardInterrupt : self.shouldLog = False executor.shutdown(wait=False) self.stopThreads = True executor._threads.clear() concurrent.futures.thread._threads_queues.clear() return n #detects if code execution is gained, given an url to request and a regex supposed to match the executed code output
def detectForms(html) : soup = BeautifulSoup(html,'html.parser') detectedForms = soup.find_all("form") returnForms = [] if len(detectedForms) > 0 : for f in detectedForms : fileInputs = f.findChildren("input",{"type":"file"}) if len(fileInputs) > 0 : returnForms.append((f,fileInputs)) return returnForms