asyncio-programming
Master asynchronous programming with asyncio, async/await, concurrent operations, and async frameworks
When & Why to Use This Skill
This Claude skill provides a comprehensive guide to mastering asynchronous programming in Python using the asyncio library. It covers fundamental concepts like the event loop and coroutines, advanced task management, and practical applications in web development and high-performance I/O operations. Designed for developers looking to enhance software scalability, it offers deep insights into writing efficient, non-blocking code using the async/await paradigm.
Use Cases
- Developing high-performance web scrapers that can handle thousands of concurrent network requests efficiently without blocking execution.
- Building real-time communication platforms, such as WebSocket-based chat servers or live notification systems, using aiohttp and FastAPI.
- Optimizing backend services by implementing non-blocking database queries and asynchronous file I/O to significantly improve system throughput.
- Refactoring synchronous Python scripts into modern asynchronous patterns to reduce latency and improve resource utilization in data-intensive tasks.
| name | Asyncio Programming |
|---|---|
| description | Master asynchronous programming with asyncio, async/await, concurrent operations, and async frameworks |
| version | "2.1.0" |
| sasmp_version | "1.3.0" |
| bonded_agent | 05-async-concurrency |
| bond_type | PRIMARY_BOND |
| retry_strategy | exponential_backoff |
| logging | true |
| metrics | task_completion_rate |
Asyncio Programming
Overview
Master asynchronous programming in Python with asyncio. Learn to write concurrent code that efficiently handles I/O-bound operations, build async web applications, and understand the async/await paradigm.
Learning Objectives
- Understand asynchronous programming concepts
- Write async functions with async/await syntax
- Manage concurrent operations with asyncio
- Build async web applications
- Handle async I/O operations efficiently
- Debug and test async code
Core Topics
1. Async/Await Basics
- Understanding coroutines
- async/await syntax
- Event loop fundamentals
- Running async functions
- Async vs sync execution
- Common pitfalls
Code Example:
import asyncio
import time
# Synchronous version (slow)
def fetch_data_sync(url):
print(f"Fetching {url}...")
time.sleep(2) # Simulating network delay
return f"Data from {url}"
def main_sync():
urls = ['url1', 'url2', 'url3']
results = []
for url in urls:
data = fetch_data_sync(url)
results.append(data)
return results
# Takes 6 seconds (2 * 3)
start = time.time()
main_sync()
print(f"Sync took: {time.time() - start:.2f}s")
# Asynchronous version (fast)
async def fetch_data_async(url):
print(f"Fetching {url}...")
await asyncio.sleep(2) # Non-blocking sleep
return f"Data from {url}"
async def main_async():
urls = ['url1', 'url2', 'url3']
# Create tasks and run concurrently
tasks = [fetch_data_async(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Takes 2 seconds (concurrent execution)
start = time.time()
asyncio.run(main_async())
print(f"Async took: {time.time() - start:.2f}s")
2. Asyncio Tasks & Coroutines
- Creating and managing tasks
- asyncio.gather() vs asyncio.wait()
- Task cancellation
- Task groups (Python 3.11+)
- Exception handling in tasks
- Timeouts
Code Example:
import asyncio
async def process_item(item_id, delay):
print(f"Processing item {item_id}")
await asyncio.sleep(delay)
if item_id == 3:
raise ValueError(f"Item {item_id} failed!")
return f"Result {item_id}"
async def main():
# Method 1: gather (returns results in order)
tasks = [
process_item(1, 1),
process_item(2, 2),
process_item(3, 1),
]
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} result: {result}")
except Exception as e:
print(f"Error: {e}")
# Method 2: wait (returns done/pending sets)
tasks = [
asyncio.create_task(process_item(i, i))
for i in range(1, 4)
]
done, pending = await asyncio.wait(tasks, timeout=2.5)
print(f"Completed: {len(done)}, Pending: {len(pending)}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Method 3: Task groups (Python 3.11+)
async with asyncio.TaskGroup() as tg:
for i in range(1, 4):
tg.create_task(process_item(i, 1))
# All tasks completed or exception raised
asyncio.run(main())
3. Async I/O Operations
- Async file operations (aiofiles)
- Async HTTP requests (aiohttp)
- Async database operations (asyncpg, motor)
- Async messaging (aio-pika)
- Streams and protocols
Code Example:
import asyncio
import aiohttp
import aiofiles
from typing import List
# Async HTTP requests
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_multiple_urls(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Async file operations
async def read_file_async(filepath):
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content
async def write_file_async(filepath, content):
async with aiofiles.open(filepath, 'w') as f:
await f.write(content)
# Async database operations (example with asyncpg)
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='localhost'
)
try:
rows = await conn.fetch('SELECT * FROM users')
return rows
finally:
await conn.close()
# Usage
async def main():
# Fetch URLs concurrently
urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
]
results = await fetch_multiple_urls(urls)
# Read/write files
content = await read_file_async('input.txt')
await write_file_async('output.txt', content.upper())
# Database operations
users = await fetch_users()
print(f"Found {len(users)} users")
asyncio.run(main())
4. Async Web Frameworks
- FastAPI async routes
- aiohttp web server
- WebSocket handling
- Background tasks
- Middleware and dependencies
Code Example:
# FastAPI async example
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
# Async route
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Async database call
user = await fetch_user_from_db(user_id)
return user
# Background task
async def send_notification(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f"Sent email to {email}: {message}")
@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
# Process order synchronously
order_id = save_order(order_data)
# Send notification in background
background_tasks.add_task(
send_notification,
order_data['customer_email'],
f"Order #{order_id} created"
)
return {"order_id": order_id}
# aiohttp web server
from aiohttp import web
async def handle_request(request):
name = request.match_info.get('name', 'Anonymous')
await asyncio.sleep(1) # Async operation
return web.json_response({'message': f'Hello {name}'})
app = web.Application()
app.add_routes([web.get('/{name}', handle_request)])
# WebSocket example
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
elif msg.type == web.WSMsgType.ERROR:
print(f'Error: {ws.exception()}')
return ws
app.add_routes([web.get('/ws', websocket_handler)])
Hands-On Practice
Project 1: Async Web Scraper
Build a concurrent web scraper with rate limiting.
Requirements:
- Scrape multiple websites concurrently
- Implement rate limiting
- Handle errors gracefully
- Save results to async database
- Progress tracking
- Retry failed requests
Key Skills: aiohttp, async I/O, error handling
Project 2: Real-time Chat Server
Create a WebSocket-based chat application.
Requirements:
- WebSocket server with aiohttp
- Multiple chat rooms
- User authentication
- Message broadcasting
- Connection management
- Message history persistence
Key Skills: WebSockets, async server, state management
Project 3: Async Task Queue
Build a distributed task processing system.
Requirements:
- Task queue with Redis/RabbitMQ
- Worker pool management
- Task prioritization
- Result caching
- Progress monitoring
- Graceful shutdown
Key Skills: Message queues, concurrent workers, cleanup
Assessment Criteria
- Understand async/await semantics
- Write efficient concurrent code
- Handle async exceptions properly
- Use asyncio tasks effectively
- Build async web applications
- Debug async code
- Manage async resources (cleanup)
Resources
Official Documentation
- asyncio Docs - Official documentation
- aiohttp - Async HTTP client/server
- FastAPI - Modern async web framework
Learning Platforms
- Using Asyncio in Python - Real Python guide
- Python Concurrency - O'Reilly book
- Asyncio Recipes - Practical examples
Tools
- aiohttp - Async HTTP
- aiofiles - Async file I/O
- asyncpg - Async PostgreSQL
- aiodebug - Async debugging
Next Steps
After mastering asyncio, explore:
- Multiprocessing - CPU-bound parallelism
- Celery - Distributed task queue
- gRPC - Async RPC framework
- Kafka - Async event streaming