Saturday, November 9, 2019

How to configure Kafka connectors

In this message I will show the very basics to connect a database to a Kafka topic in both directions:

Source: Database to topic.
Sink: Topic to database.

I assume that the reader is familiar with databases. I started the database server and I created one database called project3. As the reader might see, it has no tables:

mysql> create database project3;
Query OK, 1 row affected (0,00 sec)

mysql> use project3;
Database changed
mysql> show tables;

Empty set (0,01 sec)

I will connect this database to two Kafka topics. To run Kafka, to use a producer and a consumer, it is better to follow the official instructions here. I assume that you are familiar with these operations. Let me first do the source. I will create a table with countries and add a couple of countries to the table:

create table countries(
    country_id INT NOT NULL AUTO_INCREMENT,
    country_name VARCHAR(40) NOT NULL,
    PRIMARY KEY ( country_id )
);



INSERT INTO countries (country_name) VALUES ("Germany");

INSERT INTO countries (country_name) VALUES ("Poland");

INSERT INTO countries (country_name) VALUES ("United Kingdom");

INSERT INTO countries (country_name) VALUES ("Italy");

INSERT INTO countries (country_name) VALUES ("Portugal");



Source

The source connector will read this data and add it to a topic. For this, we need a configuration file for the connector, which looks like this:


name=jdbc-source-filipe
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://localhost:3306/project3
connection.user=user
connection.password=pass
dialect.name=MySqlDatabaseDialect
mode=bulk
query=SELECT * FROM countries;
poll.interval.ms=60000
topic.prefix=project3fromDB

I added this file to the config directory of Kafka, under the name connect-jdbc-source-filipe.properties. Before starting the connector, I needed to add two library files to the libs directory (A word of gratitude to João Soares, who reminded me of this):

  • a Kafka JDBC connector, which I downloaded from Confluence.
  • and the JDBC connector for MySQL, which I downloaded from MySQL.

Now, I can start the connect and expect it to work:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc-source-filipe.properties

To see the result I start a consumer for the topic I mentioned in the configuration file:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic project3fromDB --from-beginning

I then got the following result:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":1,"country_name":"Germany"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":2,"country_name":"Poland"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":3,"country_name":"United Kingdom"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":4,"country_name":"Italy"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"country_id"},{"type":"string","optional":false,"field":"country_name"}],"optional":false},"payload":{"country_id":5,"country_name":"Portugal"}}


Sink

For the sink, the format is the following:

name=jdbc-sink-filipe
tasks.max=1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:mysql://localhost:3306/project3
connection.user=user
connection.password=pass
dialect.name=MySqlDatabaseDialect
topics=project3toDB
auto.create=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true

value.converter.schemas.enable=true

I learnt the hard way that we might need a key converter, in this case a StringConverter. In a different situation I lost a couple of hours before realizing that a key ("1" in the case) was being written to the topic as a String, as opposed to a complete JSON schema  (as follows), thus making the sink crash. Hence we need to explicitly set it as a String (or alternatively write a null string to the topic).


I can run the source and the sink together:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-jdbc-source-filipe.properties config/connect-jdbc-sink-filipe.properties

If I open a producer, as follows:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic project3toDB

and write the following line, which is a valid JSON, with a schema and a payload:

{"schema":{"type":"struct","fields":[{"type":"double","optional":false,"field":"revenue"},{"type":"double","optional":false,"field":"expenses"},{"type":"double","optional":false,"field":"profit"}],"optional":false,"name":"total data"},"payload":{"revenue":988500.0, "expenses":731430.0,"profit":257070.0}}

This is the "complete JSON schema" I mentioned. We want it in the value, not in the key, which is exactly what the kafka-console-producer.sh will do.

Now I get the following result on the database:

mysql> show tables;
+--------------------+
| Tables_in_project3 |
+--------------------+
| countries          |
| project3toDB       |
+--------------------+

2 rows in set (0,00 sec)

and the following entries:

mysql> select * from project3toDB;
+---------+--------+----------+
| revenue | profit | expenses |
+---------+--------+----------+
|  988500 | 257070 |   731430 |
+---------+--------+----------+

1 row in set (0,00 sec)

Note that the connector created the table for me, when I sent the well-formed JSON data to the topic. If I send new data, this particular configuration of the connector will add more lines.

1 comment:

  1. I would recommend adding a paragraph just to alert the readers that might have downloaded kafka from apache website that they possibly need to include the confluence library for a jdbc connection and also the specific db connector library. I've put the jar's in the kafka/lib folder. The confluence library can be downloaded from here https://gofile.io/?c=TLQet0

    ReplyDelete