Пропустити навігацію EPAM

Від Pandas до PySpark: наш шлях масштабування Python-рішень для роботи з великими даними

Думка експерта
  • Python

Працюючи з понад 300GB структурованих та напівструктурованих даних, ми зіткнулися з класичним запитанням: коли варто використовувати Pandas, а коли необхідний PySpark? І як їх ефективно поєднувати?

Загальновідоме обмеження, з яким стикаються фахівці які працюють з даними, полягає в тому, що Pandas не справляється з великими наборами даних, які перевищують обмеження пам'яті.

Наприклад, уявіть, що ми допомагаємо природоохоронній групі аналізувати GPS-відстеження понад 500 слонів протягом 3 років. Для таких проєктів з великими даними Python потребує потужніших інструментів. PySpark спеціалізується на розподілених обчисленнях, обробляючи масивні набори даних на кількох машинах.

У цій статті ми поділимося нашим досвідом переходу від Pandas до PySpark, розглянемо архітектурні відмінності між ними та надамо практичні поради щодо масштабування Python-рішень для роботи з великими даними. Також ми дослідимо компромісний підхід — Pandas API on Spark, який використовує всі доступні ядра процесора для паралелізації обчислень, що суттєво прискорює запити порівняно зі стандартним pandas.

ПРИЄДНУЙСЯ ДО НАШОЇ КОМАНДИ

Порівняння архітектур Pandas і PySpark

Фундаментальні архітектурні відмінності між Pandas і PySpark визначають їхню здатність обробляти дані різного обсягу. Розуміння цих відмінностей допоможе обрати правильний інструмент для конкретних потреб.

Однопоточна модель Pandas

Бібліотека Pandas реалізує свій API DataFrame поверх базових структур даних і зазвичай виконує більшість операцій послідовно. На практиці це означає, що операції як read_csv, groupby чи apply у багатьох випадках використовують одне ядро процесора.

Варто зазначити, що базові типи даних у Pandas насправді побудовані на основі типів NumPy, а не є окремими вбудованими типами. Це підтверджується документацією та іншими джерелами. NumPy, своєю чергою, підтримує виконання операцій декількома потоками, що може надавати додаткові можливості для оптимізації при роботі з даними у певних сценаріях.

Попри простоту у використанні, Pandas завантажує всі дані в пам'ять, що стає критичним обмеженням для великих наборів даних. Коли дані не вміщуються в оперативну пам'ять машини, Pandas стає неефективним або взагалі непридатним для використання.

Розподілена обробка в PySpark через JVM

На відміну від Pandas, PySpark створений для розподіленого обчислення. Він поєднує масштабованість Apache Spark (на базі JVM) із простотою Python. PySpark використовує модель «майстер-робітник»:

  • Driver Program керує виконанням задач і взаємодіє з SparkSession;
  • Cluster Manager розподіляє ресурси (CPU, пам'ять) між виконавцями;
  • Executors запускають JVM-процеси для обробки даних.

Міст між Python і JVM забезпечує бібліотека Py4J, яка дозволяє Python динамічно взаємодіяти з об'єктами JVM. Це дає можливість PySpark обробляти дані паралельно, розподіляючи навантаження між кількома машинами кластера.

Лінива vs нетерпляча обробка даних

Одна з ключових відмінностей між Pandas і PySpark — модель виконання запитів:

  • Pandas використовує нетерплячу (eager) оцінку — завантажує всі дані в пам'ять і виконує операції негайно при їх виклику, не застосовує оптимізацію запитів, і всі дані мають бути завантажені в пам'ять до виконання запиту;
  • PySpark застосовує ліниву (lazy) оцінку — перетворення не виконуються, доки не буде викликана дія (як .collect() або .show()), це дозволяє Spark оптимізувати план виконання, створюючи направлений ациклічний граф (DAG) обчислень.

Така відмінність у підходах до обробки даних пояснює, чому PySpark ефективніше використовує пам'ять. Він отримує дані з диска лише за необхідності, що суттєво зменшує споживання пам'яті порівняно з Pandas.

Масштабування Python-рішень: наш практичний досвід

Наш досвід роботи з масштабуванням Python-рішень розпочався з практичних викликів, які постали перед нами у реальних проєктах. Ось що ми дізналися під час цього шляху.

Проблеми з обробкою 300GB даних у Pandas

У процесі аналізу великого набору даних обсягом 300GB ми швидко дійшли до обмежень Pandas. Спроби завантаження даних призводили до критичних проблем із пам'яттю — pandas вимагав завантаження всього набору даних в оперативну пам'ять одночасно. Крім того, однопоточна модель pandas спричиняла значне уповільнення при операціях з великими даними.

Навіть після оптимізації типів даних (конвертація float64 у float16, використання категоріальних типів замість об'єктів) ми змогли скоротити використання пам'яті лише на 96%, проте цього все одно було недостатньо для обробки всього обсягу даних.

Спроби використання chunking (розбиття на частини) виявилися ефективними лише для простих операцій, але не працювали належним чином для складніших агрегацій і groupby-запитів.

Розгортання PySpark-кластера на AWS

Зрештою, ми перейшли до PySpark на AWS EMR (Elastic MapReduce).

Процес розгортання включав кілька етапів:

  1. Створення S3-бакета для зберігання даних і скриптів
  2. Налаштування кластера EMR з вибором відповідних додатків (Apache Spark)
  3. Налаштування передачі логів кластера до S3 для моніторингу

Для автоматизації розгортання ми використали EMR CLI — інструмент, який спрощує розробку та запуск завдань Spark на Amazon EMR. Ця утиліта допомогла нам усунути необхідність ручного пакування, розгортання та запуску завдань. Використання EMR Serverless та EMR на EC2 дало нам гнучкість у виборі інфраструктури залежно від типу завдання.

Оптимізація зчитування з S3 у Spark

Після розгортання ми зіткнулися з викликами продуктивності при зчитуванні даних із S3. Відповідно до наших тестів, значна частина часу виконання Spark-завдань (близько половини) витрачалася саме на зчитування даних.

Для оптимізації ми впровадили такі підходи:

  • налаштування параметрів parquet.block.size (512 MB) та parquet.read.allocation.size (128 MB) для підвищення пропускної здатності на 77%;
  • оптимізація ресурсів Kubernetes Pod, що підвищило CPU-утилізацію з 50% до 75%;
  • Використання S3 Magic Committer, що прискорило запис результатів до S3 на 15-50%.

Таким чином, завдяки переходу на PySpark і оптимізації процесів читання та запису даних, ми змогли скоротити час виконання наших завдань обробки даних на 60% порівняно з початковою реалізацією.

Використання Pandas API on Spark як компроміс

Для розробників, які звикли до Pandas, але потребують масштабованості, існує елегантне рішення — Pandas API on Spark. Цей інструмент заповнює прогалину між простотою Pandas та потужністю Spark, пропонуючи знайомий синтаксис для обробки великих наборів даних.

Синтаксис Pandas, але масштабованість Spark

Pandas API on Spark допомагає уникнути необхідності вивчати новий фреймворк, дозволяючи використовувати знайомий API Pandas у розподіленому середовищі Apache Spark. Це особливо корисно як для користувачів Pandas, так і для розробників PySpark, оскільки бібліотека підтримує багато операцій, які складно реалізувати в PySpark, наприклад, пряму візуалізацію даних з DataFrame.

Зокрема, використання цього API дозволяє запускати операції pandas до 10 разів швидше для великих наборів даних. Для початку роботи з Pandas API on Spark у середовищі Apache Spark 3.2+ достатньо простого імпорту:

import pyspark.pandas as ps

Приклади: describe(), groupby(), plot() у Pandas API on Spark

Pandas API on Spark підтримує більшість популярних функцій pandas.

Наприклад, для обчислення описових статистик використовується метод describe():

psdf.describe()

Групування даних з подальшим застосуванням функцій агрегації також повністю підтримується:

psdf.groupby('A').sum()

psdf.groupby(['A', 'B']).sum()  # групування за кількома стовпцями

Натомість від стандартного pandas, функції візуалізації у Pandas API on Spark за замовчуванням створюють інтерактивні графіки, використовуючи plotly. Побудова графіків здійснюється аналогічно:

psdf.plot()  # лінійний графік

psdf.plot.hist(bins=12, alpha=0.5# гістограма

psdf.plot.scatter(x='length', y='width' # діаграма розсіювання

Конвертація між Pandas і Pandas-on-Spark

Важливою перевагою Pandas API on Spark є можливість легкого конвертування між різними типами DataFrame. Для перетворення pandas DataFrame у Pandas-on-Spark використовується функція from_pandas():

psdf = ps.from_pandas(pdf)

Крім того, можна конвертувати Pandas-on-Spark DataFrame назад у pandas DataFrame за допомогою to_pandas(). Однак варто пам'ятати, що ця операція завантажує всі дані з кількох машин у пам'ять драйвера Spark, тому її слід використовувати лише для невеликих наборів даних.

Також доступна конвертація між Pandas-on-Spark і Spark DataFrame:

sdf = psdf.to_spark()  # pandas-on-Spark -> Spark

psdf = sdf.pandas_api()  # Spark -> pandas-on-Spark

Таким чином, Pandas API on Spark пропонує гнучкість вибору оптимального інструменту залежно від конкретних потреб аналізу даних.

Поради щодо ефективного переходу на PySpark

Переходячи до практичних аспектів міграції з pandas на PySpark, ми виділили кілька ключових оптимізацій, які суттєво підвищують ефективність обробки великих даних.

Розбиття даних на партиції

Правильне партиціонування — основа продуктивності PySpark. Дані діляться на частини, які обробляються паралельно різними виконавцями.

Оптимальний розмір партиції становить від 256MB до 1GB. Для перерозподілу даних використовуйте:

  • repartition() — для збільшення або зменшення кількості партицій (повний перерозподіл);
  • coalesce() — для зменшення кількості партицій (без повного перерозподілу).

Особливо ефективно партиціонувати дані за стовпцями, які часто використовуються у фільтрах та операціях групування.

Використання Pandas UDF для SHAP-аналізу

Pandas UDF (User-Defined Functions) значно прискорюють складні обчислення, як-от розрахунок SHAP-значень. Вони використовують Apache Arrow для оптимізації передачі даних між JVM та Python-процесом.

Наш досвід показав, що для великих наборів даних (понад 165K записів) найкращі результати дає:

  1. Збільшення кількості партицій до 2-3 разів від кількості ядер.
  2. Використання mapInPandas для паралельного застосування SHAP Explainer.

Вимірювання продуктивності: час, памʼять, масштаб

Моніторинг продуктивності — критичний етап оптимізації.

PySpark пропонує вбудовані інструменти профілювання:

  • Performance Profiler відстежує кількість викликів функцій та час виконання;
  • Memory Profiler виявляє рядки коду з найбільшим споживанням пам'яті.

Для активації профілювання встановіть конфігурацію spark.sql.pyspark.udf.profiler у значення "perf" або "memory". Важливо також контролювати спілловер даних — коли недостатньо пам'яті, дані записуються на диск, що значно сповільнює обробку.

Висновок

Підсумовуючи наш досвід переходу від Pandas до PySpark, ми можемо впевнено сказати, що вибір правильного інструменту критично залежить від обсягу даних та обчислювальних потреб. Однопоточна архітектура Pandas чудово працює для наборів даних, які вміщуються в оперативну пам'ять, проте стає непридатною при роботі з сотнями гігабайтів інформації.

PySpark, завдяки своїй розподіленій архітектурі, значно розширює можливості обробки великих даних, дозволяючи паралельно виконувати операції на кількох машинах. Наші тести показали скорочення часу обробки на 60% після міграції на PySpark та оптимізації процесів.

Важливо зазначити, що Pandas API on Spark пропонує оптимальний компроміс — знайомий синтаксис pandas із масштабованістю Spark. Цей підхід особливо корисний для команд, які не хочуть повністю перенавчатися, але потребують обробки більших обсягів даних.

Правильне партиціонування даних, використання Pandas UDF та ретельний моніторинг продуктивності становлять основу ефективної роботи з PySpark. Наш досвід свідчить, що оптимальний розмір партиції знаходиться між 256MB та 1GB, а збільшення кількості партицій до 2-3 разів від кількості ядер значно прискорює складні обчислення.

Отже, шлях масштабування Python-рішень для роботи з великими даними потребує розуміння архітектурних відмінностей між інструментами та готовності адаптувати підходи відповідно до наростальних потреб. Перехід від pandas до PySpark — не просто зміна бібліотеки, а фундаментальна трансформація способу мислення про обробку даних, яка відкриває нові можливості для аналізу та прийняття рішень на основі великих масивів інформації.

ДАЛІ МОЖНА ПОЧИТАТИ

Підписатися на новини

Чудово! Ми вже готуємо добірку актуальних новин для вас :)

Вибачте, щось пішло не так. Будь ласка, спробуйте ще раз.

* Обов'язкові поля

*Будь ласка, заповніть обов’язкові поля