The MemStore holds in-memory modifications to the Store. This post deeps into the source code for sake of understanding what it really do underneath.

MemStore interface

It’s easy to switch to MemStore interface, which functions should not be called in parallel. Callers should hold write and read locks.

Here lists some important functions in this interface:

// Creates a snapshot of the current memstore
MemStoreSnapshot snapshot();

// Write an update
void add(final Cell cell, MemstoreSize memstoreSize);

// Update or insert the specified cells
void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize);

// return scanner over the memstore, inluding scanner over snapshot
List<KeyValueScanner> getScanners(long readPt) throws IOException;

AbstractMemStore abstract class

MemStore is implemented by AbstractMemStore, An abstract class, which implements the behaviour shared by all concrete memstore instances.

public abstract class AbstractMemStore implements MemStore {
  // active segment absorbs write operations
  protected volatile MutableSegment active;

  // Snapshot of memstore.  Made for flusher.
  protected volatile ImmutableSegment snapshot;

  @Override
  public void add(Cell cell, MemstoreSize memstoreSize) {
    Cell toAdd = maybeCloneWithAllocator(cell);
    boolean mslabUsed = (toAdd != cell);
    // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
    // default MSLAB is ON and we might have copied cell to MSLAB area. If not we must do below deep
    // copy. Or else we will keep referring to the bigger chunk of memory and prevent it from
    // getting GCed.
    // Copy to MSLAB would not have happened if
    // 1. MSLAB is turned OFF. See "hbase.hregion.memstore.mslab.enabled"
    // 2. When the size of the cell is bigger than the max size supported by MSLAB. See
    // "hbase.hregion.memstore.mslab.max.allocation". This defaults to 256 KB
    // 3. When cells are from Append/Increment operation.
    if (!mslabUsed) {
      toAdd = deepCopyIfNeeded(toAdd);
    }
    internalAdd(toAdd, mslabUsed, memstoreSize);
  }

   /*
   * Internal version of add() that doesn't clone Cells with the
   * allocator, and doesn't take the lock.
   *
   * Callers should ensure they already have the read lock taken
   * @param toAdd the cell to add
   * @param mslabUsed whether using MSLAB
   * @param memstoreSize
   */
  private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemstoreSize memstoreSize) {
    active.add(toAdd, mslabUsed, memstoreSize);
    setOldestEditTimeToNow();
    checkActiveSize();
  }
}

The add function follows these steps:

  1. call maybeCloneWithAllocator, allocates slice in this LAB and copy the passed Cell into this area. Returns new Cell instance over the copied the data.
  2. if step 1 failed, execute deep copy.
  3. call internalAdd, which jumps to MutableSegment and add current cell to CellSet.

Then, upsert function:

  @Override
  public void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize) {
    for (Cell cell : cells) {
      upsert(cell, readpoint, memstoreSize);
    }
  }

    /*
   * Inserts the specified Cell into MemStore and deletes any existing
   * versions of the same row/family/qualifier as the specified Cell.
   * <p>
   * First, the specified Cell is inserted into the Memstore.
   * <p>
   * If there are any existing Cell in this MemStore with the same row,
   * family, and qualifier, they are removed.
   * <p>
   * Callers must hold the read lock.
   *
   * @param cell the cell to be updated
   * @param readpoint readpoint below which we can safely remove duplicate KVs
   * @param memstoreSize
   */
  private void upsert(Cell cell, long readpoint, MemstoreSize memstoreSize) {
    // Add the Cell to the MemStore
    // Use the internalAdd method here since we (a) already have a lock
    // and (b) cannot safely use the MSLAB here without potentially
    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
    // test that triggers the pathological case if we don't avoid MSLAB
    // here.
    // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). We
    // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
    // prevent it from getting GCed.
    cell = deepCopyIfNeeded(cell);
    this.active.upsert(cell, readpoint, memstoreSize);
    setOldestEditTimeToNow();
    checkActiveSize();
  }
  1. get an instance from deep clone.
  2. upsert to active using MutableSegment.

upsert function in MutableSegment:

  public void upsert(Cell cell, long readpoint, MemstoreSize memstoreSize) {
    internalAdd(cell, false, memstoreSize);

    // Get the Cells for the row/family/qualifier regardless of timestamp.
    // For this case we want to clean up any other puts
    Cell firstCell = CellUtil.createFirstOnRowColTS(cell, HConstants.LATEST_TIMESTAMP);
    SortedSet<Cell> ss = this.tailSet(firstCell);
    Iterator<Cell> it = ss.iterator();
    // versions visible to oldest scanner
    int versionsVisible = 0;
    while (it.hasNext()) {
      Cell cur = it.next();

      if (cell == cur) {
        // ignore the one just put in
        continue;
      }
      // check that this is the row and column we are interested in, otherwise bail
      if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
        // only remove Puts that concurrent scanners cannot possibly see
        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
          if (versionsVisible >= 1) {
            // if we get here we have seen at least one version visible to the oldest scanner,
            // which means we can prove that no scanner will see this version

            // false means there was a change, so give us the size.
            // TODO when the removed cell ie.'cur' having its data in MSLAB, we can not release that
            // area. Only the Cell object as such going way. We need to consider cellLen to be
            // decreased there as 0 only. Just keeping it as existing code now. We need to know the
            // removed cell is from MSLAB or not. Will do once HBASE-16438 is in
            int cellLen = getCellLength(cur);
            long heapSize = heapSizeChange(cur, true);
            this.incSize(-cellLen, -heapSize);
            if (memstoreSize != null) {
              memstoreSize.decMemstoreSize(cellLen, heapSize);
            }
            it.remove();
          } else {
            versionsVisible++;
          }
        }
      } else {
        // past the row or column, done
        break;
      }
    }
  }
  1. add current Cell to CellSet.
  2. get the Cells for the row/family/qualifier regardless of timestamp.
  3. remove older Cell.

DefaultMemStore class

public class DefaultMemStore extends AbstractMemStore {

  /**
   * Creates a snapshot of the current memstore.
   * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
   */
  @Override
  public MemStoreSnapshot snapshot() {
    // If snapshot currently has entries, then flusher failed or didn't call
    // cleanup.  Log a warning.
    if (!this.snapshot.isEmpty()) {
      LOG.warn("Snapshot called again without clearing previous. " +
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
    } else {
      this.snapshotId = EnvironmentEdgeManager.currentTime();
      if (!this.active.isEmpty()) {
        ImmutableSegment immutableSegment = SegmentFactory.instance().
            createImmutableSegment(this.active);
        this.snapshot = immutableSegment;
        resetActive();
      }
    }
    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
  }

  @Override
  /*
   * Scanners are ordered from 0 (oldest) to newest in increasing order.
   */
  public List<KeyValueScanner> getScanners(long readPt) throws IOException {
    List<KeyValueScanner> list = new ArrayList<>();
    long order = snapshot.getNumOfSegments();
    order = addToScanners(active, readPt, order, list);
    addToScanners(snapshot.getAllSegments(), readPt, order, list);
    return list;
  }

CompactingMemStore class

A memstore implementation which supports in-memory compaction. A compaction pipeline is added between the active set and the snapshot data structures; it consists of a list of kv-sets that are subject to compaction. Like the snapshot, all pipeline components are read-only; updates only affect the active set. To ensure this property we take advantage of the existing blocking mechanism – the active set is pushed to the pipeline while holding the region’s updatesLock in exclusive mode. Periodically, a compaction is applied in the background to all pipeline components resulting in a single read-only component. The ``old’’ components are discarded when no scanner is reading them.

public class CompactingMemStore extends AbstractMemStore {
 /**
   * Push the current active memstore segment into the pipeline
   * and create a snapshot of the tail of current compaction pipeline
   * Snapshot must be cleared by call to {@link #clearSnapshot}.
   * {@link #clearSnapshot(long)}.
   * @return {@link MemStoreSnapshot}
   */
  @Override
  public MemStoreSnapshot snapshot() {
    // If snapshot currently has entries, then flusher failed or didn't call
    // cleanup.  Log a warning.
    if (!this.snapshot.isEmpty()) {
      LOG.warn("Snapshot called again without clearing previous. " +
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
    } else {
      if (LOG.isDebugEnabled()) {
        LOG.debug("FLUSHING TO DISK: region "
            + getRegionServices().getRegionInfo().getRegionNameAsString() + "store: "
            + getFamilyName());
      }
      stopCompaction();
      pushActiveToPipeline(this.active);
      snapshotId = EnvironmentEdgeManager.currentTime();
      // in both cases whatever is pushed to snapshot is cleared from the pipeline
      if (compositeSnapshot) {
        pushPipelineToSnapshot();
      } else {
        pushTailToSnapshot();
      }
    }
    return new MemStoreSnapshot(snapshotId, this.snapshot);
  }
}