Размер шрифта
-
+

Библиотеки Python Часть 2. Практическое применение - стр. 2

filtered_df = df.filter(df['amount'] > 0)

# 2. Группировка по клиенту и вычисление среднего значения

average_transactions = filtered_df.groupBy('customer_id').avg('amount')

# Показ результатов

average_transactions.show()

# Останавливаем Spark-сессию

spark.stop()

```

Объяснение кода:

1. Создание SparkSession: Это точка входа для работы с PySpark.

2. `spark.read.csv()`: Загружаем данные в формате DataFrame, который поддерживает SQL-подобные операции.

3. Трансформации: Операции, такие как фильтрация и группировка, выполняются параллельно на всех узлах кластера.

4. Результат: PySpark возвращает распределенные данные, которые можно сохранить или преобразовать.

Когда использовать PySpark:

– Когда вы работаете с кластерами и хотите обрабатывать данные на нескольких машинах.

– Когда данные хранятся в распределенных системах, таких как HDFS или Amazon S3.

– Когда нужно интегрировать обработку данных с экосистемой Hadoop.

Сравнение Dask и PySpark



И Dask, и PySpark являются эффективными инструментами для распределенной обработки данных. Выбор между ними зависит от ваших требований. Если вы работаете с данными, которые не помещаются в оперативную память, но ваши вычисления выполняются на одном компьютере, Dask будет лучшим выбором. Если же вы имеете дело с огромными объемами данных, распределенными по нескольким машинам, то PySpark станет незаменимым инструментом.

Обе библиотеки позволяют решать задачи, которые ранее казались невозможными из-за ограничений памяти или производительности, и они помогут вам эффективно работать с данными любого масштаба.

Задачи для практики

Задачи для Dask

Задача 1: Обработка большого CSV-файла

Описание: У вас есть CSV-файл размером 10 ГБ с данными о продажах. Вам нужно вычислить общую сумму продаж по регионам, но файл слишком большой для работы в Pandas.

Решение:

```python

import dask.dataframe as dd

# Загрузка большого CSV-файла

df = dd.read_csv('sales_data_large.csv')

# Проверка структуры данных

print(df.head()) # Показываем первые строки

# Группировка по регионам и подсчет общей суммы продаж

sales_by_region = df.groupby('region')['sales'].sum()

# Выполнение вычислений

result = sales_by_region.compute()

print(result)

```

Объяснение:

– `dd.read_csv` позволяет загружать файлы большего объема, чем объем оперативной памяти.

– `compute` выполняет ленивые вычисления.


Задача 2: Преобразование данных в формате JSON

Описание: Дан файл в формате JSON, содержащий информацию о транзакциях. Необходимо отфильтровать транзакции с суммой менее 1000 и сохранить отфильтрованные данные в новый CSV-файл.

Решение:

```python

import dask.dataframe as dd

# Загрузка JSON-файла

df = dd.read_json('transactions_large.json')

# Фильтрация данных

filtered_df = df[df['amount'] >= 1000]

# Сохранение результатов в новый CSV-файл

filtered_df.to_csv('filtered_transactions_*.csv', index=False)

print("Данные сохранены в файлы CSV.")

```

Объяснение:

– Dask автоматически разбивает данные на части, сохраняя их в несколько CSV-файлов.

– Фильтрация выполняется параллельно.


Задачи для PySpark

Задача 3: Анализ логов

Описание: Имеется файл логов сервера (формат CSV). Ваша задача – подсчитать количество ошибок (строки с `status = "ERROR"`) и вывести их общее количество.

Решение:

```python

from pyspark.sql import SparkSession

# Создаем сессию Spark

Страница 2