Apache Kafka Internals

Understanding Kafka Internals

 

What is Kafka ?

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design .Those coming from NOSQL world(Hbase,cassandra , Aerospike) , Kafka  architecture  is similar to  Architecture of WAL  subtracting  LSM feature in each of the framework

 

Basics Kafka Lingo

  1. A stream of messages of a particular type is defined as a topic.
  2. A producer can publish messages to a topic.
  3. The published messages are then stored at a set of servers called brokers.
  4. A consumer can subscribe to one or more topics and consume the published messages by pulling data from the brokers.

So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:

Communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. We provide a Java client for Kafka, but clients are available in many languages.

[addToAppearHere]

The overall architecture of Kafka is shown in Figure 1. Since Kafka is distributed in nature, a Kafka cluster typically consists of multiple brokers. To balance load, a topic is divided into multiple partitions and each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time.

Kafka Architecture

Figure 1- Kafka Architecture

 

Kafka Storage – Kafka has a very simple storage layout. Each partition of a topic corresponds to a logical log. Physically, a log is implemented as a set of segment files of equal sizes. Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file. Segment file is flushed to disk after configurable number of messages has been published or after certain amount of time. Messages are exposed to consumer after it gets flushed.

Unlike traditional message system, a message stored in Kafka system doesn’t have explicit message ids. Messages are exposed by the logical offset in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations. Messages ids are increasing but not consecutive. To compute the id of next message adds a length of the current message to its logical offset.

Consumer always consumes messages from a particular partition sequentially and if the consumer acknowledge particular message offset, it implies that the consumer has consumed all prior messages. Consumer issues asynchronous pull request to the broker to have a buffer of bytes ready to consume. Each asynchronous pull request contains the offset of the message to consume. Kafka exploits the sendfile API to efficiently deliver bytes in a log segment file from a broker to a consumer.

Kafka Storage Image

Figure 2- Kafka Storage Architecture

 

Kafka Broker: Unlike other message system, Kafka broker are stateless. By stateless, means consumer has to maintain how much he has consumed. Consumer maintains it by itself and broker would not do anything. Such design is very tricky and innovative in itself –

  1. It is very tricky to delete message from the broker as broker doesn’t whether consumer consumed the message or not. Kafka solves this problem by using a simple time-based SLA for the retention policy. A message is automatically deleted if it has been retained in the broker longer than a certain period.
  2. This design has a benefit too, as consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but proves to be an essential feature for many consumers
 
 

Various Architectural choices made  Keeping OS into consideration to achieve High throughput Low latency:

 
  • dependency on Page Cache:
    To compensate for the  slow Hard Disk Drive throughput   modern operating systems have become increasingly aggressive in their use of main memory for disk caching. A modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice.Furthermore we are building on top of the JVM, and anyone who has spent any time with Java memory usage knows two things:
  1. The memory overhead of objects is very high, often doubling the size of the data stored (or worse).
  2. Java garbage collection becomes increasingly fiddly and slow as the in-heap data increases. As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance)
    [addToAppearHere].
  • Avoid redundant Data copy between  kernel and user space
    Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket: 1.The operating system reads data from the disk into pagecache in kernel space.
    2.The application reads the data from kernel space into a user-space buffer
    3. The application writes the data back into kernel space into a socket buffer
    4.The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network.
    This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.Kafka expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.
  • Distribution of Data across Disks
    If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.
  • Batching of messages on Producer/consumer to optimize network latency and throughput
    The small I/O problem happens both between the client and the server and in the server’s own persistent operations.To avoid this, our protocol is built around a “message set” abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.
  • End-to-end Batch(data) Compression. The Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.Kafka supports GZIP and Snappy compression protocols.
  • Producer Push And Consumer pull resulting in loosely coupled framework
    The Kafka consumer works by issuing “fetch” requests to the brokers leading the partitions it wants to consume. The consumer specifies its position in the log with each request and receives back a chunk  of log beginning at that position. The consumer thus has significant control over this position and can rewind it to re-consume data if need be.
  • Consumer Position (in Pull based world)
    Kafka topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap. There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

Ways to increase the Throughput

  • batch.size – This is an upper limit of how many messages Kafka Producer will attempt to batch before sending – specified in bytes (Default is 16K bytes – so 16 messages if each message is 1K in size). Kafka may send batches before this limit is reached (so latency doesn’t change by modifying this parameter), but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.
  • linger.ms – How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch. Normally the producer will not wait at all, and simply send all the messages that accumulated while the previous send was in progress (2 ms in the example above), but as we’ve discussed, sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency. In this case tuning linger.ms to a higher value will make sense. Note that if batch.size is low and the batch if full before linger.ms time passes, the batch will send early, so it makes sense to tune batch.size and linger.ms together.
  • Increase RAM , number of Disk, Network Bandwidth as much as possible.
  • It is not necessary to tune these settings, however those wanting to optimize performance have a few knobs that will help:
    • data=writeback: Ext4 defaults to data=ordered which puts a strong order on some writes. Kafka does not require this ordering as it does very paranoid data recovery on all unflushed log. This setting removes the ordering constraint and seems to significantly reduce latency.
    • Disabling journaling: Journaling is a tradeoff: it makes reboots faster after server crashes but it introduces a great deal of additional locking which adds variance to write performance. Those who don’t care about reboot time and want to reduce a major source of write latency spikes can turn off journaling entirely.
    • commit=num_secs: This tunes the frequency with which ext4 commits to its metadata journal. Setting this to a lower value reduces the loss of unflushed data during a crash. Setting this to a higher value will improve throughput.
    • nobh: This setting controls additional ordering guarantees when using data=writeback mode. This should be safe with Kafka as we do not depend on write ordering and improves throughput and latency.
    • delalloc: Delayed allocation means that the filesystem avoid allocating any blocks until the physical write occurs. This allows ext4 to allocate a large extent instead of smaller pages and helps ensure the data is written sequentially. This feature is great for throughput. It does seem to involve some locking in the filesystem which adds a bit of latency variance.