圈子推荐
查看所有吧>>
活跃用户
    PostgreSQL CDC Kafka With Debezium

    一、软件列表


    1.    PostgreSQL

    PostgreSQL 13.1 源码编译

    2.    JDK

    jdk-8u281-linux-x64.rpm

     

    下载地址:

    https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html#license-lightbox

     

    3.    Apache Kafka

    kafka_2.13-2.7.0.tgz

    下载地址:

    https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz

     

    4.    Debezium-connector-postgres

    debezium-connector-postgres-1.4.1.Final-plugin.tar.gz

    下载地址:

    https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.4.1.Final/debezium-connector-postgres-1.4.1.Final-plugin.tar.gz

     

    二、 软件安装


    1.    安装JDK

    # tar zxvf jdk-8u281-linux-x64.tar.gz -C /opt/

    # vi /etc/profile

    #### zookeeper ####

    export JAVA_HOME=/opt/jdk1.8.0_281

    export CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib

    export PATH=$JAVA_HOME/bin:$PATH

     

    # source /etc/profile

    # java -version

    java version "1.8.0_281"

    Java(TM) SE Runtime Environment (build 1.8.0_281-b09)

    Java HotSpot(TM) 64-Bit Server VM (build 25.281-b09, mixed mode)

    #

     

    2.     安装Kafka

     

    # tar zxvf kafka_2.13-2.7.0.tgz -C /opt/

    # cd /opt/kafka_2.13-2.7.0/config

     

    # vi /etc/profile

     

    #### zookeeper ####

    export JAVA_HOME=/opt/jdk1.8.0_281

    export KAFKA_HOME=/opt/kafka_2.13-2.7.0

    export CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$KAFKA_HOME/lib

    export PATH=$KAFKA_HOME/bin:$JAVA_HOME/bin:$PATH

     

    #### 启动zookeeper #####

    # cd /opt/kafka_2.13-2.7.0/

    # nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

     

    #### 启动kafka #####

    # nohup bin/kafka-server-start.sh config/server.properties &

     

    ### 创建topic ###

    # kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    Created topic test.

    #

     

    ### 显示topic ###

    # kafka-topics.sh -list -zookeeper localhost:2181

    test

    #

     

    创建kafka生产者:

    # kafka-console-producer.sh --broker-list localhost:9092 --topic test

    >hello

    >test_msg

    >test_topic

     

    创建kafka消费者:

    # kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    hello

    test_msg

    test_topic 

     

    解释:

    --broker-list:默认端口为9092.可自行更改

     

     

     

    3.    数据库准备

     

    postgres=# create user sync_user with replication password 'sync_user';

    postgres=# alter user sync_user with superuser ;

    postgres=# create database tdb01;

    CREATE DATABASE

    postgres=# CREATE TABLE customers (

      id SERIAL,

      first_name VARCHAR(255) NOT NULL,

      last_name VARCHAR(255) NOT NULL,

      email VARCHAR(255) NOT NULL,

      PRIMARY KEY(id)

    );

     

    pg_hba.conf

     

    # TYPE  DATABASE        USER            ADDRESS                 METHOD

     

    # "local" is for Unix domain socket connections only

    local   all             all                                     trust

    # IPv4 local connections:

    host    all             all             127.0.0.1/32            trust

    host    all             all             0.0.0.0/0               md5

    # IPv6 local connections:

    host    all             all             ::1/128                 trust

    # Allow replication connections from localhost, by a user with the

    # replication privilege.

    local   replication     all                                     trust

    host    replication     all             127.0.0.1/32            trust

    host    replication     all             192.168.1.0/24          md5

    $

     

    ### 配置文件修改 ####

    $ vi postgresql.conf

    wal_level = logical

    #max_wal_senders = 10

    #max_replication_slots = 10

     

    #### reboot database ####

    $ pg_ctl stop –D /pgdata

    $ pg_ctl start –D /pgdata

     

     

    4.    配置安装connect

     

     

    # tar zxvf debezium-connector-postgres-1.4.1.Final-plugin.tar.gz -C /opt/

    # cd /opt/debezium-connector-postgres/

    # cp *.jar /opt/kafka_2.13-2.7.0/libs/

    # cd /opt/kafka_2.13-2.7.0/config

    # vi kafka-postgres.properties

    name=postgres-connector

    connector.class=io.debezium.connector.postgresql.PostgresConnector

    database.hostname=192.168.1.141

    database.port=5432

    database.user=sync_user

    database.password=sync_user

    database.dbname=tdb01

    database.server.name=tdb01

    plugin.name=pgoutput

    table.whitelist=

    errors.log.enable=true

    errors.logs.include.messages=true

    #

     

     

    5.     数据库插入数据测试

     

    #### 数据库端 ####

    postgres=# c tdb01

    tdb01=# insert into customers(first_name,last_name,email) values ('he','mia','mia@qq.com');

    INSERT 0 1

    tdb01=#

     

    #### kafka端 #####

     

    # kafka-topics.sh -list -zookeeper localhost:2181

    tdb01.public.customers

    # kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tdb01.public.customers --from-beginning

    {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"tdb01.public.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"tdb01.public.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"tdb01.public.customers.Envelope"},"payload":{"before":null,"after":{"id":1,"first_name":"he","last_name":"mia","email":"mia@qq.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"tdb01","ts_ms":1612544437937,"snapshot":"false","db":"tdb01","schema":"public","table":"customers","txId":493,"lsn":23092184,"xmin":null},"op":"c","ts_ms":1612515639176,"transaction":null}}



    • 分享到:
    排序方式:回复时间 共有0条评论