Библиотеки Python Часть 2. Практическое применение - стр. 3
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()
# Загрузка данных из CSV-файла
df = spark.read.csv('server_logs.csv', header=True, inferSchema=True)
# Фильтрация строк с ошибками
errors = df.filter(df['status'] == 'ERROR')
# Подсчет количества ошибок
error_count = errors.count()
print(f"Количество ошибок: {error_count}")
# Завершаем сессию Spark
spark.stop()
```
Объяснение:
– `filter` позволяет выбрать строки с определенным значением.
– `count` подсчитывает количество строк после фильтрации.
Задача 4: Средняя сумма покупок
Описание: Дан CSV-файл с данными о покупках. Ваша задача – вычислить среднюю сумму покупок для каждого клиента.
Решение:
```python
from pyspark.sql import SparkSession
# Создаем сессию Spark
spark = SparkSession.builder.appName("PurchaseAnalysis").getOrCreate()
# Загрузка данных
df = spark.read.csv('purchases.csv', header=True, inferSchema=True)
# Группировка по клиенту и расчет средней суммы покупок
avg_purchases = df.groupBy('customer_id').avg('purchase_amount')
# Показ результатов
avg_purchases.show()
# Завершаем сессию Spark
spark.stop()
```
Объяснение:
– `groupBy` позволяет сгруппировать данные по столбцу.
– `avg` вычисляет среднее значение для каждой группы.
Задача 5: Сортировка больших данных
Описание: У вас есть файл с информацией о транзакциях. Необходимо отсортировать данные по дате транзакции и сохранить результат в новый файл.
Решение:
```python
from pyspark.sql import SparkSession
# Создаем сессию Spark
spark = SparkSession.builder.appName("SortTransactions").getOrCreate()
# Загрузка данных
df = spark.read.csv('transactions_large.csv', header=True, inferSchema=True)
# Сортировка данных по дате
sorted_df = df.orderBy('transaction_date')
# Сохранение отсортированных данных в новый файл
sorted_df.write.csv('sorted_transactions', header=True, mode='overwrite')
print("Данные отсортированы и сохранены.")
# Завершаем сессию Spark
spark.stop()
```
Объяснение:
– `orderBy` сортирует данные по указанному столбцу.
– `write.csv` сохраняет результат в новом файле.
Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.
– Dask подходит для локальных задач и интеграции с Python-библиотеками.
– PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.
Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.
Apache Kafka – это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.
В основе Apache Kafka лежат несколько ключевых компонентов:
1. Брокеры – серверы, которые принимают, хранят и доставляют данные.
2. Топики – логические каналы, через которые данные передаются.
3. Продюсеры – приложения или устройства, которые отправляют данные в Kafka.
4. Консьюмеры – приложения, которые получают данные из Kafka.
Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.