Optimizing RocksDB Memory Usage in Kafka Streams Applications

This blog explores how RocksDB, the database engine underlying Kafka Streams' state stores, manages memory usage through various components like block cache, index/filter blocks, and memtables. It highlights the default configurations used in Kafka Streams, explains the impact of memory settings on performance, and provides guidance on customizing these settings to optimize resource usage in streaming applications.

Key takeaway #1

Kafka Streams creates a separate, memory-intensive RocksDB instance for each state store partition, quickly multiplying total memory usage.

Key takeaway #2

The statestore.cache.max.bytes setting is misleading; it doesn't cap total memory, which is actually consumed by multiple components within each RocksDB instance.

Key takeaway #3

To control memory, developers must implement a custom RocksDBConfigSetter to override the high default values for components like the block cache.

Optimizing RocksDB Memory Usage in Kafka Streams Applications

This blog explores how RocksDB, the database engine underlying Kafka Streams' state stores, manages memory usage through various components like block cache, index/filter blocks, and memtables. It highlights the default configurations used in Kafka Streams, explains the impact of memory settings on performance, and provides guidance on customizing these settings to optimize resource usage in streaming applications.

Key takeaway #1

Kafka Streams creates a separate, memory-intensive RocksDB instance for each state store partition, quickly multiplying total memory usage.

Key takeaway #2

The statestore.cache.max.bytes setting is misleading; it doesn't cap total memory, which is actually consumed by multiple components within each RocksDB instance.

Key takeaway #3

To control memory, developers must implement a custom RocksDBConfigSetter to override the high default values for components like the block cache.

Optimizing RocksDB Memory Usage in Kafka Streams Applications

This blog explores how RocksDB, the database engine underlying Kafka Streams' state stores, manages memory usage through various components like block cache, index/filter blocks, and memtables. It highlights the default configurations used in Kafka Streams, explains the impact of memory settings on performance, and provides guidance on customizing these settings to optimize resource usage in streaming applications.

Key takeaway #1

Kafka Streams creates a separate, memory-intensive RocksDB instance for each state store partition, quickly multiplying total memory usage.

Key takeaway #2

The statestore.cache.max.bytes setting is misleading; it doesn't cap total memory, which is actually consumed by multiple components within each RocksDB instance.

Key takeaway #3

To control memory, developers must implement a custom RocksDBConfigSetter to override the high default values for components like the block cache.

When I initially started working with Kafka streams, I knew state stores were backed by RocksDB, but that was the extent of my knowledge. A recent issue I had with a streaming application going out of memory required me to demystify some concepts of RocksDB and how Kafka streams configures it.

Going through the stream configurations, you might have seen the statestore.cache.max.bytes property, which states:

Maximum number of bytes in memory to be used for statestore cache across all threads.

Initially, I thought this setting simply governed the amount of memory allocated to all RocksDB instances combined. However, this assumption proved to be incorrect. Instead, statestore.cache.max.bytes defines how many bytes will be kept in memory before flushing it to the state store’s underlying implementation. In the default use case, each RocksDB will have its own configuration regarding how much memory it requires. Let’s have a look at how RocksDB uses memory.

Memory usage in RocksDB

There are multiple components that contribute to memory usage in RocksDB:

Block Cache

This is the place where RocksDB keeps all its cached data: the key-values inserted in the store, and optionally, the index and filter blocks, though this is not the default setting. Inside the block cache all data is stored as uncompressed data blocks. Whenever there is a miss in the block cache, the data is read from the file system using I/O, so in many use cases that means the OS’s page cache is used.

Because of this, decreasing the block cache size doesn’t necessarily mean that I/O will increase, as the page cache will be used when available. Instead the CPU usage will increase, because data from the page cache needs to be decompressed.

By default, each RocksDB has its own block cache, but it’s possible to define a single block cache for multiple databases. This makes it easier to set an absolute max cache size for your entire application.

Index/Filter Blocks

These blocks can be a major memory user and by default they are not stored in the memory that is assigned to the block cache.

For each data block three pieces of information are stored: the key, offset and size. This means there are 2 main factors in deciding how much memory is allocated for index and filter blocks:

  • Block size: Increasing the block size reduces the number of blocks and decreases the index size linearly
  • Key size: Reducing key size results in smaller index blocks.

For filter blocks using the default BloomFilter with 10 bits per key, the size would result in number-of-keys * 10 bits.

The default behavior for these blocks is that they are stored outside of the block cache, which means that number is controlled by max_open_files. The default is set to -1, which means infinite. This is the recommended setting for best performance at the cost of increased memory consumption.

Memtable

This component is an in-memory write buffer where key-value pairs are stored before being flushed to the underlying database. For a single RocksDB instance, this won’t result in a major consumption of memory.

Similar to block cache, a WriteBufferManager can be used over multiple databases to define an absolute limit for your process.

Blocks pinned by iterators

Each open iterator’s memory allocation equals block_size * ((num_levels — 1) * num_l0_files). In RocksDB, the concept of levels is a key component of its Log-Structured Merge-Tree (LSM-Tree) architecture. It organizes its data into multiple levels to manage storage and retrieval efficiently, but that is an entirely different topic.

As long as read transactions are limited, this shouldn’t contribute much to memory strain.

RocksDB in Kafka streams

When we have a look at the implementation of RocksDBStore, we can see that some default values are set when using RocksDB in Kafka streams. Considering that a RocksDB is opened for every partition, for every changelog topic assigned to a stream application, we have to multiply these values depending on how many RocksDBs are opened for a single process.

Defaults

The following defaults are configured in RocksDBStore:

private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L;
private static final long BLOCK_CACHE_SIZE = 50 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final int MAX_WRITE_BUFFERS = 3;

Right off the bat we can see that the block cache size is 50Mb and the write buffer size is 16Mb, but that there can be up to 3 write buffers. That means that up to 100Mb will already be required to keep a single RocksDB up and running.

To find out how much memory is used for filter and index block the RocksDB getProperty API can be used to query the rocksdb.estimate-table-readers-mem value. To access the actual RocksDB instance from within a Kafka Streams application, you’ll have to bypass some of the abstractions Kafka Streams provides. While this isn’t typically recommended due to potential maintenance and compatibility issues, it can be done using reflection. So I do not recommend this for a production environment.

Regarding blocks pinned by iterators, that metric can be retrieved by accessing the configured cache and calling the getPinnedCache method.

Changing the defaults

Kafka streams allow you to implement the org.apache.kafka.streams.state.RocksDBConfigSetter interface in which you can set custom properties and access objects such as the cache and statistics of the RocksDB.

When setting the properties for your stream, the custom config setter can be registered with rocksdb.config.setter .

See the following for an example of how this can be achieved:

public class CustomRocksDbConfig implements RocksDBConfigSetter {
 
 @Override
 public void setConfig(
     String storeName,
     Options options,
     Map<String, Object> configs) {
   var cache = new LRUCache(10 * 1024 * 1024);
 
   var tableConfig = new BlockBasedTableConfigWithAccessibleCache();
   tableConfig.setBlockCache(cache);
   tableConfig.setBlockSize(2 * 1024 *1024);
   tableConfig.setFilterPolicy(new BloomFilter(5));
 
   var stats = new Statistics();
   options.setStatistics(stats);
   options.setTableFormatConfig(tableConfig);
 }

 @Override
 public void close(String storeName, Options options) { }
}

var props = new Properties();
// ...
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDbConfig.class.getName());

var builder = new StreamsBuilder();
// ...
var streams = new KafkaStreams(builder.build(), props);

streams.start();

Barely scratched the surface

I hope that this post gets you started on your journey into optimizing RocksDB configuration for your streaming applications. At the very least be aware that the default configuration of RocksDBStore already uses a considerable amount of memory. I strongly recommend reading the RocksDB wiki as it describes the internal workings of the database and has a guide on improving memory efficiency.

We value your privacy! We use cookies to enhance your browsing experience and analyse our traffic.
By clicking "Accept All", you consent to our use of cookies.
Dark blue outline of a cookie icon.