Jim Cheung

zookeeper

$ cp conf/zoo_example.conf conf/zoo.conf

change dataDir

$ ./bin/zkServer.sh start-foreground

test

$ echo srvr | nc localhost 2181

kafka

$ ./bin/kafka-server-start.sh ./config/server.properties

test

$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>test message 1
>test message 2
^D

$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
test message 1
test message 2
^CProcessed a total of 2 messages

$ .bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
test message 1
test message 2
^CProcessed a total of 2 messages

broker configuration

The format for zookeeper parameter is a semicolon-separated list of hostname:port/path strings

/path, an optional Zookeeper path to use as a chroot environment for the Kafka cluster. If it is omitted, the root path is used.

It is generally considered to be good practice to use a chroot path for the Kafka cluster.

Kafka persists all messages to disk, and these log segments are stored in the directories specified in the log.dirs configuration

If you are managing topic creation explicitly, whether manually or through a provisioning system, you can set the auto.create.topics.enable configuration to false.

WHY NOT SET SWAPPINESS TO ZERO?

Previously, the recommendation for vm.swappiness was always to set it to 0. This value used to have the meaning “do not swap unless there is an out-of-memory condition.” However, the meaning of this value changed as of Linux kernel version 3.5-rc1, and that change was backported into many distributions, including Red Hat Enterprise Linux kernels as of version 2.6.32-303. This changed the meaning of the value 0 to “never swap under any circumstances.” It is for this reason that a value of 1 is now recommended.

Producer

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");

kafkaProps.put("key.serializer",
  "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
  "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Precision Products",
	  "France");
try {
  producer.send(record);
} catch (Exception e) {
	e.printStackTrace();
}

send message synchronously:

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
	producer.send(record).get();
} catch (Exception e) {
		e.printStackTrace();
}

Here, we are using Future.get() to wait for a reply from Kafka.

send message asynchronously

private class DemoProducerCallback implements Callback {
	@Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    	if (e != null) {
        	e.printStackTrace();
        }
    }
}

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

configuring producers

The acks parameter controls how many partition replicas must receive the record before the producer can consider the write successful.

ORDERING GUARANTEES

Apache Kafka preserves the order of messages within a partition.

Usually, setting the number of retries to zero is not an option in a reliable system, so if guaranteeing order is critical, we recommend setting in.flight.requests.per.session=1 to make sure that while a batch of messages is retrying, additional messages will not be sent (because this has the potential to reverse the correct order). This will severely limit the throughput of the producer, so only use this when order is important.

Consumer

Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic.

A single consumer can’t possibly keep up with the rate data flows into a topic, and adding more consumers that share the load by having each consumer own just a subset of the partitions and messages is our main method of scaling.

This is a good reason to create topics with a large number of partitions—it allows adding more consumers when the load increases.

There is no point in adding more consumers than you have partitions in a topic—some of the consumers will just be idle.

To make sure an application gets all the messages in a topic, ensure the application has its own consumer group.

To summarize, you create a new consumer group for each application that needs all the messages from one or more topics. You add consumers to an existing consumer group to scale the reading and processing of messages from the topics, so each additional consumer in a group will only get a subset of the messages.

Moving partition ownership from one consumer to another is called a rebalance.

During a rebalance, consumers can’t consume messages, so a rebalance is basically a short window of unavailability of the entire consumer group.

In addition, when partitions are moved from one consumer to another, the consumer loses its current state.

If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance.

During those seconds, no messages will be processed from the partitions owned by the dead consumer.

When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing.

With newer versions of Kafka, you can configure how long the application can go without polling before it will leave the group and trigger a rebalance. (max.poll.interval.ms)

When a consumer wants to join a group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader.

The following code snippet shows how to create a KafkaConsumer:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.subscribe(Collections.singletonList("customerCountries")); # topic name

# To subscribe to all test topics, we can call:

consumer.subscribe("test.*");

At the heart of the consumer API is a simple loop for polling the server for more data.

try {
  while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
      {
          log.debug("topic = %s, partition = %s, offset = %d,
             customer = %s, country = %s\n",
             record.topic(), record.partition(), record.offset(),
             record.key(), record.value());

          int updatedCount = 1;
          if (custCountryMap.countainsValue(record.value())) {
              updatedCount = custCountryMap.get(record.value()) + 1;
          }
          custCountryMap.put(record.value(), updatedCount)

          JSONObject json = new JSONObject(custCountryMap);
          System.out.println(json.toString(4))
      }
  }
} finally {
  consumer.close();
}

The poll loop does a lot more than just get data. The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator, joining the consumer group, and receiving a partition assignment. If a rebalance is triggered, it will be handled inside the poll loop as well.

The KafkaConsumer API provides multiple ways of committing offsets:

The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms.

With autocommit enabled, a call to poll will always commit the last offset returned by the previous poll. It doesn’t know which events were actually processed, so it is critical to always process all the events returned by poll() before calling poll() again.

By setting auto.commit.offset=false, offsets will only be committed when the application explicitly chooses to do so. The simplest and most reliable of the commit APIs is commitSync(). This API will commit the latest offset returned by poll() and return once the offset is committed, throwing an exception if commit fails for some reason.

It is important to remember that commitSync() will commit the latest offset returned by poll(), so make sure you call commitSync() after you are done processing all the records in the collection.

Here is how we would use commitSync to commit offsets after we finished processing the latest batch of messages:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset =
          %d, customer = %s, country = %s\n",
             record.topic(), record.partition(),
               record.offset(), record.key(), record.value());
    }
    try {
      consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

One drawback of manual commit is that the application is blocked until the broker responds to the commit request. This will limit the throughput of the application. Throughput can be improved by committing less frequently, but then we are increasing the number of potential duplicates that a rebalance will create.

Another option is the asynchronous commit API. Instead of waiting for the broker to respond to a commit, we just send the request and continue on:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(); 1
}

The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a nonretriable failure, commitAsync() will not retry.

It is common to use the callback to log commit errors or to count them in a metric, but if you want to use the callback for retries, you need to be aware of the problem with commit order:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition,
        OffsetAndMetadata> offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        }
      });
}

A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback.

a common pattern is to combine commitAsync() with commitSync() just before shutdown. Here is how it works:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
            customer = %s, country = %s\n",
            record.topic(), record.partition(),
            record.offset(), record.key(), record.value());
        }
        consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

the consumer API allows you to call commitSync() and commitAsync() and pass a map of partitions and offsets that you wish to commit.

Here is what a commit of specific offsets looks like:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
    new HashMap<>();
int count = 0;

....

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d,
        customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
        currentOffsets.put(new TopicPartition(record.topic(),
        record.partition()), new
        OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
    }
}

The consumer API allows you to run your own code when partitions are added or removed from the consumer. You do this by passing a ConsumerRebalanceListener when calling the subscribe() method

If you want to start reading all messages from the beginning of the partition, or you want to skip all the way to the end of the partition and start consuming only new messages, there are APIs specifically for that: seekToBeginning(TopicPartition tp) and seekToEnd(TopicPartition tp).

how will our consumer know where to start reading when it is assigned a partition? This is exactly what seek() can be used for. When the consumer starts or when new partitions are assigned, it can look up the offset in the database and seek() to that location.

public class SaveOffsetsOnRebalance implements
  ConsumerRebalanceListener {

    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
                commitDBTransaction();
        }

    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) {
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition));
       }
   }
}


  consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
  consumer.poll(0);

  for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));

  while (true) {
      ConsumerRecords<String, String> records =
        consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
      {
          processRecord(record);
          storeRecordInDB(record);
          storeOffsetInDB(record.topic(), record.partition(),
            record.offset());
      }
      commitDBTransaction();
  }

When you decide to exit the poll loop, you will need another thread to call consumer.wakeup().

Note that consumer.wakeup() is the only consumer method that is safe to call from a different thread.

Calling wakeup will cause poll() to exit with WakeupException, the WakeupException doesn’t need to be handled, but before exiting the thread, you must call consumer.close().

Here is what the exit code will look like if the consumer is running in the main application thread.

Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                consumer.wakeup();
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

...

try {
            // looping until ctrl-c, the shutdown hook will
               cleanup on exit
            while (true) {
                ConsumerRecords<String, String> records =
                  movingAvg.consumer.poll(1000);
                System.out.println(System.currentTimeMillis() + "
                   --  waiting for data...");
                for (ConsumerRecord<String, String> record :
                  records) {
                    System.out.printf("offset = %d, key = %s,
                      value = %s\n",
                      record.offset(), record.key(),
                      record.value());
                }
                for (TopicPartition tp: consumer.assignment())
                    System.out.println("Committing offset at
                      position:" +
                      consumer.position(tp));
                movingAvg.consumer.commitSync();
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }
    }

Standalone Consumer: Why and How to Use a Consumer Without a Group

Sometimes you know you have a single consumer that always needs to read data from all the partitions in a topic, or from a specific partition in a topic. In this case, there is no reason for groups or rebalances—just assign the consumer-specific topic and/or partitions, consume messages, and commit offsets on occasion.

When you know exactly which partitions the consumer should read, you don’t subscribe to a topic—instead, you assign yourself a few partitions. A consumer can either subscribe to topics (and be part of a consumer group), or assign itself partitions, but not both at the same time.

Here is an example of how a consumer can assign itself all partitions of a specific topic and consume from them:

List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos)
        partitions.add(new TopicPartition(partition.topic(),
          partition.partition()));
    consumer.assign(partitions);

    while (true) {
        ConsumerRecords<String, String> records =
          consumer.poll(1000);

        for (ConsumerRecord<String, String> record: records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %s\n",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
        }
        consumer.commitSync();
    }
}

You will need to handle this by checking consumer.partitionsFor() periodically or simply by bouncing the application whenever partitions are added.

Internals

Kafka uses Zookeeper’s ephemeral node feature to elect a controller and to notify the controller when nodes join and leave the cluster. The controller is responsible for electing leaders among the partitions and replicas whenever it notices nodes join and leave the cluster. The controller uses the epoch number to prevent a “split brain” scenario where two nodes believe each is the current controller.

data in Kafka is organized by topics. Each topic is partitioned, and each partition can have multiple replicas

There are two types of replicas:

Leader replica

Each partition has a single replica designated as the leader. All produce and consume requests go through the leader, in order to guarantee consistency.

Follower replica

All replicas for a partition that are not leaders are called followers. Followers don’t serve client requests

Another task the leader is responsible for is knowing which of the follower replicas is up-to-date with the leader.

The best way to identify the current preferred leader is by looking at the list of replicas for a partition. The first replica in the list is always the preferred leader.

produce requests

Producers can be configured to consider messages as “written successfully” when the message was accepted by just the leader (acks=1), all in-sync replicas (acks=all), or the moment the message was sent without waiting for the broker to accept it at all (acks=0).

fetch requests

Kafka famously uses a zero-copy method to send the messages to the clients—this means that Kafka sends messages from the file (or more likely, the Linux filesystem cache) directly to the network channel without any intermediate buffers.

Instead of the clients sending requests to the brokers every few milliseconds asking for data and getting very few or no messages in return, the clients send a request, the broker waits until there is a decent amount of data and returns the data, and only then will the client ask for more.

clients can also define a timeout to tell the broker “If you didn’t satisfy the minimum amount of data to send within x milliseconds, just send what you got.”

physical storage

The basic storage unit of Kafka is a partition replica. Partitions cannot be split between multiple brokers and not even between multiple disks on the same broker.

Because finding the messages that need purging in a large file and then deleting a portion of the file is both time-consuming and error-prone, we instead split each partition into segments. By default, each segment contains either 1 GB of data or a week of data, whichever is smaller. As a Kafka broker is writing to a partition, if the segment limit is reached, we close the file and start a new one.

The segment we are currently writing to is called an active segment. The active segment is never deleted, so if you set log retention to only store a day of data but each segment contains five days of data, you will really keep data for five days because we can’t delete the data before the segment is closed.

Kafka broker will keep an open file handle to every segment in every partition—even inactive segments. This leads to an usually high number of open file handles, and the OS must be tuned accordingly.

Each segment is stored in a single data file. Inside the file, we store Kafka messages and their offsets.

The format of the data on the disk is identical to the format of the messages that we send from the producer to the broker and later from the broker to the consumers. Using the same message format on disk and over the wire is what allows Kafka to use zero-copy optimization when sending messages to consumers and also avoid decompressing and recompressing messages that the producer already compressed.

Kafka brokers ship with the DumpLogSegment tool, which allows you to look at a partition segment in the filesystem and examine its contents.

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments

indexes

In order to help brokers quickly locate the message for a given offset, Kafka maintains an index for each partition. The index maps offsets to segment files and positions within the file.

It is also completely safe for an administrator to delete index segments if needed—they will be regenerated automatically.

compaction

Kafka supports such use cases by allowing the retention policy on a topic to be delete, which deletes events older than retention time, to compact, which only stores the most recent value for each key in the topic.

setting the policy to compact only makes sense on topics for which applications produce events that contain both a key and a value. If the topic contains null keys, compaction will fail.

In the same way that the delete policy never deletes the current active segments, the compact policy never compacts the current segment. Messages are eligble for compaction only on inactive segments.

Reliability

what does Apache Kafka guarantee?

Each Kafka topic is broken down into partitions, which are the basic data building blocks. A partition is stored on a single disk. Kafka guarantees order of events within a partition and a partition can be either online (available) or offline (unavailable). Each partition can have multiple replicas, one of which is a designated leader. All events are produced to and consumed from the leader replica. Other replicas just need to stay in sync with the leader and replicate all the recent events on time. If the leader becomes unavailable, one of the in-sync replicas becomes the new leader.

Misconfigured garbage collection can cause the broker to pause for a few seconds, during which it will lose connectivity to Zookeeper (6 seconds heartbeat)

An in-sync replica that is slightly behind can slow down producers and consumers—since they wait for all the in-sync replicas to get the message before it is committed. Once a replica falls out of sync, we no longer wait for it to get messages. It is still behind, but now there is no performance impact.

Replication Factor

At the broker level, you control the default.replication.factor for automatically created topics.

If you are totally OK with a specific topic being unavailable when a single broker is restarted (which is part of the normal operations of a cluster), then a replication factor of 1 may be enough.

recommend a replication factor of 3 for any topic where availability is an issue. In rare cases, this is considered not safe enough—we’ve seen banks run critical topics with five replicas, just in case.

By default, Kafka will make sure each replica for a partition is on a separate broker. However, in some cases, this is not safe enough. If all replicas for a partition are placed on brokers that are on the same rack and the top-of-rack switch misbehaves, you will lose availability of the partition regardless of the replication factor. To protect against rack-level misfortune, we recommend placing brokers in multiple racks and using the broker.rack broker configuration parameter to configure the rack name for each broker. If rack names are configured, Kafka will make sure replicas for a partition are spread across multiple racks in order to guarantee even higher availability.

Unclean Leader Election

Setting unclean.leader.election.enable to true means we allow out-of-sync replicas to become leaders (knowns as unclean election), knowing that we will lose messages when this occurs. If we set it to false, we choose to wait for the original leader to come back online, resulting in lower availability.

In systems where availability is more important, such as real-time clickstream analysis, unclean leader election is often enabled.

Minimum In-Sync Replicas

Both the topic and the broker-level configuration are called min.insync.replicas.

Using Producers in a Reliable System

there are two important things that everyone who writes applications that produce to Kafka must pay attention to:

Using Consumers in a Reliable System

There are four consumer configuration properties that are important to understand in order to configure your consumer for a desired reliability behavior:

  1. The first is group.id. If you need a consumer to see, on its own, every single message in the topics it is subscribed to—it will need a unique group.id.
  2. The second relevant configuration is auto.offset.reset. This parameter controls what the consumer will do when no offsets were committed. If you choose earliest, the consumer will start from the beginning of the partition whenever it doesn’t have a valid offset. This can lead to the consumer processing a lot of messages twice, but it guarantees to minimize data loss. If you choose latest, the consumer will start at the end of the partition. This minimizes duplicate processing by the consumer but almost certainly leads to some messages getting missed by the consumer.
  3. The third relevant configuration is enable.auto.commit.
  4. The fourth relevant configuration is tied to the third, and is auto.commit.interval.ms. The default is every five seconds.

EXACTLY-ONCE DELIVERY

While Kafka does not provide full exactly-once support at this time, consumers have few tricks available that allow them to guarantee that each message in Kafka will be written to an external system exactly once

The easiest and probably most common way to do exactly-once is by writing results to a system that has some support for unique keys. This includes all key-value stores, all relational databases, Elasticsearch, and probably many more data stores.

Another option is available when writing to a system that has transactions. Relational databases are the easiest example, but HDFS has atomic renames that are often used for the same purpose.

Kafka Connect

Kafka Connect is a part of Apache Kafka and provides a scalable and reliable way to move data between Kafka and other datastores.

Kafka Connect runs as a cluster of worker processes.

Starting a Connect worker is very similar to starting a broker—you call the start script with a properties file:

$ bin/connect-distributed.sh config/connect-distributed.properties

Connector Example: File Source and File Sink

# start connector
$ bin/connect-distributed.sh config/connect-distributed.properties &

# file source
$ echo '{"name":"load-kafka-config", "config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

# consumer
$ bin/kafka-console-consumer.sh --new --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning

# file sink
$ echo '{"name":"dump-kafka-config", "config":{"connector.class":"FileStreamSink","file":"copy-of-server-properties","topics":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

# delete connector
$ curl -X DELETE http://localhost:8083/connectors/dump-kafka-config

for file sink, instead of specifying a topic, you specify topics. Note the plurality—you can write multiple topics into one file with the sink, while the source only allows writing into one topic.

Connector Example: MySQL to Elasticsearch

(read the book)

A Deeper Look at Connect

The connector is responsible for three important things:

Kafka Connect’s worker processes are the “container” processes that execute the connectors and tasks.

Administering Kafka

topics:

# create topic
$ kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create --topic my-topic --replication-factor 2 --partitions 8

# increase partitions
$ kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter --topic my-topic --partitions 16

# delete topic
$ kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --topic my-topic

# list topics
$ kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --list

# describe topic
$ kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe

consumer groups:

# list old consumer groups
$ kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --list

# list new consumer groups
$ kafka-consumer-groups.sh --new-consumer --bootstrap-server kafka1.example.com:9092/kafka-cluster --list

# delete group
$ kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --group testgroup

# delete offset for a topic
$ kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --group testgroup --topic my-topic

offset management:

# export offset
$ kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup --output-file offsets

Before performing this step, it is important that all consumers in the group are stopped. They will not read the new offsets if they are written while the consumer group is active. The consumers will just overwrite the imported offsets.

# import offset
$ kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect zoo1.example.com:2181/kafka-cluster --input-file offsets