124. The biggest path and

2023-01-23   ES  

target

This article is the reproduction and organizing of [1]

environment

component version
Zookeeper 3.6.0
Kafka 2.5.0
Mysql 8.0.21-0ubuntu0.20.04.4

 

Preparation

respectively build two databases A and B, and then create a new form for each new form

mysql> create database A;
Query OK, 1 row affected (0.12 sec)

mysql> create database B;
Query OK, 1 row affected (0.08 sec)

mysql> use A;
Database changed
mysql> CREATE TABLE `person` (
    ->   `pid` int(11) NOT NULL AUTO_INCREMENT,
    ->   `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
    ->   `age` int(11) DEFAULT NULL,
    ->   PRIMARY KEY (`pid`)
    -> ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Query OK, 0 rows affected, 3 warnings (0.82 sec)

mysql> use B;
Database changed
mysql> CREATE TABLE `kafkaperson` (
    ->   `pid` int(11) NOT NULL AUTO_INCREMENT,
    ->   `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
    ->   `age` int(11) DEFAULT NULL,
    ->   PRIMARY KEY (`pid`)
    -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Query OK, 0 rows affected, 5 warnings (0.49 sec)
 

cluster start

Start Hadoop, ZooKeeper and Kafka

Test

① Producer:

$KAFKA/bin/kafka-topics.sh --create --zookeeper Desktop:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person

② Consumers

$KAFKA/bin/connect-standalone.sh $KAFKA/config/connect-standalone.properties $KAFKA/config/quickstart-mysql.properties $KAFKA/config/quickstart-mysql-sink.properties

③ Insert data into a table A

mysql> INSERT INTO person (pid,firstname,age) VALUES ( 1, ‘zs’,66);
Query OK, 1 row affected (0.07 sec)

mysql> select * from person;
+—–+———–+——+
| pid | firstname | age  |
+—–+———–+——+
|   1 | zs        |   66 |
+—–+———–+——+
1 row in set (0.00 sec)

④mysql> use B;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+————-+
| Tables_in_B |
+————-+
| kafkaperson |
+————-+
1 row in set (0.00 sec)

mysql> select * from kafkaperson
    -> ;
+—–+———–+——+
| pid | firstname | age  |
+—–+———–+——+
|   1 | zs        |   66 |
+—–+———–+——+
1 row in set (0.00 sec)
 

You can see that the data of the MySQL table A is successfully conveyed to Table B through Kafka, and the relevant information will also be seen at our Kafka terminal:

Appendix

quickstart-mysql.properties

name = MySQL-A-SOURCE-PERSON 
 connector.class = IO.CONFLUENT.CONNECT.JDBC.JDBCSourceConnector 
 tasks.max = 1 
 connection.url = jdbc: MySQL: // desktop: 3306/a? User = AppleYuchi & Password = AppleYuchi 
 # incrementing self -increase 
 MODE = Incrementing 
 # I PID 
 incrementing.column.name = PID 
 # Person 
 table.whitelist = Person 
 # Topic prefix mysql-kafka- 
 topic.prefix = MySQL-KAFKA--

quickstart-mysql-sink.properties

name = MySQL-A-Sink-Person 
 Connector.class = io.confluent.connect.jdbc.jdbcsinkConnector 
 tasks.max = 1 
 #kafka's topic name 
 topics = MySQL-Kafka-Person 
 # Configure jdbc link 
 connection.url = jdbc: MySQL: // desktop: 3306/b? User = AppleYuchi & Password = AppleYuchi 
 # Do not automatically create a table, if it is True, the table will automatically create a Topic name 
 auto.create = false 
 # Upsert Model Update and Insert 
 Insert.mode = UPSERT 
 # The two parameters below are configured with pid as the main key update 
 pk.mode = record_value 
 pk.fields = pid 
 #Table name is kafkkatable 
 table.name.Format = kafkkaperson

Reference:

[1]Kafka Connect Implement MYSQL incremental synchronization

[2]Kafka Connect quickly builds a data ETL channel

[3]kafka connect logging

source

Related Posts

Ubuntu Install an ipset accurate posture

C ++ The difference between the position order of the position of the global variable and the local variable storage

Lesson 25 Notes 14.1 ~ 15.5justdoyan

Uniapp uniuni-list-item Click event invalid

124. The biggest path and

Random Posts

Spring Boot Static page display

java read Excel 2007.xlsx file

Python Frequently Asked Questions Summary

Enter a string through the keyboard to determine the most characters in the string and output the character and output number of times

Spring integrated Redis tutorial