How do I use asyncio.gather() with exception handling to prevent one failing task from cancelling all others?

Asked 3 years, 2 months ago Modified 4 months ago Viewed 48,312 times
47
47

I have a list of coroutines that I want to run concurrently using asyncio.gather(). The problem is that when one task raises an exception, it seems to cancel the other tasks and propagate the error immediately. I want to collect all results (including errors) and handle them individually.

Here is my current code:

import asyncio

async def fetch_data(url, delay):
    await asyncio.sleep(delay)
    if delay == 2:
        raise ValueError(f"Failed to fetch: {url}")
    return f"data from {url}"

async def main():
    urls = ["http://api1.example.com", "http://api2.example.com", "http://api3.example.com"]
    delays = [1, 2, 0.5]

    results = await asyncio.gather(
        *[fetch_data(url, d) for url, d in zip(urls, delays)]
    )
    print(results)

asyncio.run(main())

When api2 fails, the entire gather raises and I never get the results from api1 or api3. How can I get all results and handle errors per-task without them cancelling each other?

12 Have you checked the return_exceptions parameter in the docs? It's exactly what you need here. – asyncio_guide Mar 15, 2022 at 10:12
4 Also worth looking at asyncio.TaskGroup if you're on Python 3.11+, it gives you better structured concurrency. – structured_concurrency_fan Mar 15, 2022 at 11:04
5 Answers
Highest score (default)
Date modified
Date created
127

Use the return_exceptions=True keyword argument on asyncio.gather(). With this flag, instead of raising on the first exception, gather returns all results including exceptions as objects in the result list.

import asyncio

async def fetch_data(url, delay):
    await asyncio.sleep(delay)
    if delay == 2:
        raise ValueError(f"Failed to fetch: {url}")
    return f"data from {url}"

async def main():
    urls = ["http://api1.example.com", "http://api2.example.com", "http://api3.example.com"]
    delays = [1, 2, 0.5]

    results = await asyncio.gather(
        *[fetch_data(url, d) for url, d in zip(urls, delays)],
        return_exceptions=True   # ← key change
    )

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"ERROR {url}: {result}")
        else:
            print(f"OK    {url}: {result}")

Output:

OK    http://api1.example.com: data from http://api1.example.com
ERROR http://api2.example.com: Failed to fetch: http://api2.example.com
OK    http://api3.example.com: data from http://api3.example.com

The tasks still run concurrently; the only difference is that exceptions are caught and returned as values rather than re-raised. You then inspect each result yourself.

Why this works: by default gather() propagates the first exception and cancels remaining tasks. return_exceptions=True treats exceptions like regular return values, letting all tasks run to completion.

8 Clean explanation. Worth noting that return_exceptions=True will also catch CancelledError in Python 3.8+, so you need to re-raise it explicitly if you want proper cancellation propagation. – concurrent_py Mar 15, 2022 at 11:33
3 @concurrent_py Good point — in Python 3.11+ CancelledError is excluded from the catch again when using TaskGroup, which makes this cleaner. – asyncio_expert Mar 15, 2022 at 12:01
{fill}
45

An alternative pattern if you need full control and want to wrap each coroutine individually is to write a thin helper that catches exceptions:

import asyncio

async def safe_call(coro):
    """Wraps a coroutine to catch and return exceptions instead of raising."""
    try:
        return await coro
    except Exception as e:
        return e

async def main():
    urls = ["http://api1.example.com", "http://api2.example.com", "http://api3.example.com"]
    delays = [1, 2, 0.5]

    results = await asyncio.gather(
        *[safe_call(fetch_data(url, d)) for url, d in zip(urls, delays)]
    )

    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"FAIL {url}: {result}")
        else:
            print(f"OK   {url}: {result}")

This gives you the same semantics as return_exceptions=True but lets you add custom logging, retry logic, or different exception types inside safe_call.

23

If you're on Python 3.11+, asyncio.TaskGroup is the modern structured-concurrency approach and handles errors differently. It collects all exceptions into an ExceptionGroup:

import asyncio

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(fetch_data("http://api1.example.com", 1))
            t2 = tg.create_task(fetch_data("http://api2.example.com", 2))
            t3 = tg.create_task(fetch_data("http://api3.example.com", 0.5))
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

    # Access completed task results
    for t in [t1, t3]:
        if not t.cancelled():
            print(t.result())

Note: TaskGroup does cancel sibling tasks on failure (by design), so it's not a drop-in replacement for return_exceptions=True. Use it when you want structured cancellation.

8

You can also use asyncio.ensure_future combined with per-task callbacks to handle errors without blocking:

import asyncio

def handle_result(task):
    try:
        result = task.result()
        print(f"Success: {result}")
    except Exception as e:
        print(f"Task failed: {e}")

async def main():
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(fetch_data(url, d))
        for url, d in zip(urls, delays)
    ]
    for task in tasks:
        task.add_done_callback(handle_result)
    await asyncio.gather(*tasks, return_exceptions=True)

This is more verbose but useful if you need side-effects (e.g., logging each completion in real time rather than waiting for all tasks to finish).

3

Simple wrapper approach if you don't want to import anything extra:

results = await asyncio.gather(*coros, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)]
successes = [r for r in results if not isinstance(r, Exception)]

if errors:
    print(f"{len(errors)} task(s) failed: {errors}")

Your Answer

Thanks for contributing an answer to Stack Overflow! Please be sure to answer the question. Provide details and share your research! But avoid asking for help, clarification, or responding to other answers.