Following the preceding A Lifecycle of HBase’s Put: Client-side, this post will figure out how a Put request does its mutation on HRegionServer. The whole process involves WAL, MemStore, and Coprocessor etc. After reading this post, hope you make their roles clear.

What ClientServiceCallable do on client-side, if you remember, is calling ClientProtos.ClientService.BlockingInterface’s mutate method to send Put request to RegionServer.

// ClientServiceCallable.java

@InterfaceAudience.Private
public abstract class ClientServiceCallable<T> extends
    RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {

  @Override
  protected void setStubByServiceName(ServerName serviceName) throws IOException {
    setStub(getConnection().getClient(serviceName));
  }

  protected ClientProtos.MutateResponse doMutate(ClientProtos.MutateRequest request)
  throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
    return getStub().mutate(getRpcController(), request);
  }
}

HRegionServer

And on server-side, who will accept this request? Every RegionServer runs a HRegionServer from commandline. When HregionServer is initialized, it starts RSRpcServices. See the definition of HRegionServer and its constructor.

// HRegionServer.java

public class HRegionServer extends HasThread implements
    RegionServerServices, LastSequenceId, ConfigurationObserver {

  protected final RSRpcServices rpcServices;

  /**
  * Starts a HRegionServer at the default location
  */
  public HRegionServer(Configuration conf) throws IOException {
    // ...

    rpcServices = createRpcServices();
    this.rpcServices.start();
  }
}

RSRpsServices

RSRpsServices implements ClientService.BlockingInterface, and override mutate method of course. What mutate method is simple, it calls put method of HRegion, them update metrics.

// RSRpcServices.java

public class RSRpcServices implements HBaseRPCErrorHandler,
    AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
    ConfigurationObserver {

   final RpcServerInterface rpcServer;

   void start() {
      rpcServer.start();
    }   

   /**
   * Mutate data in a table.
   */
  @Override
  public MutateResponse mutate(final RpcController rpcc,
      final MutateRequest request) throws ServiceException {
    // ...
    HBaseRpcController controller = (HBaseRpcController)rpcc;
    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;

    MutationProto mutation = request.getMutation();
    Put put = ProtobufUtil.toPut(mutation, cellScanner);

    HRegion region = getRegion(request.getRegion());
    region.put(put)
  }
}

HRegion

// HRegion.java

public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
  @Override
  public void put(Put put) throws IOException {
    checkReadOnly();

    // Do a rough check that we have resources to accept a write.  The check is
    // 'rough' in that between the resource check and the call to obtain a
    // read lock, resources may run out.  For now, the thought is that this
    // will be extremely rare; we'll deal with it when it happens.
    checkResources();
    startRegionOperation(Operation.PUT);
    try {
      // All edits for the given row (across all column families) must happen atomically.
      doBatchMutate(put);
    } finally {
      closeRegionOperation(Operation.PUT);
    }
  }

  OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {
          checkReadOnly();
        }
        checkResources();

        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            callPreMutateCPHooks(batchOp);
          }
          // validate and prepare batch for write, after CP pre-hooks
          batchOp.checkAndPrepare(this);
          initialized = true;
        }
        doMiniBatchMutate(batchOp);
        long newSize = this.getMemStoreSize();
        requestFlushIfNeeded(newSize);
      }
    } finally {
      closeRegionOperation(op);
    }
    return batchOp.retCodeDetails;
  }
}

After some validations, doMiniBatchMutate becomes the worker that really writes, and requestFlushIfNeeded will flush MemStore to HFile. Let’s look at requestFlushIfNeeded first, since doMiniBatchMutate is more complicated and will take a large chunk of this post.

requestFlushIfNeeded

// HRegion.java

public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
  private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
    if (memstoreTotalSize > this.getMemStoreFlushSize()) {
      requestFlush();
    }
  }

  private void requestFlush() {
    if (this.rsServices == null) {
      return;
    }
    requestFlush0(FlushLifeCycleTracker.DUMMY);
  }

  private void requestFlush0(FlushLifeCycleTracker tracker) {
    boolean shouldFlush = false;
    synchronized (writestate) {
      if (!this.writestate.isFlushRequested()) {
        shouldFlush = true;
        writestate.flushRequested = true;
      }
    }
    if (shouldFlush) {
      // Make request outside of synchronize block; HBASE-818.
      this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
      }
    } else {
      tracker.notExecuted("Flush already requested on " + this);
    }
  }
}

As we can see, FlushRequester is called to trigger flush operation. In its Child MemStoreFlusher, each flush task is appended into BlockingQueue. At the same time, one Runnable named FlushHandler periodically flushes these cache into HDFS with the help of HRegion’s flushcache.

// MemStoreFlusher.java

class MemStoreFlusher implements FlushRequester {

  private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<>();
  private final Map<Region, FlushRegionEntry> regionsInQueue = new HashMap<>();
  private final FlushHandler[] flushHandlers;
  private List<FlushRequestListener> flushRequestListeners = new ArrayList<>(1);
  private final HRegionServer server;

  @Override
  public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
    r.incrementFlushesQueuedCount();
    synchronized (regionsInQueue) {
      if (!regionsInQueue.containsKey(r)) {
        // This entry has no delay so it will be added at the top of the flush
        // queue. It'll come out near immediately.
        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      } else {
        tracker.notExecuted("Flush already requested on " + r);
      }
    }
  }

  private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
      FlushLifeCycleTracker tracker) {
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue. When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
      }
    }

    lock.readLock().lock();
    try {
      notifyFlushRequest(region, emergencyFlush);
      FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);

      // ...
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
      tracker.afterExecution();
    }
    return true;
  }

  private class FlushHandler extends HasThread {
    @Override
    public void run() {
      while (!server.isStopped()) {
        FlushQueueEntry fqe = null;
        try {
          // ...
          FlushRegionEntry fre = (FlushRegionEntry) fqe;
          if (!flushRegion(fre)) {
            break;
          }
        } catch (InterruptedException ex) {
          continue;
        } catch (ConcurrentModificationException ex) {
          continue;
        } catch (Exception ex) {
          LOG.error("Cache flusher failed for entry " + fqe, ex);
          if (!server.checkFileSystem()) {
            break;
          }
        }
      }

      synchronized (regionsInQueue) {
        regionsInQueue.clear();
        flushQueue.clear();
      }

      // Signal anyone waiting, so they see the close flag
      wakeUpIfBlocking();
      LOG.info(getName() + " exiting");
    }
  }
}

doMiniBatchMutate

Hope this graph tells the complete steps of doMiniBatchMutate. A sketch lists here:

  1. Try to acquire as many read locks as we can, and ensure we acquire at least one. Why read locks? See more Document how HRegion#doMiniBatchMutation is acquiring read row locks.
  2. Update any LATEST_TIMESTAMP timestamps.
  3. Build WAL edit.
  4. Append the final edit to WAL and sync, begin mvcc if no WriteEntry.
  5. Write back to memStore.
  6. Complete mvcc.

Two important concepts I must emphasize,

  • mvcc, shorted for MultiVersionConcurrencyControl. it manages the read/write consistency. This provides an interface for readers to determine what entries to ignore, and a mechanism for writers to obtain new write numbers, then “commit” the new writes for readers to read (thus forming atomic transactions).
  • RegionCoprocessorHost, which implements the coprocessor environment and runtime support for coprocessors loaded within a Region. Every step leaves some pre or post jobs to coprocessors.
// HRegion.java

public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
  private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
    boolean replay = batchOp.isInReplay();
    long currentNonceGroup = HConstants.NO_NONCE;
    long currentNonce = HConstants.NO_NONCE;
    WALEdit walEdit = null;
    boolean locked = false;
    // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
    int firstIndex = batchOp.nextIndexToProcess;
    int lastIndexExclusive = firstIndex;
    boolean success = false;
    boolean doneByCoprocessor = false;
    int noOfPuts = 0;
    int noOfDeletes = 0;
    WriteEntry writeEntry = null;
    int cellCount = 0;
    /** Keep track of the locks we hold so we can release them in finally clause */
    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
    MemStoreSizing memStoreAccounting = new MemStoreSizing();
    try {
      // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
      int numReadyToWrite = 0;
      for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) {
        if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
            != OperationStatusCode.NOT_RUN) {
          continue;
        }
        Mutation mutation = batchOp.getMutation(lastIndexExclusive);
        // If we haven't got any rows in our batch, we should block to get the next one.
        RowLock rowLock = null;
        try {
          rowLock = getRowLockInternal(mutation.getRow(), true);
        } catch (TimeoutIOException e) {
          // We will retry when other exceptions, but we should stop if we timeout .
          throw e;
        } catch (IOException ioe) {
          LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
        }
        if (rowLock == null) {
          // We failed to grab another lock
          break; // Stop acquiring more rows for this batch
        } else {
          acquiredRowLocks.add(rowLock);
        }

        numReadyToWrite++;
        if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
          for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
            cellCount += cells.size();
          }
        }
      }

      // We've now grabbed as many mutations off the list as we can
      // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
      if (numReadyToWrite <= 0) {
        return;
      }

      // STEP 2. Update any LATEST_TIMESTAMP timestamps
      // We should record the timestamp only after we have acquired the rowLock,
      // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
      long now = EnvironmentEdgeManager.currentTime();
      if (!replay) {
        byte[] byteNow = Bytes.toBytes(now);
        for (int i = firstIndex; i < lastIndexExclusive; i++) {
          // skip invalid
          if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
            // lastIndexExclusive was incremented above.
            continue;
          }

          Mutation mutation = batchOp.getMutation(i);
          if (mutation instanceof Put) {
            updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow);
            noOfPuts++;
          } else {
            prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow);
            noOfDeletes++;
          }
          rewriteCellTags(batchOp.familyCellMaps[i], mutation);
          WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
          if (fromCP != null) {
            cellCount += fromCP.size();
          }
        }
      }
      lock(this.updatesLock.readLock(), numReadyToWrite);
      locked = true;

      // calling the pre CP hook for batch mutation
      if (!replay && coprocessorHost != null) {
        MiniBatchOperationInProgress<Mutation> miniBatchOp =
          new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
        if (coprocessorHost.preBatchMutate(miniBatchOp)) {
          doneByCoprocessor = true;
          return;
        } else {
          for (int i = firstIndex; i < lastIndexExclusive; i++) {
            if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
              // lastIndexExclusive was incremented above.
              continue;
            }
            // we pass (i - firstIndex) below since the call expects a relative index
            Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
            if (cpMutations == null) {
              continue;
            }
            Mutation mutation = batchOp.getMutation(i);
            boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;
            // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
            for (int j = 0; j < cpMutations.length; j++) {
              Mutation cpMutation = cpMutations[j];
              checkAndPrepareMutation(cpMutation, replay, now);

              // Acquire row locks. If not, the whole batch will fail.
              acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));

              // Returned mutations from coprocessor correspond to the Mutation at index i. We can
              // directly add the cells from those mutations to the familyMaps of this mutation.
              Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
              // will get added to the memStore later
              mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);

              // The durability of returned mutation is replaced by the corresponding mutation.
              // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
              // cells of returned mutation.
              if (!skipWal) {
                for (List<Cell> cells : cpFamilyMap.values()) {
                  cellCount += cells.size();
                }
              }
            }
          }
        }
      }

      // STEP 3. Build WAL edit
      walEdit = new WALEdit(cellCount, replay);
      for (int i = firstIndex; i < lastIndexExclusive; i++) {
        // Skip puts that were determined to be invalid during preprocessing
        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
          continue;
        }

        Mutation m = batchOp.getMutation(i);
        // we use durability of the original mutation for the mutation passed by CP.
        if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
          recordMutationWithoutWal(m.getFamilyCellMap());
          continue;
        }

        long nonceGroup = batchOp.getNonceGroup(i);
        long nonce = batchOp.getNonce(i);
        // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
        // Given how nonces are originally written, these should be contiguous.
        // They don't have to be, it will still work, just write more WALEdits than needed.
        if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
          // Write what we have so far for nonces out to WAL
          appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);
          walEdit = new WALEdit(cellCount, replay);
          currentNonceGroup = nonceGroup;
          currentNonce = nonce;
        }

        // Add WAL edits by CP
        WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
        if (fromCP != null) {
          for (Cell cell : fromCP.getCells()) {
            walEdit.add(cell);
          }
        }
        addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit);
      }

      // STEP 4. Append the final edit to WAL and sync.
      Mutation mutation = batchOp.getMutation(firstIndex);
      writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now,
          currentNonceGroup, currentNonce,
          replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID);
      if (!replay && writeEntry == null) {
        // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC
        // transaction to get sequence id.
        writeEntry = mvcc.begin();
      }

      // STEP 5. Write back to memStore
      for (int i = firstIndex; i < lastIndexExclusive; i++) {
        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
          continue;
        }
        // We need to update the sequence id for following reasons.
        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
        // 2) If no WAL, FSWALEntry won't be used
        // we use durability of the original mutation for the mutation passed by CP.
        boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
        if (updateSeqId) {
          this.updateSequenceId(batchOp.familyCellMaps[i].values(),
            replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
        }
        applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting);
      }

      // update memstore size
      this.addAndGetMemStoreSize(memStoreAccounting);

      // calling the post CP hook for batch mutation
      if (!replay && coprocessorHost != null) {
        MiniBatchOperationInProgress<Mutation> miniBatchOp =
          new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
        coprocessorHost.postBatchMutate(miniBatchOp);
      }

      // STEP 6. Complete mvcc.
      if (writeEntry != null) {
        mvcc.completeAndWait(writeEntry);
        writeEntry = null;
      }
      if (replay) {
        this.mvcc.advanceTo(batchOp.getReplaySequenceId());
      }

      success = true;
    } finally {
      // Call complete rather than completeAndWait because we probably had error if walKey != null
      if (writeEntry != null) mvcc.complete(writeEntry);

      if (locked) {
        this.updatesLock.readLock().unlock();
      }
      releaseRowLocks(acquiredRowLocks);

      for (int i = firstIndex; i < lastIndexExclusive; i++) {
        if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
          batchOp.retCodeDetails[i] =
              success || doneByCoprocessor ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
        }
      }

      // synced so that the coprocessor contract is adhered to.
      if (!replay && coprocessorHost != null && !doneByCoprocessor) {
        for (int i = firstIndex; i < lastIndexExclusive; i++) {
          // only for successful puts
          if (batchOp.retCodeDetails[i].getOperationStatusCode()
              != OperationStatusCode.SUCCESS) {
            continue;
          }
          Mutation m = batchOp.getMutation(i);
          if (m instanceof Put) {
            coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
          } else {
            coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
          }
        }
      }

      // See if the column families were consistent through the whole thing.
      // if they were then keep them. If they were not then pass a null.
      // null will be treated as unknown.
      // Total time taken might be involving Puts and Deletes.
      // Split the time for puts and deletes based on the total number of Puts and Deletes.

      if (noOfPuts > 0) {
        // There were some Puts in the batch.
        if (this.metricsRegion != null) {
          this.metricsRegion.updatePut();
        }
      }
      if (noOfDeletes > 0) {
        // There were some Deletes in the batch.
        if (this.metricsRegion != null) {
          this.metricsRegion.updateDelete();
        }
      }

      if (coprocessorHost != null && !batchOp.isInReplay()) {
        // call the coprocessor hook to do any finalization steps
        // after the put is done
        MiniBatchOperationInProgress<Mutation> miniBatchOp =
          new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
        coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
      }

      batchOp.nextIndexToProcess = lastIndexExclusive;
    }
  }

Reference

  1. HBase - 数据写入流程解析
  2. HBase源码分析:HTable put过程
  3. hbase put 流程分析regionserver端
  4. HBase的put流程源码分析
  5. Document how HRegion#doMiniBatchMutation is acquiring read row locks
  6. The usage of mutation from CP is weird.
  7. Inconsistent behavior for preBatchMutate in doMiniBatchMutate and processRowsWithLocks