{"id":362753,"date":"2015-10-29T09:00:59","date_gmt":"2015-10-29T16:00:59","guid":{"rendered":"https:\/\/www.microsoft.com\/en-us\/research\/?p=362753"},"modified":"2017-02-09T12:24:52","modified_gmt":"2017-02-09T20:24:52","slug":"evolution-bings-objectstore","status":"publish","type":"post","link":"https:\/\/www.microsoft.com\/en-us\/research\/blog\/evolution-bings-objectstore\/","title":{"rendered":"The Evolution of Bing\u2019s ObjectStore"},"content":{"rendered":"
By Vikas Sabharwal\u00a0and Vineet Pruthi, Microsoft Shared Platform Group<\/em><\/p>\n In late 2011, Bing\u2019s shared platform team started looking at different solutions for distributed NoSQL stores. We needed a fast, unified, distributed key value-store that is scalable and can be turned into a shared platform for many Bing internal partners. We wanted to serve lookups with low single-digit millisecond latencies. We added strong requirements for scale, latency, availability, and multi-tenancy. We set the following goals for us to meet in order to release a shared platform:<\/p>\n We realized there is no one-size-fits-all approach, and we wanted to allow different scenarios to use different indexing and storage techniques. We wanted to allow transparent and continuous migration from one storage engine to another while serving the traffic on the same data interface.<\/p>\n We looked at some of the open source solutions available at that point in time, our effort started before some other solutions became mature enough. Our requirements related to availability, latency, and multitenancy made us take a different direction. We reached out to the Microsoft Research Team and worked closely with them to come up with a high-performance storage platform from the ground up.<\/p>\n We named our platform ObjectStore <\/strong>and started working on it. We came up with a group of core concepts we needed to build for our platform:<\/p>\n In this article we will focus on Pluggable Storage Engine, at later point we will share more details on Replication, Coprocessors, secondary indexes and automated repairs.<\/p>\n At Bing, we generate, process, and serve tons of data \u2013 and we wanted a solution to serve two classes of data update mechanisms for key-value store, extremely fast and efficiently. We settled for the following:<\/p>\n Regardless of the model, we wanted to use a pluggable storage engine that would allow us to replace one storage type with another while sharing common components, such as request\/response interface and data replication layer. We leveraged this plug-ability to serve these tables types.<\/p>\n Figure 1 \u2013 Table Server<\/p><\/div>\n This type of store supports read-only tables with very low memory utilization by leveraging minimum overhead per record. To achieve fast lookups, we keep the hash of the keys in memory while trying to keep a low memory consumption. We use the minimal perfect hashing (opens in new tab)<\/span><\/a> algorithm. The data is loaded atomically into the storage engine on a different machine, without impacting lookup serving. Bulk stores allow for delta or full refresh of store with data publishing. We ensure that every request with multiple keys only hits same version of the data.<\/p>\n The entire data set is sharded into partitions, and each partition is divided into buckets which are created offline using Cosmos<\/a> via the following procedure. All records within a bucket are split into groups of about 4 records. All groups are then sorted by the number of keys in them. For each group, we determine a factor F with a value between 0-255, such that there are no collisions, and at that point, the value is flipped with the position of a key in the bucket. The keys for which we do not have such a factor F get moved into a collisions table. We use 3 hash functions, one for identifying the bucket, one for the group, and one for the position of the value in the bucket. Empty spaces in array B are compressed while storing the ordered list of values into the data file. These files are then shipped to online Table Servers, for serving this key value space.<\/p>\n Figure 2 \u2013 Offline preparation of minimal perfect hash using map reduce<\/p><\/div>\n For every key in a read operation, 3 hashes are calculated. Every read key is served with about two in-memory look-ups \u2013 one to determine the bucket, and another one to find the position of the value for the key in loaded array of the group. Key collisions are served from collisions table.<\/p>\n Data loading is orchestrated by a centralized controller called Table Master. Each partition has at least 3 serving copies on different machines; we may pick more if we need to support higher load. During a new version of data rollout, we have more than 3 serving copies of data to support version consistency and higher availability for reads. Example, we add a 4th<\/sup> row of data with new version on different set of machines, and unload 1st<\/sup> row of old data once 4th<\/sup> row is successfully responding to queries. Whenever a new data rollout completes row-by-row, the old version of data is completely unloaded. Data rollout for each table is treated as separate rollout unit. Because this is happening for 100s of tables on 1000s of machines on demand, amount of additional spare capacity required for the 4th<\/sup> row is much smaller in magnitude.<\/p>\n This type of table is very useful for scenario with large keys, like machine learning models for URLs, because we only store a small key hash in memory. The typical memory footprint overhead is 14 bytes for bulk store implementation in ObjectStore.<\/p>\n This type of store is quite different from the bulk table store. For each partition, we select a primary replica and 2 or more secondary replicas. Read operations go to any replica. Write operations go the primary replica and are sent to multiple secondary replicas as well. Writes are acknowledged to the client when a quorum of replicas on different machines accepts the write. We created our own metadata service to manage and push the information like shard distribution and primary election. The metadata changes are frequently based on heart-beats, process restarts, new data or compute workloads additions, primary lease expiration etc. The service is capable of sub-second responses to metadata changes, which is critical for keeping high availability for writes.<\/p>\n At the core, the LKRB point table uses linear hashing (opens in new tab)<\/span><\/a> for this distributed hash table and has excellent lookup performance. It has been tested to yield more than 1M reads and writes per second each while offering sub milliseconds latencies. It is lock-free, contention-free implementation of CACM paper from 1988. It provides fast and stable performance irrespective of table size. The hash table expands\/shrinks automatically without any performance penalty as the data in the table grows\/shrinks. The LKRB hashtable implementation is highly customizable with options to configure space\/time-based expiration, compression, persistence, caching, iterators etc. to name a few. They can be dynamically scaled up from MBs to TBs of data while offering the same performance guarantees.<\/p>\n\n
\n
Pluggable Storage Engine<\/h2>\n
\n

Bulk Tables \u2013 Read-Only Stores<\/h3>\n

Point Tables \u2013 Read\/Write Stores<\/h3>\n
Disk Store Design & Performance<\/h4>\n