Kafka Introduction: Difference between revisions
Line 288: | Line 288: | ||
===Kafka Consumer Polling=== | ===Kafka Consumer Polling=== | ||
[[File:Kafka Consumer Polling.png|600px]] | [[File:Kafka Consumer Polling.png|600px]] | ||
*SubscriptionState – content of collection passed to subscribe() used, source of truth for topics the consumer is subscribed, manages offsets. | |||
*ConsumerCoordinator – with metadata about the cluster, can take responsibility to coordinate the consumer (to coordinate SubcriptionState), two main responsibilities: | |||
been aware of dynamic/automatic partition reassignment and notification of assignment changes to the Notifications state object, | |||
committing offsets to the cluster – will cause the update of the subscription state. | |||
*Fetcher – responsible object for most communication between consumer and cluster. To start getting information should know what partitions or topics should asking for, it gets this information from the SubcriptionState object. | |||
*ConsumerNetworkClient – tcp/ip communication with cluster. | |||
*Polling timeout is the number of ms the network client will spent polling from the cluster getting messages before returning, the minimum amount of time retrieval will take. | |||
=Terms= | =Terms= |
Revision as of 06:54, 31 July 2021
Introduction
Definition
This is an introduction to Kafka which describes itself as a messaging system
Architecture
Cluster
It is a group of computers , each executing same instance of kafka broker.
Broker
It is just a meaningful name given to the kafka server, kafka producer does not directly interact with the consumer, they use kafka broker as the agent or broker to interact. In a cluster there can be more than one brokers.
Brokers are stateless, hence to maintain the cluster state they use ZooKeeper.
Zookeeper
ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka cluster system or failure of the broker in the Kafka cluster system. As per the notification received by the Zookeeper regarding presence or failure of the broker then producer and consumer takes decision and starts coordinating their task with some other broker.
Producers
Producer is a component which pushes data to the brokers, it doesn’t wait for acknowledgement from the brokers rather sends data as fast as the brokers can handle. There can be more than one producers depending on the use case.
Consumers
Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can kind of rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.
LinkedIn Worked Example
Here is some stats from LinkedIn
And here is there architeture pre-2010.
Post 2010 architecture.
This is the current scale.
Installation
Doing the install
To install I went with the instructions on https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04. To enable stat for zookeeper I enable it on the whitelist by changing kafka/config/zookeeper.properties to include
4lw.commands.whitelist=stat, ruok, conf, isro
Creating A Topic
We do this with the command
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic
Listing Topics
We do this with the command
~/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
Send Message to a Topic
We send a message with the command
~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
>Message 1
>Test Message 2
Receive Message from a Topic
We receive a message with the command
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
Partitions
What are Partitions
A partition represents the splitting up of messages from a topic. I.E when a message is received it can be delivered to one and only one of the available partitions.
Partition Trade offs
- The more partitions the greater the Zookeeper overhead
- Message order can become compplex
- More partitions the longer the leader fail-over time
Replication Factor
This is the number of copies of the messages that should be replicated. I.E. the number of copies of the data to keep.
Replication Status
We can view the status of the replication using the describe command. In the screenshot below we killed one of the brokers and then brought it back up. By comparing the ReplicationFactor with the Isr (In-sync replica) we can see the outage and then the return to normal state.
Producers
Introduction
Here is a visual representation of the Producer
Where to Direct Messages
The producer looks at the message and makes an decision on where to send the message based on configurable rules.
Message Buffering
This is starting to look like Oracle DB tuning. This shows the key players for deciding when to send records.
Example Producer
Create a default Maven project
For VS Code we can do this with
mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=nz.co.bibble.app -DartifactId=kafkasample
Add the Maven Client to Pom
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
Create the Producer
We just create and instance adding the parameters we would add at the command line. Then we need send the records
package nz.co.bibble.app;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> myProducer = new KafkaProducer<String,String>(props);
try {
for(int i = 0; i < 10; i++) {
myProducer.send(
new ProducerRecord<String,String>(
"my_topic",
Integer.toString(i),
"My Message: " + Integer.toString(i)
)
);
}
}
catch(Exception e) {
e.printStackTrace();
}
finally {
myProducer.close();
}
System.out.println( "Hello World!" );
}
}
Producer Performance Test
We can generate records with the test script to test consumers. Here is an example.
./kafka/bin/kafka-producer-perf-test.sh --topic my_topic --throughput 10 --num-records 50 --record-size 1 --producer-props bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
Delivery Guarantees
The broke and produce acknowledgments that the messages are received. Values are
- 0 Fire and Forget (useful for broadcast)
- 1 Leader acknowledged
- 2 Replication quorum acknowledged
You can then config what to do if an error. Options are
- retry
- retry.backoff.ms
Things to look into
- Custom Serializers
- Custom Partitioners
- Asynchronous Send
- Compression
- Advanced Settings
Consumers
Introduction
Here is a visual representation of the Consumer
Assigning vs Subscribe
Subscribe
Introduction
For subscribing you simple subscribe to a list of topics or a regular expression. However subscribing is immutable. With subscribe you are subscribing to all partitions for the topic or topics. If new partition are added the changes are picked up automatically.
Build and Run In Maven
From the command line we can do.
mvn install
mvn exec:java -Dexec.mainClass=nz.co.bibble.app.App
Example Subscribe
This is very similar to the producer. We use the deserializer rather than the serializer.
public static void processConsumerSubscribe() {
System.out.println("Starting Consumer Subscribe");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id","test");
KafkaConsumer<String,String> myConsumer = new KafkaConsumer<String,String>(props);
ArrayList<String> topics = new ArrayList<String>();
topics.add("my_topic");
myConsumer.subscribe(topics);
System.out.println("Ready to start trying");
try {
while(true) {
ConsumerRecords<String,String> records = myConsumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord<String,String> record: records) {
System.out.println(
String.format("Topic: %s Partition: %d, Offset: %d, Key: %s, Value: %s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())
);
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
myConsumer.close();
}
}
Assign
Introduction
For assigning we are deciding which partitions to consume. If new partitions are added we have to manage the change.
Example Assign
For assign this is almost identical to the consumer.
public static void processConsumerAssign() {
System.out.println("Starting Consumer Assign");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
var myConsumer = new KafkaConsumer<String,String>(props);
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
TopicPartition myTopicPartition = new TopicPartition("my_topic", 0);
partitions.add(myTopicPartition);
myConsumer.assign(partitions);
try {
while(true) {
ConsumerRecords<String,String> records = myConsumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord<String,String> record: records) {
System.out.println(
String.format("Topic: %s Partition: %d, Offset: %d, Key: %s, Value: %s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())
);
}
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
myConsumer.close();
}
}
Kafka Consumer Polling
- SubscriptionState – content of collection passed to subscribe() used, source of truth for topics the consumer is subscribed, manages offsets.
- ConsumerCoordinator – with metadata about the cluster, can take responsibility to coordinate the consumer (to coordinate SubcriptionState), two main responsibilities:
been aware of dynamic/automatic partition reassignment and notification of assignment changes to the Notifications state object, committing offsets to the cluster – will cause the update of the subscription state.
- Fetcher – responsible object for most communication between consumer and cluster. To start getting information should know what partitions or topics should asking for, it gets this information from the SubcriptionState object.
- ConsumerNetworkClient – tcp/ip communication with cluster.
- Polling timeout is the number of ms the network client will spent polling from the cluster getting messages before returning, the minimum amount of time retrieval will take.
Terms
- Log shipping is the process of automating the backup of transaction log files on a primary (production) database server, and then restoring them onto a standby server.
- Message Broker is an intermediary computer program module that translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receiver
- ZooKeeper is essentially a service for distributed systems offering a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems (see Use cases). ZooKeeper was a sub-project of Hadoop but is now a top-level Apache project in its own right.