KAFKA Producer / Consumer in JAVA

apache-kafka

Salam,

Bu postda sizə Javada yazılmış simple Producer ve Consumer API -ları bölüşəcəyəm.

Producer API:

package kafka.example.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;

import java.util.Properties;

public class ProducerAPI {
public static void main(String[] args) {

Properties properties = new Properties();

// can be manual but the best practice is to use Kafka Config
//properties.put("bootstrap.servers","localhost:9092");
//properties.put("key.serializer", IntegerSerializer.class.getName());
//properties.put("value.serializer", IntegerSerializer.class.getName());

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());


KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);

for(int i=2; i<=100; i++){

ProducerRecord<Integer, String> record = new ProducerRecord<>("my-first-topic",i,"Msg from Java #"+i);

producer.send(record);

}

producer.close();

}
}

Consumer API

package kafka.example.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerAPI {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-6681");

        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(properties);

        consumer.subscribe(Arrays.asList("my-first-topic"));

        try {
            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(2));
                for (ConsumerRecord record: records) {
                    String message = record.value().toString();
                    String topic = record.topic();
                    int partition = record.partition();

                    System.out.println("DATA: " + message + " TOPIC: " + topic + " PARTITION: " + partition);
                }
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            consumer.close();
        }

    }
}