@NogerbekNurzhan

Как отфильтровать данные за определенный период в Spark?

Здравствуйте, товарищи! Помогите пожалуйста разобраться.

Раньше со Spark не работал. Пытаюсь разобраться с ним на простом примере. Предположим есть большой файл со следующей структурой (см. ниже). В ней хранится дата, мобильный номер и его статус в это время.

| CREATE_DATE         | MOBILE_KEY | STATUS |
|---------------------|------------|--------|
| 2018-11-28 00:00:00 | 8792548575 | IN     |
| 2018-11-29 20:00:00 | 7052548575 | OUT    |
| 2018-11-30 07:30:00 | 7772548575 | IN     |


Как правильно отфильтровать все данные за указанный период для определенных мобильных номеров? К примеру в качестве входящих данных я получаю такие данные:

val dateFrom = "2018-10-01"
val dateTo = "2018-11-05"
val numbers = "7778529636,745128598,7777533575"

val arr = numbers.split(",") // Создать массив из мобильных номеров

spark.read.parquet("fs://path/file.parquet").filter(???)
  • Вопрос задан
  • 75 просмотров
Пригласить эксперта
Ответы на вопрос 2
angrySCV
@angrySCV
machine learning, programming, startuping
можно просто попытаться как вы пишете отфильтровать, для этого в начале получить определенную структуру и тип данных:

источникДанных
  .мап(созданиеСтруктуры)
  .фильтр(текущаяЗапись => СписокТребуемыхНомеров.содержит(текущаяЗапись.телефон) 
    && текущаяЗапись.дата<>требуемыйИнтервал)


так будет работать, но очень долго, медленно и сожрет кучу ресурсов на одной машине - это не то ради чего спарк используют, спарк - это движек для распределенных вычислений. А чтоб запустить распределенные вычисления, нужно в начале создать пару "ключ"->"значение" (где ключ номер телефона, а значение все остальные данные), эти пары распределятся по узлам, где будут параллельно обрабатываться, а потом результат паралельной обработки агрегировать в один общий результат, и для этого не фильтр использовать а reduceByKey с aggregate, для паралельного сбора ключей и значений для этих ключей.
Ответ написан
@potan
Дату в формате ISO можно сравнивать как строки. Список телефонов оформить как множество.
Будет что-то типа

val arr = numbers.split(",").toSet

spark.read.parquet("fs://path/file.parquet").filter(t => t("CREATE_DATE") < dateTo && t("CREATE_DATE") > dateFrom && arr(t("MOBILE_KEY")))

Точно не знаю как к полям записи в SPARC обращаться, может быть надо будет немного переделать.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
Первый ОФД Нижний Новгород
от 60 000 до 110 000 руб.
HFLabs Москва
от 195 000 до 250 000 руб.
Denim Москва
от 140 000 до 220 000 руб.
16 июл. 2019, в 23:23
5000 руб./за проект
16 июл. 2019, в 22:43
10000 руб./за проект