Kafka Introduction: Difference between revisions

From bibbleWiki
Jump to navigation Jump to search
Line 237: Line 237:


===Assign===
===Assign===
====Introduction====
For assigning we are deciding which partitions to consume. If new partitions are added we have to manage the change.  
For assigning we are deciding which partitions to consume. If new partitions are added we have to manage the change.  
<br>
<br>
[[File:Kafka Consumer Assign.png|600px]]
[[File:Kafka Consumer Assign.png|600px]]
<br>
<br>
====Example Assign====
For assign this is almost identical to the consumer.


=Terms=
=Terms=

Revision as of 05:38, 31 July 2021

Introduction

Definition

This is an introduction to Kafka which describes itself as a messaging system

Architecture

Cluster

It is a group of computers , each executing same instance of kafka broker.

Broker

It is just a meaningful name given to the kafka server, kafka producer does not directly interact with the consumer, they use kafka broker as the agent or broker to interact. In a cluster there can be more than one brokers.

Brokers are stateless, hence to maintain the cluster state they use ZooKeeper.

Zookeeper

ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka cluster system or failure of the broker in the Kafka cluster system. As per the notification received by the Zookeeper regarding presence or failure of the broker then producer and consumer takes decision and starts coordinating their task with some other broker.

Producers

Producer is a component which pushes data to the brokers, it doesn’t wait for acknowledgement from the brokers rather sends data as fast as the brokers can handle. There can be more than one producers depending on the use case.

Consumers

Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can kind of rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.

LinkedIn Worked Example

Here is some stats from LinkedIn

And here is there architeture pre-2010.

Post 2010 architecture.

This is the current scale.

Installation

Doing the install

To install I went with the instructions on https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04. To enable stat for zookeeper I enable it on the whitelist by changing kafka/config/zookeeper.properties to include

4lw.commands.whitelist=stat, ruok, conf, isro

Creating A Topic

We do this with the command

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic

Listing Topics

We do this with the command

~/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

Send Message to a Topic

We send a message with the command

~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
>Message 1
>Test Message 2

Receive Message from a Topic

We receive a message with the command

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

Partitions

What are Partitions

A partition represents the splitting up of messages from a topic. I.E when a message is received it can be delivered to one and only one of the available partitions.

Partition Trade offs

  • The more partitions the greater the Zookeeper overhead
  • Message order can become compplex
  • More partitions the longer the leader fail-over time

Replication Factor

This is the number of copies of the messages that should be replicated. I.E. the number of copies of the data to keep.

Replication Status

We can view the status of the replication using the describe command. In the screenshot below we killed one of the brokers and then brought it back up. By comparing the ReplicationFactor with the Isr (In-sync replica) we can see the outage and then the return to normal state.

Producers

Introduction

Here is a visual representation of the Producer

Where to Direct Messages

The producer looks at the message and makes an decision on where to send the message based on configurable rules.

Message Buffering

This is starting to look like Oracle DB tuning. This shows the key players for deciding when to send records.

Example Producer

Create a default Maven project

For VS Code we can do this with

 mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=nz.co.bibble.app -DartifactId=kafkasample

Add the Maven Client to Pom

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>

  </dependencies>

Create the Producer

We just create and instance adding the parameters we would add at the command line. Then we need send the records

package nz.co.bibble.app;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer",  "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> myProducer = new KafkaProducer<String,String>(props);

        try {

            for(int i = 0; i < 10; i++) {
                myProducer.send(
                    new ProducerRecord<String,String>(
                        "my_topic", 
                        Integer.toString(i),
                        "My Message: " + Integer.toString(i)
                    )
                );
            }
        }
        catch(Exception e) {
            e.printStackTrace();
        }
        finally {
            myProducer.close();
        }

        System.out.println( "Hello World!" );
    }
}

Build and Run In Maven

From the command line we can do.

mvn install
mvn exec:java -Dexec.mainClass=nz.co.bibble.app.App

Delivery Guarantees

The broke and produce acknowledgments that the messages are received. Values are

  • 0 Fire and Forget (useful for broadcast)
  • 1 Leader acknowledged
  • 2 Replication quorum acknowledged

You can then config what to do if an error. Options are

  • retry
  • retry.backoff.ms

Things to look into

  • Custom Serializers
  • Custom Partitioners
  • Asynchronous Send
  • Compression
  • Advanced Settings

Consumers

Introduction

Here is a visual representation of the Consumer

Assigning vs Subscribe

Subscribe

Introduction

For subscribing you simple subscribe to a list of topics or a regular expression. However subscribing is immutable. With subscribe you are subscribing to all partitions for the topic or topics. If new partition are added the changes are picked up automatically.

Example Subscribe

This is very similar to the producer. We use the deserializer rather than the serializer.

    public static void processConsumerSubscribe() {

        System.out.println("Starting Consumer Subscribe");

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id","test");

        KafkaConsumer<String,String> myConsumer = new KafkaConsumer<String,String>(props);

        ArrayList<String> topics = new ArrayList<String>();

        topics.add("my_topic");

        myConsumer.subscribe(topics);

        System.out.println("Ready to start trying");

        try {
            while(true) {
                ConsumerRecords<String,String> records = myConsumer.poll(Duration.ofSeconds(10));
                for(ConsumerRecord<String,String> record: records) {
                    System.out.println(
                        String.format("Topic: %s Partition: %d, Offset: %d, Key: %s, Value: %s", 
                        record.topic(), record.partition(), record.offset(), record.key(), record.value())
                    );
                }
            }
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            myConsumer.close();
        }
    }

Assign

Introduction

For assigning we are deciding which partitions to consume. If new partitions are added we have to manage the change.

Example Assign

For assign this is almost identical to the consumer.

Terms

  • Log shipping is the process of automating the backup of transaction log files on a primary (production) database server, and then restoring them onto a standby server.
  • Message Broker is an intermediary computer program module that translates a message from the formal messaging protocol of the sender to the formal messaging protocol of the receiver
  • ZooKeeper is essentially a service for distributed systems offering a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems (see Use cases). ZooKeeper was a sub-project of Hadoop but is now a top-level Apache project in its own right.