【2026年版】Python非同期プログラミング完全ガイド:asyncio・タスク管理・エラーハンドリング

Tech Trends AI
- 8 minutes read - 1558 wordsはじめに
Pythonの非同期プログラミングは、I/Oバウンドな処理のパフォーマンスを劇的に向上させる強力な手法です。2026年現在、AI APIの呼び出し、Webスクレイピング、マイクロサービス間通信など、非同期処理が求められるユースケースは急増しています。
しかし、asyncioの学習曲線は決して低くありません。同期処理との思考の切り替え、タスクのライフサイクル管理、適切なエラーハンドリングなど、理解すべき概念が多くあります。
本記事では、asyncioの基礎から実践的なパターンまでを体系的に解説し、本番環境で使える非同期コードを書くための知識を提供します。
同期処理と非同期処理の違い
処理モデルの比較
| 処理モデル | 仕組み | 適したユースケース | Python実装 |
|---|---|---|---|
| 同期処理 | 1つずつ順番に実行 | 単純なスクリプト | 通常のPythonコード |
| マルチスレッド | OS スレッドで並行実行 | I/Oバウンド(GIL制約あり) | threading |
| マルチプロセス | 別プロセスで並列実行 | CPUバウンド | multiprocessing |
| 非同期(asyncio) | イベントループで協調的に並行実行 | 大量のI/Oバウンド | asyncio |
パフォーマンス比較の例
100件のHTTPリクエストを処理する場合の比較です。
import asyncio
import time
import httpx
URLS = [f"https://httpbin.org/delay/1" for _ in range(100)]
# 同期処理: 約100秒
def sync_fetch():
with httpx.Client() as client:
for url in URLS:
client.get(url)
# 非同期処理: 約2〜3秒
async def async_fetch():
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in URLS]
await asyncio.gather(*tasks)
# 計測
start = time.perf_counter()
asyncio.run(async_fetch())
elapsed = time.perf_counter() - start
print(f"非同期処理: {elapsed:.2f}秒")
asyncioの基礎
コルーチンとは
async defで定義された関数はコルーチン関数であり、呼び出すとコルーチンオブジェクトを返します。awaitでコルーチンの完了を待機します。
import asyncio
# コルーチン関数の定義
async def greet(name: str) -> str:
await asyncio.sleep(1) # I/O待ちをシミュレート
return f"Hello, {name}!"
# コルーチンの実行
async def main():
result = await greet("World")
print(result)
# イベントループで実行
asyncio.run(main())
イベントループの仕組み
イベントループ
│
├── タスクA: await中(I/O待ち)→ 一時停止
│
├── タスクB: 実行中 → awaitに到達 → 一時停止
│
├── タスクC: I/O完了 → 再開・実行
│
└── タスクA: I/O完了 → 再開・実行
※ シングルスレッドで協調的に切り替え
重要な概念
| 概念 | 説明 | コード例 |
|---|---|---|
| コルーチン | async defで定義された関数 | async def foo(): ... |
| await | コルーチンの完了を待機 | result = await foo() |
| タスク | コルーチンをイベントループにスケジュール | task = asyncio.create_task(foo()) |
| Future | 将来の結果を表すオブジェクト | loop.create_future() |
| イベントループ | 非同期タスクを管理・実行するエンジン | asyncio.run(main()) |
タスク管理
タスクの作成と実行
import asyncio
async def fetch_data(url: str) -> dict:
"""データ取得のシミュレーション"""
await asyncio.sleep(1)
return {"url": url, "status": "ok"}
async def main():
# 方法1: asyncio.create_task(推奨)
task1 = asyncio.create_task(fetch_data("https://api.example.com/users"))
task2 = asyncio.create_task(fetch_data("https://api.example.com/items"))
# 両タスクの完了を待機
result1 = await task1
result2 = await task2
print(result1, result2)
# 方法2: asyncio.gather(複数タスクの一括実行)
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/items"),
fetch_data("https://api.example.com/orders"),
)
print(results)
asyncio.run(main())
asyncio.gatherの詳細
import asyncio
async def success_task(n: int) -> str:
await asyncio.sleep(n)
return f"Task {n} completed"
async def failing_task() -> str:
await asyncio.sleep(0.5)
raise ValueError("Something went wrong")
async def main():
# return_exceptions=False(デフォルト): 最初の例外で全体が失敗
try:
results = await asyncio.gather(
success_task(1),
failing_task(),
success_task(2),
)
except ValueError as e:
print(f"エラー発生: {e}")
# return_exceptions=True: 例外も結果として返す
results = await asyncio.gather(
success_task(1),
failing_task(),
success_task(2),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"エラー: {r}")
else:
print(f"成功: {r}")
asyncio.run(main())
TaskGroupによる構造化された並行処理(Python 3.11+)
import asyncio
async def process_item(item: str) -> str:
await asyncio.sleep(1)
if item == "bad":
raise ValueError(f"Invalid item: {item}")
return f"Processed: {item}"
async def main():
results = []
try:
async with asyncio.TaskGroup() as tg:
tasks = []
for item in ["a", "b", "c", "d"]:
task = tg.create_task(process_item(item))
tasks.append(task)
# TaskGroup内のすべてのタスクが完了
results = [t.result() for t in tasks]
print(results)
except* ValueError as eg:
# ExceptionGroupで複数のエラーをハンドリング
for exc in eg.exceptions:
print(f"エラー: {exc}")
asyncio.run(main())
並行処理パターン
パターン1:セマフォによる同時実行数の制御
import asyncio
import httpx
class RateLimitedClient:
"""同時リクエスト数を制御するHTTPクライアント"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = httpx.AsyncClient(timeout=30.0)
async def fetch(self, url: str) -> dict:
async with self.semaphore:
response = await self.client.get(url)
return {"url": url, "status": response.status_code}
async def fetch_all(self, urls: list[str]) -> list[dict]:
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
async def close(self):
await self.client.aclose()
# 使用例
async def main():
client = RateLimitedClient(max_concurrent=5)
urls = [f"https://httpbin.org/get?id={i}" for i in range(50)]
try:
results = await client.fetch_all(urls)
print(f"取得完了: {len(results)}件")
finally:
await client.close()
asyncio.run(main())
パターン2:プロデューサー/コンシューマー
import asyncio
from dataclasses import dataclass
@dataclass
class Job:
id: int
data: str
async def producer(queue: asyncio.Queue, num_items: int):
"""ジョブを生成してキューに投入"""
for i in range(num_items):
job = Job(id=i, data=f"item_{i}")
await queue.put(job)
print(f"Produced: {job.id}")
await asyncio.sleep(0.1)
# 終了シグナル
await queue.put(None)
async def consumer(queue: asyncio.Queue, worker_id: int):
"""キューからジョブを取得して処理"""
while True:
job = await queue.get()
if job is None:
# 終了シグナルを次のワーカーに伝播
await queue.put(None)
break
# ジョブの処理
await asyncio.sleep(0.5) # 処理のシミュレーション
print(f"Worker {worker_id}: Processed job {job.id}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 1プロデューサー + 3コンシューマー
producer_task = asyncio.create_task(producer(queue, 20))
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
await producer_task
await asyncio.gather(*consumers)
asyncio.run(main())
パターン3:タイムアウト制御
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(10)
return "完了"
async def main():
# 方法1: asyncio.wait_for
try:
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("タイムアウト: 3秒以内に完了しませんでした")
# 方法2: asyncio.timeout(Python 3.11+、推奨)
try:
async with asyncio.timeout(3.0):
result = await slow_operation()
print(result)
except TimeoutError:
print("タイムアウト: 3秒以内に完了しませんでした")
# 方法3: asyncio.wait で最初の完了を待つ
tasks = {
asyncio.create_task(slow_operation()),
asyncio.create_task(asyncio.sleep(2, result="タイムアウト")),
}
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
for task in done:
print(f"結果: {task.result()}")
asyncio.run(main())
エラーハンドリング
基本的なエラーハンドリング
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def risky_operation(item_id: int) -> dict:
"""失敗する可能性のある処理"""
if item_id % 3 == 0:
raise ConnectionError(f"接続エラー: item {item_id}")
if item_id % 7 == 0:
raise ValueError(f"不正な値: item {item_id}")
await asyncio.sleep(0.1)
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> dict:
"""エラーハンドリング付きの処理"""
try:
return await risky_operation(item_id)
except ConnectionError as e:
logger.warning(f"接続エラー(リトライ可能): {e}")
return {"id": item_id, "status": "connection_error"}
except ValueError as e:
logger.error(f"バリデーションエラー: {e}")
return {"id": item_id, "status": "validation_error"}
except Exception as e:
logger.exception(f"予期しないエラー: {e}")
return {"id": item_id, "status": "unknown_error"}
async def main():
results = await asyncio.gather(
*[safe_operation(i) for i in range(20)]
)
success = [r for r in results if r["status"] == "success"]
errors = [r for r in results if r["status"] != "success"]
print(f"成功: {len(success)}件, エラー: {len(errors)}件")
asyncio.run(main())
リトライパターン
import asyncio
import random
from functools import wraps
from typing import TypeVar, Callable
T = TypeVar("T")
def async_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_backoff: bool = True,
retry_on: tuple[type[Exception], ...] = (Exception,),
):
"""非同期リトライデコレータ(指数バックオフ対応)"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except retry_on as e:
last_exception = e
if attempt == max_retries:
break
if exponential_backoff:
delay = min(
base_delay * (2 ** attempt) + random.uniform(0, 1),
max_delay,
)
else:
delay = base_delay
logger.warning(
f"{func.__name__} 失敗 (試行 {attempt + 1}/{max_retries + 1}): "
f"{e}. {delay:.1f}秒後にリトライ"
)
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# 使用例
@async_retry(max_retries=3, base_delay=1.0, retry_on=(ConnectionError, TimeoutError))
async def fetch_with_retry(url: str) -> dict:
"""リトライ付きのHTTPリクエスト"""
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
タスクのキャンセル処理
import asyncio
async def cancellable_task(name: str):
"""キャンセル可能なタスク"""
try:
print(f"{name}: 開始")
while True:
await asyncio.sleep(1)
print(f"{name}: 実行中...")
except asyncio.CancelledError:
# クリーンアップ処理
print(f"{name}: キャンセルされました。クリーンアップ中...")
await asyncio.sleep(0.1) # クリーンアップのシミュレーション
print(f"{name}: クリーンアップ完了")
raise # CancelledErrorを再送出
async def main():
task = asyncio.create_task(cancellable_task("WorkerA"))
# 3秒後にキャンセル
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("タスクがキャンセルされました")
asyncio.run(main())
実践的なユースケース
AI APIの並行呼び出し
import asyncio
import httpx
from dataclasses import dataclass
@dataclass
class LLMResponse:
model: str
content: str
tokens_used: int
latency_ms: float
class AIAPIClient:
"""複数のAI APIを並行で呼び出すクライアント"""
def __init__(self):
self.client = httpx.AsyncClient(timeout=60.0)
self.semaphore = asyncio.Semaphore(5)
async def call_openai(self, prompt: str) -> LLMResponse:
async with self.semaphore:
start = asyncio.get_event_loop().time()
response = await self.client.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": "Bearer $OPENAI_API_KEY"},
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
},
)
elapsed = (asyncio.get_event_loop().time() - start) * 1000
data = response.json()
return LLMResponse(
model="gpt-4o",
content=data["choices"][0]["message"]["content"],
tokens_used=data["usage"]["total_tokens"],
latency_ms=elapsed,
)
async def call_multiple_models(
self, prompt: str, models: list[str]
) -> list[LLMResponse]:
"""複数モデルに同じプロンプトを並行送信"""
tasks = []
for model in models:
if model.startswith("gpt"):
tasks.append(self.call_openai(prompt))
# 他のモデルも同様に追加
return await asyncio.gather(*tasks, return_exceptions=True)
async def batch_process(
self, prompts: list[str], batch_size: int = 10
) -> list[LLMResponse]:
"""バッチ処理で大量のプロンプトを効率的に処理"""
results = []
for i in range(0, len(prompts), batch_size):
batch = prompts[i : i + batch_size]
batch_results = await asyncio.gather(
*[self.call_openai(p) for p in batch],
return_exceptions=True,
)
results.extend(batch_results)
# レート制限を考慮した待機
await asyncio.sleep(1.0)
return results
async def close(self):
await self.client.aclose()
非同期Webスクレイパー
import asyncio
import httpx
from dataclasses import dataclass, field
@dataclass
class ScrapeResult:
url: str
status_code: int
content_length: int
success: bool
error: str = ""
class AsyncScraper:
"""非同期Webスクレイパー"""
def __init__(self, max_concurrent: int = 10, delay: float = 0.5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.delay = delay
self.results: list[ScrapeResult] = []
async def scrape_url(self, client: httpx.AsyncClient, url: str) -> ScrapeResult:
async with self.semaphore:
try:
response = await client.get(url, follow_redirects=True)
await asyncio.sleep(self.delay) # ポリトネス遅延
return ScrapeResult(
url=url,
status_code=response.status_code,
content_length=len(response.content),
success=response.status_code == 200,
)
except httpx.TimeoutException:
return ScrapeResult(
url=url, status_code=0, content_length=0,
success=False, error="Timeout",
)
except Exception as e:
return ScrapeResult(
url=url, status_code=0, content_length=0,
success=False, error=str(e),
)
async def scrape_all(self, urls: list[str]) -> list[ScrapeResult]:
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = [self.scrape_url(client, url) for url in urls]
self.results = await asyncio.gather(*tasks)
return self.results
def summary(self) -> dict:
success = sum(1 for r in self.results if r.success)
failed = len(self.results) - success
return {"total": len(self.results), "success": success, "failed": failed}
パフォーマンス最適化
asyncioのパフォーマンスTips
| Tips | 説明 | 効果 |
|---|---|---|
| uvloopの使用 | Cython実装のイベントループ | 2〜4倍の速度向上 |
| 接続プールの活用 | HTTPクライアントの再利用 | 接続確立のオーバーヘッド削減 |
| バッチ処理 | 小さなタスクをまとめて実行 | タスクスケジューリングの最適化 |
| セマフォの適切な設定 | 同時実行数の制御 | リソース枯渇の防止 |
| 不要なawaitの回避 | 同期処理をawaitしない | イベントループの負荷軽減 |
uvloopの導入
import asyncio
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
print("uvloop を使用中")
except ImportError:
print("uvloop が利用できないため、デフォルトのイベントループを使用")
async def main():
# uvloopが有効な場合、自動的に高速なイベントループが使用される
pass
asyncio.run(main())
同期処理との共存
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
async def main():
loop = asyncio.get_event_loop()
# CPU集約型の処理はProcessPoolExecutorで実行
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_task, data)
# ブロッキングI/OはThreadPoolExecutorで実行
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_io_task, filepath)
def cpu_heavy_task(data):
"""CPU集約型の処理(別プロセスで実行)"""
return sum(x ** 2 for x in range(10_000_000))
def blocking_io_task(filepath):
"""ブロッキングI/O(別スレッドで実行)"""
with open(filepath, "r") as f:
return f.read()
デバッグとテスト
asyncioのデバッグモード
import asyncio
# デバッグモードの有効化
asyncio.run(main(), debug=True)
# 環境変数でも有効化可能
# PYTHONASYNCIODEBUG=1 python script.py
pytestでの非同期テスト
import pytest
import asyncio
# pytest-asyncio を使用
@pytest.mark.asyncio
async def test_fetch_data():
"""非同期関数のテスト"""
result = await fetch_data("https://api.example.com/test")
assert result["status"] == "ok"
@pytest.mark.asyncio
async def test_concurrent_fetch():
"""並行処理のテスト"""
results = await asyncio.gather(
fetch_data("https://api.example.com/1"),
fetch_data("https://api.example.com/2"),
)
assert len(results) == 2
assert all(r["status"] == "ok" for r in results)
@pytest.mark.asyncio
async def test_timeout():
"""タイムアウトのテスト"""
with pytest.raises(TimeoutError):
async with asyncio.timeout(0.1):
await asyncio.sleep(10)
@pytest.mark.asyncio
async def test_cancellation():
"""キャンセルのテスト"""
task = asyncio.create_task(asyncio.sleep(10))
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
よくある落とし穴と対策
アンチパターン一覧
| アンチパターン | 問題 | 正しいアプローチ |
|---|---|---|
asyncio.run()のネスト | 既にイベントループが実行中だとエラー | awaitで呼び出す |
time.sleep()の使用 | イベントループをブロック | asyncio.sleep()を使用 |
| タスクの参照を保持しない | GCで回収される可能性 | 変数に保持するかgatherで管理 |
| 例外の握りつぶし | タスクの例外が見過ごされる | return_exceptions=Trueまたはtry/except |
| 大量タスクの一括作成 | メモリ消費が急増 | セマフォまたはバッチ処理で制御 |
具体例:よくある間違い
# 間違い: コルーチンをawaitせずに呼び出す
async def bad_example():
fetch_data("https://api.example.com") # コルーチンが実行されない!
# 正しい: awaitで実行を待機
async def good_example():
await fetch_data("https://api.example.com")
# 間違い: forループで順次await(非効率)
async def sequential_bad():
results = []
for url in urls:
result = await fetch(url) # 1つずつ順番に処理
results.append(result)
# 正しい: gatherで並行処理
async def concurrent_good():
results = await asyncio.gather(*[fetch(url) for url in urls])
まとめ
Pythonの非同期プログラミングは、I/Oバウンドな処理のパフォーマンスを劇的に改善する強力な手法です。2026年のPython開発で押さえるべきポイントを整理します。
- asyncioの基本:
async defでコルーチンを定義し、awaitで待機。asyncio.run()でイベントループを起動 - タスク管理:
asyncio.TaskGroup(Python 3.11+)が構造化された並行処理に最適。asyncio.gatherは簡易的な並行実行に便利 - セマフォで制御:
asyncio.Semaphoreで同時実行数を制限し、バックエンドやAPIへの過負荷を防止 - エラーハンドリング: 指数バックオフ付きリトライと
CancelledErrorの適切な処理が重要 - パフォーマンス: uvloopの導入、接続プールの再利用、CPU集約型処理の
ProcessPoolExecutorへの委譲
非同期プログラミングは、AI APIの並行呼び出し、大量のWebリクエスト処理、マイクロサービス間通信など、現代のPython開発で不可欠なスキルです。基礎を固めた上で、実践パターンを身につけましょう。