KAFKA Producer / Consumer in JAVA
Salam,
Bu postda sizə Javada yazılmış simple Producer ve Consumer API -ları bölüşəcəyəm.
Producer API:
package kafka.example.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import java.util.Properties;
public class ProducerAPI {
public static void main(String[] args) {
Properties properties = new Properties();
// can be manual but the best practice is to use Kafka Config
//properties.put("bootstrap.servers","localhost:9092");
//properties.put("key.serializer", IntegerSerializer.class.getName());
//properties.put("value.serializer", IntegerSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
for(int i=2; i<=100; i++){
ProducerRecord<Integer, String> record = new ProducerRecord<>("my-first-topic",i,"Msg from Java #"+i);
producer.send(record);
}
producer.close();
}
}
Consumer API
package kafka.example.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerAPI {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-6681");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(Arrays.asList("my-first-topic"));
try {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord record: records) {
String message = record.value().toString();
String topic = record.topic();
int partition = record.partition();
System.out.println("DATA: " + message + " TOPIC: " + topic + " PARTITION: " + partition);
}
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
consumer.close();
}
}
}