sortarage
@sortarage
Я тучка-тучка-тучка, я вовсе не медведь

Как получать значения от нескольких одновременно работающих процессов на Python?

Доброго времени суток.

Есть код, который одновременно запускает несколько асинхронных процессов по списку. Нужно из каждого процесса получить результат, и обновить статус в базе. Пример кода:

from time import sleep
from multiprocessing import Process, Value
import subprocess

def worker_email(keyword, func_result):
    subprocess.Popen(["python", "mongoworker.py", str(keyword)])
    func_result.value = 1
    return True

keywords_list = ['apple', 'banana', 'orange', 'strawberry']

if __name__ == '__main__':
    for keyword in keywords_list:
        # Выполняю задачу
        func_result = Value('i', 0)
        p = Process(target=worker_email, args=(keyword,func_result))
        p.start()
        # Обновляю статус задачи
        if func_result.value == 1:
            stream.update_one({'_id': doc['_id']}, {"$set": {"status": True}}, upsert=False)

В чем проблема: если использовать p.join(), то все работает, но процессы выполняются по очереди. Если не использовать, то процессы не закрываются, и статус не обновляется. Рабочий вариант - использовать p.join(), но выполнять не код функции, а subprocess.Popen, но это выглядит как-то нецензурно.

Собственно, буду рад любому совету :)
  • Вопрос задан
  • 167 просмотров
Решения вопроса 1
sortarage
@sortarage Автор вопроса
Я тучка-тучка-тучка, я вовсе не медведь
Решил переносом соединения в базу и проверки результата в саму функцию-worker. Примерно так:

def worker_email(keyword, task_id):

    # Соединяюсь с базой
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    stream = db.stream

    sleep(10)
    print('Yo:' + keyword)

    # Обновляю статус задачи на "Выполнено" (если все ок) или не меняю статус и отправляю на повторое выполнение (если не ок)
    if True:
        stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False)

    # Отключаюсь от базы
    client.close()
    return True


UPD: Более разобранный вариант:

def update_status(task_id, func_result):
    # Соединяюсь с базой
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    stream = db.stream

    # Обновляю статус задачи на "Выполнено" (если все ок) или не меняю статус и отправляю на повторое выполнение (если не ок)
    if func_result:
        stream.update_one({'_id': task_id}, {"$set": {"status": True}}, upsert=False)

    # Отключаюсь от базы
    client.close()

def yo_func(keyword):
    sleep(10)
    print('Yo:' + keyword)
    return True

def worker_email(keyword, task_id):
    update_status(task_id, yo_func(keyword))
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы