মডিউল ৫: কাফকা কনফিগারেশন এবং ফ্রেমওয়ার্ক ইন্টিগ্রেশন
কাফকা ক্লাস্টার এবং টপিকগুলোকে সঠিকভাবে পরিচালনা করার জন্য কনফিগারেশন বোঝা অত্যন্ত জরুরি। এই মডিউলে আমরা ব্রোকার এবং টপিক লেভেলের কনফিগারেশন এবং বিভিন্ন জনপ্রিয় ফ্রেমওয়ার্কে কাফকা ব্যবহারের পূর্ণাঙ্গ প্রসেস দেখব।
১. ব্রোকার লেভেল কনফিগারেশন (Broker Level)
এই কনফিগারেশনগুলো পুরো কাফকা ক্লাস্টারের ওপর প্রভাব ফেলে এবং এগুলো config/server.properties ফাইলে সেট করা হয়।
- log.retention.hours: কাফকা কতক্ষণ মেসেজগুলো স্টোর করে রাখবে। ডিফল্টভাবে এটি ১৬৮ ঘণ্টা (৭ দিন)।
- message.max.bytes: একটি সিঙ্গেল মেসেজের সর্বোচ্চ সাইজ কত হতে পারবে। ডিফল্ট ১ মেগাবাইট।
- auto.create.topics.enable: যদি কোনো প্রডিউসার এমন টপিকে মেসেজ পাঠায় যা এখনো তৈরি হয়নি, তবে কি কাফকা নিজে থেকেই সেটি তৈরি করবে? প্রোডাকশনে এটি
falseরাখা ভালো।
২. টপিক লেভেল কনফিগারেশন (Topic Level)
আপনি প্রতিটি টপিকের জন্য আলাদা আলাদা কনফিগারেশন সেট করতে পারেন।
টপিক কনফিগারেশন পরিবর্তন করা (Changing a topic configuration):
সার্ভার চালু থাকা অবস্থায় আপনি নির্দিষ্ট টপিকের কনফিগারেশন পরিবর্তন করতে পারেন:
- Retention time: একটি টপিক কতক্ষণ ডেটা রাখবে (যেমন:
retention.ms=86400000)। - Retention bytes: একটি টপিক সর্বোচ্চ কত সাইজের ডেটা রাখবে (যেমন:
retention.bytes=1073741824)।
উদাহরণ (কনফিগারেশন পরিবর্তন):
# Retention সময় ৭ দিন থেকে কমিয়ে ১ দিন করা
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config retention.ms=86400000লগ ক্লিনআপ পলিসি (Log Cleanup policies):
১. Delete: পুরনো ডেটা নির্দিষ্ট সময় বা সাইজ অতিক্রম করলে ডিলিট করে দেয় (ডিফল্ট)। ২. Compact: একই কী (Key) এর মেসেজগুলোর মধ্যে শুধুমাত্র লেটেস্ট মেসেজটি রাখে (Log Compaction)।
৩. ফ্রেমওয়ার্ক ইন্টিগ্রেশন (Full Process)
বিভিন্ন ল্যাঙ্গুয়েজ এবং ফ্রেমওয়ার্কে কাফকা কিভাবে কনফিগার এবং ব্যবহার করবেন তার উদাহরণ নিচে দেওয়া হলো।
Java (Spring Boot)
Spring Boot-এ spring-kafka ডিপেনডেন্সি ব্যবহার করা হয়।
// application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
// Producer Service
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}Python (FastAPI & Django/DRF)
A. FastAPI (using AIOCafka)
# app.py
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer
import asyncio
app = FastAPI()
@app.on_event("startup")
async def startup():
app.producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
await app.producer.start()
@app.post("/notify")
async def notify(msg: str):
await app.producer.send_and_wait("my-topic", msg.encode())
return {"status": "sent"}B. Django / DRF (using confluent-kafka)
# settings.py
KAFKA_CONFIG = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'django-group',
'auto.offset.reset': 'earliest'
}
# producer.py
from confluent_kafka import Producer
import json
def send_to_kafka(data):
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce('user-events', json.dumps(data).encode('utf-8'))
p.flush()Node.js (Express.js & NestJS)
A. Express.js (using KafkaJS)
const { Kafka } = require("kafkajs");
const kafka = new Kafka({ clientId: "my-app", brokers: ["localhost:9092"] });
const producer = kafka.producer();
const startProducer = async () => {
await producer.connect();
await producer.send({
topic: "test-topic",
messages: [{ value: "Hello Kafka from Express!" }],
});
};B. NestJS
NestJS-এ বিল্ট-ইন মাইক্রোসার্ভিস মডিউল আছে।
// main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: { brokers: ["localhost:9092"] },
consumer: { groupId: "nestjs-consumer" },
},
},
);Laravel (PHP)
Laravel-এ mateusjunges/laravel-kafka প্যাকেজটি জনপ্রিয়।
// config/kafka.php তে ব্রোকার সেট করুন
use Junges\Kafka\Facades\Kafka;
// Producer
Kafka::publishOn('my-topic')
->withBody(['message' => 'Hello from Laravel'])
->send();
// Consumer (Command)
Kafka::createConsumer(['my-topic'])
->withHandler(new Handler())
->build()
->consume();IMPORTANT
প্রতিটি ফ্রেমওয়ার্কে কাফকা ক্লায়েন্ট ব্যবহারের আগে সংশ্লিষ্ট লাইব্রেরি বা প্যাকেজগুলো ইন্সটল করে নিতে হবে (যেমন: npm install kafkajs, pip install aiokafka, ইত্যাদি)।
TIP
রিয়েল-টাইম অ্যাপ্লিকেশনে কানেকশন পুলিং এবং এরর হ্যান্ডলিং (Retry mechanism) কনফিগার করা খুব জরুরি।