Put is used to perform insert or update a single row of HBase table. It’s essential to understand its lifecycle, which can be broke into two pieces,

  1. Client-side, How the put request is built up and sent to the right RegionServer.
  2. Server-side, How the RegionServer handle the Put request and update one row.

In view of the limitation of one single post, the client-side is discussed only.

A sequence diagram elucidates the interaction between class, so it emerges firstly.

The roles involves this interaction, we can see from above diagram, are,

  • HTable, launch a Put operation.
  • ServerCallable, is constructed in HTable, then it builds up mutation request.
  • RpcRetryingCaller, takes the following responsibility after callWithRetries method is called. it coordinates other classes.
  • Connection, finds the destination RegionServer and return it to RpcRetryingCaller.
  • RpcController, set the RPC properties before it is sent.

So far, A more complicated class diagram is needed,

Then, please allow me to show the source code, since it never lies.

Table

Table is used to communicate with a single HBase table, and obtained an instance from a Connection. Table interface has two methods about Put, one for single operation, the other for batch puts.

// Table.java

@InterfaceAudience.Public
public interface Table extends Closeable {
  /**
   * Puts some data in the table.
   */
  void put(Put put) throws IOException;

  /**
   * Batch puts the specified data into the table.
   */
  void put(List<Put> puts) throws IOException;
}

HTable is an implementation of Table and overrides these two methods. It’s very lightweight(Many thanks to ThreadPoolExecutor). Get as needed and just close when done.

In this post, it excludes batch puts which is a totally different story in contrast with single row’s put.

// HTable.java

@InterfaceAudience.Private
@InterfaceStability.Stable
public class HTable implements Table {

  private final ClusterConnection connection;

  @Override
  public void put(final Put put) throws IOException {
    validatePut(put);
    ClientServiceCallable<Void> callable =
        new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
            this.rpcControllerFactory.newController(), put.getPriority()) {
          @Override
          protected Void rpcCall() throws Exception {
            MutateRequest request =
                RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
                  put);
            doMutate(request);
            return null;
          }
        };
    rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
      this.operationTimeoutMs);
  }

  public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
    if (put.isEmpty()) {
      throw new IllegalArgumentException("No columns to insert");
    }
    if (maxKeyValueSize > 0) {
      for (List<Cell> list : put.getFamilyCellMap().values()) {
        for (Cell cell : list) {
          if (KeyValueUtil.length(cell) > maxKeyValueSize) {
            throw new IllegalArgumentException("KeyValue size too large");
          }
        }
      }
    }
  }
}

We can lightly list the steps of what put method does,

  1. validate this put.
  2. new a ClientServiceCallable with parameter Connection, RpcControlelr. and override its rpcCall method.
  3. new a RpcRetryingCaller from RpcCallerFactory and call the caller’s callWithRetries, passing callable, initialized from step 2, to it.

Take a glimpse on validatePut. It checks if this put is empty, then iterates all the values, if non-empty, to make sure the they are all smaller than maximum Key-Value size, which is configured before.

Let’s skip ServerCallable, which we will take a close look later, and deep into RpcRetryingCaller first, considering callWithRetries method in RpcRetryingCaller takes ServerCallable as an input parameter.

RpcRetryingCaller

RpcRetryingCaller is also an interface without any member field. it leaves unimplemented callWithRetries to its implementation, RpcRetryingCallerImpl.

// RpcRetryingCaller.java

@InterfaceAudience.Public
public interface RpcRetryingCaller<T> {

  /**
   * Retries if invocation fails.
   */
  T callWithRetries(RetryingCallable<T> callable, int callTimeout)
  throws IOException, RuntimeException;
}

We can slide over other code snippets and look through callWithRetries method.

public class RpcRetryingCallerImpl<T> implements RpcRetryingCaller<T> {
  @Override
  public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
  throws IOException, RuntimeException {
    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions = new ArrayList<>();
    tracker.start();
    context.clear();
    for (int tries = 0;; tries++) {
      long expectedSleep;
      try {
        // bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block
        callable.prepare(tries != 0);
        interceptor.intercept(context.prepare(callable, tries));
        return callable.call(getTimeout(callTimeout));
      } catch (PreemptiveFastFailException e) {
        throw e;
      } catch (Throwable t) {
        Throwable e = t.getCause();
        ExceptionUtil.rethrowIfInterrupt(t);
        Throwable cause = t.getCause();
        if (cause instanceof DoNotRetryIOException) {
          // Fail fast
          throw (DoNotRetryIOException) cause;
        }
        // translateException throws exception when should not retry: i.e. when request is bad.
        interceptor.handleFailure(context, t);
        t = translateException(t);

        if (tries > startLogErrorsCnt) {
          LOG.info("Call exception, tries=" + tries + ", maxAttempts=" + maxAttempts + ", started="
              + (EnvironmentEdgeManager.currentTime() - tracker.getStartTime()) + " ms ago, "
              + "cancelled=" + cancelled.get() + ", msg="
              + t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail());
        }

        callable.throwable(t, maxAttempts != 1);
        RetriesExhaustedException.ThrowableWithExtraContext qt =
            new RetriesExhaustedException.ThrowableWithExtraContext(t,
                EnvironmentEdgeManager.currentTime(), toString());
        exceptions.add(qt);
        if (tries >= maxAttempts - 1) {
          throw new RetriesExhaustedException(tries, exceptions);
        }
        // If the server is dead, we need to wait a little before retrying, to give
        // a chance to the regions to be moved
        // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
        // special when encountering CallQueueTooBigException, see #HBASE-17114
        long pauseBase = (t instanceof CallQueueTooBigException) ? pauseForCQTBE : pause;
        expectedSleep = callable.sleep(pauseBase, tries);

        // If, after the planned sleep, there won't be enough time left, we stop now.
        long duration = singleCallDuration(expectedSleep);
        if (duration > callTimeout) {
          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
              ": " +  t.getMessage() + " " + callable.getExceptionMessageAdditionalDetail();
          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
        }
      } finally {
        interceptor.updateFailureInfo(context);
      }
      try {
        if (expectedSleep > 0) {
          synchronized (cancelled) {
            if (cancelled.get()) return null;
            cancelled.wait(expectedSleep);
          }
        }
        if (cancelled.get()) return null;
      } catch (InterruptedException e) {
        throw new InterruptedIOException("Interrupted after " + tries
            + " tries while maxAttempts=" + maxAttempts);
      }
    }
  }
}

Well, it’s still a long snippet. Let’s simplify it as short as possible.

public T callWithRetries(RetryingCallable<T> callable, int callTimeout) {
  for (int tries = 0;; tries++) {
    try {
      callable.prepare(tries != 0);
      return callable.call(getTimeout(callTimeout));
    } catch (Throwable t) {
      callable.throwable(t, maxAttempts != 1)
    }
  }
}

At present, it becomes clear. RpcRetryingCaller invokes ServerCallable’s

  1. prepare()
  2. call()
  3. throwable(), if call() throws any Throwable exception.

Next, It’s time for ServerCallable’s appearance on the stage.

ServerCallable

The root class of ServerCallable is RetryingCallable, a callable that will be retried. It declares two important methods, prepare() and call(), which will be used in the remaining Put process.

// RetryingCallable.java

@InterfaceAudience.Private
public interface RetryingCallable<T> {
  /**
   * Prepare by setting up any connections to servers, etc., ahead of call invocation.
   */
  void prepare(final boolean reload) throws IOException;

  /**
   * Computes a result, or throws an exception if unable to do so.
   */
  T call(int callTimeout) throws Exception;
}

RegionServerCallable implements RetryingCallable, overrides prepare() and call() methods. What’s important, it introduces a field named stub, a RPC client stub, which sends request underneath.

// RegionServerCallable.java

@InterfaceAudience.Private
public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> {
  private final Connection connection;
  protected HRegionLocation location;
  protected S stub;
  protected final RpcController rpcController;

  /**
   * Override that changes call Exception from {@link Exception} to {@link IOException}.
   * Also does set up of the rpcController.
   */
  public T call(int callTimeout) throws IOException {
    try {
      // Iff non-null and an instance of a SHADED rpcController, do config! Unshaded -- i.e.
      // com.google.protobuf.RpcController or null -- will just skip over this config.
      if (getRpcController() != null) {
        RpcController shadedRpcController = (RpcController)getRpcController();
        // Do a reset to clear previous states, such as CellScanner.
        shadedRpcController.reset();
        if (shadedRpcController instanceof HBaseRpcController) {
          HBaseRpcController hrc = (HBaseRpcController)getRpcController();
          // If it is an instance of HBaseRpcController, we can set priority on the controller based
          // off the tableName. Set call timeout too.
          hrc.setPriority(tableName);
          hrc.setPriority(priority);
          hrc.setCallTimeout(callTimeout);
        }
      }
      return rpcCall();
    } catch (Exception e) {
      throw ProtobufUtil.handleRemoteException(e);
    }
  }

  /**
   * Run the RPC call. Implement this method. To get at the rpcController that has been created
   * and configured to make this rpc call, use getRpcController(). We are trying to contain
   * rpcController references so we don't pollute codebase with protobuf references; keep the
   * protobuf references contained and only present in a few classes rather than all about the
   * code base.
   * @throws Exception
   */
  protected abstract T rpcCall() throws Exception;

  public void prepare(final boolean reload) throws IOException {
    // check table state if this is a retry
    if (reload && tableName != null && !tableName.equals(TableName.META_TABLE_NAME)
        && getConnection().isTableDisabled(tableName)) {
      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
    }
    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
      this.location = regionLocator.getRegionLocation(row);
    }
    if (this.location == null) {
      throw new IOException("Failed to find location, tableName=" + tableName +
          ", row=" + Bytes.toString(row) + ", reload=" + reload);
    }
    setStubByServiceName(this.location.getServerName());
  }

  /**
   * Set the RPC client stub
   * @param serviceName to get the rpc stub for
   * @throws IOException When client could not be created
   */
  protected abstract void setStubByServiceName(ServerName serviceName) throws IOException;
}

The prepare() method,

  1. checks table state
  2. gets region’s location of one certain row.
  3. sets RPC client stub by service name.

The detail of get region’s location see here: How Does Client Find the Region

setStubByServiceName is an abstract method and should be implemented by RegionServerCallable’s children.

And the call() method,

  1. gets RpcController, reset it, the set its priority and timeout if it’s HBaseRpcController.
  2. call rpcCall() for the remaining work.

Like setStubByServiceName, rpcCall() is also abstract, waiting for being implemented.

One son of RegionServerCallable, ClientServiceCallable get stub from Connection’s client. We will penetrate Connection thereafter.

See the definition of ClientServiceCallable, stub’s type is determined to ClientProtos.ClientService.BlockingInterface. With stub in hand, ClientServiceCallable can mutate request finally. Remember int the put method of HTable, doMutate is called when initializing ServerCallable.

// ClientServiceCallable.java

@InterfaceAudience.Private
public abstract class ClientServiceCallable<T> extends
    RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {

  @Override
  protected void setStubByServiceName(ServerName serviceName) throws IOException {
    setStub(getConnection().getClient(serviceName));
  }

  protected ClientProtos.MutateResponse doMutate(ClientProtos.MutateRequest request)
  throws org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
    return getStub().mutate(getRpcController(), request);
  }
}

The Child of ClientServiceCallable, ScannerCallable, implements rpcCall(), as we expected. A reduced version list in the following,

// ScannerCallable.java

@InterfaceAudience.Private
public class ScannerCallable extends ClientServiceCallable<Result[]> {
  @Override
  protected Result[] rpcCall() throws Exception {
    ScanResponse response;
    if (this.scannerId == -1L) {
      response = openScanner();
    } else {
      response = next();
    }

    Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);

    updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
    return rrs;
  }

  private ScanResponse openScanner() throws IOException {
    incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
    ScanRequest request = RequestConverter.buildScanRequest(
      getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
    try {
      ScanResponse response = getStub().scan(getRpcController(), request);
      long id = response.getScannerId();
      if (logScannerActivity) {
        LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
          + " on region " + getLocation().toString());
      }
      if (response.hasMvccReadPoint()) {
        this.scan.setMvccReadPoint(response.getMvccReadPoint());
      }
      this.scannerId = id;
      return response;
    } catch (Exception e) {
      throw ProtobufUtil.handleRemoteException(e);
    }
  }
}

rpcCall() call another private method openScanner, the laster one invokes getStub(), ClientProtos.ClientService.BlockingInterface, to get response.

RpcController

HBaseRpcController is a readily comprehensible interface, implementing from protobuf.RpcController. it carries some properties, such as priority, timeout, of the request.

// HBaseRpcController.java

@InterfaceAudience.Private
public interface HBaseRpcController extends RpcController, CellScannable {

  // Priority for this request
  void setPriority(int priority);
  int getPriority();

  int getCallTimeout();
  void setCallTimeout(int callTimeout);
}

HBaseRpcControllerImpl contains of some getters and setters. Nothing need to be emphasized.

// HBaseRpcControllerImpl.java

@InterfaceAudience.Private
public class HBaseRpcControllerImpl implements HBaseRpcController {
  @Override
  public void reset() {
    priority = 0;
    cellScanner = null;
    exception = null;
    callTimeout = null;
    // In the implementations of some callable with replicas, rpc calls are executed in a executor
    // and we could cancel the operation from outside which means there could be a race between
    // reset and startCancel. Although I think the race should be handled by the callable since the
    // reset may clear the cancel state...
    synchronized (this) {
      done = false;
      cancelled = false;
      cancellationCbs.clear();
    }
  }
}

Connection

At last, hope you don’t forget Connection, which is responsible for locating region during this process.

See more, please jump to How Does Client Find the Region.

Reference

  1. HBase源码分析:HTable put过程