target
This article is the reproduction and organizing of [1]
environment
component | version |
Zookeeper | 3.6.0 |
Kafka | 2.5.0 |
Mysql | 8.0.21-0ubuntu0.20.04.4 |
Preparation
respectively build two databases A and B, and then create a new form for each new form
mysql> create database A;
Query OK, 1 row affected (0.12 sec)
mysql> create database B;
Query OK, 1 row affected (0.08 sec)
mysql> use A;
Database changed
mysql> CREATE TABLE `person` (
-> `pid` int(11) NOT NULL AUTO_INCREMENT,
-> `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
-> `age` int(11) DEFAULT NULL,
-> PRIMARY KEY (`pid`)
-> ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Query OK, 0 rows affected, 3 warnings (0.82 sec)
mysql> use B;
Database changed
mysql> CREATE TABLE `kafkaperson` (
-> `pid` int(11) NOT NULL AUTO_INCREMENT,
-> `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
-> `age` int(11) DEFAULT NULL,
-> PRIMARY KEY (`pid`)
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Query OK, 0 rows affected, 5 warnings (0.49 sec)
cluster start
Start Hadoop, ZooKeeper and Kafka
Test
① Producer:
$KAFKA/bin/kafka-topics.sh --create --zookeeper Desktop:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person
② Consumers
$KAFKA/bin/connect-standalone.sh $KAFKA/config/connect-standalone.properties $KAFKA/config/quickstart-mysql.properties $KAFKA/config/quickstart-mysql-sink.properties
③ Insert data into a table A
mysql> INSERT INTO person (pid,firstname,age) VALUES ( 1, ‘zs’,66);
Query OK, 1 row affected (0.07 sec)
mysql> select * from person;
+—–+———–+——+
| pid | firstname | age |
+—–+———–+——+
| 1 | zs | 66 |
+—–+———–+——+
1 row in set (0.00 sec)
④mysql> use B;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+————-+
| Tables_in_B |
+————-+
| kafkaperson |
+————-+
1 row in set (0.00 sec)
mysql> select * from kafkaperson
-> ;
+—–+———–+——+
| pid | firstname | age |
+—–+———–+——+
| 1 | zs | 66 |
+—–+———–+——+
1 row in set (0.00 sec)
You can see that the data of the MySQL table A is successfully conveyed to Table B through Kafka, and the relevant information will also be seen at our Kafka terminal:
Appendix
quickstart-mysql.properties
name = MySQL-A-SOURCE-PERSON
connector.class = IO.CONFLUENT.CONNECT.JDBC.JDBCSourceConnector
tasks.max = 1
connection.url = jdbc: MySQL: // desktop: 3306/a? User = AppleYuchi & Password = AppleYuchi
# incrementing self -increase
MODE = Incrementing
# I PID
incrementing.column.name = PID
# Person
table.whitelist = Person
# Topic prefix mysql-kafka-
topic.prefix = MySQL-KAFKA--
quickstart-mysql-sink.properties
name = MySQL-A-Sink-Person
Connector.class = io.confluent.connect.jdbc.jdbcsinkConnector
tasks.max = 1
#kafka's topic name
topics = MySQL-Kafka-Person
# Configure jdbc link
connection.url = jdbc: MySQL: // desktop: 3306/b? User = AppleYuchi & Password = AppleYuchi
# Do not automatically create a table, if it is True, the table will automatically create a Topic name
auto.create = false
# Upsert Model Update and Insert
Insert.mode = UPSERT
# The two parameters below are configured with pid as the main key update
pk.mode = record_value
pk.fields = pid
#Table name is kafkkatable
table.name.Format = kafkkaperson
Reference:
[1]Kafka Connect Implement MYSQL incremental synchronization