@artem78

Почему поток зависает после ошибки загрузки файла на FTP?

Создал модуль для загрузки файлов на FTP в несколько потоков. Модуль используется в демоне, который регулярно проверяет наличие новых файлов и скидывает их на сервер. Демон запакован с помощью pyinstaller и работает на Windows-сервере. Иногда загрузка файла завершается с ошибкой '[WinError 10054] Удаленный хост принудительно разорвал существующее подключение' и в этом случае где-то после выполнения upload_callback поток намертво зависает. Пробовал на своём компьютере воспроизвести ситуацию, но зависание не происходит. В чём может быть причина?

from threading import Thread, RLock#, Lock
from queue import Queue
from ftplib import FTP
import logging
from os.path import basename


class _FTPMultiUploaderWorker(Thread):

    def __init__(self, host, port, login, password, directory, upload_callback, files_queue, lock):
        super().__init__()

        self._host = host
        self._port = port
        self._login = login
        self._password = password
        self._directory = directory
        self._upload_callback = upload_callback
        self._files_queue = files_queue
        self._lock = lock

        self._ftp = FTP()

    def run(self):
        logging.debug('Поток %s запущен', self.name)

        self._ftp_connect()

        try:
            while not self._files_queue.empty():
                file = self._files_queue.get()
                try:
                    self._upload_file(file)
                except Exception as e:
                    logging.exception('Ошибка при загрузке файла %s', file)
                    success = False
                    #raise e
                else:
                    success = True
                finally:
                    with self._lock:
                        self._upload_callback(file, success)

                    self._files_queue.task_done()

        finally:
            self._ftp_disconnect()

        logging.debug('Поток %s завершён', self.name)

    def _upload_file(self, file):
        filename = basename(file)
        logging.debug('Начинаем загружать файл %s', filename)
        with open(file, 'rb') as fobj:
            self._ftp.storbinary('STOR ' + filename, fobj)
        logging.info('Файл %s загружен', filename)

    def _ftp_connect(self):
        self._ftp.connect(self._host, self._port)
        self._ftp.login(self._login, self._password)
        logging.debug('Соединение с FTP установлено')
        self._ftp.cwd(self._directory)

    def _ftp_disconnect(self):
        self._ftp.quit()
        logging.debug('Соединение с FTP закрыто')


class FTPMultiUploader():

    def __init__(self, host, port, login, password, directory='/', threads_count=3, upload_callback=lambda file, success: None):
        self._host = host
        self._port = port
        self._login = login
        self._password = password
        self._directory = directory

        self._threads_count = threads_count
        self._upload_callback = upload_callback

        self._files_queue = Queue()
        self._lock = RLock()

    def add_file(self, file):
        self._files_queue.put(file)

    def run(self):
        if self._files_queue.empty():
            return

        threads = []
        for i in range(self._threads_count):
            thr = _FTPMultiUploaderWorker(
                host=self._host,
                port=self._port,
                login=self._login,
                password=self._password,
                directory=self._directory,
                upload_callback=self._upload_callback,
                files_queue=self._files_queue,
                lock=self._lock
            )
            thr.start()
            threads.append(thr)

        for thr in threads:
            thr.join()

Код, которые производит отправку:
ftp_uploader = FTPMultiUploader(
    host=self.config['ftp']['host'],
    port=int(self.config['ftp']['port']),
    login=self.config['ftp']['login'],
    password=self.config['ftp']['password'],
    directory=self.config['ftp']['dir'],
    upload_callback=self.on_uploaded,
    threads_count=int(self.config['main']['threads'])
)

for file_id in new_files_ids:
    stock = 'shutterstock'
    file = self.get_file_by_id(stock, file_id)
    if file is None:
        logging.warning('Файл #%s не найден', file_id)
        self.set_status(file_id, self.STATUS_ERROR)
        continue
    logging.info('Найден файл %s', file)

    # Помещаем в ZIP
    zip_file = os.path.join('temp/', '{}_{}.zip'.format(stock, file_id))
    logging.debug('Начинаем архивирование')

    with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
        zipf.write(file, os.path.basename(file))
    logging.debug('Архив создан: %s', zip_file)

    ftp_uploader.add_file(zip_file)

logging.debug('Начинаем загрузку файлов на FTP')
ftp_uploader.run()
logging.debug('Загрузка файлов завершена')
  • Вопрос задан
  • 195 просмотров
Пригласить эксперта
Ответы на вопрос 1
dimonchik2013
@dimonchik2013
non progredi est regredi
число коннектов к FTP ограничено

коннект это и листинг директории тоже

замени ftplib на pycurl
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
17 апр. 2024, в 00:13
800 руб./за проект
17 апр. 2024, в 00:06
240000 руб./за проект
17 апр. 2024, в 00:02
1000 руб./за проект