AsyncIO nâng cao trong Python: Task, Future, Queue, Lock, Semaphore
Cùng mình tìm hiểu thêm một số khái niệm về Asynco IO nâng cao trong Python
Trong bài viết trước, chúng ta đã tìm hiểu về lập trình bất đồng bộ trong Python, thư viện asyncio
và một số ví dụ cơ bản (bạn có thể xem lại tại đây).
Tiếp nối nội dung đó, bài này sẽ đi sâu vào các khái niệm nâng cao gồm: Task, Future, Queue, Lock và Semaphore. Việc nắm vững những khái niệm này sẽ giúp bạn chủ động và tự tin hơn khi làm việc với asyncio
.
Coroutines, Task và Future
Đầu tiên, hãy cũng nhau tìm hiểu lại khái niệm coroutines. Ví dụ dưới đây khai báo một coroutine object bằng các sử dụng cú pháp async/await
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
hello
word
Nếu chỉ đơn giản gọi hàm main, một coroutine sẽ không được thực thi
main()
<coroutine object main at 0x1053bb7c8>
Để chạy một coroutine, asyncio cung cấp một vài cách sau:
Dùng
asyncio.run như ví dụ trên
Awaiting một coroutine. Ví dụ sau sử dung awaiting. Để ý chúng ta sẽ tốn 3 giây vì 2 coroutine này chạy lần lượt, hoàn tất mới xong cái tiếp theo
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) ## Output: started at 17:13:52 hello world finished at 17:13:55
asyncio.create_task: dùng để chạy các coroutine concurrently. Chúng ta sẽ modify lại ví dụ trên bằng các chạy 2 coroutine say_after concurrently. Chúng ta chú ý, output ở ví dụ này chỉ tốn khoảng 2 giây, thay vì 3 giây như ví dụ trên, vì 2 task chạy concurrently.
async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") Output: started at 17:14:32 hello world finished at 17:14:34
asyncio.TaskGroup,
đây là cách hiện đại hơn thay thế create_task(). Output sẽ tương tự như ví dụ ở create_taskasync def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task( say_after(1, 'hello')) task2 = tg.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # The await is implicit when the context manager exits. print(f"finished at {time.strftime('%X')}")
Awaitables
Một object trong Python được gọi là awaitable nếu nó được sử dụng trong “await” expression. Có ba loại object awaitable chính: coroutines, Task and Futures.
Coroutins và Task chúng ta đã tìm hiểu ở phần trước. Còn Futures là trường hợp đặc biết, một low-level awaitable objects đại diện cho một eventual result of an asynchronous operation.
Khi một Future được await
, nghĩa là coroutine sẽ tạm dừng và chờ cho đến khi Future đó được xử lý (resolve) ở một nơi khác. Trong asyncio
, Future được dùng để kết nối giữa cơ chế callback truyền thống và cú pháp async/await
. Thông thường, lập trình viên ứng dụng không cần phải tự tạo Future. Tuy nhiên, một số thư viện hoặc API của asyncio
có thể trả về Future, và những đối tượng này hoàn toàn có thể được await
.
Synchronization Primitives
Synchronization Primitives - các công cụ đồng bộ trong asyncio là những cơ chế giúp nhiều coroutine chia sẻ tài nguyên chung một các an toàn, tránh tính trạng race condition hoặc deadlock. Những cơ chế, công cụ này giống như trong threading module nhưng cần chú ý một số điểm:
asyncio primitives are not thread-safe, nếu muốn thread-safe thì dùng (https://docs.python.org/3/library/threading.html#module-threading)
Không có tham số timeout trong các synchronization primitives, nếu cần timeout thì sử dụng asyncio.wait_for() function.
Cùng tìm hiểu các công cụ cơ bản: Lock, Event, Condition, Semaphore/BoundedSemaphore và Barrier
Lock
Chi cho phép một coroutine truy cập vào shared resource. Chú ý không thread-safe.
lock = asyncio.Lock()
async with lock:
# code an toàn
Event
Dùng để notify nhiều asyncio task khi có một sự kiện xảy ra. Hoạt động như một flag có 2 trạng thái:
Chưa được set (False) → các coroutine nào
await event.wait()
sẽ bị chặnĐược set (
True
) → tất cả coroutine đang chờ sẽ tiếp tục chạy ngay lập tức.import asyncio async def waiter(event, name): print(f"{name} is waiting for the event...") await event.wait() # chờ cho đến khi event được set print(f"{name} received the event!") async def main(): event = asyncio.Event() # 3 coroutine đang chờ tín hiệu tasks = [asyncio.create_task(waiter(event, f"Task{i}")) for i in range(3)] print("Main: sleeping for 2 seconds before setting event") await asyncio.sleep(2) print("Main: setting the event") event.set() # phát tín hiệu → tất cả Task0,1,2 chạy tiếp await asyncio.gather(*tasks) asyncio.run(main()) #### Output: Task0 is waiting for the event... Task1 is waiting for the event... Task2 is waiting for the event... Main: sleeping for 2 seconds before setting event Main: setting the event Task0 received the event! Task1 received the event! Task2 received the event!
Condition
Cao cấp hơn Event
, cho phép nhiều coroutine chờ trên cùng một điều kiện và được thông báo khi điều kiện thay đổi.
Semaphore/BoundedSemaphore
Giới hạn số coroutine được phép truy cập vào tài nguyên cùng lúc (thay vì chỉ 1 như Lock
).
Ví dụ: Giới hạn 3 kết nối DB đồng thời.
sem = asyncio.Semaphore(3)
async with sem:
# tối đa 3 coroutine vào cùng lúc
Barrier
Đồng bộ nhiều coroutine, bắt buộc chúng chờ đủ số lượng trước khi cùng tiến tiếp.
Áp dụng: Web Crawler Bất Đồng Bộ
Hãy cùng nhau xem cách sử dụng các công cụ trên trong một ví dụ cụ thể. Ví dụ chúng ta đang xây dựng web crawler tải nhiều URL vì vậy có thể tận dụng asyncio
primitives như sau:
Queue: quản lý danh sách URL (Producer–Consumer).
Lock: đảm bảo ghi log không bị rối.
Semaphore: giới hạn số request đồng thời (tránh bị block).
Event: báo hiệu dừng khi hết URL hoặc người dùng ngắt crawler.
import asyncio
import aiohttp
async def fetch(session, url, sem, lock):
async with sem: # Semaphore: giới hạn số request đồng thời
async with session.get(url) as resp:
data = await resp.text()
async with lock: # Lock: đảm bảo ghi log an toàn
print(f"Fetched {url} with {len(data)} chars")
return data
async def worker(name, queue, session, sem, lock, stop_event):
while not stop_event.is_set():
try:
url = await asyncio.wait_for(queue.get(), timeout=2.0)
except asyncio.TimeoutError:
# Nếu queue trống lâu quá → có thể dừng
stop_event.set()
break
await fetch(session, url, sem, lock)
queue.task_done()
async def main():
urls = [
"https://example.com",
"https://httpbin.org/get",
"https://python.org",
# thêm nhiều url khác...
]
queue = asyncio.Queue()
for url in urls:
await queue.put(url)
sem = asyncio.Semaphore(5) # Tối đa 5 request đồng thời
lock = asyncio.Lock()
stop_event = asyncio.Event()
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(worker(f"worker-{i}", queue, session, sem, lock, stop_event))
for i in range(3)
]
await queue.join() # Chờ đến khi tất cả url được xử lý
stop_event.set() # Báo hiệu dừng
await asyncio.gather(*tasks)
asyncio.run(main())
Kết luận
Trong lập trình bất đồng bộ với Python, việc phối hợp nhiều coroutine cùng chạy là một thách thức lớn. Đó là lý do tại sao asyncio
cung cấp các synchronization primitives như Event
, Lock
, Semaphore
, và đặc biệt là Queue
.
Event
giúp phát tín hiệu giữa các coroutine.Lock
đảm bảo tài nguyên chỉ được một coroutine truy cập tại một thời điểm.Semaphore
mở rộng cơ chế Lock để giới hạn số coroutine được phép truy cập đồng thời.Queue
đóng vai trò như “đường ống dữ liệu” an toàn và tiện lợi trong mô hình Producer – Consumer.
Hiểu và sử dụng thành thạo những công cụ này sẽ giúp bạn:
Tránh race condition và deadlock.
Xây dựng pipeline xử lý dữ liệu bất đồng bộ.
Viết ứng dụng ổn định, dễ bảo trì, và có khả năng mở rộng.
Đọc thêm
https://docs.python.org/3/library/asyncio-sync.html
https://docs.python.org/3/library/asyncio-queue.html
Nếu bạn thấy bài viết hữu ích, hãy subscribe để nhận thêm những bài viết chuyên sâu về kiến trúc hệ thống và microservices.
Đừng ngần ngại để lại câu hỏi hoặc chia sẻ trải nghiệm của bạn với asyncio trong Python trong phần bình luận