Если вы видите что-то необычное, просто сообщите мне. Skip to main content

Streaming Data with PostgreSQL + Kafka + Debezium: Part 1

В этой инструкции мы будем использовать Postgres, Kafka< Kafka Connect, Debezium и Zookeeper для создание маленького api, который отслеживает магазины и крипто попуки во времени.

Введение

Платформа потоковой передачи данных, как Kafka позволяет вам строить ситемы которые обрабатывают данные в реальном времени. Эти системы имеют мириады случаем использования, проекты могут ранжироваться от простой обработки данных для ETL систем до проектов требующих высокую скорость координации микросервисов требуемое все виды Kafka как приемлимое решение.

Один из моих любимых примеров использования Kafka происходит от New Relic инженерный блог. New Relic помогает разработчикам отслеживать производительность их приложений. Их свойства работают в реальном времени, что может быть важно так как множество разработчиков полагаются на него в качестве системы опвещение, когда что-то идет не так. New Relic сереьзно использует Kafka для координирования микросервисов и связывать их в реальном времени друг с другом.

В то время как Kafka слишком сложна в качестве простого инструменка, который мы собираемся построить сегодня, эта инструкция покажет как настроить Kafka. Мы не будем делить наш API на множество сервисов, но мы будем использовать Kafka внутри нашего сервиса что того, чтобы просчитать и создать допольнительные данные доступные через API.

Что такое Kafka?

Kafka isочень aмощный veryплатформа powerfulпотока eventсобытий, streamingкоторая platformпозволяет thatобрабатывать isмассинвый capableнабор ofданных processingв massiveреальном amountsвремени. ofВ real-timeдобавок, data.можно Inсказать, addition,kafka it'sмасштабируемы scalableи andотказоустойчива, fault-tolerant,делает makingеё itпопулярным aвыбором popularдля choiceпроектов forкоторые projectsтребуют thatскорость requireобработки speedy data pipelines.данных.

Что такое Debezium?

ConsiderРеляционная KafkaSQL aбаза seriousданных tool.в Aсердце colleagueбесчетного ofколичество mineпрограммный notedпроектов. thatДля Kafkaпримена, wasесли "aptlyвы named."хотите His comment was comparing the difficulty of setting up and configuring Kafka to fully understanding the written works of Franz Kafka (after whomиспользовать Kafka, theно software tool, was named). What is Debezium?

A relational SQL database is at the heart of countless software projects. For example, if you want to use Kafka, but partчасть (orили all)всё) ofваших yourданных dataсуществует exists in aв Postgres database,базе данных, Debezium is- aэто toolинструмент thatкоторый plugsподключается intoк Postgres andи streamsпотоковым allобразом orпередает someданные of your data intoв Kafka. ItЗапускается runsна onсервере yourс databaseбазой server.данных.

What

Что isтакое ZooKeeper?

Zookeper?

ZooKeeper is- anotherеще pieceодин ofкусок softwareпрограмного fromобеспечения Apacheот thatApache, который использует Kafka usesдля toхранения storeи andуправления coordinateконфигурацией. configuration.Для Theбазовой basicнастройки, setupкоторую weмы willбудем useиспользовать inне thisтребуется tutorialглубокое doesпонимание not require a deep understanding of ZooKeeper.Zookeeper.

IfЕсли youвы endуже upзакончили deployingустановку aпроекта projectкак likeэтот thisв inбоевом aокружении, productionвы environment,захотите you'llузнать wantгораздо toбольше learnо moreтом, aboutкак howоно itработает worksи andкак howего toнастроить. setВ it up. At some point in the future,будущем, Kafka willне notпотребует requireZookeeper.

ZooKeeper.

Что Learn more about ZooKeeper here. What isтакое Kafka Connect?

Kafka Connect actsработает asкак aмост bridgeдля forвходящих streamingи dataисходищх inпотоковых andданных. outВы ofможете Kafka.подключить You can use it to connect yourвашу Kafka databaseк withразличным dataисточникам sources.баз Inданных. thisВ guide,этой we'llиснтрукции, useмы itбудем toиспользовать connectдля withподключения Debezium—andDebezium, ultimatelyPostgres, Postgres—butно SQLэто isбудет farне fromединственный theисточник onlyданных dataдля source for whichкоторых Connect isможет useful.быть Thereполезен. areЕсть countlessбесконечное connectorsколичество writtenконнекторов toнаписанных pullдля differentтого, typesчтобы ofманипулировать dataразличными inданными and out ofв Kafka.

AlthoughТак theже экосистема Kafka ecosystemможет canбыть beполезна, aвы handful,сможете youполучить willбольшую beотдачу rewardedотдачу with a world-class platform with capabilities you're unlikely to outrun if you invest in Kafka. The fast and easy-to-use SQL client for developers and teams Theот Kafka setupв weпоследствии willесли build.вложитесь Useв Kafka:

Использвоание Docker toдля Set Upнастройки Postgres, Kafka andи Debezium

Эта инструкция будет состоять из несколких частей. Первая, мы настроим маленкий API сервер, который позволит вам хранить записи. Затем, используя данные цен, покупок/продаж, данные будут проходить через Kafka и расчитывать различные общие метрики. Мы так же поэкспериментируем используя Debezium sink для потока данных из Kafka обратно в SQL базу даннхы.

ThisВ guideэтой willчасти ultimatelyмы consistподнимем ofи several parts. First, we will set up a small API that allows you to keep a record of your crypto and stock trades. Then, as price quotes come in and instruments are purchased or sold, the data will stream throughзапустим Kafka andи calculateDebezium. runningВ totalsконце andинструкции, otherу metrics.вас Weбудет willпроект alsoкоторый experimentпередает withпотоковым usingобразом aсобытия Debeziumиз sinkтаблицы toв streamтопик dataKafka.

from

Мы будем исполльзовать Docker и docker-compose чтобы помочь нам запустить Postgres, Kafka backи toDebezium. ourЕсли SQLвы database.не знакомы с этими инструментами, возможно будет полезно прочитать про инструменты прежде чем продолжить.

In this first part, we will get Kafka and Debezium up and running. By the end of this guide, you will have a project that streams events from a table to a Kafka topic.

We'll be using Docker and Docker Compose to help us get Postgres, Kafka, and Debezium set up. If you aren't familiar with those tools, it may be helpful to read up on them before continuing. Create a

Созадим Postgres Containerконтейнера withс помощь Docker

First,Первое, let'sнастроим get a basicбазвый Postgres container set up.контейнер.

version: '3.9'

services: db: image: postgres:latest ports: - "5432:5432" environment: - POSTGRES_PASSWORD=arctype

После запуска docker-compose, мы должны иметь рабочую базу данных

After running docker-compose up, we should have a functioning Postgres database.

db_1  | 2021-05-22 03:03:59.860 UTC [47] LOG:  database system is ready to accept connections

Теперь, проеверим, что она работает.

Now, let's verify it's working.

$ psql -h 127.0.0.1 -U postgres
Password for user postgres:

postgres@postgres=# 

После подключения нас приветствует psql консоль.

127Добавим postgres@postgres=#Debezium Kafka, Kafka Connect, и Zookeeper образы

And,Теперь afterдобавим connecting,другие weобразы areнеобходимые greetedдля withKafka. aDebezium psqlпредлагет prompt. Add Debezium'sобразы Kafka, Kafka Connect andи ZooKeeperZookeeper, Imagesкоторые предназначены специально для работы с Debezuim. Поэтому использовать мы будем их.

Now let's add the other images we'll need for Kafka. Debezium happens to offer images of Kafka, Kafka Connect, and ZooKeeper that are designed specifically to work with Debezium. So we'll go ahead and use their images.

version: '3.9'

services: db: image: postgres:latest ports: - "5432:5432" environment: - POSTGRES_PASSWORD=arctype

zookeeper: image: debezium/zookeeper ports: - "2181:2181" - "2888:2888" - "3888:3888"

kafka: image: debezium/kafka ports: - "9092:9092" - "29092:29092" depends_on: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092 - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT

connect: image: debezium/connect ports: - "8083:8083" environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses depends_on: - zookeeper - kafka

TheНастройки environmentпеременного variablesокружения for theдля Kafka setupпозволяют letнам youуказать setразличные upсети differentи networkпротоколы andбезопаснотсти securityесли protocolsу ifвашей yourсети networkесть setupразличные hasправила differentдля rulesвнутреннего forброкера intra-brokerподключения communicationв vs.отличии externalот clientsвнешних connectingклиентов toподключающихся к Kafka. OurНаша simpleпростая setupнастройка isсамостоятельна allс self-containedсозданное withinсетью theвнутри network Docker creates for us.Docker.

Kafka Connect createsсоздает topicsтопик inв Kafka andи usesиспользует themих toдля storeхранения configurations.настроек. YouВы canможете specifyуказать theимя, nameкоторое itон willбудет useиспользовать forдля theтопик topicsс withпеременными environmentокружением. variables.Если Ifу youвас haveесть multipleмножетсво KafkaKofka Connect nodes,нод, theyони canмогут parallelizeвыполнять theirработу workloadпаралельно whenкогда theyони haveимеют theодну sameи ту же GROUP_ID andи *_STORAGE_TOPIC configurations.потоковые More details on Connect configuration are available here. Streaming Events toсобытия PostgreSQL

Let'sСоздадим createтаблицу aчтобы tableпроверить toпотоковые test event streaming.события.

create table test (
id serial primary key,
name varchar
);

Set Up AНастроим Debezium Connector forдля PostgreSQLPostgreSQL.

IfЕсли weмы startзапустим ourнаш Docker project,проект, Kafka, Kafka Connect, ZooKeeper,Zookeeper andи Postgres willон runпрекрасно justработает. fine. However,Однако, Debezium requiresтребует usконкретной toнастройки explicitlyконнектора setдля upзапуска aпотоковых connectorданных to start streaming data fromот Postgres.

The Collaborative

Совместный SQL Editorредактор

BeforeПрежде weчем activateмы активируем Debezium, weнам needнужно to prepareподготовить Postgres byсделав makingнеобольшие someконфигурационные configuration changes.изменения. Debezium utilizesиспользует somethingнечто builtвстроенное intoв POstgres, под названием WAL, или упреждающую журнализацию. Postgres calledиспользует aэтот WAL,лог orчтобы write-aheadпроверить log.целостность данных и управлять версиями ячеек и транзакций. WAL в Postgres usesимеет thisнесколько logрежимов, toкоторые ensureможно dataнастроить, integrityи andдля manageработы row versions and transactions. Postgres'Debezium WAL hasрежим severalдолжен modesбыть youуказан canкак configurereplica. itДавайте to,это and for Debezium to work, the WAL level must be set to replica. Let's change that now.настроим.

psql> alter system set wal_level to 'replica';

Возможно понадобится рестарт Postgres контейнера для применения настройки.

YouЕсть mayеще needодин to restart theплагин Postgres containerне forвключенный thisв changeобраз toкоторый takeмы effect.

используем,

Thereпоэтому isнам one Postgres plugin not included with the image we used that we will need:понадобится wal2json. Debezium canможет workработать withи eitherс wal2json orи с protobuf. ForДля thisэтой tutorial,инструкции, weмы willбудем useиспользовать wal2json. AsТак itsкак nameон implies,согласно itимени convertsпереводит Postgres'Postgres write-aheadWAL logsлог toв JSON format.формат.

WithС ourпомощью Dockerзапущенного appDocker, running,в let'sручном manuallyрежиме installустановим wal2json usingисполльзуя aptitude. ToЧтобы getдобраться toдо the shell of theкосноли Postgres container,контейнера, firstдля findначала the containerнайдем ID andконтейнера thenи runвыполним theследующий followingнабор command to open bash:команд:


$ docker ps

CONTAINER ID IMAGE
c429f6d35017 debezium/connect
7d908378d1cf debezium/kafka
cc3b1f05e552 debezium/zookeeper
4a10f43aad19 postgres:latest

$ docker exec -ti 4a10f43aad19 bash

NowТеперь, thatкогда we'reмы insideвнутри theконтейнера container,давайте let's installпоставим wal2json:

$ apt-get update && apt-get install postgresql-13-wal2json

Активируем Debezium

ActivateМы Debezium

можем

We'reобщаться ready to activate Debezium!

We can communicate withс Debezium by makingделая HTTP requestsзапросы. toДля it.этого We need to make aнужен POST requestзапрос whoseданные dataкоторого isотформатированны a configuration inв JSON format. Thisформате. JSON definesопределяет theпараметры parametersконнектора ofкоторый theмы connectorпытаемся we'reсоздать. attemptingПоместим toданные create.в We'llфайл putи theбудем configurationего JSONиспользовать intoс a file and then use cURL to send it to Debezium.

You have several configuration options at this point. This is where you can use a whitelist or blacklist if you only want Debezium to stream certain tables (or avoid certain tables).

У нас есть несколько конфигурационных опций на данный момент. Тут можно использовать белый или черный списки если вы хотите чтобы Debezium отображал только определенные таблицы(или для избежания определенных таблиц)

$ echo '
{
    "name": "arctype-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "plugin.name": "wal2json",
        "database.hostname": "db",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "arctype",
        "database.dbname": "postgres",
        "database.server.name": "ARCTYPE",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "snapshot.mode": "always"
    }
}
' > debezium.json

Теперь можно отправить эту конфигурацию в Debezium

Now we can send this configuration to Debezium.

$ curl -i -X POST 
\ -H "Accept:application/json"
\ -H "Content-Type:application/json"
\ 127.0.0.1:8083/connectors/
\ --data "@debezium.json"

TheОтвет responseдолжен willбыть beсо aследующим содержанием JSON representationесли ofэто theуже newlyне initiatedнастроенный connector.коннектор.

{
  "name": "arctype-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "wal2json",
    "database.hostname": "db",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "arctype",
    "database.dbname": "postgres",
    "database.server.name": "ARCTYPE",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "snapshot.mode": "always",
    "name": "arctype-connector"
  },
  "tasks": [],
  "type": "source"
}

Проверим настройку потоковой передачи Kafka

TestТеперь после вставки обновления или удаления записей мы будем использовать изменения как новое сообщение в Kafka Streamingтопике Setup

связанной

Nowс we are streaming! After inserting, updating, or deleting a record, we will see the changes as a new message in the Kafka topic associated with the table.таблицей. Kafka Connect will createсоздаст 1 topicтопик perдля SQL table.таблицы. ToЧтобы verifyпроверить thatчто thisвсё isработает workingверно, correctly,нам we'llнужно need to monitor theмониторить Kafka topic.топик.

Kafka comesидет with someс shell scriptsскриптами thatкоторые helpпомогают youвам pokeвставлять aroundваши yourнастройки Kafka. Это удобно когда вы хотите проверить вашу конфигурацию и её удобно включать в Docker образ который мы используем. Первый, который мы будем использовать список всех топиков в нашем Kafka configuration.кластере. TheyДавайте areзапустим handyи whenпроверим youчто wantмы toвидим testтопик yourдля configuration and are conveniently included in the Docker image we are using. The first one we'll use lists all of the topics in your Kafka cluster. Let's run it and verify that we see a topic for ourнашей test table.таблицы.

$ docker exec -it 
\ $(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }')
\ /kafka/bin/kafka-topics.sh
\ --bootstrap-server localhost:9092 --list

ARCTYPE.public.test __consumer_offsets my_connect_configs my_connect_offsets my_connect_statuses

Встроенный в инструмент Kafka требует указания --bootstrap-server. Он ссылается на bootstrap потому, что вы обычно запускаете Kafka как кластер с несколькими нодами, и вам нужно один из них, который "выставлен наружу" чтобы зайти в кластер. Kafka обрабатывает все остальное самостоятельно.

Вы можете увидеть нашу test таблицу в списке ARCTYPE.public.test. Первая часть, ARCTYPE - это префикс который мы настроили для database.server.name поле в настройках JSON. Вторая часть отражает схему Postgres таблицы в ней, в последней части название таблицы. При добавлении Kafka производителей и приложений с потоковыми данными, количество топиков будет увеличиваться, поэтому удобно указывать префиксы, чтобы проще идентифицировать какой из топиков относится к таблице в бд.

Теперь монжо использовать другой инструмент называемым консольый потребитель для слежения за топиками в реальном времени. Называется он "console consumer" потому, что это типа потребителя kafka - утилита которая постребляет сообщения из топика и что-нибудь делает с ним. Потребитель может делать что угодно с данными которые он потребяет и консоль потребителя ничего не делает кроме как выодит эти сообщения в консоль.

$ docker exec allows you to execute a command inside of a container without having to enter into its shell. Docker requires you to specify the container id when you use docker exec. When you re-create docker containers, the ID will change, making -it futile to memorize that ID.\
  $(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }') finds the correct container ID by listing all active docker containers (docker ps), running them through grep to find the one that is running pure Kafka and then using awk to cherry-pick just the first column of the output, which will be the container id. the $() syntax runs a command and inserts its output in place.

The built-in Kafka tools require you to specify --bootstrap-server. They refer to it as bootstrap because you'll usually run Kafka as a cluster with several nodes, and you need one of them that is public-facing for your consumer to "enter the mix." Kafka handles the rest on its own.

You can see our test table is listed as ARCTYPE.public.test. The first part, ARCTYPE, is a prefix that we set with the database.server.name field in the JSON configuration. The second part represents which Postgres schema the table is in, and the last part is the table name. Once you write more Kafka producers and stream applications, you'll have many more topics, so it's helpful to set the prefix to make it easy to identify which topics are pure SQL tables.

Now we can use another tool called the console consumer to watch the topic in real-time. It's called "console consumer" because it is a type of Kafka "Consumer"— a utility that consumes messages from a topic and does something with them. A consumer can do anything with the data it ingests, and the console consumer does nothing besides print it out to the console.

$ docker exec -it
$(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }')
\ /kafka/bin/kafka-console-consumer.sh
\ --bootstrap-server localhost:9092
\ --topic ARCTYPE.public.test

ByПо default,умолчанию, theконсольный consoleпотребитель, consumerпотребляет onlyтолько consumesсообщения messagesу itнего hasn'tуже already.не Ifбыло. youЕсли wantвы toхотите seeувидеть everyвсе messageсообщения inв aтопике topic,нужно youдобавить canключ add --from-beginning toв theкоманду consoleзапуска command..

NowТеперь thatнаш ourпотребитель consumerследить isза watchingновыми theсообщенямии topicв forтопике, newа messages,мы weзапустим runINSERT anи INSERTпосмотрим and watch for output.вывод.

postgres=# insert into test (name) values ('Arctype Kafka Test!');
INSERT 0 1

BackВернемся onк ourнашему Kafka consumer:потребителю:

$ docker exec -it $(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }') /kafka/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic ARCTYPE.public.test
...
{
  "before": null,
  "after": {
    "id": 8,
    "name": "Arctype Kafka Test!"
  },
  "source": {
    "version": "1.5.0.Final",
    "connector": "postgresql",
    "name": "ARCTYPE",
    "ts_ms": 1621913280954,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"22995096"22995096\",\"22995096"22995096\"]",
    "schema": "public",
    "table": "test",
    "txId": 500,
    "lsn": 22995288,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1621913280982,
  "transaction": null
}

AlongВ withместе someс metadata,мета youданными canвы seeможете theувидеть primaryглавный keyключ and theполя name fieldзаписки ofкоторую theвы record we inserted! Conclusionдобавили.

Выводы

Let'sДавайте congratulateскоординируемся, ourselvesтак asкак weмы have set upимеем Postgres toдля streamпередачи itsданных data to aв Kafka cluster!кластер. StayВо tunedвторой forчасти, Partмы 2, where we'll build up someпостроим SQL schemaсхему forчтобы ourулучшить financialнаше appприложение, andдля beginвычисления to leverage Kafka to run calculations on our data.данных.