我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用concurrent.futures.FIRST_EXCEPTION。
def run(self, funcs): """Run a set of functions in parallel, returning their results. Make sure any function you pass exits with a reasonable timeout. If it doesn't return within the timeout or the result is ignored due an exception in a separate thread it will continue to stick around until it finishes, including blocking process exit. Args: funcs: An iterable of functions or iterable of args to functools.partial. Returns: A list of return values with the values matching the order in funcs. Raises: Propagates the first exception encountered in one of the functions. """ funcs = [f if callable(f) else functools.partial(*f) for f in funcs] if len(funcs) == 1: # Ignore threads if it's not needed. return [funcs[0]()] if len(funcs) > self._workers: # Lazy init and grow as needed. self.shutdown() self._workers = len(funcs) self._executor = futures.ThreadPoolExecutor(self._workers) futs = [self._executor.submit(f) for f in funcs] done, not_done = futures.wait(futs, self._timeout, futures.FIRST_EXCEPTION) # Make sure to propagate any exceptions. for f in done: if not f.cancelled() and f.exception() is not None: if not_done: # If there are some calls that haven't finished, cancel and recreate # the thread pool. Otherwise we may have a thread running forever # blocking parallel calls. for nd in not_done: nd.cancel() self.shutdown(False) # Don't wait, they may be deadlocked. raise f.exception() # Either done or timed out, so don't wait again. return [f.result(timeout=0) for f in futs]
def test_first_exception(self): future1 = self.executor.submit(mul, 2, 21) future2 = self.executor.submit(sleep_and_raise, 1.5) future3 = self.executor.submit(time.sleep, 3) finished, pending = futures.wait( [future1, future2, future3], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([future1, future2]), finished) self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self): future1 = self.executor.submit(divmod, 21, 0) future2 = self.executor.submit(time.sleep, 1.5) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1, future2], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1]), finished) self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_first_exception_one_already_failed(self): future1 = self.executor.submit(time.sleep, 2) finished, pending = futures.wait( [EXCEPTION_FUTURE, future1], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([EXCEPTION_FUTURE]), finished) self.assertEqual(set([future1]), pending)
def get_comments_of_top_stories(loop, session, limit, iteration): """Retrieve top stories in HN. """ fetcher = URLFetcher() # create a new fetcher for this task try: response = await fetcher.fetch(session, TOP_STORIES_URL) except BoomException as e: log.error("Error retrieving top stories: {}".format(e)) # return instead of re-raising as it will go unnoticed return except Exception as e: # catch generic exceptions log.error("Unexpected exception: {}".format(e)) return tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] # return on first exception to cancel any pending tasks done, pending = await asyncio.wait(tasks, return_when=FIRST_EXCEPTION) # cancel any pending tasks, the tuple could be empty so it's safe for pending_task in pending: pending_task.cancel() # process the done tasks for done_task in done: # one of the Tasks could raise an exception try: print("Post ??? has {} comments ({})".format( done_task.result(), iteration)) except BoomException as e: print("Error retrieving comments for top stories: {}".format(e)) return fetcher.fetch_counter
def get_comments_of_top_stories(loop, session, limit, iteration): """Retrieve top stories in HN. """ fetcher = URLFetcher() # create a new fetcher for this task try: response = await fetcher.fetch(session, TOP_STORIES_URL) except BoomException as e: log.error("Error retrieving top stories: {}".format(e)) # return instead of re-raising as it will go unnoticed return except Exception as e: # catch generic exceptions log.error("Unexpected exception: {}".format(e)) return tasks = { asyncio.ensure_future( post_number_of_comments(loop, session, fetcher, post_id) ): post_id for post_id in response[:limit]} # return on first exception to cancel any pending tasks done, pending = await asyncio.wait( tasks.keys(), return_when=FIRST_EXCEPTION) # if there are pending tasks is because there was an exception # cancel any pending tasks for pending_task in pending: pending_task.cancel() # process the done tasks for done_task in done: # if an exception is raised one of the Tasks will raise try: print("Post {} has {} comments ({})".format( tasks[done_task], done_task.result(), iteration)) except BoomException as e: print("Error retrieving comments for top stories: {}".format(e)) return fetcher.fetch_counter
def get_comments_of_top_stories(loop, session, limit, iteration): """Retrieve top stories in HN. """ fetcher = URLFetcher() # create a new fetcher for this task try: response = await fetcher.fetch(session, TOP_STORIES_URL) except BoomException as e: log.error("Error retrieving top stories: {}".format(e)) # return instead of re-raising as it will go unnoticed return except Exception as e: # catch generic exceptions log.error("Unexpected exception: {}".format(e)) return tasks = { asyncio.ensure_future( post_number_of_comments(loop, session, fetcher, post_id) ): post_id for post_id in response[:limit]} # return on first exception to cancel any pending tasks done, pending = await asyncio.shield(asyncio.wait( tasks.keys(), return_when=FIRST_EXCEPTION)) # if there are pending tasks is because there was an exception # cancel any pending tasks for pending_task in pending: pending_task.cancel() # process the done tasks for done_task in done: # if an exception is raised one of the Tasks will raise try: print("Post {} has {} comments ({})".format( tasks[done_task], done_task.result(), iteration)) except BoomException as e: print("Error retrieving comments for top stories: {}".format(e)) return fetcher.fetch_counter