Библиотеки Python Часть 2. Практическое применение - стр. 4
Пример потока данных
Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.
Установка и настройка Apache Kafka
Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).
1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.
2. Запустите Kafka-брокер.
3. Создайте топик с помощью команды:
```bash
bin/kafka-topics.sh –create –topic orders –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1
```
Отправка данных в Kafka
Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:
```bash
pip install confluent-kafka
```
Пример кода, который отправляет сообщения в топик:
```python
from confluent_kafka import Producer
import json
import time
# Настройки продюсера
producer_config = {
'bootstrap.servers': 'localhost:9092' # Адрес Kafka-брокера
}
# Создание продюсера
producer = Producer(producer_config)
# Функция для обратного вызова при успешной отправке сообщения
def delivery_report(err, msg):
if err is not None:
print(f'Ошибка доставки сообщения: {err}')
else:
print(f'Сообщение отправлено: {msg.topic()} [{msg.partition()}]')
# Отправка данных в Kafka
orders = [
{'order_id': 1, 'product': 'Laptop', 'price': 1000},
{'order_id': 2, 'product': 'Phone', 'price': 500},
{'order_id': 3, 'product': 'Headphones', 'price': 150}
]
for order in orders:
producer.produce(
'orders',
key=str(order['order_id']),
value=json.dumps(order),
callback=delivery_report
)
producer.flush() # Отправка сообщений в брокер
time.sleep(1)
```
В этом примере продюсер отправляет JSON-объекты в топик `orders`. Каждое сообщение содержит данные о заказе.
Чтение данных из Kafka
Теперь создадим консьюмера, который будет читать сообщения из топика `orders`.
```python
from confluent_kafka import Consumer, KafkaException
# Настройки консьюмера
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-group', # Группа консьюмеров
'auto.offset.reset': 'earliest' # Начало чтения с первой записи
}
# Создание консьюмера
consumer = Consumer(consumer_config)
# Подписка на топик
consumer.subscribe(['orders'])
# Чтение сообщений из Kafka
try:
while True:
msg = consumer.poll(1.0) # Ожидание сообщения (1 секунда)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# Конец партиции
continue
else:
print(f"Ошибка: {msg.error()}")
break
# Обработка сообщения
print(f"Получено сообщение: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
print("Завершение работы…")
finally:
# Закрытие консьюмера
consumer.close()
```
В этом примере консьюмер подключается к Kafka, читает сообщения из топика `orders` и выводит их на экран.
Потоковая обработка данных
Kafka часто используется совместно с платформами потоковой обработки, такими как Apache Spark или Apache Flink, для анализа данных в реальном времени. Однако вы также можете обрабатывать данные прямо в Python.