@Wade2k

Как реализовать парсер на aiohttp с динамической очередью и ограничением на одновременные подключения по прокси?

Приветствую.
Делаю парсер страниц на python. Нужно постоянно обходить большое ко-во страниц. Например, 1М в день.
Для этого раньше использовал многопоточность TheadPool. Но когда возросло кол-во требуемых одновременных подключений до 80-100 скрипт начинал падать по памяти.

Почитал, что пишут надо переходить на asyncio.
Накидал простенький скрипт, где беру прокси из файлика, беру url и обхожу их. Для ограничения одновременности испольщую semaphore.

Но столкнулся с тем, что если у меня для прокси доступно, например 50 потоков, то при выборке 200 урлов, первые 50 отработаются правильно, а остальные уйдут в ошибку. Такое ощущение, что семафор не работает или я как то не так использую это.

Хотелось бы понять как нужно делать. Или пример кода подобного.

Мой код для примера:
#!/usr/bin/python3.6
# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession
from random import shuffle
from bs4 import BeautifulSoup

count = 0

with open('proxy2.txt') as f:
    proxy = f.readlines()

proxy = [x.strip() for x in proxy]
shuffle(proxy)


def geturls(num):

    urls = []
    f = open('sitemap.xml', 'r')
    soup = BeautifulSoup(f, "lxml")

    arr = soup.findAll('loc')
    i = 0
    for url in arr:
        if i < num:
            urls.append(url.contents[0])
            i = i + 1
        else:
            break
            
    return urls


async def fetch(sem,url, session):
    global proxy
    global count
    try:
      async with session.get(url,proxy="http://"+random.choice(proxy)) as response:
          count = count + 1
          body = await response.read()
          print(str(count) + "   " + url)
          return body
    except Exception as e:
      print(e)


async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        return await fetch(sem, url, session)


async def run(r):
    urls=geturls(r)

    tasks = []

    #одноврменно можно до 50 коннектов
    sem = asyncio.Semaphore(50)


    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(bound_fetch(sem, url, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses



#200 - выбираем 200 урлов из sitemap для теста
number = 200
loop = asyncio.get_event_loop()

future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
  • Вопрос задан
  • 2696 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы