Multiversion Concurrency Control (MVCC for short) manages the read/write consistency, providing an interface for readers to determine what entries to ignore, and a mechanism for writers to obtain new write numbers, then “commit” the new writes for readers to read (thus forming atomic transactions).

Why MVCC is needed

Unlike traditional relational databases, HBase only provides ACID semantics on a per-row basis.

Write-Write Synchronization

Consider two concurrent writes to same row that represent {company, role} combinations:

Two writes to the same row

We know that HBase will perform the following steps for each write:

  1. Write to Write-Ahead-Log (WAL)
  2. Update MemStore: write each data cell [the (row, column) pair] to the memstore.

Then, assume we have no concurrency control over the writes and consider the following order of events:

At the end, we are left with the following state:

The simplest solution that comes to our mind is to provide exclusive locks per row in order to provide isolation for writes that update the same row. So our new list of steps for writes is as follows (new steps are in bold).

  1. Obtain Row Lock
  2. Write to Write-Ahead-Log (WAL)
  3. Update MemStore: write each cell to the memstore
  4. Release Row Lock

It works for write-write synchronization.

Read-Write Synchronization

However, if it involves reads,

Therefore, we need some concurrency control to deal with read-write synchronization. The simplest solution would be to have the reads obtain and release the row locks in the same manner as the writes. This would resolve the ACID violation, but the downside is that our reads and writes would both contend for the row locks, slowing each other down.

How MVCC works

Instead, HBase uses a form of MVCC to avoid requiring the reads to obtain row locks. MVCC works in HBase as follows:

For writes:

  1. After acquiring the RowLock, each write operation is immediately assigned a write number.
  2. Each data cell in the write stores its write number.
  3. A write operation completes by declaring it is finished with the write number.

For reads:

  1. Each read operation is first assigned a read timestamp, called a read point.
  2. The read point is assigned to be the highest integer such that all writes with write number <= x have been completed.
  3. A read r for a certain (row, column) combination returns the data cell with the matching (row, column) whose write number is the largest value that is less than or equal to the read point of r.

Let’s look at the operations using Multiversion Concurrency Control:

Now, let’s consider the read that begins after step Restaurant [wn=2] but before the step Waiter [wn=2]. From rule 1 and 2, its read point will be assigned to 1. From r3, it will read the values with write number of 1, leaving us with:

Let’s put this all together by listing the steps for a write with Multiversion Concurrency Control: (new steps required for read-write synchronization are in bold):

  1. Obtain Row Lock
  2. Acquire New Write Number
  3. Write to Write-Ahead-Log (WAL)
  4. Update MemStore: write each cell to the memstore
  5. Finish Write Number
  6. Release Row Lock

MVCC in Source Code

The MultiVersionConsistencyControl class implements Multiversion Consistency Control in HBase.

A few member variables are listed here:

// MultiVersionConcurrencyControl.java

public class MultiVersionConcurrencyControl {
  final AtomicLong readPoint = new AtomicLong(0);
  final AtomicLong writePoint = new AtomicLong(0);
  private final Object readWaiters = new Object();

  // This is the pending queue of writes.
  private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
}

From doMiniBatchMutate method in HRegion.java, we can conclude the order of calling MultiVersionConcurrencyControl’s member methods:

  1. begin
  2. completeAndWait
  3. advanceTo
  4. complete

Next, let’s see what these methods do.

begin

For begin, it starts a write transaction by:

  1. Increment writePoint.
  2. Create a new WriteEntry with a new write number.
  3. Add new WriteEntry to our queue of ongoing writes.
  4. call Runnable.
  5. Return this WriteEntry instance.
// MultiVersionConcurrencyControl.java

private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();

public WriteEntry begin(Runnable action) {
  synchronized (writeQueue) {
    long nextWriteNumber = writePoint.incrementAndGet();
    WriteEntry e = new WriteEntry(nextWriteNumber);
    writeQueue.add(e);
    action.run();
    return e;
  }
}

completeAndWait

To complete the write transaction and wait for it to be visible, call completeAndWait(WriteEntry). It waits until the read point catches up to our write. At the end of this call, the global read point is at least as large as the write point of the passed in WriteEntry. Thus, the write is visible to MVCC readers.

// MultiVersionConcurrencyControl.java

public void completeAndWait(WriteEntry e) {
  if (!complete(e)) {
    waitForRead(e);
  }
}

Take waitForRead(WriteEntry) first if you don’t mind, since complete(WriteEntry) will be penetrated later.

// MultiVersionConcurrencyControl.java

private final Object readWaiters = new Object();

// Wait for the global readPoint to advance up to the passed in write entry number.
void waitForRead(WriteEntry e) {
  boolean interrupted = false;
  int count = 0;
  synchronized (readWaiters) {
    while (readPoint.get() < e.getWriteNumber()) {
      if (count % 100 == 0 && count > 0) {
        LOG.warn("STUCK: " + this);
      }
      count++;
      try {
        readWaiters.wait(10);
      } catch (InterruptedException ie) {
        // We were interrupted... finish the loop -- i.e. cleanup --and then
        // on our way out, reset the interrupt flag.
        interrupted = true;
      }
    }
  }
  if (interrupted) {
    Thread.currentThread().interrupt();
  }
}

advanceTo

advanceTo steps the MVCC forward on to a new read/write basis, which calls tryAdvanceTo in the loop until it succeeds.

// MultiVersionConcurrencyControl.java

public void advanceTo(long newStartPoint) {
  while (true) {
    long seqId = this.getWritePoint();
    if (seqId >= newStartPoint) break;
    if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
  }
}

/*
 + @param newStartPoint Point to move read and write points to.
 + @param expected If not -1 (#NONE)
 + @return Returns false if expected is not equal to the
 + current readPoint or if startPoint is less than current readPoint
*/
boolean tryAdvanceTo(long newStartPoint, long expected) {
  synchronized (writeQueue) {
    long currentRead = this.readPoint.get();
    long currentWrite = this.writePoint.get();
    if (currentRead != currentWrite) {
      throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
        ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
    }
    if (expected != NONE && expected != currentRead) {
      return false;
    }

    if (newStartPoint < currentRead) {
      return false;
    }

    readPoint.set(newStartPoint);
    writePoint.set(newStartPoint);
  }
  return true;
}

complete

complete marks the WriteEntry as complete and advances the read point as much as possible.

complete is called even if the write has FAILED (AFTER backing out the write transaction changes completely) so we can clean up the outstanding transaction.

// MultiVersionConcurrencyControl.java

private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();

public boolean complete(WriteEntry writeEntry) {
  synchronized (writeQueue) {
    writeEntry.markCompleted();
    long nextReadValue = NONE;
    boolean ranOnce = false;
    while (!writeQueue.isEmpty()) {
      ranOnce = true;
      WriteEntry queueFirst = writeQueue.getFirst();

      if (nextReadValue > 0) {
        if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
          throw new RuntimeException("Invariant in complete violated, nextReadValue="
              + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
        }
      }

      if (queueFirst.isCompleted()) {
        nextReadValue = queueFirst.getWriteNumber();
        writeQueue.removeFirst();
      } else {
        break;
      }
    }

    if (!ranOnce) {
      throw new RuntimeException("There is no first!");
    }

    if (nextReadValue > 0) {
      synchronized (readWaiters) {
        readPoint.set(nextReadValue);
        readWaiters.notifyAll();
      }
    }
    return readPoint.get() >= writeEntry.getWriteNumber();
  }
}

Where MVCC is used

For writes, many methods in HRegion.java depend on MVCC,

  1. put
  2. checkAndPut
  3. delete
  4. checkAndDelete
  5. internalFlushcache
  6. mutateRow
  7. mutateRowsWithLocks
  8. batchMutate

For reads, a member, private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints in HRegion.java, is responsible for recording each read point.

If you get or scan in HBase, a RegionScannerImpl will be created by calling getScanner.

// HRegion.java

public RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
    throws IOException {
  return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

Notice the input parameter KeyValueScanner. the next method from StoreFileScanner(KeyValueScanner’s derived class) is called every time to get a new cell.

// StoreFileScanner.java

public Cell next() throws IOException {
  Cell retKey = cur;

  try {
    // only seek if we aren't at the end. cur == null implies 'end'.
    if (cur != null) {
      hfs.next();
      setCurrentCell(hfs.getCell());
      if (hasMVCCInfo || this.reader.isBulkLoaded()) {
        skipKVsNewerThanReadpoint();
      }
    }
  } catch (FileNotFoundException e) {
    throw e;
  } catch(IOException e) {
    throw new IOException("Could not iterate " + this, e);
  }
  return retKey;
}

protected boolean skipKVsNewerThanReadpoint() throws IOException {
  // We want to ignore all key-values that are newer than our current
  // readPoint
  Cell startKV = cur;
  while(enforceMVCC
      && cur != null
      && (cur.getSequenceId() > readPt)) {
    boolean hasNext = hfs.next();
    setCurrentCell(hfs.getCell());
    if (hasNext && this.stopSkippingKVsIfNextRow
        && getComparator().compareRows(cur, startKV) > 0) {
      return false;
    }
  }

  if (cur == null) {
    return false;
  }

  return true;
}

Reference

  1. Apache HBase Internals: Locking and Multiversion Concurrency Control
  2. 浅谈数据库并发控制 - 锁和 MVCC
  3. PESSIMISTIC vs. OPTIMISTIC concurrency control
  4. 数据库事务系列-HBase行级事务模型
  5. HBase 事务和并发控制机制原理
  6. Multiversion concurrency control
  7. HBase中MVCC的实现机制及应用情况
  8. HBase MVCC and built-in Atomic Operations
  9. HBase MVCC