Apache HBase

Monday September 09, 2019

Introduction "hbtop", a real-time monitoring tool for HBase modeled after Unix's 'top' command

by Toshihiro Suzuki, HBase Committer

hbtop is a real-time monitoring tool for HBase modeled after Unix's _top_ command. It can display summary information as well as metrics per Region/Namespace/Table/RegionServer. With this tool, you can see metrics sorted by a selected field and filter the metrics to see only metrics you are interested in. Also, with the drill-down feature, you dig in on hot-spotting Regions.

hbtop (https://issues.apache.org/jira/browse/HBASE-11062) is available in the coming hbase 2.1.7, 2.2.2, and 2.3.0 releases and is being actively backported to branch-1.

Run hbtop with the following command:

$ hbase hbtop

In this case, the values of hbase.client.zookeeper.quorum and zookeeper.znode.parent in hbase-site.xml in the classpath or their default values are used to connect. Or, specify your own zookeeper quorum and znode parent as follows:

$ hbase hbtop -Dhbase.client.zookeeper.quorum=<zookeeper quorum> -Dzookeeper.znode.parent=<znode parent>

Run hbtop and you'll see something like the following:

Top screen

The top screen consists of a summary and metrics section. In the summary section, you can see HBase Version, Cluster ID, The number of region servers, Region count, Average Cluster Load and Aggregated Request/s. In the metrics portion, you can see metrics per Region/Namespace/Table/RegionServer depending on the selected mode. The top screen is refreshed on a period – 3 seconds by default.

You can scroll the metric records in the metrics section.

Scrolling metric records

Command line arguments

Argument Description
-d,--delay <arg> The refresh delay (in seconds); default is 3 seconds
-h,--help Print usage; for help while the tool is running press h key
-m,--mode <arg>

The mode n (Namespace)|t (Table)|r (Region)|s (RegionServer), default is r (Region) Mode

You can change mode by pressing the m key in the top screen.

Changing mode

Change the refresh rate by pressing d key in the top screen.

Changing the refresh delay

You can change the field screen by pressing f key in the top screen. In the fields screen, you can change the displayed fields by choosing a field and pressing d key or space key.

Changing the displayed fields

You can move to the fields screen by pressing f key in the top screen. In the field screen, you can change the sort field by choosing a field and pressing s key. Also, you can change the sort order (ascending or descending) by pressing R key.

Changing the sort field

You can move to the fields screen by pressing f key in the top screen. In the field screen, you can change the order of the fields.

Changing the order of the fields


You can filter the metric records with the filter feature. We can add filters by pressing o key for ignoring case or O key for case sensitive.

Adding filters

The syntax is as follows:


For example, we can add filters like the following:


The operators you can specify are as follows:

Operator Description
= Partial match
== Exact match
> Greater than
>= Greater than or equal to
< Less than
<= Less than and equal to

You can see the current filters by pressing ^o key and clear them by pressing = key.

Showing and clearing filters

Drilling down

You can drill down on a metric record by pressing i key in the top screen. With this feature, you can find hot regions easily in a top-down manner.

Drilling down

Help screen

You can see the help screen by pressing h key in the top screen.

Help screen

How hbtop gets the metrics data

hbtop gets the metrics from ClusterMetrics which is returned as the result of a call to Admin#getClusterMetrics() on the current HMaster. To add metrics to hbtop, they will need to be exposed via ClusterMetrics.

Feedback, bugs, and enhancements are all welcome!

The Reference Guide - hbtop:


Thursday February 28, 2019

NoSQL Day 2019

On May 21st in Washington, DC, there will be a one-day community event for Apache Accumulo, HBase, and Phoenix called NoSQL Day. We hope that these three Apache communities can come together to share stories from the field and learn from one another. This event is being offered by the DataWorks Summit organization, prior to their DataWorks Summit event May 20th through 23rd.

At this time, we are looking for speakers, attendees, and sponsors for the event. For speakers, we hope to see a wide breadth of subjects and focus, anything from performance, scaling, real-life applications, dev-ops, or best-practices. All speakers are welcome! Abstracts can be submitted here.

For attendees, we want to get the best and brightest from each of the respective communities because the organizers believe we have much to learn from from each other. We’ve tried to keep costs down to make this approachable for all.

Finally, sponsors are the major enabler to provide events like these at low-costs to attendees. If you are interested in a corporate sponsorship, please feel free to contact Josh Elser for more information.

For questions, please feel free to mail Josh Elser or the HBase user mailing list

Friday August 31, 2018

Beijing in Summertime: HBaseConAsia2018

by Yu Li, Chair of the HBaseConAsia2018 Conference Committee and member of the HBase PMC and Michael Stack, HBase PMC-er.

A crowd of us gathered at the Gehua New Century Hotel, in North-Eastern Beijing to attend a lively HBaseConAsia2018. It was an all-day community-run event held on August 17th. Admission was free thanks to our generous host, the Alibaba Group. This was the second HBaseConAsia conference. The first was hosted by Huawei in Shenzhen, in 2017.

The majority of the talks were in Chinese [1] but slides were in English. Below are links that point into slideshare where you’ll find slides and video. There were three tracks -- internals, ops, and ecosystem -- of heavy-duty hbasing.

Your authors led off the conference with a keynote on the “State of Apache HBase”. It covered general state (“stable”), the effort getting hbase-2.0.0 out the door, and toward the end, a couple of features that would sit pretty in any hbase-3.0.0.

The second (dense) keynote was by Chunhui Shen (Long-time HBase PMC member) and Long Cao. They talked of “Recent Developments” around HBase at Alibaba and at Alibaba Cloud (“Aliyun”). Alibaba has been using HBase since 0.20.6! and keep up their own version internally (“AliHB”) where they dev features that later get pushed upstream to Apache. The node and cluster counts were ahem, impressive, as is the ecosystem built up over time to serve different workload types. Near-future projects are figuring separation of compute from storage and hbase deploy on pangu, the Alibaba filesystem. This talk also served as overview for some of the talks that came later in the day where topics mentioned in the morning were given a deep dive in later sessions.

One such deep-dive was the first talk in track one, on “Using CCSMap to improve HBase YGC time” by Chance Li and Lijin Bin. CCSMap is a stripped-down ConcurrentSkipListMap for use in place of the native java implementation HBase currently uses. Look for it in an upcoming Apache HBase release.

Ramkrishna Vasudevan and Anoop Sam John, HBase PMCers from Intel described their recent development work, WALLess HBase with persistent memory devices.

A bunch of talks were focused on hbase deploys at various companies. The Xiaomi folks, heavyweight contributors to Apache HBase with lots of HBase deployed at scale, gave two talks, one by Xie Gang on “HDFS optimizations for HBase at Xiaomi“, and “HBase at Xiaomi” by HBase Committer, Guanghao Zhang. JingYi Yao talked on “HBase at Didi” (GeoMesa, JanusGraph, and Phoenix). Other interesting deploy-types were described in “HBase at China Telecom”, “@ China Life Insurance”, “@ Meituan”, and “@ Lianjia”.

The boys from PInterest, Chenji Pan and Lianghong Xu, did “Improving HBase reliability at Pinterest with geo-­‐replication and efficient backup” and our WenLong (Allan) Yang described an interesting scheme for treating hot and cold data differently in “Separating hot-cold data into heterogeneous storage based on layered compaction”.

Apache Kylin and JanusGraph run on HBase and each had a dedicated session. AntsDB is a fun new project that puts a mysql face on an Apache HBase cluster. Its easy to set up and works nicely.

Alibaba described how they do security, phoenix, spark, in their cloud HBase offering, AsparaDB, and an interesting backup solution they use internally that requires “zero-modification” to HBase. “The Application of HBase in New Energy Vehicle Monitoring System” was by Yan Yu, Chetankumar Jyestaram Khatri talked on “Scaling 30 TB's of Data lake with Apache HBase and Scala DSL at Production”, Biju Nair of Bloomberg described “Serving billions of queries in millisecond latency”, and Pankaj Kumar, Wei Zhi, and Chaoqiang Zhong of last year’s hosts Huawei presented on “HBase and OpenTSDB practice at Huawei” (Huawei also offer HBase in the cloud).

At the end of all sessions, a worn-out PMC took questions from a worn-out audience.

On the day after, HBase contributors came together for a fruitful dev meeting. Rough Notes were posted to the dev@hbase mailing list.

Let us finish this short report with a picture of all the HBaseConAsia2018 speakers taken at the end of the day after all was done.

  1. https://www.quora.com/Is-it-appropriate-to-refer-to-Mandarin-simply-as-Chinese

Friday July 14, 2017


(This is the second of a two-part post. The first part an be found at this spot).


Using a technique called key-flattening, a document can be shredded by storing each value in the document according to the path from the root to the name of the element containing the value.  HDocDB uses this approach.

Column Family: default
Row Key Column: <property 1 path> Column: <property 2 path>
<document ID>  <property 1 value> <property 2 value>

The document can also be stored as a binary value, in which case support for Medium Objects (MOBs) can be used if the documents are large.  This approach is described in the book Architecting HBase Applications.

Column Family: default
Row Key Column: body
<document ID>  <reference to MOB>


There are many ways to store a graph in HBase.  One method is to use an adjacency list, where each vertex stores its neighbors in the same row.  This is the approach taken in JanusGraph.

Column Family: default
Row Key Column: <edge 1 key> Column: <edge 2 key> Column: <property 1 name> Column: <property 2 name>
<vertex ID>  <edge 1 properties> <edge 2 properties> <property 1 value> <property 2 value>

In the table above, the edge key is actually comprised of a number of parts, including the label, direction, edge ID, and adjacent vertex ID.

Alternatively, a separate table to represent edges can be used, in which case the incident vertices are stored in the same row as an edge.   This may scale better if the adjacency list is large, such as in a social network.  This is the approach taken in both Zen and HGraphDB.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<vertex ID> <property 1 value> <property 2 value>
Column Family: default
Row Key Column: fromVertex Column: toVertex Column: <property 1 name> Column: <property 2 name>
<edge ID>  <vertex ID> <vertex ID> <property 1 value> <property 2 value>

When storing edges in a separate table, additional index tables must be used to provide efficient access to the incident edges of a vertex.  For example, the full list of tables in HGraphDB can be viewed here.


A queue can be modeled by using a row key comprised of the consumer ID and a counter.  Both Cask and Box implement queues in this manner.

Column Family: default
Row Key Column: metadata Column: body
<consumer ID + counter>  <message metadata> <message body>

Cask also uses coprocessors for efficient scan filtering and queue trimming, and Apache Tephra for transactional queue processing.


The Metrics archetype is a variant of the Entity archetype in which the column values are counters or some other aggregate.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<entity ID>  <property 1 counter> <property 2 counter>

HGraphDB is actually a combination of the Graph and Metrics archetypes, as arbitrary counters can be stored on either vertices or edges.

(This is the second of a two-part article. The first part can be found over here). 


by Robert Yokota, HBase Contributor

(This post originally appeared on Robert's personal blog. It is reposted here as a two-parter. The second-part can be found here.)

At Yammer, we’ve transitioned away from polyglot persistence to persistence consolidation. In a microservice architecture, the principle that each microservice should be responsible for its own data had led to a proliferation of different types of data stores at Yammer. This in turn led to multiple efforts to make sure that each data store could be easily used, monitored, operationalized, and maintained. In the end, we decided it would be more efficient, both architecturally and organizationally, to reduce the number of data store types in use at Yammer to as few as possible.

Today HBase is the primary data store for non-relational data at Yammer (we use PostgreSQL for relational data).  Microservices are still responsible for their own data, but the data is segregated by cluster boundaries or mechanisms within the data store itself (such as HBase namespaces or PostgreSQL schemas).

HBase was chosen for a number of reasons, including its performance, scalability, reliability, its support for strong consistency, and its ability to support a wide variety of data models.  At Yammer we have a number of services that rely on HBase for persistence in production:

  • Feedie, a feeds service
  • RoyalMail, an inbox service
  • Ocular, for tracking messages that a user has viewed
  • Streamie, for storing activity streams
  • Prankie, a ranking service with time-based decay
  • Authlog, for authorization audit trails
  • Spammie, for spam monitoring and blocking
  • Graphene, a generic graph modeling service

HBase is able to satisfy the persistence needs of several very different domains. Of course, there are some use cases for which HBase is not recommended, for example, when using raw HDFS would be more efficient, or when ad-hoc querying via SQL is preferred (although projects like Apache Phoenix can provide SQL on top of HBase).

Previously, Lars George and Jonathan Hsieh from Cloudera attempted to survey the most commonly occurring use cases for HBase, which they referred to as application archetypes.  In their presentation, they categorized archetypes as either “good”, “bad”, or “maybe” when used with HBase. Below I present an augmented listing of their “good” archetypes, along with pointers to projects that implement them.


The Entity archetype is the most natural of the archetypes.  HBase, being a wide column store, can represent the entity properties with individual columns.  Projects like Apache Gora and HEntityDB support this archetype.

Column Family: default
Row Key Column: <property 1 name> Column: <property 2 name>
<entity ID>  <property 1 value> <property 2 value>

Entities can be also stored in the same manner as with a key-value store.  In this case the entity would be serialized as a binary or JSON value in a single column.

Column Family: default
Row Key Column: body
<entity ID>  <entity blob>


The Sorted Collection archetype is a generalization of the original Messaging archetype that was presented.  In this archetype the entities are stored as binary or JSON values, with the column qualifier being the value of the sort key to use.  For example, in a messaging feed, the column qualifier would be a timestamp or a monotonically increasing counter of some sort.  The column qualifier can also be “inverted” (such as by subtracting a numeric ID from the maximum possible value) so that entities are stored in descending order.

Column Family: default
Row Key Column: <sort key 1 value> Column: <sort key 2 value>
<collection ID>  <entity 1 blob> <entity 2 blob>

Alternatively, each entity can be stored as a set of properties.  This is similar to how Cassandra implements CQL.  HEntityDB supports storing entity collections in this manner.

Column Family: default
Row Key Column: <sort key 1 value + property 1 name> Column: <sort key 1 value + property 2 name> Column: <sort key 2 value + property 1 name> Column: <sort key 2 value + property 2 name>
<collection ID> <property 1 of entity 1> <property 2 of entity 1> <property 1 of entity 2> <property 2 of entity 2>

In order to access entities by some other value than the sort key, additional column families representing indices can be used.

Column Family: sorted Column Family: index
Row Key Column: <sort key 1 value> Column: <sort key 2 value> Column: <index 1 value> Column: <index 2 value>
<collection ID>  <entity 1 blob> <entity 2 blob> <entity 1 blob> <entity 2 blob>

To prevent the collection from growing unbounded, a coprocessor can be used to trim the sorted collection during compactions.  If index column families are used, the coprocessor would also remove corresponding entries from the index column families when trimming the sorted collection.  At Yammer, both the Feedie and RoyalMail services use this technique.  Both services also use server-side filters for efficient pagination of the sorted collection during queries.

Continued here...

Sunday April 09, 2017

Accordion: Developer View of In-Memory Compaction

by Anastasia Braginsky (HBase Committer), Eshcar Hillel (HBase Committer) and Edward Bortnikov (Contributor) of Yahoo! Research

In-memory compaction (Accordion project) demonstrated sizable improvement in HBase’s write amplification and read/write performance. In this post, we describe the design behind Accordion’s algorithms, and how it fits within the HBase internals.

What’s New

Accordion affects the regionserver package. Its centerpiece component is the CompactingMemStore class, which inherits from AbstractMemStore, and is sibling to DefaultMemStore. In contrast with DefaultMemStore, which maintains a monolithic dynamic (mutable) index to cell storage, CompactingMemStore manages multiple indexes, ordered by creation time. The youngest index is mutable, whereas the rest are immutable.

Cell indexes are implemented as descendants of the CellSet class that provides the basic NavigableMap access to cells. In addition to the traditional ConcurrentSkipListMap mutable index, Accordion introduces an immutable CellArrayMap index - a space-efficient ordered array that uses binary search. CellArrayMap is allocated on heap.

Accordion introduces the Segment abstraction, which encapsulates the combination of the CellSet and associated metadata (time range tracker, MSLAB reference, size counters, etc.). Beforehand, these (gory) details were managed directly by the MemStore. The abstract Segment class manages a single CellSet and its metadata. It has two subclasses:  MutableSegment and ImmutableSegment. The latter can either manage an immutable CellSet, or provide a read-only wrapper to a mutable CellSet. The CompositeImmutableSegment class extends ImmutableSegment; it provides a similar API for a fixed set of segments.

Segment’s are scannable. The traversal is provided by the SegmentScanner class that implements the KeyValueScanner interface. SegmentScanner exploits the NavigableMap API implemented by the CellSet encapsulated by the segment.

CompactingMemStore manages one MutableSegment (in what follows, active) and multiple ImmutableSegment’s. It supports the top-level scan mechanism via a list of SegmentScanner’s, each referring to one segment. In this context, the MemStoreScanner class became deprecated and was eliminated in HBase 2.0.

Figure 1 depicts the Segment and cell index (NavigableMap) class hierarchies.


Figure 1. Segment and cell index (NavigableMap) class hierarchies.

Immutable segments are created upon in-memory flush. Following this, they travel through an interim pipeline (CompactionPipeline class) to the snapshot buffer from where they are flushed to disk, and finally released. Pipeline is accessed in parallel by multiple tasks; in what follows, we discuss how its thread-safety and correctness are guaranteed. The snapshot is simpler because its content never changes; it is implemented as CompositeImmutableSegment.  

In-memory flushes trigger in-memory compactions. The latter replace one or more segments in pipeline with semantically equivalent but more memory-efficient presentations. The MemStoreCompactor class is an algorithmic tool that implements the in-memory compaction policies. It uses the MemStoreSegmentsIterator helper class to traverse the segments. Figure 2 depicts the classes that implement in-memory compaction.


Figure 2. Classes that implement in-memory compaction.

The  StoreScanner class implements a consistent scan mechanism for HRegion. It maintains a heap of KeyValueScanner’s to merge the MemStore data with the on-disk HFile data. CompactingMemStore returns a subset of these scanners (list of SegmentScanner instances) for all its Segment’s.

MemStoreCompactor exploits the same mechanism, via the MemStoreSegmentsIterator helper; it only iterates through immutable segments. Figure 3 depicts the classes involved in in-memory compaction.


Figure 3. Classes involved in in-memory compaction.

Managing the Compacting Memstore State

MemStore’s in HBase run processing tasks concurrently with serving normal read and write requests - for example, flush data from RAM to disk. In CompactingMemStore, there are more concurrent scenarios, with in-memory flushes and compactions introducing more complexity. Here, pipeline is the most complex since it is accessed by multiple tasks in parallel.

Our guiding principles are:

  1. Correctness. Data retrieval semantics are preserved - in particular, data is never lost.

  2. Performance. Infrequent flushes and compactions, which happen in the background, do not affect the datapath operations, namely scans.

Let us give a quick look at how these principles manifest in the CompactionPipeline design.

Data Structures and Synchronization

Pipeline contains a double-ended queue of ImmutableSegments’s ordered by segment creation time. It is accessed by scans (read) as well as flushes and compactions (update). Since the segments are immutable, it is sufficient to provide the reader with a clone of the queue. One way to go would be to clone upon each scan, under the protection of a reentrant shared lock. We chose a more efficient copy-on-write approach. Namely, only the update operations synchronize on the pipeline. Each update modifies the read-only copy of the queue (volatile reference). The subsequent scans retrieve their clone lock-free. Note that if some segments are removed from the queue by in-memory compaction or disk flush in parallel with an ongoing scan, correctness is not affected because the data does not disappear. Rather, it may be referenced from multiple locations (for instance, both pipeline and snapshot). The scan algorithm filters the duplicates.

In-memory compaction swaps one or more segments in the queue with new (compacted) segments. Similarly to scan, it is a long-running operation, which should not disrupt the concurrent datapath operations. In order to achieve this, we implemented compaction in a non-blocking way. CompactionPipeline maintains a version that is promoted each time the queue tail is modified. When the compaction starts, it records this version. Upon completion, it atomically checks whether the version changed in the meantime, and atomically swaps the segments if it did not. This opportunistic approach succeed in most cases. Since in-memory compaction is an optimization, it is fine for it to fail on rare occasions. The version counter (long) is volatile - that is, changes to it are atomic and immediately observable.

Detailed Scenarios

Scan Operation (in particular, Get). The SegmentScanner’s are created (non-atomically) in the order of data movement between the MemStore segments, to preserve correctness. For example, in the course of scanner set creation a segment can move from active to pipeline, in which case it will be referenced by two scanners - however, no data is lost. The merge algorithm eliminates the redundant results that stem from the overlap.

In-Memory Flush (happens when active overflows). A dedicated worker (1) blocks updates for the region (via RegionServicesForStores), (2) creates a new ImmutableSegment that wraps active, (3) atomically inserts it into pipeline, (4) creates a new MutableSegment and flips the active reference to it, (5) unblocks the updates, and (6) calls MemStoreCompactor.

Disk Flush (happens when the region overflows, and decides to free up space in RAM). A dedicated worker (1) forces in-memory flush (to guarantee there is at least one segment in the pipeline), (2) creates a new CompositeImmutableSegment from all segments in the read-only clone of pipeline and flips the snapshot reference, (3) atomically removes references to segments in snapshot from CompactionPipeline, and (4) scans snapshot (merge across multiple segments) and flushes the results to disk.

In-Memory Compaction (triggered by in-memory flush, except in the disk flush case). (1) Retrieves a versioned copy of pipeline, (2) builds a new (compacted) ImmutableSegment, (3) atomically, if the version did not change, swap one or more segments in pipeline with the new segment (swap target depends on the compaction policy, see below).

Note that all the atomic sections are extremely lightweight. They only include manipulation of a few references, and avoid any computation and copy.

In-Memory Compaction Policies

MemStoreCompactor provides two compaction policies: BASIC and EAGER.

The BASIC policy is a low-cost/low-overhead alternative that merges the indexes of all segments in pipeline into a single flat index. It does not eliminate redundancies, in order to avoid cell data copy. Namely, once the number of segments in pipeline exceeds N, the algorithm scans the CellSet’s of N+1 youngest segments in pipeline, and copies the KeyValue references to a new CellArrayMap. The scan retrieves all the KeyValue’s in the original CellSet’s ordered by key and version (non-SQM matcher).

The EAGER policy is a high-cost/high-reward alternative that both flattens the index and eliminates redundancies across segments. It scans all the segments in pipeline, and merges them into one segment encapsulating a new CellArrayMap index. Redundant data versions are eliminated in the course of scan (SQM matcher). If the MemStore uses MSLAB cell storage, then the data is copied to new (compact) MSLAB’s under the new index. This policy trades extra data copy and GC overhead for maximal memory efficiency.

Disk Flush Policy and WAL Truncation

HBase 2.0 introduces a notion of sloppy MemStore’s - that is, MemStore implementations that dynamically expand and contract their RAM footprint over time. CompactingMemStore is currently the only sloppy MemStore implementation. When a region triggers a flush to disk to free up memory, sloppy stores  are the last candidates for flush. The rationale is that they manage their memory more efficiently than DefaultMemStore by over time, and therefore should be prioritized for remaining in RAM.

Disk flushes trigger WAL truncation (archiving), as the WAL entries corresponding to persisted data versions become obsolete. Region maintains the estimate of the lower bound (minimum sequence id) of non-flushed data among all its stores; the log entries below this bound can be safely removed. Prior to Accordion, this maintenance was simple. Since DefaultMemStore dumps the whole in-memory content to disk, the store-level minimum sequence id was reset when flush was scheduled, and re-installed by the first put operation to occur after the flush.

Since sloppy stores can flush in-memory data to disk partially (for example, CompactingMemStore can flush any suffix of CompactionPipeline) the minimum sequence id maintenance becomes more subtle, to avoid data loss. Namely, every segment maintains its own minimum sequence id, and therefore, the CompactingMemStore lower bound is the minimum among all segments. Note that this is just a conservative estimate. For example, an eager in-memory compaction that happens concurrently to a disk flush might eliminate redundant cells and thereby lift the lower bound. However, this estimate is safe because the value can only monotonously grow over time. It can be safely computed anytime; no atomicity is required while retrieving the segment lower bounds.

If the WAL grows too big despite the truncation efforts, the periodic LogRoller process kicks in and forces a full flush to disk. This generic mechanism guarantees that the recovery after crash does not need to replay the entire history, and also trims the WAL. In other words, however efficient, in-memory compaction does not eliminate disk flushes entirely - rather, it pushes them further into the future. Note that for when EAGER compaction is adopted, periodic flushing is even more important because the WAL stores all the data redundancies that are eliminated by the compaction algorithm.


In this blog post, we covered Accordion’s internals - new classes, relationships, and execution flows. We also zoomed in the synchronization scheme that guarantees thread-safety, and shed light on the compaction policy implementations.

We thank Michael Stack, Anoop Sam John and Ramkrishna Vasudevan for their continuous support that made this project happen.

Accordion: HBase Breathes with In-Memory Compaction

by Anastasia Braginsky (HBase Committer), Eshcar Hillel (HBase Committer) and Edward Bortnikov (Contributor) of Yahoo! Research

Modern products powered by HBase exhibit ever-increasing  expectations from its read and write performance. Ideally, HBase applications would like to enjoy the speed of in-memory databases without giving up on the reliable persistent storage guarantees. We introduce a new algorithm in HBase 2.0, named Accordion, which takes a significant step towards this goal.

HBase partitions the data into regions controlled by a cluster of RegionServer’s. The internal (vertical) scalability of RegionServer is crucial for end-user performance as well as for the overall system utilization. Accordion improves the RegionServer scalability via a better use of RAM. It accommodates more data in memory and writes to disk less frequently. This manifests in multiple desirable phenomena. First, HBase’s disk occupancy and write amplification are reduced. Second, more reads and writes get served from RAM, and less are stalled by disk I/O - in other words, HBase’s performance is increased. Traditionally, these different metrics were considered at odds, and tuned at each other’s expense. With Accordion, they all get improved simultaneously.

Accordion is inspired by the Log-Structured-Merge (LSM) tree design pattern that governs the HBase storage organization. An HBase region is stored as a sequence of searchable key-value maps. The topmost is a mutable in-memory store, called MemStore, which absorbs the recent write (put) operations. The rest are immutable HDFS files, called HFiles. Once a MemStore overflows, it is flushed to disk, creating a new HFile. HBase adopts the multi-versioned concurrency control, that is, MemStore stores all data modifications as separate versions. Multiple versions of one key may therefore reside in MemStore and the HFile tier. A read (get) operation, which retrieves the value by key, scans the HFile data in BlockCache, seeking for the latest version. To reduce the number of disk accesses, HFiles are merged in the background. This process, called compaction, removes the redundant cells and creates larger files.

LSM trees deliver superior write performance by transforming random application-level I/O to sequential disk I/O. However, their traditional design makes no attempt to compact the in-memory data. This stems from historical reasons: LSM trees have been designed in the age when RAM was very short resource, therefore the MemStore capacity was small. With recent changes in the hardware landscape, the overall MemStore memstore managed by RegionServer can be multiple gigabytes, leaving a lot of headroom for optimization.

Accordion reapplies the LSM principle to MemStore, in order to eliminate redundancies and other overhead while the data is still in RAM. Doing so decreases the frequency of flushes to HDFS, thereby reducing the write amplification and the overall disk footprint. With less flushes, the write operations are stalled less frequently as the MemStore overflows, therefore the write performance is improved. Less data on disk also implies less pressure on the block cache, higher hit rates, and eventually better read response times. Finally, having less disk writes also means having less compaction happening in the background, i.e., less cycles are stolen from productive (read and write) work. All in all, the effect of in-memory compaction can be envisioned as a catalyst that enables the system move faster as a whole.

Accordion currently provides two levels of in-memory compaction - basic and eager. The former applies generic optimizations that are good for all data update patterns. The latter is most useful for applications with high data churn, like producer-consumer queues, shopping carts, shared counters, etc. All these use cases feature frequent updates of the same keys, which generate multiple redundant versions that the algorithm takes advantage of to provide more value. On the flip side, eager optimization may incur compute overhead (more memory copies and garbage collection), which may affect response times under intensive write loads. The overhead is high if the MemStore uses on-heap MemStore-Local Allocation Buffer (MSLAB) allocation; this configuration is not advised in conjunction with eager compaction. See more details about Accordion’s compaction algorithms in the next sections.

Future implementations may tune the optimal compaction policy automatically, based on the observed workload.

How To Use

The in-memory compaction level can be configured both globally and per column family. The supported levels are none (legacy implementation), basic, and eager.

By default, all tables apply basic in-memory compaction. This global configuration can be overridden in hbase-site.xml, as follows:





The level can also be configured in the HBase shell per column family, as follows:  

create ‘<tablename>’,


Performance Gains, or Why You Should Care

We stress-tested HBase extensively via the popular Yahoo Cloud Service Benchmark (YCSB). Our experiments used 100-200 GB datasets, and exercised a variety of representative workloads. The results demonstrate significant performance gains delivered by Accordion.

Heavy-tailed (Zipf) distribution. The first experiment exercises a workload in which the key popularities follow the Zipf distribution that arises in most of the real-life scenarios. In this context, when 100% of the operations are writes, Accordion achieves up to 30% reduction of write amplification, 20% increase of write throughput, and 22% reduction of GC. When 50% of the operations are reads, the tail read latency is reduced by 12%.

Uniform distribution. The second experiment exercises a workload in which all keys are equally popular. In this context, under 100% writes, Accordion delivers up to 25% reduction of write amplification, 50% increase of write throughput, and 36% reduction of GC. The tail read latencies are not impacted (which is expected, due to complete lack of locality).

How Accordion Works

High Level Design. Accordion introduces CompactingMemStore - a MemStore implementation that applies compaction internally. Contrast to the default MemStore, which maintains all data in one monolithic data structure, Accordion manages it as a sequence of segments. The youngest segment, called active, is mutable; it absorbs the put operations. Upon overflow (by default, 32MB - 25% of the MemStore size bound), the active segment is moved to an in-memory pipeline, and becomes immutable. We call this in-memory flush. Get operations scan through these segments and the HFiles (the latter are accessed via the block cache, as usual in HBase).

CompactingMemStore may merge multiple immutable segments in the background from time to time, creating larger and leaner segments. The pipeline is therefore “breathing” (expanding and contracting), similar to accordion bellows.

When RegionServer decides to flush one or more MemStore’s to disk to free up memory, it considers the CompactingMemStore’s after the rest that have overflown. The rationale is to prolong the lifetime of MemStore’s that manage their memory efficiently, in order to reduce the overall I/O. When such a flush does happen, all pipeline segments are moved to a composite snapshot,  merged, and streamed to a new HFile.

Figure 1 illustrates the structure of CompactingMemStore versus the traditional design.

Figure 1. CompactingMemStore vs DefaultMemStore

Segment Structure. Similarly to the default MemStore, CompactingMemStore maintains an index on top of cell storage, to allow fast search by key. Traditionally, this index was implemented as a Java skiplist  (ConcurrentSkipListMap) - a dynamic but wasteful data structure that manages a lot of small objects. CompactingMemStore uses a space-efficient flat layout for immutable segment indexes. This universal optimization helps all compaction policies reduce the RAM overhead, even when the data has little-to-none redundancies. Once a segment is added to the pipeline, the store serializes its index into a sorted array named CellArrayMap that is amenable to fast binary search.

CellArrayMap supports both direct allocation of cells from the Java heap and custom allocation from MSLAB’s - either on-heap or off-heap. The implementation differences are abstracted away via the helper KeyValue objects that are referenced from the index (Figure 2). CellArrayMap itself is always allocated on-heap.

Figure 2. Immutable segment with a flat CellArrayMap index and MSLAB cell storage.

Compaction Algorithms. The in-memory compaction algorithms maintains a single flat index on top of the pipelined segments. This saves space, especially when the data items are small, and therefore pushes the disk flush further off away in time. A single index allows searching in one place, therefore bounding the tail read latency.

When an active segment is flushed to memory, it is queued to the compaction pipeline, and a background merge task is immediately scheduled. The latter simultaneously scans all the segments in the pipeline (similarly to on-disk compaction) and merges their indexes into one. The differences between the basic and eager compaction policies manifest in how they handle the cell data. Basic compaction does not eliminate the redundant data versions in order to  avoid physical copy; it just rearranges the references the KeyValue objects. Eager compaction, on the contrary, filters out the duplicates. This comes at the cost of extra compute and data migration - for example, with MSLAB storage the surviving cells are copied to the newly created MSLAB(s). The compaction overhead pays off when the data is highly redundant.

Future implementations of compaction may automate the choice between the basic and eager compaction policies. For example, the algorithm might try eager compaction once in awhile, and schedule the next compaction based on the value delivered (i.e., fraction of data eliminated). Such an approach could relieve the system administrator from deciding a-priori, and adapt to changing access patterns.


In this blog post, we covered Accordion’s basic principles, configuration, performance gains, and some details of the in-memory compaction algorithms. The next post will focus on system internals for HBase developers.

We thank Michael Stack, Anoop Sam John and Ramkrishna Vasudevan for their continuous support that made this project happen.

Monday March 27, 2017

HBase on Azure: Import/Export snapshots from/to ADLS

by Apekshit Sharma, HBase Committer.


Azure Data Lake Store (ADLS) is Microsoft’s cloud alternative for Apache HDFS. In this blog, we’ll see how to use it as backup for storing snapshots of Apache HBase tables. You can export snapshots to ADLS for backup; and for recovery, import the snapshot back to HDFS and use it to clone/restore the table. In this post, we’ll go over the configuration changes needed to make HDFS client talk to ADLS, and commands to copy HBase table snapshots from HDFS to ADLS and vice-versa.


“The Azure Data Lake store is an Apache Hadoop file system compatible with Hadoop Distributed File System (HDFS) and works with the Hadoop ecosystem.”

ADLS can be treated as any HDFS service, except that it’s in the cloud. But then how do applications talk to it? That’s where the hadoop-azure-datalake module comes into the picture. It enables an HDFS client to talk to ADLS whenever the following access path syntax is used:

adl://<Account Name>.azuredatalakestore.net/

For eg.
hdfs dfs -mkdir adl://<Account Name>.azuredatalakestore.net/test_dir

However, before it can access any data in ADLS, the module needs to be able to authenticate to Azure. That requires a few configuration changes. These we describe in the next section.

Configuration changes

ADLS requires an OAuth2 bearer token to be present as part of request’s HTTPS header. Users who have access to an ADLS account can obtain this token from the Azure Active Directory (Azure AD) service. To allow an HDFS client to authenticate to ADLS and access data, you’ll need to specify these tokens in core-site.xml using the following four configurations:



To find the values for dfs.adls.oauth2.* configurations, refer to this document.

Since all files/folders in ADLS are owned by the account owner, it’s ACL environment works well with that of HDFS which can have multiple users. Since the user issuing commands using the HDFS client will be different than what’s in Azure’s AD, any operation which checks for ACL will fail. To workaround this issue, use the following configuration which will tell the HDFS client that in case of ADLS requests, assume that the current user owns all files.


Make sure to deploy the above configuration changes to the cluster.

Export snapshot to ADLS

Here are the steps to export a snapshot from HDFS to ADLS.

  1. Create a new directory in ADLS to store snapshots.

$ hdfs dfs -mkdir adl://appy.azuredatalakestore.net/hbase

$ hdfs dfs -ls adl://appy.azuredatalakestore.net/

Found 1 items

drwxr-xr-x   - systest hdfs          0 2017-03-21 23:43 adl://appy.azuredatalakestore.net/hbase

  1. Create the snapshot. To know more about this feature and how to create/list/restore snapshots, refer to HBase Snapshots section in the HBase reference guide.

  2. Export snapshot to ADLS

$ sudo -u hbase hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot <snapshot_name> -copy-to adl://appy.azuredatalakestore.net/hbase


17/03/21 23:50:24 INFO snapshot.ExportSnapshot: Copy Snapshot Manifest

17/03/21 23:50:48 INFO snapshot.ExportSnapshot: Export Completed: snapshot_1

  1. Verify that the snapshot was copied to ADLS.

$ hbase snapshotinfo -snapshot <snapshot_name> -remote-dir adl://appy.azuredatalakestore.net/hbase

Snapshot Info


  Name: snapshot_1

  Type: FLUSH

 Table: t

Format: 2

Created: 2017-03-21T23:42:56

  1. It’s now safe to delete the local snapshot (one in HDFS).

Restore/Clone table from a snapshot in ADLS

If you have a snapshot in ADLS which you want to use either to restore an original table to a previous state, or create a new table by cloning, follow the steps below.

  1. Copy the snapshot back from ADLS to HDFS. Make sure to copy to ‘hbase’ directory on HDFS, because that’s where HBase service will look for snapshots.

$ sudo -u hbase hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot <snapshot_name> -copy-from adl://appy.azuredatalakestore.net/hbase -copy-to hdfs:///hbase

  1. Verify that the snapshot exists in HDFS. (Note that there is no -remote-dir parameter)

$ hbase snapshotinfo -snapshot snapshot_1

Snapshot Info


  Name: snapshot_1

  Type: FLUSH

 Table: t

Format: 2

Created: 2017-03-21T23:42:56

  1. Follow the instructions in HBase Snapshots section of HBase reference guide to restore/clone from the snapshot.


The Azure module in HDFS makes it easy to interact with ADLS. We can keep using the commands we are already know and our applications that use the HDFS client just need a few configuration changes. What what a seamless integration! In this blog, we got a glimpse of the HBase integration with Azure - Using ADLS as a backup for storing snapshots. Let’s see what the future has in store for us. Maybe, a HBase cluster fully backed by ADLS!

Thursday March 09, 2017

Offheap Read-Path in Production - The Alibaba story

By Yu Li (HBase Committer/Alibaba), Yu Sun (Alibaba), Anoop Sam John (HBase PMC/Intel), and Ramkrishna S Vasudevan (HBase PMC/Intel)


HBase is the core storage system in Alibaba’s Search Infrastructure. Critical e-commerce data about products, sellers and promotions etc. are all synced into HBase from various online databases. We query HBase to build and provide real time updates on the search index. In addition, user behavior data, such as impressions, clicks and transactions will also be streamed into HBase. They serve as feature data for our online machine learning system, which optimizes the personalized search result in real time. The whole system produces mixed workloads on HBase that includes bulkload/snapshot for full index building, batch mutation for real time index updates and streaming/continuous query for online machine learning. Our biggest HBase cluster has reached more than 1500 nodes and 200,000 regions. It routinely serves tens of millions QPS.

Both latency and throughput are important for our HBase deploy. From the latency perspective, it directly affects how quickly users can search an item after it has been posted as well as how ‘real-time’ we can run our inventory accounting. From the throughput perspective, it decides the speed of machine learning program processing, and thus the accuracy of recommendations made. What’s more, since data is distributed through the cluster and accesses are balanced, applications are sensitive to latency spikes on a single node, which makes GC a critical factor in our system servicing capability.

By caching more data in memory, the read latency (and throughput) can be greatly improved. If we can get our data from local cache, we save having to make a trip to HDFS. Apache HBase has two layers of data caching. There is what we call “L1” caching, our first caching tier – which caches data in an on heap Least Recently Used (LRU) cache -- and then there is an optional, “L2” second cache tier (aka Bucket Cache).

Bucket Cache can be configured to keep its data in a file -- i.e. caching data in a local file on disk -- or in memory. File mode usually is able to cache more data but there will be more attendant latency reading from a file vs reading from memory. Bucket Cache can also be configured to use memory outside of the Java heap space (‘offheap’) so users generally configurea a large L2 cache with offheap memory along with a smaller on heap L1 cache.

At Alibaba we use an offheap L2 cache dedicating 12GB to Bucket Cache on each node. We also backported a patch currently in master branch only (to be shipped in the coming hbase-2.0.0) which makes it so the hbase read path runs offheap end-to-end. This combination improved our average throughput significantly. In the below sections, we’ll first talk about why the off-heaping has to be end-to-end, then introduce how we back ported the feature from master branch to our customized 1.1.2, and at last show the performance with end-to-end read-path offheap in an A/B test and on Singles’ Day (11/11/2016).

Necessity of End-to-end Off-heaping

Before offheap, the QPS curve looked like below from our A/B test cluster


We could see that there were dips in average throughput. Concurrently, the average latency would be high during these times.

Checking RegionServer logs, we could see that there were long GC pauses happening. Further analysis indicated that when disk IO is fast enough, as on PCIe-SSD, blocks would be evicted from cache quite frequently even when there was a high cache hit ratio. The eviction rate was so high that the GC speed couldn’t keep up bringing on frequent long GC pauses impacting throughput.

Looking to improve throughput, we tried the existing Bucket Cache in 1.1.2 but found GC was still heavy. In other words, although Bucket Cache in branch-1 (branch for current stable releases) already supports using offheap memory for Bucket Cache, it tends to generate lots of garbages. To understand why end-to-end off-heaping is necessary, let’s see how reads from Bucket cache work in branch-1. But before we do this, lets understand how bucket cache itself has been organized.

The allocated offheap memory is reserved as DirectByteBuffers, each of size 4 MB. So we can say that physically the entire memory area is split into many buffers each of size 4 MB.  Now on top of this physical layout, we impose a logical division. Each logical area is sized to accommodate different sized HFile blocks (Remember reads of HFiles happen as blocks and block by block it will get cached in L1 or L2 cache). Each logical split accommodates different sized HFile blocks from 4 KB to 512 KB (This is the default. Sizes are configurable). In each of the splits, there will be more that one slot into which we can insert a block. When caching, we find an appropriately sized split and then an empty slot within it and here we insert the block. Remember all slots are offheap. For more details on Bucket cache, refer here [4]. Refer to the HBase Reference Guide [5] for how to setup Bucket Cache.

In branch-1, when the read happens out of an L2 cache, we will have to copy the entire block into a temporary onheap area. This is because the HBase read path assumes block data is backed by an onheap byte array.  Also as per the above mentioned physical and logical split, there is a chance that one HFile block data is spread across 2 physical ByteBuffers.

When a random row read happens in our system, even if the data is available in L2 cache, we will end up reading the entire block -- usually ~64k in size -- into a temporary onheap allocation for every row read. This creates lots of garbage (and please note that without the HBASE-14463 fix, this copy from offheap to onheap reduced read performance a lot). Our read workload is so high that this copy produces lots of GCs, so we had to find a way to avoid the need of copying block data from offheap cache into temp onheap arrays.

How was it achieved? - Our Story

The HBASE-11425 Cell/DBB end-to-end on the read-path work in the master branch, avoids the need to copy offheap block data back to onheap when reading. The entire read path is changed to work directly off the offheap Bucket Cache area and serve data directly from here to clients (see the details of this work and performance improvement details here [1], and [2]). So we decided to try this project in our custom HBase version based on 1.1.2 backporting it from the master branch.

The backport cost us about 2 people months, including getting familiar with and analysis of the JIRAs to port, fix UT failures, fixing problems found in functional testing (HBASE-16609/16704), and resolving compatibility issues (HBASE-16626). We have listed the full to-back-port JIRA list here [3] and please refer to it for more details if interested.

About configurations, since for tables of different applications use different block sizes -- from 4KB to 512KB -- the default bucket splits just worked for our use case. We also kept the default values for other configurations after carefully testing and even after tuning while in production. Our configs are listed below:

Alibaba’s Bucket Cache related configuration





















How it works? - A/B Test and Singles’ Day

We tested the performance on our A/B test cluster (with 450 physical machines, and each with 256G memory + 64 core) after back porting and got a better throughput as illustrated below


It can be noted that now the average throughput graph is very much more linear and there are no more dips in throughput across time.

The version with the offheap read path feature was released on October 10th and has been online ever since (more than 4 months). Together with the NettyRpcServer patch (HBASE-15756), we successfully made it through our 2016 Singles’ Day, with peaks at 100K QPS on a single RS.



[1] https://blogs.apache.org/hbase/entry/offheaping_the_read_path_in

[2] http://www.slideshare.net/HBaseCon/offheaping-the-apache-hbase-read-path

[3] https://issues.apache.org/jira/browse/HBASE-17138

[4] https://issues.apache.org/jira/secure/attachment/12562209/Introduction%20of%20Bucket%20Cache.pdf

[5] http://hbase.apache.org/book.html#offheap.blockcache

Thursday December 29, 2016

HGraphDB: Apache HBase As An Apache TinkerPop Graph Database

Robert Yokota is a Software Engineer at Yammer.

An earlier version of this post was published here on Robert's blog.

Be sure to also check out the excellent follow on post Graph Analytics on HBase with HGraphDB and Giraph.

HGraphDB: Apache HBase As An Apache TinkerPop Graph Database

The use of graph databases is common among social networking companies. A social network can easily be represented as a graph model, so a graph database is a natural fit. For instance, Facebook has a graph database called Tao, Twitter has FlockDB, and Pinterest has Zen. At Yammer, an enterprise social network, we rely on Apache HBase for much of our messaging infrastructure, so I decided to see if HBase could also be used for some graph modelling and analysis.

Below I put together a wish list of what I wanted to see in a graph database.

  • It should be implemented directly on top of HBase.
  • It should support the TinkerPop 3 API.
  • It should allow the user to supply IDs for both vertices and edges.
  • It should allow user-supplied IDs to be either strings or numbers.
  • It should allow property values to be of arbitrary type, including maps, arrays, and serializable objects.
  • It should support indexing vertices by label and property.
  • It should support indexing edges by label and property, specific to a given vertex.
  • It should support range queries and pagination with both vertex indices and edge indices.

I did not find a graph database that met all of the above criteria. For instance, Titan is a graph database that supports the TinkerPop API, but it is not implemented directly on HBase. Rather, it is implemented on top of an abstraction layer that can be integrated with Apache HBase, Apache Cassandra, or Berkeley DB as its underlying store. Also, Titan does not support user-supplied IDs. Apache S2Graph Incubating is a graph database that is implemented directly on HBase, and it supports both user-supplied IDs and indices on edges, but it does not yet support the TinkerPop API nor does it support indices on vertices.

This led me to create HGraphDB, a TinkerPop 3 layer for HBase. It provides support for all of the above bullet points. Feel free to try it out if you are interested in using HBase as a graph database.

Tuesday May 24, 2016

Why We Use HBase: Recap so far

Just a quick note: If you haven't seen them yet you should check out the first four entries in our "Why We Use HBase" series. More guest posts in the series on deck soon.

  1. Scalable Distributed Transactional Queues on Apache HBase

  2. Medium Data and Universal Data Systems (great read!)

  3. Imgur Notifications: From MySQL to HBase

  4. Investing In Big Data: Apache HBase

 - Andrew


Tuning G1GC For Your HBase Cluster

Graham Baecher is a Senior Software Engineer on HubSpot's infrastructure team and Eric Abbott is a Staff Software Engineer at HubSpot.

An earlier version of this post was published here on the HubSpot Product and Engineering blog.

Tuning G1GC For Your HBase Cluster

HBase is the big data store of choice for engineering at HubSpot. It’s a complicated data store with a multitude of levers and knobs that can be adjusted to tune performance. We’ve put a lot of effort into optimizing the performance and stability of our HBase clusters, and recently discovered that suboptimal G1GC tuning was playing a big part in issues we were seeing, especially with stability.

Each of our HBase clusters is made up of 6 to 40 AWS d2.8xlarge instances serving terabytes of data. Individual instances handle sustained loads over 50k ops/sec with peaks well beyond that. This post will cover the efforts undertaken to iron out G1GC-related performance issues in these clusters. If you haven't already, we suggest getting familiar with the characteristics, quirks, and terminology of G1GC first.

We first discovered that G1GC might be a source of pain while investigating frequent

“...FSHLog: Slow sync cost: ### ms...”

messages in our RegionServer logs. Their occurrence correlated very closely to GC pauses, so we dug further into RegionServer GC. We discovered three issues related to GC:

  • One cluster was losing nodes regularly due to long GC pauses.
  • The overall GC time was between 15-25% during peak hours.
  • Individual GC events were frequently above 500ms, with many over 1s.

Below are the 7 tuning iterations we tried in order to solve these issues, and how each one played out. As a result, we developed a step-by-step summary for tuning HBase clusters.

Original GC Tuning State

The original JVM tuning was based on an Intel blog post, and over time morphed into the following configuration just prior to our major tuning effort.

-Xmx32g -Xms32g

32 GB heap, initial and max should be the same

-XX:G1NewSizePercent= 3-9

Minimum size for Eden each epoch, differs by cluster


Optimistic target, most clusters take 100+ ms


Better stack traces in some circumstances, traded for a bit more CPU usage


Helps keep a lid on reference processing time issues were were seeing


Alleged to protect against a bizarre Linux issue


Alleged to save some CPU cycles in between GC epochs

GC logging verbosity as shown below was cranked up to a high enough level of detail for our homegrown gc_log_visualizer script. The majority of graphs in this document were created with gc_log_visualizer, while others were snapshotted from SignalFX data collected through our CollectD GC metrics plugin.

Our GC logging params:


Starting Point: Heap Sizes and Overall Time Spent in GC

With the highly detailed GC logs came the following chart of heap state. Eden size is in red and stays put at its minimum (G1NewSizePercent), 9% of total heap. Tenured size, or Working set + waste, floats in a narrow band between 18-20gb. With Eden a flat line, the total heap line will mirror the Tenured line, just 9% higher.

The black horizontal line just under 15GB marks the InitiatingHeapOccupancyPercent (aka “IHOP”), at its default setting of 45%. The purple squares are the amount of Tenured space reclaimable at the start of a mixed GC event. The floor of the band of purple squares is 5% of heap, the value of G1HeapWastePercent.

The next graph shows a red “+” on each minute boundary and stands for the total percent of wall time the JVM was in STW and doing no useful work. The overall time spent in GC for this HBase instance for this time period is 15-25%. For reference, an application tier server spending 20%+ time in GC is considered stuck in “GC Hell” and in desperate need of tuning.

Tuning #1 Goal: Lower Total Time in GC - Action: Raise IHOP

One aspect that stands out clearly in the previous graph of heap sizes is that the working set is well above the IHOP. Tenured being higher than IHOP generally results in an overabundance of MPCMC runs (wastes CPU) and consequently an overabundance of Mixed GC cycles resulting in a higher ratio of expensive GC events vs cheap GC events. By moving IHOP a bit higher than Tenured, we expect fewer Mixed GC events to reclaim larger amounts of space, which should translate to less overall time spent in STW.

Raising the IHOP value on an HBase instance, the following before/after (above/below) graphs show that indeed the frequency of Mixed GC events drops dramatically while the reclaimable amount rises.

Considering that at least half of Mixed GC events on this instance took 200-400ms, we expected the reduced amount of Mixed GC events to outweigh any increase in individual Mixed GC times, such that overall GC time would drop. That expectation held true, as overall time spent in GC dropped from 4-12% to 1-8%.

The following graphs show before/after on the STW times for all Mixed GC events. Note the drastic drop in frequency while individual STW times don't seem to change.

Result: Success

This test was considered successful. We made the change across all HBase clusters to use a significantly larger IHOP value than the default of 45%.

Tuning #2 Goal: Lower Total Time in GC - Action: Increase Eden

Fixing the IHOP value to be higher than working set was basically fixing a misconfiguration. There was very little doubt that nonstop MPCMC + Mixed GC events was an inefficient behavior. Increasing Eden size, on the other hand, had a real possibility of increasing all STW times, both Minor and Mixed. GC times are driven by the amount of data being copied (surviving) from one epoch to the next, and databases like HBase are expected to have very large caches. A 10+ GB cache could very well have high churn and thus high object survival rates.

The effective Eden size for our HBase clusters is driven by the minimum Eden value G1NewSizePercent because the MaxGCPauseMillis target of 50ms is never met.

For this test, we raised Eden from 9% to 20% through G1NewSizePercent.

Effects on Overall GC Time

Looking at the following graphs, we see that overall time spent in GC may have dropped a little for this one hour time window from one day to the next.

Individual STW times

Looking at the STW times for just the Minor GC events there is a noticeable jump in the floor of STW times.

To-space Exhaustion Danger

As mentioned in the G1GC Foundational blog post, G1ReservePercent is ignored when the minimum end of the Eden range is used. The working set on a few of our HBase clusters is in the 70-75% range, which combined with a min Eden of 20% would leave only 5-10% of heap free for emergency circumstances. The downside of running out of free space, thus triggering To-space Exhaustion, is a 20+ sec GC pause and the effective death of the HBase instance. While the instance would recover, the other HBase instances in the cluster would consider it long dead before the GC pause completed.

Result: Failure

The overall time spent in GC did drop a little as theoretically expected, unfortunately the low end of Minor GC stop the world times increased by a similar percent. In addition, the risk for To-space exhaustion increased. The approach of increasing G1NewSizePercent to reduce overall time spent in GC didn't look promising and was not rolled out to any clusters.

Tuning #3 Goal: Reduce Individual STW Durations - Action: SurvivorRatio and MaxTenuringThreshold

In the previous tuning approach, we found that STW times increased as Eden size increased. We took some time to dig a little deeper into Survivor space to determine if there was any To-space overflow or if objects could be promoted faster to reduce the amount of object copying being done. To collect the Survivor space tenuring distribution data in the GC logs we enabled PrintTenuringDistribution and restarted a few select instances.

To-space overflow is the phenomenon where the Survivor To space isn't large enough to fit all the live data in Eden at the end of an epoch. Any data collected after Survivor To is full is added to Free regions, which are then added to Tenured. If this overflow is transient data, putting it in Tenured is inefficient as it will be expensive to clean out. If that was the case, we'd need to increase SurvivorRatio.

On the other hand, consider a use case where any object that survives two epochs will also survive ten epochs. In that case, by ensuring that any object that survives a second epoch is immediately promoted to Tenured, we would see a performance improvement since we wouldn’t be copying it around in the GC events that followed.

Here’s some data collected from the PrintTenuringDistribution parameter:

Desired survivor size 192937984 bytes, new threshold 2 (max 15)
- age 1: 152368032 bytes, 152368032 total
- age 2: 147385840 bytes, 299753872 total
[Eden: 2656.0M(2656.0M)->0.0B(2624.0M) Survivors: 288.0M->320.0M Heap: 25.5G(32.0G)->23.1G(32.0G)]

An Eden size of 2656 MB with SurvivorRatio = 8 (default) yields a 2656/8 = 332 MB survivor space. In the example entries we see enough room to hold two ages of survivor objects. The second age here is 5mb smaller than the first age, indicating that in the interval between GC events, only 5/152 = 3.3% of the data was transient. We can reasonably assume the other 97% of the data is some kind of caching. By setting MaxTenuringThreshold = 1, we optimize for the 97% of cached data to be promoted to Tenured after surviving its second epoch and hopefully shave a few ms of object copy time off each GC event.

Result: Theoretical Success

Unfortunately we don't have any nice graphs available to show these effects in isolation. We consider the theory sound and rolled out MaxTenuringThreshold = 1 to all our clusters.

Tuning #4 Goal: Eliminate Long STW Events - Action: G1MixedGCCountTarget & G1HeapWastePercent

Next, we wanted to see what we could do about eliminating the high end of Mixed GC pauses. Looking at a 5 minute interval of our Mixed GC STW times, we saw a pattern of sharply increasing pauses across each cycle of 6 mixed GCs:

That in and of itself should not be considered unusual, after all that behavior is how the G1GC algorithm got it's name. Each Mixed GC event will evacuate 1 / G1MixedGCCountTarget of the high-waste regions (regions with the most non-live data). Since it prioritizes regions with the most garbage, each successive Mixed GC event will be evicting regions with more and more live objects. The chart shows the performance effects of clearing out regions with more and more live data: the Mixed event times start at 100ms at the beginning of a mixed GC cycle and range upwards past 600ms by the end.

In our case, we were seeing occasional pauses at the end of some cycles that were several seconds long. Even though they were rare enough that our average pause time was reasonable, pauses that long are still a serious concern.

Two levers in combination can be used together to lessen the “scraping the bottom of the barrel” effect of cleaning up regions with a lot of live data:

G1HeapWastePercent: default (5) → 10. Allow twice as much wasted space in Tenured. Having 5% waste resulted in 6 of the 8 potential Mixed GC events occurring in each Mixed GC cycle. Bumping to 10% waste should chop off 1-2 more of the most expensive events of the Mixed GC cycle.

G1MixedGCCountTarget: default (8) → 16. Double the target number of Mixed GC events each Mixed GC cycle, but halve the work done by each GC event. Though it’s an increase to the number of GC events that are Mixed GC events, STW times of individual Mixed events should drop noticeably.

In combination, we expected doubling the target count to drop the average Mixed GC time, and increasing the allowable waste to eliminate the most expensive Mixed GC time. There should be some synergy, as more heap waste should also mean regions are on average slightly less live when collected.

Waste heap values of 10% and 15% were both examined in a test environment. (Don’t be alarmed by the high average pause times - this test environment was running under heavy load, on less capable hardware than our production servers.)

Above: 10% heap waste; below: 15% heap waste:

The results are very similar. 15% performed slightly better, but in the end we decided that 15% waste was unnecessarily much. 10% was enough to clear out the "scraping the bottom of the barrel" effect such that the 1+ sec Mixed GC STW times all but disappeared in production.

Result: Success

Doubling G1MixedGCCountTarget from 8 to 16 and G1HeapWastePercent from 5 to 10 succeeded in eliminating the frequent 1s+ Mixed GC STW times. We kept these changes and rolled them out across all our clusters.

Tuning #5 Goal: Stop Losing Nodes: Heap Size and HBase Configs

While running load tests to gauge the effects of the parameters above, we also began to dig into what looked like evidence of memory leaks in a production cluster. In the following graph we see the heap usage slowly grow over time until To-space Exhaustion, triggering a Full GC with a long enough pause to get the HBase instance booted from the cluster and killed:

We've got several HBase clusters, and only one cluster occasionally showed this type of behavior. If this issue were a memory leak, we'd expect the issue to arise more broadly, so it looks like HBase is using more memory in this cluster than we expected. To understand why, we looked into the heap breakdown of our RegionServers. The vast majority of an HBase RegionServer’s Tenured space is allocated to three main areas:

  • Memstore: region server’s write cache; default configuration caps this at 40% of heap.
  • Block Cache: region server’s read cache; default config caps at 40% of heap.
  • Overhead: the vast majority of HBase’s in-memory overhead is contained in a “static index”. The size of this index isn’t capped or configurable, but HBase assumes this won’t be an issue since the combined cap for memstore and block cache can’t exceed 80%.

We have metrics for the size of each of these, from the RegionServer’s JMX stats: “memStoreSize,” “blockCacheSize,” and “staticIndexSize.” The stats from our clusters show that HBase will use basically all of the block cache capacity you give it, but memstore and static index sizes depend on cluster usage and tables. Memstore fluctuates over time, while static index size depends on the RegionServer’s StoreFile count and average row key size.

It turned out, for the cluster in question, that the HBase caches and overhead combined were actually using more space than our JVM was tuned to handle. Not only were memstore and block cache close to capacity—12 GB block cache, memstore rising past 10GB - but the static index size was unexpectedly large, at 6 GB. Combined, this put desired Tenured space at 28+ GB, while our IHOP was set at 24 GB, so the upward trend of our Tenured space was just the legitimate memory usage of the RegionServer.

With this in mind, we judged the maximum expected heap use for each cluster’s RegionServers by looking at the cluster maximum memstore size, block cache size, and static index size over the previous month, and assuming max expected usage to be 110% of each value. We then used that number to set the block cache and memstore size caps (hfile.block.cache.size and hbase.regionserver.global.memstore.size) in our HBase configs.

The fourth component of Tenured space is the heap waste, in our case 10% of the heap size. We could now confidently tune our IHOP threshold by summing the max expected memstore, block cache, static index size, 10% heap for heap waste, and finally 10% more heap as a buffer to avoid constant mixed GCs when working set is maxed (as described in Tuning #1).

However, before we went ahead and blindly set this value, we had to consider the uses of heap other than Tenured space. Eden requires a certain amount of heap (determined by G1NewSizePercent), and a certain amount (default 10%) is Reserved free space. IHOP + Eden + Reserved must be ≤ 100% in order for a tuning to make sense; in cases where our now-precisely-calculated IHOP was too large for this to be possible, we had to expand our RegionServer heaps. To determine minimum acceptable heap size, assuming 10% Reserved space, we used this formula:

Heap ≥ (M + B + O + E) ÷ 0.7

    • M = max expected memstore size
    • B = max expected block cache size
    • O = max expected overhead (static index)
    • E = minimum Eden size

When those four components add up to ≤ 70% of the heap, then there will be enough room for 10% Reserved space, 10% heap waste, and 10% buffer between max working set and IHOP.

Result: Success

We reviewed memory usage of each of our clusters and calculated correct heap sizes and IHOP thresholds for each. Rolling out these changes immediately ended the To-space Exhaustion events we were seeing on the problematic cluster.

Tuning #6 Goal: Eliminate Long STW Events - Action: Increase G1 Region Size

We’d rolled out HBase block cache & memstore config changes, changes to G1HeapWastePercent and G1MixedGCCountTarget, and an increase in heap size on a couple clusters (32 GB → 40+ GB) to accommodate larger IHOP. In general things were smooth, but there were still occasional Mixed GC events taking longer than we were comfortable with, especially on the clusters whose heap had increased. Using gc_log_visualizer, we looked into what phase of Mixed GC was the most volatile and noticed that Scan RS times correlated:

A few Google searches indicated that Scan RS time output in the GC logs is the time taken examining all the regions referencing the tenured regions being collected. In our most recent tuning changes, heap size had been bumped up, however the G1HeapRegionSize remained fixed at 16 MB. Increasing the G1HeapRegionSize to 32 MB eliminated those high scan times:

Result: Success

Halving the G1 region count cleared out the high volatility in Scan RS times. According to G1GC documentation, the ideal region count is 2048, so 16 MB regions were perfect for a 32 GB heap. However, this tuning case led us to believe that for HBase heaps without a clear choice of region size, in our case 40+ GB, it’s much better to err on the side of fewer, larger regions.

Tuning #7 Goal: Preventing RegionServer To-space Exhaustion - Action: Extra Heap as Buffer

At this point, our RegionServers were tuned and running much shorter and less frequent GC Events. IHOP rested above Tenured while Tenured + Eden remained under the target of 90% total heap. Yet once in awhile, a RegionServer would still die from a To-space exhaustion triggered Full GC event as shown in the following graph.

It looks like we did everything right—there’s lot’s of reclaimable space and Tenured space drops well below IHOP with each Mixed GC. But right at the end, heap usage spikes up and we hit To-space Exhaustion. And while it’s likely the HBase client whose requests caused this problem could be improved to avoid this*, we can’t rely on our various HBase clients to behave perfectly all the time.

In the scenario above, very bursty traffic caused Tenured space to fill up the heap before the MPCMC could complete and enable a Mixed GC run. To tune around this, we simply added heap space**, while adjusting IHOP and G1NewSizePercent down to keep them at the same GB values they had been at before. By doing this we increased the buffer of free space in the heap above our original 10% default, for additional protection against spikes in memory usage.

Result: Success

Increasing heap buffer space on clusters whose HBase clients are known to be occasionally bursty has all but eliminated Full GC events on our RegionServers.


* Block cache churn correlates very closely with time spent in Mixed GC events on our clusters (see chart below). A high volume of Get and Scan requests with caching enabled unnecessarily (e.g. requested data isn’t expected to be in cache and isn’t expected to be requested again soon) will increase cache churn as data is evicted from cache to make room for the Get/Scan results. This will raise the RegionServer’s time in GC and could contribute to instability as described in this section.

Chart: % time in Mixed GC is in yellow (left axis); MB/sec cache churn is in blue (right axis):

** ** Another potential way to tune around this issue is by increasing ConcGCThreads (default is 1/4 * ParallelGCThreads). ConcGCThreads is the number of threads used to do the MPCMC, so increasing it could mean the MPCMC finishes sooner and the RegionServer can start a Mixed GC before Tenured space fills the heap. At HubSpot we’ve been satisfied with the results of increasing our heap size and haven’t tried experimenting with this value.

Overall Results: Goals Achieved!

After these cycles of debugging and tuning G1GC for our HBase clusters, we’ve improved performance in all the areas we were seeing problems originally:

  • Stability: no more To-space Exhaustion events causing Full GCs.
  • 99th percentile performance: greatly reduced frequency of long STW times.
  • Avg. performance: overall time spent in GC STW significantly reduced.

Summary: How to Tune Your HBase Cluster

Based on our findings, here’s how we recommend you tune G1GC for your HBase cluster(s):

Before you start: GC and HBase monitoring

  • Track block cache, memstore and static index size metrics for your clusters in whatever tool you use for charts and monitoring, if you don’t already. These are found in the RegionServer JMX metrics:
    • memStoreSize
    • blockCacheSize
    • staticIndexSize”  

  • You can use our collectd plugin to track G1GC performance over time, and our gc_log_visualizer for insight on specific GC logs. In order to use these you’ll have to log GC details on your RegionServers:
    • -Xloggc:$GC_LOG_PATH
    • -verbosegc
    • -XX:+PrintGC
    • -XX:+PrintGCDateStamps
    • -XX:+PrintAdaptiveSizePolicy
    • -XX:+PrintGCDetails
    • -XX:+PrintGCApplicationStoppedTime
    • -XX:+PrintTenuringDistribution
    • Also recommended is some kind of GC log rotation, e.g.:
      • -XX:+UseGCLogFileRotation
      • -XX:NumberOfGCLogFiles = 5
      • -XX:GCLogFileSize=20M

Step 0: Recommended Defaults

  • We recommend the following JVM parameters and values as defaults for your HBase RegionServers (as explained in Original GC Tuning State):
    • -XX:+UseG1GC
    • -XX:+UnlockExperimentalVMOptions
    • -XX:MaxGCPauseMillis = 50
      • This value is intentionally very low and doesn’t actually represent a pause time upper bound. We recommend keeping it low to pin Eden to the low end of its range (see Tuning #2).
    • -XX:-OmitStackTraceInFastThrow
    • -XX:ParallelGCThreads = 8+(logical processors-8)(5/8)
    • -XX:+ParallelRefProcEnabled
    • -XX:+PerfDisableSharedMem
    • -XX:-ResizePLAB 

Step 1: Determine maximum expected HBase usage

  • As discussed in the Tuning #5 section, before you can properly tune your cluster you need to know your max expected block cache, memstore, and static index usage.
    • Using the RegionServer JMX metrics mentioned above, look for the maximum value of each metric across the cluster:
      • Maximum block cache size.
      • Maximum memstore size.
      • Maximum static index size.
    • Scale each maximum by 110%, to accommodate even for slight increase in max usage. This is your usage cap for that metric: e.g. a 10 GB max recorded memstore → 11 GB memstore cap.
      • Ideally, you’ll have these metrics tracked for the past week or month, and you can find the maximum values over that time. If not, be more generous than 110% when calculating memstore and static index caps. Memstore size especially can vary widely over time. 

Step 2: Set Heap size, IHOP, and Eden size

  • Start with Eden size relatively low: 8% of heap is a good initial value if you’re not sure.
    • -XX:G1NewSizePercent = 8
    • See Tuning #2 for more about Eden sizing.
      • Increasing Eden size will increase individual GC pauses, but slightly reduce overall % time spent in GC.
      • Decreasing Eden size will have the opposite effect: shorter pauses, slightly more overall time in GC.

  • Determine necessary heap size, using Eden size and usage caps from Step 1:
    • From Tuning #5: Heap ≥ (M + B + O + E) ÷ 0.7
      • M = memstore cap, GB
      • B = block cache cap, GB
      • O = static index cap, GB
      • E = Eden size, GB
    • Set JVM args for fixed heap size based on the calculated value, e.g:
      • -Xms40960m -Xmx40960m

  • Set IHOP in the JVM, based on usage caps and heap size:
    • IHOP = (memstore cap’s % heap + block cache cap’s % heap + overhead cap’s % heap + 20)
    • -XX:InitiatingHeapOccupancyPercent IHOP

Step 3: Adjust HBase configs based on usage caps

  • Set block cache cap and memstore cap ratios in HBase configs, based on usage caps and total heap size. In hbase-site.xml:
    • hfile.block.cache.size → block cache cap ÷ heap size
    • hbase.regionserver.global.memstore.size → memstore cap ÷ heap size 

Step 4: Adjust additional recommended JVM flags for GC performance

  • From Tuning #3:
    • -XX:MaxTenuringThreshold = 1

  • From Tuning #4:
    • -XX:G1HeapWastePercent = 10
    • -XX:G1MixedGCCountTarget = 16

  • From Tuning #6:
    • -XX:G1HeapRegionSize = #M
    • # must be a power of 2, in range [1..32].
    • Ideally, # is such that: heap size ÷ # MB = 2048 regions.
    • If your heap size doesn’t provide a clear choice for region size, err on the side of fewer, larger regions. Larger regions reduce GC pause time volatility.

Step 5: Run it!

  •  Restart your RegionServers with these settings and see how they look.

    • Remember that you can adjust Eden size as described above, to optimize either for shorter individual GCs or for less overall time in GC. If you do, make sure to maintain Eden + IHOP ≤ 90%.

    • If your HBase clients can have very bursty traffic, consider adding heap space outside of IHOP and Eden (so that IHOP + Eden adds up to 80%, for example).

      • Remember to update % and ratio configs along with the new heap size.

      • Details and further suggestions about this found in Tuning #7. 

Further reference:

Saturday April 23, 2016

HDFS HSM and HBase: Conclusions (Part 7 of 7)

This is part 7 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel)  

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions


There are many things to consider when choosing the hardware of a cluster. According to the test results, in the SSD-related cases the network utility between DataNodes is larger than 10Gbps. If you are using a 10Gbps switch, the network will be the bottleneck and impact the performance. We suggest either extending the network bandwidth by network bonding, or upgrading to a more powerful switch with a higher bandwidth. In cases 1T_HDD and 1T_RAM_HDD, the network utility is lower than 10 Gbps in most time, using a 10 Gbps switch to connect DataNodes is fine.

In all 1T dataset tests, 1T_RAM_SSD shows the best performance. Appropriate mix of different types of storage can improve the HBase write performance. First, write the latency-sensitive and blocking data to faster storage, and write the data that are rarely compacted and accessed to slower storage. Second, avoid mixing types of storage with a large performance gap, such as with 1T_RAM_HDD.

The hardware design issue limits the total disk bandwidth which makes there is hardly superiority of eight SSDs than four SSDs. Either to enhance hardware by using HBA cards to eliminate the limitation of the design issue for eight SSDs or to mix the storage appropriately. According to the test results, in order to achieve a better balance between performance and cost, using four SSDs and four HDDs can achieve a good performance (102% throughput and 101% latency of eight SSDs) with a much lower price. The RAMDISK/SSD tiered storage is the winner of both throughput and latency among all the tests, so if cost is not an issue and maximum performance is needed, RAMDISK(extremely high speed block device, e.g. NVMe PCI-E SSD)/SSD should be chosen.

You should not use a large number of flusher/compactor when most of data are written to HDD. The read and write shares the single channel per HDD, too many flushers and compactors at the same time can slow down the HDD performance.

During the tests, we found some things that can be improved in both HBase and HDFS.

In HBase, the memstore is consumed quickly when the WALs are stored in fast storage; this can lead to regular long GC pauses. It is better to have an offheap memstore for HBase.

In HDFS, each DataNode shares the same lock when creating/finalizing blocks. Any such slow operations in one DataXceiver can block any other operations of creating/finalizing blocks in other DataXceiver on the same DataNode no matter what storage they are using. We need to eliminate the blocking access across storage, and a finer grained lock mechanism to isolate the operations on different blocks is needed (HDFS-9668). And it will be good to implement a latency-aware VolumeChoosingPolicy in HDFS to remove the slow volumes from the candidates.

RoundRobinVolumeChoosingPolicy can lead to load imbalance in HDFS with tiered storage (HDFS-9608).

In HDFS, renaming a file to a different storage does not move the blocks indeed. We need to asynchronously move the HDFS blocks in such a case.


The authors would like to thank Weihua Jiang  – who is the previous manager of the big data team in Intel – for leading this performance evaluation, and thank Anoop Sam John(Intel), Apekshit Sharma(Cloudera), Jonathan Hsieh(Cloudera), Michael Stack(Cloudera), Ramkrishna S. Vasudevan(Intel), Sean Busbey(Cloudera) and Uma Gangumalla(Intel) for the nice review and guidance.

HDFS HSM and HBase: Issues (Part 6 of 7)

This is part 6 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel)

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions

Issues and Improvements

This section describes the issues we find during tests.


Long-time BLOCKED threads in DataNode

In 1T_RAM_HDD test, we can observe a substantial 0 throughput period in the YCSB client. After deep-diving into the thread stack, we find many threads of DataXceiver are stuck in a BLOCKED state for a long time in DataNode. We can observe such things in other cases too, but it is most often in 1T_RAM_HDD.

In each DataNode, there is a single instance of FsDatasetImpl where there are many synchronized methods, DataXceiver threads use this instance to achieve synchronization when creating/finalizing blocks. A slow creating/finalizing operation in one DataXceiver thread can block other creating/finalizing operations in all other DataXceiver threads. The following table shows the time consumed by these operations in 1T_RAM_HDD:

Synchronized methods

Max exec time (ms)

in light load

Max wait time (ms)

in light load

Max exec time (ms)

in heavy load

Max wait time (ms)

in heavy load
















Table 13. DataXceiver threads and time consumed

We can see that both execution time and wait time on the synchronized methods increased dramatically along with the increment of the system load. The time to wait for locks can be up to tens of seconds. Slow operations usually come from the slow storage such as HDD. It can hurt the concurrent operations of creating/finalizing blocks in fast storage so that HDFS/HBase cannot make better use of tiered storage.

A finer grained lock mechanism in DataNode is needed to fix this issue. We are working on this improvement now (HDFS-9668).

Load Imbalance in HDFS with Tiered Storage

In tiered storage cases, we find that the utilization are not the same among volumes of the same storage type when using the policy RoundRobinVolumeChoosingPolicy. The root cause is that in RoundRobinVolumeChoosingPolicy it uses a shared counter to choose volumes for all storage types. It might be unfair when choosing volumes for a certain storage type, so the volumes in the tail of the configured data directories have a lower chance to be written.

The situation becomes even worse when there are different numbers of volumes of different storage types. We have filed this issue in JIRA (HDFS-9608) and provided a patch.

Asynchronous File Movement Across Storage When Renaming in HDFS

Currently data blocks in HDFS are stored in different types of storage media according to pre-specified storage policies when creating. After that data blocks will remain where they were until an external tool Mover in HDFS is used. Mover scans the whole namespace, and moves the data blocks that are not stored in the right storage media as the policy specifies.

In a tiered storage, when we rename a file/directory from one storage to another different one, we have to move the blocks of that file or all files under that directory to the right storage. This is not currently provided in HDFS.

Non-configurable Storage Type and Policy

Currently in HDFS both storage type and storage policy are predefined in source code. This makes it inconvenient to add implementations for new devices and policies. It is better to make them configurable.

No Optimization for Certain Storage Type

Currently there is no difference in the execution path for different storage types. As more and more high performance storage devices are adopted, the performance gap between storage types will become larger, and the optimization for certain types of storage will be needed.

Take writing certain numbers of data into HDFS as an example. If users want to minimize the total time to write, the optimal way for HDD may be using compression to save disk I/O, while for RAMDISK writing directly is more suitable as it eliminates the overheads of compression. This scenario requires configurations per storage type, but it is not supported in the current implementation.


Disk Bandwidth Limitation

In the section 50GB Dataset in a Single Storage, the performance difference between four SSDs and eight SSDs is very small. The root cause is the total bandwidth available for the eight SSDs is limited by upper level hardware controllers. Figure 16 illustrates the motherboard design. The eight disk slots connect to two different SATA controllers (Ports 0:3 - SATA and Port 0:3 - sSATA). As highlighted in the red rectangle, the maximum bandwidth available for the two controller in the server is 2*6 Gbps = 1536 MB/s.

Figure 16. Hardware design of server motherboard

Maximum throughput for single disk is measured with FIO tool.

Read BW (MB/s)

Write BW (MB/s)










Table 14. Maximum throughput of storage medias

Note: RAMDISK is memory essentially and it does not go through the same controller as SSDs and HDDs, so it does not have the 2*6Gbps limitation. Data of RAMDISK is listed in the table for convenience of comparison.

According to Table 14 the writing bandwidth of eight SSDs is 447 x 8 = 3576 MB/s. It exceeds the controllers’ 1536 MB/s physical limitation, thus only 1536 MB/s are available for all eight SSDs. HDD is not affected by this limitation as their total bandwidth (127 x 8 = 1016 MB/s) is below the limitation. This fact greatly impacts the performance of the storage system.

We suggest one of the following:

  • Enhance hardware by using more HBA cards to eliminate the limitation of the design issue.

  • Use SSD and HDD together with an appropriate ratio (for example four SSDs and four HDDs) to achieve a better balance between performance and cost.

Disk I/O Bandwidth and Latency Varies for Ports

As described in section Disk Bandwidth Limitation, four of the eight disks connect to Ports 0:3 - SATA and the rest of them connect to Port 0:3 - sSATA, the total bandwidth of the two controllers is 12Gbps. We find that the bandwidth is not evenly divided to the disk channels.

We do a test, each SSD (sda, sdb, sdc and sdd connect to Port 0:3 - sSATA, sde, sdf, sdg and sdh connect to Ports 0:3 - SATA) is written by an individual FIO process. It’s expected that eight disks are written at the same speed, but according to the output of IOSTAT the bandwidth 1536 MB/s is not evenly divided to two controllers and eight disk channels. As shown in Figure 17, the four SSDs connected to Ports 0:3 - SATA obtain more I/O bandwidth (213.5MB/s*4) than the others (107MB/s*4).

We suggest that you consider the controller limitation and storage bandwidth when setting up a cluster. Using four SSDs and four HDDs in a node is a reasonable choice, and it is better to install the four SSDs to Port 0:3 - SATA.

Additionally, the disk with higher latency might take the same workload as the disks with lower latency in the existing VolumeChoosingPolicy. This would slow down the performance. We suggest to implement a latency-aware VolumeChoosingPolicy in HDFS.

Figure 17. Different write speed and await time of disks

Go to part 7, Conclusions

HDFS HSM and HBase: Experiment (continued) (Part 5 of 7)

This is part 5 of a 7 part report by HBase Contributor, Jingcheng Du and HDFS contributor, Wei Zhou (Jingcheng and Wei are both Software Engineers at Intel)

  1. Introduction
  2. Cluster Setup
  3. Tuning
  4. Experiment
  5. Experiment (continued)
  6. Issues
  7. Conclusions

1TB Dataset in a Single Storage

The performance for 1TB dataset in HDD and SSD is shown in Figure 6 and Figure 7. Due to the limitation of memory capability, 1TB dataset in RAMDISK is not tested.

Figure 6. YCSB throughput of a single storage type with 1TB dataset

Figure 7. YCSB latency of a single storage type with 1TB dataset

The throughput and latency on SSD are both better than HDD (134% throughput and 35% latency). This is consistent with 50GB data test.

The benefits gained for throughput by using SSD are different between 50GB and 1TB (from 128% to 134%), SSD gains more benefits in the 1TB test. This is because much more I/O intensive events such as compactions occur in 1TB dataset test than 50GB, and this shows the superiority of SSD in huge data scenarios. Figure 8 shows the changes of the network throughput during the tests.

Figure 8. Network throughput measured for case 1T_HDD and 1T_SSD

In 1T_HDD case the network throughput is lower than 10Gbps, and in 1T_SSD case the network throughput can be much larger than 10Gbps. This means if we use a 10Gbps switch in 1T_SSD case, the network should be the bottleneck.

Figure 9. Disk throughput measured for case 1T_HDD and 1T_SSD

In Figure 9, we can see the bottleneck for these two cases is disk bandwidth.

  • In 1T_HDD, at the beginning of the test the throughput is almost 1000 MB/s, but after a while the throughput drops down due to memstore limitation of regions caused by slow flush.

  • In 1T_SSD case, the throughput seems to be limited by a ceiling of around 1300 MB/s, nearly the same with the bandwidth limitation of SATA controllers. To further improve the throughput, more SATA controllers are needed (e.g. using HBA card) instead of more SSDs are needed.

During 1T_SSD test, we observe that the operation latencies on eight SSDs per node are very different as shown in the following chart. In Figure 10, we only include latency of two disks, sdb represents disks with a high latency and sdf represents disks with a low latency.

Figure 10. I/O await time measured for different disks

Four of them have a better latency than the other ones. This is caused by the hardware design issue. You can find the details in Disk I/O Bandwidth and Latency Varies for Ports. The disk with higher latency might take the same workload as the disks with lower latency in the existing VolumeChoosingPolicy, this would slow down the performance. We suggest to implement a latency-aware VolumeChoosingPolicy in HDFS.

Performance Estimation for RAMDISK with 1TB Dataset

We cannot measure the performance of RAMDISK with 1T dataset due to RAMDISK limited capacity. Instead we have to evaluate its performance by analyzing the results of cases HDD and SSD.

The performance between 1TB and 50GB dataset are pretty close in HDD and SSD.

The throughput difference between 50GB and 1TB dataset for HDD is


While for SSD the value is


If we make an average of the above values as the throughput decrease in RAMDISK between 50GB and 1TB dataset, it is around 2.15% ((2.89%+1.41%)/2=2.15%), thus the throughput for RAMDISK with 1T dataset should be

406577×(1+2.15%)=415318 (ops/sec)

Figure 11.  YCSB throughput estimation for RAMDISK with 1TB dataset

Please note: the throughput doesn’t drop much in 1 TB dataset cases compared to 50 GB dataset cases because they do not use the same number of pre-split regions. The table is pre-split to 18 regions in 50 GB dataset cases, and it is pre-split to 210 regions in the 1 TB dataset.

Performance for Tiered Storage

In this section, we will study the HBase write performance on tiered storage (i.e. different storage mixed together in one test). This would show what performance it can achieve by mixing fast and slow storage together, and help us to conclude the best balance of storage between performance and cost.

Figure 12 and Figure 13 show the performance for tiered storage. You can find the description of each case in Table 1.

Most of the cases that introduce fast storage have better throughput and latency. With no surprise, 1T_RAM_SSD has the best performance among them. The real surprise is that the throughput of 1T_RAM_HDD is worse than 1T_HDD (-11%) and 1T_RAM_SSD_All_HDD is worse than 1T_SSD_All_HDD (-2%) after introducing RAMDISK, and 1T_SSD is worse than 1T_SSD_HDD (-2%).

Figure 12.  YCSB throughput data for tiered storage

Figure 13.  YCSB latency data for tiered storage

We also investigate how much data is written to different storage types by collecting information from one DataNode.

Figure 14. Distribution of data blocks on each storage of HDFS in one DataNode

As shown in Figure 14, generally, more data are written to disks for test cases with higher throughput. Fast storage can accelerate the flush and compaction, which lead to more flushes and compactions.  Thus, more data are written to disks. In some RAMDISK-related cases, only WAL can be written to RAMDISK, and there are 1216 GB WALs written to one DataNode.

For tests without SSD (1T_HDD and 1T_RAM_HDD), we by purpose limiting the number of flush and compaction actions by using fewer flushers and compactors. This is due to limited IOPs capability of HDD, which lead to fewer flush & compactions. Too many concurrent reads and writes can hurt HDD performance which eventually slows down the performance.

Many BLOCKED DataNode threads can be blocked up to tens of seconds in 1T_RAM_HDD. We observe this in other cases as well, but it happens most often in 1T_RAM_HDD. This is because each DataNode holds one big lock when creating/finalizing HDFS blocks, these methods might take tens of seconds sometimes (see Long-time BLOCKED threads in DataNode), the more these methods are used (in HBase they are used in flusher, compactor, and WAL), the more often the BLOCKED occurs. Writing WAL in HBase needs to create/finalize blocks which can be blocked, and consequently users’ inputs are blocked. Multiple WAL with a large number of groups or WAL per region might also encounter this problem, especially in HDD.

With the written data distribution in mind, let’s look back at the performance result in Figure 12 and Figure 13. According to them, we have following observations:

  1. Mixing SSD and HDD can greatly improve the performance (136% throughput and 35% latency) compared to pure HDD. But fully replacing HDD with SSD doesn’t show an improvement (98% throughput and 99% latency) over mixing SSD/HDD. This is because the hardware design cannot evenly split the I/O bandwidth to all eight disks, and 94% data are written in SSD while only 6% data are written to HDD in SSD/HDD mixing case. This strongly hints a mix use of SSD/HDD can achieve the best balance between performance and cost. More information is in Disk Bandwidth Limitation and Disk I/O Bandwidth and Latency Varies for Ports.

  2. Including RAMDISK in SSD/HDD tiered storage has different results with 1T_RAM_SSD_All_HDD and 1T_RAM_SSD_HDD. The case 1T_RAM_SSD_HDD shows a result when there are only a few data written to HDD, which improves the performance over SSD/HDD mixing cases. The results of 1T_RAM_SSD_All_HDD when there are a large number of data written to HDD is worse than SSD/HDD mixing cases. This means if we distribute the data appropriately to SSD and HDD in HBase, we can gain a good performance when mixing RAMDISK/SSD/HDD.

  3. The RAMDISK/SSD tiered storage is the winner of both throughput and latency (109% throughput and 67% latency of pure SSD case). So, if cost is not an issue and maximum performance is needed, RAMDISK/SSD should be chosen.

The throughput decreases by 11% by comparing 1T_RAM_HDD to 1T_HDD. This is initially because 1T_RAM_HDD uses RAMDISK which consumes part of the RAM, which results in the OS buffer having less memory to cache the data.

Further, with 1T_RAM_HDD, the YCSB client can push data at very high speed, cells are accumulated very fast in memstore while the flush and compaction in HDD are slow, the RegionTooBusyException occurs more often (the figure below shows a much larger memstore in 1T_RAM_HDD than 1T_HDD), and we observe much longer GC pause in 1T_RAM_HDD than 1T_HDD, it can be up to 20 seconds in a minute.

Figure 15. Memstore size in 1T_RAM_HDD and 1T_HDD

Finally, as we try to increase the number of flushers and compactors, the performance even goes worse because of the reasons mentioned when explaining why we use less flusher and compactors in HDD-related tests (see Long-time BLOCKED threads in DataNode).

The performance reduction in 1T_RAM_SSD_All_HDD than 1T_SSD_All_HDD (-2%) is due to the same reasons mentioned above.

We suggest:

Implement a finer grained lock mechanism in DataNode.

  1. Use reasonable configurations for flusher and compactor, especially in HDD-related cases.

  2. Don’t use the storage that has large performance gaps, such as directly mixing RAMDISK and HDD together.

  3. In many cases, we can observe the long GC pause around 10 seconds per minute. We need to implement an off-heap memstore in HBase to solve long GC pause issues.

  4. Implement a finer grained lock mechanism in DataNode.

Go to part 6, Issues



Hot Blogs (today's hits)

Tag Cloud