Как передать watchdog событие в asyncio?

Использую библиотеку watchdog для python следит за изменениями в каталоге, при изменения порождает событие, как мне при появлении события поймать его в asyncio и дальше уже, что то сделать с ним.
  • Вопрос задан
  • 746 просмотров
Решения вопроса 1
sergey-gornostaev
@sergey-gornostaev Куратор тега Python
Седой и строгий
Постановка задачи класса "пойди туда не знаю куда", но если в общих чертах, то:
import asyncio
from contextlib import suppress
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


async def some_async_handler(event):
    # дальше уже, что то сделать с ним


class SomeEventHandler(FileSystemEventHandler):
    def __init__(self, loop, *args, **kwargs):
        self._loop = loop
        super().__init__(*args, **kwargs)

    def on_any_event(self, event):
        asyncio.run_coroutine_threadsafe(some_async_handler(event), self._loop)


loop = asyncio.get_event_loop()

event_handler = SomeEventHandler(loop)
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()

try:
    loop.run_forever()
except KeyboardInterrupt:
    observer.stop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)
finally:
    loop.close()
    observer.join()
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы