PostgreSQL 13.1 源码编译
下载地址:
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html#license-lightbox
kafka_2.13-2.7.0.tgz
下载地址:
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz
4. Debezium-connector-postgres
debezium-connector-postgres-1.4.1.Final-plugin.tar.gz
下载地址:
https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.4.1.Final/debezium-connector-postgres-1.4.1.Final-plugin.tar.gz
# tar zxvf jdk-8u281-linux-x64.tar.gz -C /opt/
# vi /etc/profile
#### zookeeper ####
export JAVA_HOME=/opt/jdk1.8.0_281
export CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib
export PATH=$JAVA_HOME/bin:$PATH
# source /etc/profile
# java -version
java version "1.8.0_281"
Java(TM) SE Runtime Environment (build 1.8.0_281-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.281-b09, mixed mode)
#
# tar zxvf kafka_2.13-2.7.0.tgz -C /opt/
# cd /opt/kafka_2.13-2.7.0/config
# vi /etc/profile
#### zookeeper ####
export JAVA_HOME=/opt/jdk1.8.0_281
export KAFKA_HOME=/opt/kafka_2.13-2.7.0
export CLASSPATH=$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$KAFKA_HOME/lib
export PATH=$KAFKA_HOME/bin:$JAVA_HOME/bin:$PATH
#### 启动zookeeper #####
# cd /opt/kafka_2.13-2.7.0/
# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#### 启动kafka #####
# nohup bin/kafka-server-start.sh config/server.properties &
### 创建topic ###
# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
#
### 显示topic ###
# kafka-topics.sh -list -zookeeper localhost:2181
test
#
创建kafka生产者:
# kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>test_msg
>test_topic
>
创建kafka消费者:
# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
test_msg
test_topic
解释:
--broker-list:默认端口为9092.可自行更改
postgres=# create user sync_user with replication password 'sync_user';
postgres=# alter user sync_user with superuser ;
postgres=# create database tdb01;
CREATE DATABASE
postgres=# CREATE TABLE customers (
id SERIAL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
pg_hba.conf
# TYPE DATABASE USER ADDRESS METHOD
# "local" is for Unix domain socket connections only
local all all trust
# IPv4 local connections:
host all all 127.0.0.1/32 trust
host all all 0.0.0.0/0 md5
# IPv6 local connections:
host all all ::1/128 trust
# Allow replication connections from localhost, by a user with the
# replication privilege.
local replication all trust
host replication all 127.0.0.1/32 trust
host replication all 192.168.1.0/24 md5
$
### 配置文件修改 ####
$ vi postgresql.conf
wal_level = logical
#max_wal_senders = 10
#max_replication_slots = 10
#### reboot database ####
$ pg_ctl stop –D /pgdata
$ pg_ctl start –D /pgdata
# tar zxvf debezium-connector-postgres-1.4.1.Final-plugin.tar.gz -C /opt/
# cd /opt/debezium-connector-postgres/
# cp *.jar /opt/kafka_2.13-2.7.0/libs/
# cd /opt/kafka_2.13-2.7.0/config
# vi kafka-postgres.properties
name=postgres-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=192.168.1.141
database.port=5432
database.user=sync_user
database.password=sync_user
database.dbname=tdb01
database.server.name=tdb01
plugin.name=pgoutput
table.whitelist=
errors.log.enable=true
errors.logs.include.messages=true
#
#### 数据库端 ####
postgres=# c tdb01
tdb01=# insert into customers(first_name,last_name,email) values ('he','mia','mia@qq.com');
INSERT 0 1
tdb01=#
#### kafka端 #####
# kafka-topics.sh -list -zookeeper localhost:2181
tdb01.public.customers
# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tdb01.public.customers --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"tdb01.public.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"tdb01.public.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"tdb01.public.customers.Envelope"},"payload":{"before":null,"after":{"id":1,"first_name":"he","last_name":"mia","email":"mia@qq.com"},"source":{"version":"1.4.1.Final","connector":"postgresql","name":"tdb01","ts_ms":1612544437937,"snapshot":"false","db":"tdb01","schema":"public","table":"customers","txId":493,"lsn":23092184,"xmin":null},"op":"c","ts_ms":1612515639176,"transaction":null}}