shenk
@shenk
t.me/kshnkvn

Почему не происходит запись в MongoDB при нескольких потоках?

При изучении MongoDB наткнулся на возможность использования вместо сгенерированного значения _id упорядоченный (1, 2, 3...):
def get_next_sequence(collection, name):
    return collection.find_and_modify({'_id': name}, update={'$inc': {'seq': 1}}, new=True).get('seq')


def insert_in_db():
    client = MongoClient(mongo_url)
    db = client['']
    collection = db['']
    print(collection.insert_one({'_id': get_next_sequence(collection, 'userid'), 'value': f'{random.randint(10000, 2147483647)}'}))
    client.close()


Вариант работает, но меня смутила возможность его использования в несколько потоков. Если я правильно понимаю, то скрипт выполняет 2 запроса в БД:
1. Узнает значение последнего элемента
2. Присваивает новый элемент со значением +1

Если к нему обратятся одновременно несколько запросов? Начал проверять:
with Pool(processes=200) as pool:
    for _ in range(100000):
        pool.apply_async(insert_in_db)
    pool.close()
    pool.join()

Начал наращивать кол-во потоков по чуть-чуть - со значений в 50 и до 1000.
Вплоть до кол-во потоков в 200-250 проблем не было вообще никаких - все записывалось, после - начались пропуски записи. Особенно заметно после 400 потоков. При 1000 потоков из 100000 запросов на запись в базе оказалось только чуть больше 90000, при этом ошибок никаких нет. Сейчас у меня 2 предположения:
1. Не выдерживает сервер с MongoDB (3 ядра, 4гб ОЗУ). При работе скрипта были видны моменты, когда он просто останавливался на несколько секунд, иногда на 10-30 секунд. Видимо не мог подключиться. Соответственно некоторые потоки просто не могли подключиться и запись не происходила.
2. Обращаются несколько потоков, все получают информацию, что на данный момент, к примеру, 1389 записей в базе и пытаются записать запись под номером 1389. Смущает то, что ошибок записи нет в выводе, хотя по идеи должна быть.
  • Вопрос задан
  • 96 просмотров
Пригласить эксперта
Ответы на вопрос 1
@Taus
Вы не получаете результат выполнения в созданных процессах из pool.apply_async. Это плохая практика, потому что при выполнении кода в дочерних процессах могут быть исключения, которые стоит обрабатывать в основном процессе. Почитайте дополнительно документацию. Пример:
import multiprocessing

def f():
    raise ValueError()

with multiprocessing.Pool() as pool:
    for _ in range(10):
        pool.apply_async(f) # no errors

with multiprocessing.Pool() as pool:
    for _ in range(10):
        result = pool.apply_async(f)
        result.get(timeout=1) # raise ValueError

Поскольку у вас такой обработки, то можно предположить следующее. При некотором числе создаваемых процессов создание MongoClient или запросы .insert_one|.find_and_modify бросает исключение, связанное с превышением какого-то таймаута (посмотрите необязательные аргументы mongo_client и исключения)
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
Protoplan Краснодар
от 80 000 до 150 000 руб.
YLab Тольятти
от 90 000 до 150 000 руб.
от 60 000 до 120 000 руб.