Source: Database to topic.
Sink: Topic to database.
I assume that the reader is familiar with databases. I started the database server and I created one database called project3. As the reader might see, it has no tables:
mysql> create database project3;
Query OK, 1 row affected (0,00 sec)
mysql> use project3;
Database changed
mysql> show tables;
Empty set (0,01 sec)
create table countries(
country_id INT NOT NULL AUTO_INCREMENT,
country_name VARCHAR(40) NOT NULL,
PRIMARY KEY ( country_id )
);
INSERT INTO countries (country_name) VALUES ("Germany");
INSERT INTO countries (country_name) VALUES ("Poland");
INSERT INTO countries (country_name) VALUES ("United Kingdom");
INSERT INTO countries (country_name) VALUES ("Italy");
INSERT INTO countries (country_name) VALUES ("Portugal");
Source
The source connector will read this data and add it to a topic. For this, we need a configuration file for the connector, which looks like this:
name=jdbc-source-filipe
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://localhost:3306/project3
connection.user=user
connection.password=pass
dialect.name=MySqlDatabaseDialect
mode=bulk
query=SELECT * FROM countries;
poll.interval.ms=60000
topic.prefix=project3fromDB
|
I added this file to the config directory of Kafka, under the name connect-jdbc-source-filipe.properties. Before starting the connector, I needed to add two library files to the libs directory (A word of gratitude to João Soares, who reminded me of this):
- a Kafka JDBC connector, which I downloaded from Confluence.
- and the JDBC connector for MySQL, which I downloaded from MySQL.
Now, I can start the connect and expect it to work:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc-source-filipe.properties
To see the result I start a consumer for the topic I mentioned in the configuration file:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic project3fromDB --from-beginning
I then got the following result:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":1,"country_name":"Germany"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":2,"country_name":"Poland"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":3,"country_name":"United Kingdom"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":4,"country_name":"Italy"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":5,"country_name":"Portugal"}}Sink
For the sink, the format is the following:
name=jdbc-sink-filipe
tasks.max=1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:mysql://localhost:3306/project3
connection.user=user
connection.password=pass
dialect.name=MySqlDatabaseDialect
topics=project3toDB
auto.create=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
|
I learnt the hard way that we might need a key converter, in this case a StringConverter. In a different situation I lost a couple of hours before realizing that a key ("1" in the case) was being written to the topic as a String, as opposed to a complete JSON schema (as follows), thus making the sink crash. Hence we need to explicitly set it as a String (or alternatively write a null string to the topic).
I can run the source and the sink together:
I can run the source and the sink together:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc-source-filipe.properties config/connect-jdbc-sink-filipe.properties
If I open a producer, as follows:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic project3toDB
{"schema":{"type":"struct","fields":[{"type":"double","optional":false,"field":"revenue"},{"type":"double","optional":false,"field":"expenses"},{"type":"double","optional":false,"field":"profit"}],"optional":false,"name":"total data"},"payload":{"revenue":988500.0, "expenses":731430.0,"profit":257070.0}}
Now I get the following result on the database:
mysql> show tables;
+--------------------+
| Tables_in_project3 |
+--------------------+
| countries |
| project3toDB |
+--------------------+
2 rows in set (0,00 sec)
mysql> select * from project3toDB;
+---------+--------+----------+
| revenue | profit | expenses |
+---------+--------+----------+
| 988500 | 257070 | 731430 |
+---------+--------+----------+
1 row in set (0,00 sec)
I would recommend adding a paragraph just to alert the readers that might have downloaded kafka from apache website that they possibly need to include the confluence library for a jdbc connection and also the specific db connector library. I've put the jar's in the kafka/lib folder. The confluence library can be downloaded from here https://gofile.io/?c=TLQet0
ReplyDelete