Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2020000
  • 博文数量: 392
  • 博客积分: 7040
  • 博客等级: 少将
  • 技术积分: 4138
  • 用 户 组: 普通用户
  • 注册时间: 2009-06-17 13:03














分类: 服务器与存储

2011-01-10 22:43:57


Large-scale Incremental Processing Using Distributed Transactions and Notifications本文将摘译google的论文Large-scale Incremental Processing Using Distributed Transactions and Notifications, 同时对笔者认为的论文中的关键知识点进行解析。 该论文描述了如何使用分布式事务实现实时增量的更新网页索引库。由于笔者对搜索架构的认识有限,很多方面仅仅从 分布式架构层面进行分析,存在偏差的地方请大家斧正。论文清晰的讲述2PC以及乐观锁在实际工程 上的应用,理解这篇论文有助于加深对分布式一致性特别是2PC的理解。


  • 2PC协议本身并不处理participants和coordinator的高可用性,两者的高可用性必须通过其他 方法实现。在这篇论文中,participants的高可用性由bigtable保证;同时使用乐观锁来解决 coordinator的宕机问题。

AbstractUpdating an index of the web as documents are crawled requires continuously transforming a large repository[仓库] of existing documents as new documents arrive. This task is one example of a class of data processing tasks that transform a large repository of data via small, independent mutations. These tasks lie in a gap between the capabilities of existing infrastructure. Databases do not meet the storage or throughput requirements of these tasks: Google’s indexing system stores tens of petabytes of data and processes billions of updates per day on thousands of machines. MapReduce and other batch-processing systems cannot process small updates individually as they rely on creating large batches for efficiency.

We have built Percolator, a system for incrementally processing updates to a large data set, and deployed it to create the Google web search index. By replacing a batch-based indexing system with an indexing system based on incremental processing using Percolator, we process the same number of documents per day, while reducing the average age of documents in Google search results by 50%.

IntroductionJust ignore.

DeisignPercolator provides two main abstractions for performing incremental processing at large scale: ACID transactions over a random-access repository and observers, a way to organize an incremental computation. A Percolator system consists of three binaries that run on every machine in the cluster: a Percolator worker, a Bigtable tablet server, and a GFS chunkserver. All observers are linked into the Percolator worker, which scans the Bigtable for changed columns ("notifications") and invokes the corresponding observers as a function call in the worker process. The observers perform transactions by sending read/write RPCs to Bigtable tablet servers, which in turn send read/write RPCs to GFS chunkservers. The system also depends on two small services: the timestamp oracle and the lightweight lock service. The timestamp oracle provides strictly increasing timestamps: a property required for correct operation of the snapshot isolation protocol. Workers use the lightweight lock service to make the search for dirty notifications more efficient.

notifications类似与数据库的trigger,这里可以看出notifications的实现并不怎么高效: 通过不断的遍历bigtable的tablet的数据,发现其中修改的数据。Schema的设计必须支持这种 扫描,也就是说必须存在标志位表示当前row是否发生change,这里的实现需要在后续读 论文的过程中加以关注。

另外一个需要关注的问题是,how do the workers use lightweight lock service to make the search for dirty notifications more efficient.

From the programmer’s perspective, a Percolator repository consists of a small number of tables. Each table is a collection of "cells" indexed by row and column. Each cell contains a value: an uninterpreted array of bytes. (Internally, to support snapshot isolation, we represent each cell as a series of values indexed by timestamp.)

The design of Percolator was influenced by the requirement to run at massive scales and the lack of a requirement for extremely low latency. Relaxed latency requirements let us take, for example, a lazy approach to cleaning up locks left behind by transactions running on failed machines. This lazy, simple-to-implement approach potentially delays transaction commit by tens of seconds. This delay would not be acceptable in a DBMS running OLTP tasks, but it is tolerable in an incremental processing system building an index of the web. Percolator has no central location for transaction management; in particular, it lacks a global deadlock detector. This increases the latency of conflicting transactions but allows the system to scale to thousands of machines.

Percolator更加关注可扩展性,而放低了对latency的需求,这样的设计 不适合传统的DBMS,更加适合类似于建索引这样的线下服务。由于它本质上 采用2PC加乐观锁的方法实现分布式事务:client作为2PC的协调者,而系统 并没有对client做高可用的设计,因此协调者异常必定导致事务以很大的 概率重试,从而加大事务处理的latency。

Bigtable OverviewPercolator is built on top of the Bigtable distributed storage system. Bigtable presents a multi-dimensional sorted map to users: keys are (row, column, timestamp) tuples. Bigtable provides lookup and update operations on each row, and Bigtable row transactions enable atomic read-modify-write operations on individual rows. Bigtable handles petabytes of data and runs reliably on large numbers of (unreliable) machines.

Bigtable支持行级事务,并且支持read-modify-write。不知道 Bigtable最初的read-modify-write是如何实现的,以很大的概率 是以乐观锁的方式实现。那么一种简单的实现是让Bigtable支持 在服务器端计算和更新,这样行级事务的所有操作都在tablet server 内部进行,不需要把行级锁暴露到外面;关于这种实现方法论文 的后部分也有相应的阐述。

使用乐观锁的实现行级锁的方式如下:每行数据都有版本号,在 开始行事务的时候,利用copy-on-write技术,拷贝行的一个副本, 后续事务的所有修改都是在这个副本上进行,只有等行事务 操作完成,对副本的更新才会应用到数据本身上,应用的时候对比 副本的版本与实际记录的版本是否一致,如果一致就提交;如果不一致 行级事务提交失败。

A running Bigtable consists of a collection of tablet servers, each of which is responsible for serving several tablets (contiguous regions of the key space). A master coordinates the operation of tablet servers by, for example, directing them to load or unload tablets. A tablet is stored as a collection of read-only files in the Google SSTable format. SSTables are stored in GFS; Bigtable relies on GFS to preserve data in the event of disk loss. Bigtable allows users to control the performance characteristics of the table by grouping a set of columns into a locality group. The columns in each locality group are stored in their own set of SSTables, which makes scanning them less expensive since the data in other columns need not be scanned.

Bigtable列存储的实现方式就是将同一行的数据存放在多个sstable中, 一个tablet有一个或者多个系列的sstable组成,每个系列的sstable 存放了一个或者多个column family.

The decision to build on Bigtable defined the overall shape of Percolator. Percolator maintains the gist[要点] of Bigtable’s interface: data is organized into Bigtable rows and columns, with Percolator metadata stored alongside in special columns (see Figure 5). Percolator’s API closely resembles Bigtable’s API: the Percolator library largely consists of Bigtable operations wrapped in Percolator-specific computation. The challenge, then, in implementing Percolator is providing the features that Bigtable does not: multirow transactions and the observer framework.

Percolator是搭建在Bigtable之上的,它并没有对Bigtable做任何的修改, 它在Bigtable存放的实际数据的同时增加了实现分布式事务所需要的一些列(元信息), Percolator的API仅仅是对Bigtable API的封装。增加的列定义如下表所示:

Figure 5. Percolator元信息列定义The columns in the Bigtable representation of a Percolator column named "c:"

c:lockAll uncommited transactions is writing this cell; contains the location of the primary lock
c:writeCommitted data present; stores the Bigtable timestamp of the data
c:dataStores the data itself
c:notifyHint: observers may need to run
c:ack_OObserver "O" has run; stores start timestamp of successful last run


TransactionsPercolator provides cross-row, cross-table transactions with ACID snapshot-isolation semantics. Percolator users write their transaction code in an imperative language (currently C++) and mix calls to the Percolator API with their code. Figure 2 shows a simplified version of clustering documents by a hash of their contents. In this example, if Commit() returns false, the transaction has conflicted (in this case, because two URLs with the same content hash were processed simultaneously) and should be retried after a backoff. Calls to Get() and Commit() are blocking; parallelism is achieved by running many transactions simultaneously in a thread pool.

Figure 2. Percolator API使用示例Example usage of the Percolator API to perform basic checksum clustering and eliminate documents with the same content.

bool UpdateDocument(Document doc) { Transaction t(&cluster); t.Set(doc.url(), "contents", "document", doc.contents()); int hash = Hash(doc.contents()); // dups table maps hash ! canonical URL string canonical; if (!t.Get(hash, "canonical-url", "dups", &canonical)) { // No canonical yet; write myself in t.Set(hash, "canonical-url", "dups", doc.url()); } // else this document already exists, ignore new copy return t.Commit(); }

While it is possible to incrementally process data without the benefit of strong transactions, transactions make it more tractable[易处理的] for the user to reason about the state of the system and to avoid the introduction of errors into a long-lived repository. For example, in a transactional web-indexing system the programmer can make assumptions like: the hash of the contents of a document is always consistent with the table that indexes duplicates. Without transactions, an ill-timed crash could result in a permanent error: an entry in the document table that corresponds to no URL in the duplicates table. Transactions also make it easy to build index tables that are always up to date and consistent. Note that both of these examples require transactions that span rows, rather than the single-row transactions that Bigtable already provides.

Percolator stores multiple versions of each data item using Bigtable’s timestamp dimension. Multiple versions are required to provide snapshot isolation, which presents each transaction with the appearance of reading from a stable snapshot at some timestamp. Writes appear in a different, later, timestamp. Snapshot isolation protects against write-write conflicts: if transactions A and B, running concurrently, write to the same cell, at most one will commit. Snapshot isolation does not provide serializability; in particular, transactions running under snapshot isolation are subject to write skew. The main advantage of snapshot isolation over a serializable protocol is more efficient reads. Because any timestamp represents a consistent snapshot, reading a cell requires only performing a Bigtable lookup at the given timestamp; acquiring locks is not necessary. Figure 3 illustrates the relationship between transactions under snapshot isolation.

Snapshot isonation要求存放数据的多个版本,每个版本的数据 是一个一致的快照。它可以防止写-写冲突,如果两个事务的更新 基于同一个版本的数据,那么这两个更新只会有一次成功。设计的算法必须保证这一点。 Snapshot isolation并不提供串行化的保证,一般使用这种isolation的事务都对更新顺序不敏感。 这种isolation的优点是更新事务不会阻塞读,每次读取都可以读取 一个一致的快照,这样读请求就不用加锁。

Figure 3. Snapshot Isolation 示例Transactions under snapshot isolation perform reads at a start timestamp (represented here by an open square) and writes at a commit timestamp (closed circle). In this example, transaction 2 would not see writes from transaction 1 since transaction 2’s start timestamp is before transaction 1’s commit timestamp. Transaction 3, however, will see writes from both 1 and 2. Transaction 1 and 2 are running concurrently: if they both write the same cell, at least one will abort.


Because it is built as a client library accessing Bigtable, rather than controlling access to storage itself, Percolator faces a different set of challenges implementing distributed transactions than traditional PDBMSs. Other parallel databases integrate locking into the system component that manages access to the disk: since each node already mediates[协调] access to data on the disk it can grant locks on requests and deny accesses that violate locking requirements.

By contrast, any node in Percolator can (and does) issue requests to directly modify state in Bigtable: there is no convenient place to intercept[拦截] traffic and assign locks. As a result, Percolator must explicitly maintain locks. Locks must persist in the face of machine failure; if a lock could disappear between the two phases of commit, the system could mistakenly commit two transactions that should have conflicted. The lock service must provide high throughput; thousands of machines will be requesting locks simultaneously. The lock service should also be low-latency; each Get() operation requires reading locks in addition to data, and we prefer to minimize this latency. Given these requirements, the lock server will need to be replicated (to survive failure), distributed and balanced (to handle load), and write to a persistent data store. Bigtable itself satisfies all of our requirements, and so Percolator stores its locks in special in-memory columns in the same Bigtable that stores data and reads or modifies the locks in a Bigtable row transaction when accessing data in that row.

We’ll now consider the transaction protocol in more detail. Figure 6 shows the pseudocode for Percolator transactions, and Figure 4 shows the layout of Percolator data and metadata during the execution of a transaction. These various metadata columns used by the system are described in Figure 5. The transaction’s constructor asks the timestamp oracle for a start timestamp (line 10), which determines the consistent snapshot seen by Get(). Calls to Set() are buffered (line 13) until commit time. The basic approach for committing buffered writes is two-phase commit, which is coordinated by the client. Transactions on different machines interact through row transactions on Bigtable tablet servers.


算法伪码Pseudocode for Percolator transaction protocol.

1 class Transaction 2 { 3 struct Write {Row row; Column col; string value;}; 4 /// 所有的写都会cache在这个数组中,直到commit的时候才会进行真正的写入 5 vector writes_; 6 /// 事务开始的时间,事务基于该时间之前最后提交的版本为基础进行修改 7 int start_ts_; 8 9 /// 事务创建的时候获取基准timestamp 10 Transaction() : start_ts_(oracle.GetTimeStamp()){} 11 12 /// 所有更新都被缓存起来 13 void set(Write w) {writes_.push_back(w);} 14 15 /// 这里的Get主要是在更新事务中实现read-modify-write,普通读取是不用 16 /// 在事务中进行的,不需要判断当前是否有人加锁,直接获取最新的write中记录的数据版本, 17 /// 然后直接读取相应版本的数据 18 bool Get(Row row, Column c, string *value) 19 { 20 while(true) 21 { 22 bigtable::Txn T = bigtable::StartRowTransaction(row); 23 /// 检查是否有更新事务正在进行 24 if (T.Read(row, c+"lock", [0,start_ts_])) 25 { 26 /// 有未决的事务; 执行clean或者等待 27 BackoffAndMaybeCleanupLock(row,c); 28 continue; 29 } 30 /// 查找小于start_ts_最后commit的事务,由于每次读取都是通过c.write,因此在一个事务提交之前其他事务是看不到它修改的数据的 31 last_write = T.Read(row, c+"write", [0,start_ts_]); 32 if(!last_write.found()) return false; /// no data 33 /// 获取事务commit的数据的timestamp 34 int data_ts = last_write.start_timestamp(); 35 *value = T.Read(row, c+"data", [data_ts,data_ts]); 36 return true; 37 } 38 } 39 40 /// prewrite, tries to lock cell w, return false in case of conflict 41 /// 两阶段提交的第一阶段,prewrite,对cell w进行加锁,如果存在冲突,则返回false 42 /// 冲突有两种情况: 43 /// -# 事务开始之后别的进程进行了commit操作 44 /// -# 如果有任何别的线程已经加锁 45 /// 由于行级事务的存在,保证并发的进程只有一个能够对cell加锁,从而保证 46 /// snapshot isolation 47 /// 这里值得关注行级事务如何实现read-modify-write 48 /// 由于prewrite没有执行取消别人的锁的操作,因此加锁的时候必须保证加锁的顺序,以防止死锁 49 bool Prewrite(Write w, Write primary) 50 { 51 Column c = w.col; 52 bigtable::Txn T = bigtable::StartRowTransaction(w.row); 53 54 /// Abort on writes after our start timestamp ... 55 if (T.Read(w.row, c+"write", [start_ts,MAX])) return false; 56 /// ... or locks at any timestamp 57 if (T.Read(w.row, c+"lock", [0,MAX])) return false; 58 59 /// 由于行级事务的存在,保证并发的进程只有一个能够对cell加锁,从而保证 60 T.write(w.row, c+"data", start_ts_, w.value); 61 T.write(w.row, c+"lock", start_ts_, {primary.row, primary.col}; 62 63 return T.Commit(); 64 } 65 66 bool Commit() 67 { 68 Write primary = write_[0]; 69 vector secondaries(writes_.begin() + 1, writes_.end()); 70 /// 首先对primary加锁 71 if (!Prewrite(primary, primary)) return false; 72 /// 然后对所有需要修改的cell加锁 73 for(vector::iterator beg = secondaries.begin(); 74 beg != secondaries.end(); 75 beg ++) 76 { 77 if(!Prewrite(w, primary)) return false; 78 } 79 80 /// 这个地方为什么不直接使用start_ts_?这样不能够保证snapshot isolation 81 /// 设想如下执行场景 82 /// -# T A start, start_ts_ = 8 83 /// -# T B start, start_ts_ = 9 84 /// -# T A commit, if use start_ts_, then write = 8 85 /// -# T B prewrite, then B will not discover that there is a commit transaction after its start 86 /// 因此重新获取commit_ts是为了防止并发更新 87 int commit_ts = oracle_.GetTimestamp(); 88 89 /// 首先提交primary 90 Write p = primary; 91 bigtable::Txn T = bigtable::StartRowTransaction(p.row); 92 if(!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_])) 93 { 94 /// 由于是乐观锁,因此在加锁(Prewrite)之后执行commit之前,锁可能被其他进程 95 /// 取消,这种情况下事务应该失败,由于行级事务的存在,锁的取消和提交是串行的 96 return false; 97 } 98 /// start_ts_是cell的结果 99 T.Write(p.row, p.col+"write", commit_ts_, start_ts_); 100 /// 解锁 101 T.Erase(p.row, p.col+"lock", commit_ts); 102 /// 如果primary更新失败,则事务失败 103 if(!T.Commit()) return false; 104 105 /// Sencond phase: write out write records for secondary cells 106 /// 下面的更新没有在行事务中,并且更新失败也不会让事务abort,因此primary 107 /// commit成功,事务一定最终被commit 108 for(vector::iterator beg = secondaries.begin(); 109 beg != secondaries.end(); 110 beg ++) 111 { 112 /// 在没有行事务的情况下,这里必须保证这种写入顺序, 113 /// 必须先写write再erase锁,否则异常情况下没有办法恢复 114 bigtable::Write(w.row, w.col+"write", commit_ts, start_ts_); 115 bigtable::Erase(w.row, w.col+"lock", commit_ts); 116 } 117 return true; 118 } 119 };


  1. 对所有participants加锁并执行预处理(预更新)
  2. 对primary执行commit
  3. 对所有其他participants执行commit


In the first phase of commit ("prewrite"), we try to lock all the cells being written. (To handle client failure, we designate one lock arbitrarily as the primary; we’ll discuss this mechanism below.)The transaction reads metadata to check for conflicts in each cell being written. There are two kinds of conflicting metadata: if the transaction sees another write record after its start timestamp, it aborts (line 55); this is the write-write conflict that snapshot isolation guards against. If the transaction sees another lock at any timestamp, it also aborts (line 57). It’s possible that the other transaction is just being slow to release its lock after having already committed below our start timestamp, but we consider this unlikely, so we abort. If there is no conflict, we write the lock and the data to each cell at the start timestamp (lines 60-63).

在prewrite阶段通过c.write和c.lock判断是否发生事务冲突,如果发生冲突 prewrite执行失败。

If no cells conflict, the transaction may commit and proceeds to the second phase. At the beginning of the second phase, the client obtains the commit timestamp from the timestamp oracle (line 87). Then, at each cell (starting with the primary), the client releases its lock and make its write visible to readers by replacing the lock with a write record. The write record indicates to readers that committed data exists in this cell; it contains a pointer to the start timestamp where readers can find the actual data. Once the primary’s write is visible (line 103), the transaction must commit since it has made a write visible to readers.

在primary更新成功之后,事务必须被提交,因为事务已经存在更新 能够被其他事务看到。

Transaction processing is complicated by the possibility of client failure (tablet server failure does not affect the system since Bigtable guarantees that written locks persist across tablet server failures). If a client fails while a transaction is being committed, locks will be left behind. Percolator must clean up those locks or they will cause future transactions to hang indefinitely. Percolator takes a lazy approach to cleanup: when a transaction A encounters a conflicting lock left behind by transaction B, A may determine that B has failed and erase its locks.

在这里的两阶段提交算法中,client作为coordinator,tablet server 作为participants。由于bigtable保证了所有participants(tablet server) 的高可用性,因此participants失败并不会产生任何影响。这也是理解 2PC最难的地方:participants和coordinator的高可用性不是2PC要解决的问题, 它们的高可用性必须通过其他技术方案来解决。Percolator没有未客户端提供 高可用的解决方案,因此在client异常退出的情况下,算法必须保证 roll-back并解锁所有未提交的事务(primary没有commit);roll-forward并解锁所有 已经提交的事务(primary已经commit)。

It is very difficult for A to be perfectly confident in its judgment that B is failed; as a result we must avoid a race between A cleaning up B’s transaction and a not-actually-failed B committing the same transaction. Percolator handles this by designating one cell in every transaction as a synchronizing point for any commit or cleanup operations. This cell’s lock is called the primary lock. Both A and B agree on which lock is primary (the location of the primary is written into the locks at all other cells). Performing either a cleanup or commit operation requires modifying the primary lock; since this modification is performed under a Bigtable row transaction, only one of the cleanup or commit operations will succeed. Specifically: before B commits, it must check that it still holds the primary lock and replace it with a write record. Before A erases B’s lock, A must check the primary to ensure that B has not committed; if the primary lock is still present, then it can safely erase the lock.

任何事务在提交之前必须先检查它自己是否还持有锁,因为这个锁可能 被另外的事务给释放了。释放锁很简单,只需要把c.lock erase掉。

When a client crashes during the second phase of commit, a transaction will be past the commit point (it has written at least one write record) but will still have locks outstanding.We must perform roll-forward on these transactions. A transaction that encounters a lock can distinguish between the two cases by inspecting the primary lock: if the primary lock has been replaced by a write record, the transaction which wrote the lock must have committed and the lock must be rolled forward, otherwise it should be rolled back (since we always commit the primary first, we can be sure that it is safe to roll back if the primary is not committed). To roll forward, the transaction performing the cleanup replaces the stranded lock with a write record as the original transaction would have done.

当事务在进行过程中发现锁冲突的时候,可以通过primary判断事务的状态。 那么如何进行判断呢?

  1. 读取c.lock,得到相关事务的start_ts_
  2. 使用[start_ts_, MAX]获取primary.c.lock
    1. 如果primary.c.lock的最后更新时间是start_ts_,并且记录的信息与c.lock相同, 那么事务还没有提交,可以进行任何操作
    2. 如果primary.c.lock的最后更新时间大于start_ts_,那么就以[start_ts_,MAX]获取 primary.c.write
      1. 如果第一个primary.c.write的值等于start_ts_,表示事务已经提交,执行c的roll-forward
      2. 否则执行c的roll-back,表示该事务没有提交


1 void RollBackOrRollForward(Row row, Column col) 2 { 3 bigtable::Txn T = bigtable::StartRowTransaction(row); 4 int last_txn_start_ts = 0; 5 Write primary; 6 if(!T.Read(row, col+"lock", [0,MAX])) 7 { 8 return; /// 没有未决的锁 9 } 10 /// 获取最后加锁的时间 11 primary = T.Read(row, col+"lock", [0,MAX]); 12 last_txn_start_ts = primary.start_timestamp(); 13 if (primary.row == row) 14 { 15 /// 自己是primary,并且还没有提交 16 T.Erase(row, col+"lock", last_txn_start_ts); 17 T.Commit(); 18 return; 19 } 20 bigtable::Txn TP = bigtable::StartRowTransaction(primary.row); 21 if(TP.Read(primary.row, primary.col+"lock", [last_txn_start_ts, last_txn_start_ts])) 22 { 23 /// primary 的事务没有提交,这里timestamp取last_txn_start_ts不知道是否有问题 24 TP.Erase(primary.row, primary.col + "lock", last_txn_start_ts); 25 T.Erase(row, col+"lock", last_txn_start_ts); 26 TP.Commit(); 27 T.Commit(); 28 return; 29 } 30 /// 获取primary在last_txn_start_ts之后的第一次write 31 Write first_write_after_txn = 0; 32 if(!TP.Read(primary.row, primary.col+"write", [last_txn_start_ts,MAX])) 33 { 34 ///事务没有体检,但是事务的锁已经被别人给释放了,直接释放锁 35 T.Erase(row, col+"lock", last_txn_start_ts); 36 T.Commit(); 37 return; 38 } 39 first_write_after_txn= TP.Read(primary.row, primary.col+"write", [last_txn_start_ts,MAX]); 40 if(first_write_after_txn.value == start_ts_) 41 { 42 ///事务已经提交,那么提交这个事务 43 T.Write(row, col+"write", first_write_after_txn.start_timestamp(), last_txn_start_ts); 44 } 45 else 46 { 47 ///事务失败,但是之后有新的事务成功提交,abort这个事务 48 T.Erase(row, col+"lock", last_txn_start_ts); 49 } 50 T.Commit(); 51 return; 52 }

算法中,首先对本行加锁,然后对primary加锁,这样的设计并不好,容易产生死锁相关的问题。 更好的处理方法是在对primary进行操作后根据操作的结果再来检查(double check)和操作当前row。 为了简化算法描述,这里使用了嵌套加锁的方法。

Since cleanup is synchronized on the primary lock, it is safe to clean up locks held by live clients; however, this incurs a performance penalty since rollback forces the transaction to abort. So, a transaction will not clean up a lock unless it suspects that a lock belongs to a dead or stuck worker. Percolator uses simple mechanisms to determine the liveness of another transaction. Running workers write a token into the Chubby lockservice to indicate they belong to the system; other workers can use the existence of this token as a sign that the worker is alive (the token is automatically deleted when the process exits). To handle a worker that is live, but not working, we additionally write the wall time into the lock; a lock that contains a too-old wall time will be cleaned up even if the worker’s liveness token is valid. To handle longrunning commit operations, workers periodically update this wall time while committing.

上述方法是对乐观锁的优化,需要在每行中再存入两个内存列:当前事务操作者在 chubby中的文件标示;worker的wall time。

TimestampThe timestamp oracle is a server that hands out timestamps in strictly increasing order. Since every transaction requires contacting the timestamp oracle twice, this service must scale well.The oracle periodically allocates a range of timestamps by writing the highest allocated timestamp to stable storage; given an allocated range of timestamps, the oracle can satisfy future requests strictly from memory. If the oracle restarts, the timestamps will jump forward to the maximum allocated timestamp (but will never go backwards). To save RPC overhead (at the cost of increasing transaction latency) each Percolator worker batches timestamp requests across transactions by maintaining only one pending RPC to the oracle. As the oracle becomes more loaded, the batching naturally increases to compensate. Batching increases the scalability of the oracle but does not affect the timestamp guarantees. Our oracle serves around 2 million timestamps per second from a single machine.

这里讲述了Oracle的timestamp的可扩展性,timestamp在oracle中相当于 一个内存列,可以做到上百万的qps,因此可扩展性没有问题。 仅对这里存在一点 疑问,那就是client的批量获取timestamp是否会导致冲突的增加。 比如,有个worker拿到了当前最大的一批,那么这一批次更新成功后很大的可能性 导致其他worker的事务都失败,从而浪费timestamp。不管怎么样,oracle的 内存列已经可以很大限度的支持可扩展了。

The transaction protocol uses strictly increasing timestamps to guarantee that Get() returns all committed writes before the transaction’s start timestamp. To see how it provides this guarantee, consider a transaction R reading at timestamp TR and a transaction W that commited at timestamp TW < TR; we will show that R sees W’s writes. Since TW < TR, we know that the timestamp oracle gave out TW before or in the same batch as TR; hence, W requested TW before R received TR. We know that R can’t do reads before receiving its start timestamp TR and that W wrote locks before requesting its commit timestamp TW: Therefore, the above property guarantees that W must have at least written all its locks before R did any reads; R’s Get() will see either the fullycommitted write record or the lock, in which caseWwill block until the lock is released. Either way, W’s write is visible to R’s Get().

NotificationTransactions let the user mutate the table while maintaining invariants, but users also need a way to trigger and run the transactions. In Percolator, the user writes code ("observers") to be triggered by changes to the table, and we link all the observers into a binary running alongside every tablet server in the system. Each observer registers a function and a set of columns with Percolator, and Percolator invokes the function after data is written to one of those columns in any row.

Percolator applications are structured as a series of observers; each observer completes a task and creates more work for "downstream" observers by writing to the table. In our indexing system, a MapReduce loads crawled documents into Percolator by running loader transactions, which trigger the document processor transaction to index the document (parse, extract links, etc.). The document processor transaction triggers further transactions like clustering. The clustering transaction, in turn, triggers transactions to export changed document clusters to the serving system.

Notifications are similar to database triggers or events in active databases, but unlike database triggers, they cannot be used to maintain database invariants. In particular, the triggered observer runs in a separate transaction from the triggering write, so the triggering write and the triggered observer’s writes are not atomic. Notifications are intended to help structure an incremental computation rather than to help maintain data integrity.

Notification与数据库的trigger的区别在于,Notification与触发操作的 事务不是原子的。Notification仅仅用于增量计算,不用来保证一致性。

This difference in semantics and intent makes observer behavior much easier to understand than the complex semantics of overlapping triggers. Percolator applications consist of very few observers -- the Google indexing system has roughly 10 observers. Each observer is explicitly constructed in the main() of the worker binary, so it is clear what observers are active. It is possible for several observers to observe the same column, but we avoid this feature so it is clear what observer will run when a particular column is written. Users do need to be wary about infinite cycles of notifications, but Percolator does nothing to prevent this; the user typically constructs a series of observers to avoid infinite cycles.

We do provide one guarantee: at most one observer’s transaction will commit for each change of an observed column. The converse is not true, however: multiple writes to an observed column may cause the corresponding observer to be invoked only once. We call this feature message collapsing, since it helps avoid computation by amortizing the cost of responding to many notifications. For example, it is sufficient for to be reprocessed periodically rather than every time we discover a new link pointing to it.


  1. 如果一个observer失败了,会发生什么事情?
  2. 如果一个cell的修改被一个observer观察到,进行了处理,在处理过程中 有事务又修改了该cell,另外一个worker会去处理新的更新,这样会发生什么样的情况?

To provide these semantics for notifications, each observed column has an accompanying "acknowledgment" column for each observer, containing the latest start timestamp at which the observer ran. When the observed column is written, Percolator starts a transaction to process the notification. The transaction reads the observed column and its corresponding acknowledgment column. If the observed column was written after its last acknowledgment, then we run the observer and set the acknowledgment column to our start timestamp. Otherwise, the observer has already been run, so we do not run it again. Note that if Percolator accidentally starts two transactions concurrently for a particular notification, they will both see the dirty notification and run the observer, but one will abort because they will conflict on the acknowledgment column. We promise that at most one observer will commit for each notification.

每个notification列都会又一个伴随的acknowledgment列,acknowledgment 列记录了该cell追回处理的时间。通过对比acknowledgment的值与notificaton 列的最后修改时间,可以判断notification列的修改是否被处理了。


  1. 把对acknowledgment列的修改与notification的修改放入同一个事务中,如果事务失败, acknowledgment的修改会被abort,这样会被其他的worker处理;
  2. 由于对acknowledgment的修改是在事务中处理的,因此处理新的更新的worker与处理之前 更新的worker只有一个会成功
    1. 如果第一个worker成功,那么notification列的最后修改时间会小于acknowledgment的值, 因为acknowledgment的值是事务开始的时间;
    2. 如果第二个worker处理事务成功,那么两次更新的处理会被合并。

To implement notifications, Percolator needs to efficiently find dirty cells with observers that need to be run. This search is complicated by the fact that notifications are rare: our table has trillions of cells, but, if the system is keeping up with applied load, there will only be millions of notifications. Additionally, observer code is run on a large number of client processes distributed across a collection of machines, meaning that this search for dirty cells must be distributed.

To identify dirty cells, Percolator maintains a special "notify" Bigtable column, containing an entry for each dirty cell. When a transaction writes an observed cell, it also sets the corresponding notify cell. The workers perform a distributed scan over the notify column to find dirty cells. After the observer is triggered and the transaction commits, we remove the notify cell. Since the notify column is just a Bigtable column, not a Percolator column, it has no transactional properties and serves only as a hint to the scanner to check the acknowledgment column to determine if the observer should be run.

为了能够高效的发现修改的列,为每个notification列所在的行增加一个 notify column,如果一个notification列被修改, 就会增加响应的notify列,当notification的事务处理被commit后就把notify列删除。 事务成功提交的情况下,即使notify列没有被成功删除也不会产生错误的影响; 同时由于删除的时候可以带上timestamp,所以不存在commit的事务删除后续更新产生的 notify数据。甚至可以利用乐观锁的机制,在删除notify列的时候判断timestamp。

增加的column会在一个单独column family里面,在scan的时候只需要scan少量更新的数据, 而不需要扫描全表。bigtable的按列存储的优势就显现出来了。

To make this scan efficient, Percolator stores the notify column in a separate Bigtable locality group so that scanning over the column requires reading only the millions of dirty cells rather than the trillions of total data cells. Each Percolator worker dedicates several threads to the scan. For each thread, the worker chooses a portion of the table to scan by first picking a random Bigtable tablet, then picking a random key in the tablet, and finally scanning the table from that position. Since each worker is scanning a random region of the table, we worry about two workers running observers on the same row concurrently. While this behavior will not cause correctness problems due to the transactional nature of notifications, it is inefficient. To avoid this, each worker acquires a lock from a lightweight lock service before scanning the row. This lock server need not persist state since it is advisory and thus is very scalable.

The random-scanning approach requires one additional tweak: when it was first deployed we noticed that scanning threads would tend to clump[聚集] together in a few regions of the table, effectively reducing the parallelism of the scan. This phenomenon is commonly seen in public transportation systems where it is known as "platooning" or "bus clumping" and occurs when a bus is slowed down (perhaps by traffic or slow loading). Since the number of passengers at each stop grows with time, loading delays become even worse, further slowing the bus. Simultaneously, any bus behind the slow bus speeds up as it needs to load fewer passengers at each stop. The result is a clump of buses arriving simultaneously at a stop. Our scanning threads behaved analogously: a thread that was running observers slowed down while threads "behind" it quickly skipped past the now-clean rows to clump with the lead thread and failed to pass the lead thread because the clump of threads overloaded tablet servers. To solve this problem, we modified our system in a way that public transportation systems cannot: when a scanning thread discovers that it is scanning the same row as another thread, it chooses a new random location in the table to scan. To further the transportation analogy, the buses (scanner threads) in our city avoid clumping by teleporting themselves to a random stop (location in the table) if they get too close to the bus in front of them.

这里主要讲述通过随机跳转的方式处理处理worker拥塞的问题。存在唯一的 疑问是为什么全局的随机能够提高性能。

Finally, experience with notifications led us to introduce a lighter-weight but semantically weaker notification mechanism. We found that when many duplicates of the same page were processed concurrently, each transaction would conflict trying to trigger reprocessing of the same duplicate cluster. This led us to devise a way to notify a cell without the possibility of transactional conflict. We implement this weak notification by writing only to the Bigtable "notify" column. To preserve the transactional semantics of the rest of Percolator, we restrict these weak notifications to a special type of column that cannot be written, only notified. The weaker semantics also mean that multiple observers may run and commit as a result of a single weak notification (though the system tries to minimize this occurrence). This has become an important feature for managing conflicts; if an observer frequently conflicts on a hotspot, it often helps to break it into two observers connected by a non-transactional notification on the hotspot.

DiscussionOne of the inefficiencies of Percolator relative to a MapReduce-based system is the number of RPCs sent per work-unit. While MapReduce does a single large read to GFS and obtains all of the data for 10s or 100s of web pages, Percolator performs around 50 individual Bigtable operations to process a single document.

One source of additional RPCs occurs during commit. When writing a lock, we must do a read-modify-write operation requiring two Bigtable RPCs: one to read for conflicting locks or writes and another to write the new lock. To reduce this overhead, we modified the Bigtable API by adding conditional mutations which implements the read-modify-write step in a single RPC. Many conditional mutations destined for the same tablet server can also be batched together into a single RPC to further reduce the total number of RPCs we send.We create batches by delaying lock operations for several seconds to collect them into batches. Because locks are acquired in parallel, this adds only a few seconds to the latency of each transaction; we compensate for the additional latency with greater parallelism. Batching also increases the time window in which conflicts may occur, but in our low-contention environment this has not proved to be a problem.

通过在server端实现read-modify-write和condition-write以减少 处理事务的网络交互次数。同时batch处理永远是减少交互次数提高 性能的重要手段。

We also perform the same batching when reading from the table: every read operation is delayed to give it a chance to form a batch with other reads to the same tablet server. This delays each read, potentially greatly increasing transaction latency. A final optimization mitigates[减缓] this effect, however: prefetching. Prefetching takes advantage of the fact that reading two or more values in the same row is essentially the same cost as reading one value. In either case, Bigtable must read the entire SSTable block from the file system and decompress it. Percolator attempts to predict, each time a column is read, what other columns in a row will be read later in the transaction. This prediction is made based on past behavior. Prefetching, combined with a cache of items that have already been read, reduces the number of Bigtable reads the system would otherwise do by a factor of 10.


Early in the implementation of Percolator, we decided to make all API calls blocking and rely on running thousands of threads per machine to provide enough parallelism to maintain good CPU utilization. We chose this thread-per-request model mainly to make application code easier to write, compared to the event-driven model. Forcing users to bundle up their state each of the (many) times they fetched a data item from the table would have made application development much more difficult. Our experience with thread-per-request was, on the whole, positive: application code is simple, we achieve good utilization on many-core machines, and crash debugging is simplified by meaningful and complete stack traces. We encountered fewer race conditions in application code than we feared. The biggest drawbacks of the approach were scalability issues in the Linux kernel and Google infrastructure related to high thread counts. Our in-house kernel development team was able to deploy fixes to address the kernel issues.

什么技术方法不是很重要,关键是需要把需要把这样的技术做到极致。 任何东西做到极致都会很nb。

阅读(2688) | 评论(0) | 转发(0) |