Multiversion Concurrency Control (MVCC for short) manages the read/write consistency, providing an interface for readers to determine what entries to ignore, and a mechanism for writers to obtain new write numbers, then “commit” the new writes for readers to read (thus forming atomic transactions).
Why MVCC is needed
Unlike traditional relational databases, HBase only provides ACID semantics on a per-row basis.
Write-Write Synchronization
Consider two concurrent writes to same row that represent {company, role} combinations:
We know that HBase will perform the following steps for each write:
- Write to Write-Ahead-Log (WAL)
- Update MemStore: write each data cell [the (row, column) pair] to the memstore.
Then, assume we have no concurrency control over the writes and consider the following order of events:
At the end, we are left with the following state:
The simplest solution that comes to our mind is to provide exclusive locks per row in order to provide isolation for writes that update the same row. So our new list of steps for writes is as follows (new steps are in bold).
- Obtain Row Lock
- Write to Write-Ahead-Log (WAL)
- Update MemStore: write each cell to the memstore
- Release Row Lock
It works for write-write synchronization.
Read-Write Synchronization
However, if it involves reads,
Therefore, we need some concurrency control to deal with read-write synchronization. The simplest solution would be to have the reads obtain and release the row locks in the same manner as the writes. This would resolve the ACID violation, but the downside is that our reads and writes would both contend for the row locks, slowing each other down.
How MVCC works
Instead, HBase uses a form of MVCC to avoid requiring the reads to obtain row locks. MVCC works in HBase as follows:
For writes:
- After acquiring the RowLock, each write operation is immediately assigned a write number.
- Each data cell in the write stores its write number.
- A write operation completes by declaring it is finished with the write number.
For reads:
- Each read operation is first assigned a read timestamp, called a read point.
- The read point is assigned to be the highest integer such that all writes with write number
<= x
have been completed. - A read r for a certain (row, column) combination returns the data cell with the matching (row, column) whose write number is the largest value that is less than or equal to the read point of
r
.
Let’s look at the operations using Multiversion Concurrency Control:
Now, let’s consider the read that begins after step Restaurant [wn=2]
but before the step Waiter [wn=2]
. From rule 1 and 2, its read point will be assigned to 1. From r3
, it will read the values with write number of 1, leaving us with:
Let’s put this all together by listing the steps for a write with Multiversion Concurrency Control: (new steps required for read-write synchronization are in bold):
- Obtain Row Lock
- Acquire New Write Number
- Write to Write-Ahead-Log (WAL)
- Update MemStore: write each cell to the memstore
- Finish Write Number
- Release Row Lock
MVCC in Source Code
The MultiVersionConsistencyControl
class implements Multiversion Consistency Control in HBase.
A few member variables are listed here:
// MultiVersionConcurrencyControl.java
public class MultiVersionConcurrencyControl {
final AtomicLong readPoint = new AtomicLong(0);
final AtomicLong writePoint = new AtomicLong(0);
private final Object readWaiters = new Object();
// This is the pending queue of writes.
private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
}
From doMiniBatchMutate
method in HRegion.java, we can conclude the order of calling MultiVersionConcurrencyControl
’s member methods:
begin
completeAndWait
advanceTo
complete
Next, let’s see what these methods do.
begin
For begin
, it starts a write transaction by:
- Increment
writePoint
. - Create a new
WriteEntry
with a new write number. - Add new
WriteEntry
to our queue of ongoing writes. - call
Runnable
. - Return this
WriteEntry
instance.
// MultiVersionConcurrencyControl.java
private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
public WriteEntry begin(Runnable action) {
synchronized (writeQueue) {
long nextWriteNumber = writePoint.incrementAndGet();
WriteEntry e = new WriteEntry(nextWriteNumber);
writeQueue.add(e);
action.run();
return e;
}
}
completeAndWait
To complete the write transaction and wait for it to be visible, call completeAndWait(WriteEntry)
. It waits until the read point catches up to our write. At the end of this call, the global read point is at least as large as the write point of the passed in WriteEntry. Thus, the write is visible to MVCC readers.
// MultiVersionConcurrencyControl.java
public void completeAndWait(WriteEntry e) {
if (!complete(e)) {
waitForRead(e);
}
}
Take waitForRead(WriteEntry)
first if you don’t mind, since complete(WriteEntry)
will be penetrated later.
// MultiVersionConcurrencyControl.java
private final Object readWaiters = new Object();
// Wait for the global readPoint to advance up to the passed in write entry number.
void waitForRead(WriteEntry e) {
boolean interrupted = false;
int count = 0;
synchronized (readWaiters) {
while (readPoint.get() < e.getWriteNumber()) {
if (count % 100 == 0 && count > 0) {
LOG.warn("STUCK: " + this);
}
count++;
try {
readWaiters.wait(10);
} catch (InterruptedException ie) {
// We were interrupted... finish the loop -- i.e. cleanup --and then
// on our way out, reset the interrupt flag.
interrupted = true;
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
advanceTo
advanceTo
steps the MVCC forward on to a new read/write basis, which calls tryAdvanceTo
in the loop until it succeeds.
// MultiVersionConcurrencyControl.java
public void advanceTo(long newStartPoint) {
while (true) {
long seqId = this.getWritePoint();
if (seqId >= newStartPoint) break;
if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
}
}
/*
+ @param newStartPoint Point to move read and write points to.
+ @param expected If not -1 (#NONE)
+ @return Returns false if expected is not equal to the
+ current readPoint or if startPoint is less than current readPoint
*/
boolean tryAdvanceTo(long newStartPoint, long expected) {
synchronized (writeQueue) {
long currentRead = this.readPoint.get();
long currentWrite = this.writePoint.get();
if (currentRead != currentWrite) {
throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
}
if (expected != NONE && expected != currentRead) {
return false;
}
if (newStartPoint < currentRead) {
return false;
}
readPoint.set(newStartPoint);
writePoint.set(newStartPoint);
}
return true;
}
complete
complete
marks the WriteEntry
as complete and advances the read point as much as possible.
complete
is called even if the write has FAILED (AFTER backing out the write transaction changes completely) so we can clean up the outstanding transaction.
// MultiVersionConcurrencyControl.java
private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
public boolean complete(WriteEntry writeEntry) {
synchronized (writeQueue) {
writeEntry.markCompleted();
long nextReadValue = NONE;
boolean ranOnce = false;
while (!writeQueue.isEmpty()) {
ranOnce = true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("Invariant in complete violated, nextReadValue="
+ nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
}
}
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
} else {
break;
}
}
if (!ranOnce) {
throw new RuntimeException("There is no first!");
}
if (nextReadValue > 0) {
synchronized (readWaiters) {
readPoint.set(nextReadValue);
readWaiters.notifyAll();
}
}
return readPoint.get() >= writeEntry.getWriteNumber();
}
}
Where MVCC is used
For writes, many methods in HRegion.java
depend on MVCC,
put
checkAndPut
delete
checkAndDelete
internalFlushcache
mutateRow
mutateRowsWithLocks
batchMutate
For reads, a member, private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints
in HRegion.java
, is responsible for recording each read point.
If you get
or scan
in HBase, a RegionScannerImpl
will be created by calling getScanner
.
// HRegion.java
public RegionScannerImpl getScanner(Scan scan, List<KeyValueScanner> additionalScanners)
throws IOException {
return getScanner(scan, additionalScanners, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
Notice the input parameter KeyValueScanner
. the next
method from StoreFileScanner
(KeyValueScanner
’s derived class) is called every time to get a new cell.
// StoreFileScanner.java
public Cell next() throws IOException {
Cell retKey = cur;
try {
// only seek if we aren't at the end. cur == null implies 'end'.
if (cur != null) {
hfs.next();
setCurrentCell(hfs.getCell());
if (hasMVCCInfo || this.reader.isBulkLoaded()) {
skipKVsNewerThanReadpoint();
}
}
} catch (FileNotFoundException e) {
throw e;
} catch(IOException e) {
throw new IOException("Could not iterate " + this, e);
}
return retKey;
}
protected boolean skipKVsNewerThanReadpoint() throws IOException {
// We want to ignore all key-values that are newer than our current
// readPoint
Cell startKV = cur;
while(enforceMVCC
&& cur != null
&& (cur.getSequenceId() > readPt)) {
boolean hasNext = hfs.next();
setCurrentCell(hfs.getCell());
if (hasNext && this.stopSkippingKVsIfNextRow
&& getComparator().compareRows(cur, startKV) > 0) {
return false;
}
}
if (cur == null) {
return false;
}
return true;
}
Reference