Kafka Introduction

From bibbleWiki
Jump to navigation Jump to search

Introduction

Definition

This is an introduction to Kafka which describes itself as a messaging system
Kafka Intro.png

Architecture

Kafka Arch.png

Cluster

It is a group of computers , each executing same instance of kafka broker.

Broker

It is just a meaningful name given to the kafka server, kafka producer does not directly interact with the consumer, they use kafka broker as the agent or broker to interact. In a cluster there can be more than one brokers.

Brokers are stateless, hence to maintain the cluster state they use ZooKeeper.

Zookeeper

ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka cluster system or failure of the broker in the Kafka cluster system. As per the notification received by the Zookeeper regarding presence or failure of the broker then producer and consumer takes decision and starts coordinating their task with some other broker.

Producers

Producer is a component which pushes data to the brokers, it doesn’t wait for acknowledgement from the brokers rather sends data as fast as the brokers can handle. There can be more than one producers depending on the use case.

Consumers

Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can kind of rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.

LinkedIn Worked Example

Here is some stats from LinkedIn
LinkedInStats.png
And here is there architeture pre-2010.
LinkedInPre2010.png
Post 2010 architecture.
LinkedInPos2010.png
This is the current scale.
LinkedInCurrentScale.png

Installation

Doing the install

To install I went with the instructions on https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04. To enable stat for zookeeper I enable it on the whitelist by changing kafka/config/zookeeper.properties to include

4lw.commands.whitelist=stat, ruok, conf, isro

Creating A Topic

We do this with the command

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic

Listing Topics

We do this with the command

~/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

Send Message to a Topic

We send a message with the command

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
>Message 1
>Test Message 2

Receive Message from a Topic

We receive a message with the command

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

Partitions

What are Partitions

A partition represents the splitting up of messages from a topic. I.E when a message is received it can be delivered to one and only one of the available partitions.

Partition Trade offs

  • The more partitions the greater the Zookeeper overhead
  • Message order can become compplex
  • More partitions the longer the leader fail-over time

Replication Factor

This is the number of copies of the messages that should be replicated. I.E. the number of copies of the data to keep.

Replication Status

We can view the status of the replication using the describe command. In the screenshot below we killed one of the brokers and then brought it back up. By comparing the ReplicationFactor with the Isr (In-sync replica) we can see the outage and then the return to normal state. Kafka Status.png

Producers

Introduction

Here is a visual representation of the Producer
Kafka Producer Internals.png

Where to Direct Messages

The producer looks at the message and makes an decision on where to send the message based on configurable rules.
Kafka Producer Strategy.png

Message Buffering

This is starting to look like Oracle DB tuning. This shows the key players for deciding when to send records. Kafka Message buffering.png

Example Producer

Create a default Maven project

For VS Code we can do this with

 mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=nz.co.bibble.app -DartifactId=kafkasample

Add the Maven Client to Pom

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>

  </dependencies>

Create the Producer

We just create and instance adding the parameters we would add at the command line. Then we need send the records

package nz.co.bibble.app;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer",  "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> myProducer = new KafkaProducer<String,String>(props);

        try {

            for(int i = 0; i < 10; i++) {
                myProducer.send(
                    new ProducerRecord<String,String>(
                        "my_topic", 
                        Integer.toString(i),
                        "My Message: " + Integer.toString(i)
                    )
                );
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
        finally {
            myProducer.close();
        }

        System.out.println( "Hello World!" );
    }
}

Producer Performance Test

We can generate records with the test script to test consumers. Here is an example.

 ./kafka/bin/kafka-producer-perf-test.sh  --topic my_topic --throughput 10  --num-records 50 --record-size 1 --producer-props bootstrap.servers=localhost:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer

Delivery Guarantees

The broke and produce acknowledgments that the messages are received. Values are

  • 0 Fire and Forget (useful for broadcast)
  • 1 Leader acknowledged
  • 2 Replication quorum acknowledged

You can then config what to do if an error. Options are

  • retry
  • retry.backoff.ms

Things to look into

  • Custom Serializers
  • Custom Partitioners
  • Asynchronous Send
  • Compression
  • Advanced Settings

Consumers

Introduction

Here is a visual representation of the Consumer
Kafka Consumer Internals.png

Assigning vs Subscribe

Subscribe

Introduction

For subscribing you simple subscribe to a list of topics or a regular expression. However subscribing is immutable. With subscribe you are subscribing to all partitions for the topic or topics. If new partition are added the changes are picked up automatically.
Kafka Consumer Subscribe.png

Build and Run In Maven

From the command line we can do.

mvn install
mvn exec:java -Dexec.mainClass=nz.co.bibble.app.App

Example Subscribe

This is very similar to the producer. We use the deserializer rather than the serializer.

    public static void processConsumerSubscribe() {

        System.out.println("Starting Consumer Subscribe");

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id","test");

        KafkaConsumer<String,String> myConsumer = new KafkaConsumer<String,String>(props);

        ArrayList<String> topics = new ArrayList<String>();

        topics.add("my_topic");

        myConsumer.subscribe(topics);

        System.out.println("Ready to start trying");

        try {
            while(true) {
                ConsumerRecords<String,String> records = myConsumer.poll(Duration.ofSeconds(10));
                for(ConsumerRecord<String,String> record: records) {
                    System.out.println(
                        String.format("Topic: %s Partition: %d, Offset: %d, Key: %s, Value: %s", 
                        record.topic(), record.partition(), record.offset(), record.key(), record.value())
                    );
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            myConsumer.close();
        }
    }

Assign

Introduction

For assigning we are deciding which partitions to consume. If new partitions are added we have to manage the change.
Kafka Consumer Assign.png

Example Assign

For assign this is almost identical to the consumer.

public static void processConsumerAssign() {

        System.out.println("Starting Consumer Assign");

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        var myConsumer = new KafkaConsumer<String,String>(props);

        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        TopicPartition myTopicPartition = new TopicPartition("my_topic", 0);

        partitions.add(myTopicPartition);
        myConsumer.assign(partitions);

        try {
            while(true) {
                ConsumerRecords<String,String> records = myConsumer.poll(Duration.ofSeconds(10));
                for(ConsumerRecord<String,String> record: records) {
                    System.out.println(
                        String.format("Topic: %s Partition: %d, Offset: %d, Key: %s, Value: %s", 
                        record.topic(), record.partition(), record.offset(), record.key(), record.value())
                    );
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            myConsumer.close();
        }

    }

Kafka Consumer Polling

This did seem to be important and I looked at https://chrzaszcz.dev/2019/06/16/kafka-consumer-poll/ for more information.
Kafka Consumer Polling.png

  • SubscriptionState – content of collection passed to subscribe() used, source of truth for topics the consumer is subscribed, manages offsets.
  • ConsumerCoordinator – with metadata about the cluster, can take responsibility to coordinate the consumer (to coordinate SubcriptionState), two main responsibilities:
    • been aware of dynamic/automatic partition reassignment and notification of assignment changes to the Notifications state object,
    • committing offsets to the cluster – will cause the update of the subscription state.
  • Fetcher – responsible object for most communication between consumer and cluster. To start getting information should know what partitions or topics should asking for, it gets this information from the SubcriptionState object.
  • ConsumerNetworkClient – tcp/ip communication with cluster.
  • Polling timeout is the number of ms the network client will spent polling from the cluster getting messages before returning, the minimum amount of time retrieval will take.

Kafka Consumer Offset in Detail

Offset ss the critical detail that allows consumer to operate independently. How Kafka manages the consumer offset is an important thing to understand.

Different categories of offsets, when an individual is reading from a partitions it has to establish what it has read

  • last committed offset – we are looking at it from a partition point of view.
  • current position – As the consumer reads messages, it tracks it. The position *advance. Advances with consumer.
  • long-end offset – the end of the messages log.
  • un-committed offset – Between last committed offset and current position It is important to understand what creates this gap and how to fix this.

Kafka offset gap.png

Storing of Offsets

These are stored in Kafka in a topic called __consumer_offsets.

./kafka/bin/kafka-topics.sh  --bootstrap-server localhost:9092 --describe --topic __consumer_offsets

Consumer Groups

In order to scale horizontally the load you can create a consumer group. There a various parameters which control this.
Kafka Consumer Groups.png
For multiple applications you can create multiple consumer groups you can create and second group.
Kafka Multi ConsumerGroups.png

Key Consumer Configuration

The following were highlighted as key consumer configuration parameters

  • fetch.min.bytes
    • Sets a minimum threshold for size-based batching.
  • max.fetch.wait.ms
  • max.partition.fetch.bytes
    • Sets a maximum limit in bytes on how much data is returned for each partition, which must always be larger than the number of bytes set in the broker or topic configuration for max.message.bytes
  • max.poll.records
    • Sets the number of processed records returned from the consumer. You can use the max.poll.records property to set a maximum limit on the number of records returned from the consumer buffer, allowing your application to process fewer records within the max.poll.interval.ms

Kafka Ecosystem

Use Cases

Key use cases for Kafka are

  • Connecting disparate sources of data
  • Large-scale data movement pipelines (Replacing ETL solutions)
  • "Big Data" Integration

Challenges

Governance And Evolution

With large data base movement of large data there are problems with governance. Management of serializers in consumers and producers need to managed. This is where confluent comes in with the Kafka Schema Registry. You can use the Apache Avro serialization format and build RESTful service discovery and Compatibility broker.

Consistency and Productivity

To solve connect issues Kafta provides Apache Kafka connect. This provides a common framework for integration. Standardizing common approaches along with Producers and Consumers. Some of the larger companies are now providing Platform connectors like HP and Oracle.

Fast Data

All of the distributed solution come with there own APIs and Cluster based management approach including Kafka.
To help reduce this, Kafka built a client library called Kafka Streams to provide stream based processing. Kafka FastData.png

Terms

  • Log shipping is the process of automating the backup of transaction log files on a primary (production) database server, and then restoring them onto a standby server.
  • Message Broker is an intermediary computer program module that translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receiver
  • ZooKeeper is essentially a service for distributed systems offering a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems (see Use cases). ZooKeeper was a sub-project of Hadoop but is now a top-level Apache project in its own right.