For batch puts, it’s better if you construct a list of puts and then call HTable.put(final List<Put> puts), because it uses a single RPC call to commit the batch, but depending on the size of the list write buffer may flush it all or not.

In this post, BufferedMutator is excluded. It will be discussed in the near future.

HTable

batch method does essential job for batch puts, we can see this fact in HTable.java.

// HTable.java

  public void put(final List<Put> puts) throws IOException {
    for (Put put : puts) {
      validatePut(put);
    }
    Object[] results = new Object[puts.size()];
    try {
      batch(puts, results, writeRpcTimeoutMs);
    } catch (InterruptedException e) {
      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
    }
  }

The interaction of batch lists in the following,

Classes involved includes,

  • AsyncProcessTask
  • AsyncProcess
  • RequestController
  • ClusterConnection
  • AsyncRequestFuture
  • RpcRetryingCaller

And the class diagram declares its appearance,

Then, let’s take look at source code of batch method.

// HTable.java

  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
      throws InterruptedException, IOException {
    AsyncProcessTask task = AsyncProcessTask.newBuilder()
            .setPool(pool)
            .setTableName(tableName)
            .setRowAccess(actions)
            .setResults(results)
            .setRpcTimeout(rpcTimeout)
            .setOperationTimeout(operationTimeoutMs)
            .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
            .build();
    AsyncRequestFuture ars = multiAp.submit(task);
    ars.waitUntilDone();
    if (ars.hasError()) {
      throw ars.getErrors();
    }
  }

batch method will

  1. Create a AsyncProcessTask.
  2. Submit new task to AsyncProcess.
  3. Wait until AsyncRequestFuture is done.

AsyncProcessTask

AsyncProcessTask contains the attributes of a task which will be executed by AsyncProcess. A new Builder is instantiated carrying these attributes,

  • Table name
  • Rows
  • Callbacks
  • Timeout limitation
// AsyncProcessTask.java

public class AsyncProcessTask<T> {
  public static Builder newBuilder() {
    return new Builder();
  }

  public static class Builder<T> {

    private ExecutorService pool;
    private TableName tableName;
    private RowAccess<? extends Row> rows;
    private SubmittedRows submittedRows = SubmittedRows.ALL;
    private Batch.Callback<T> callback;
    private boolean needResults;
    private int rpcTimeout;
    private int operationTimeout;
    private CancellableRegionServerCallable callable;
    private Object[] results;

    // getters and setters
  }
}

AsyncProcess

The new AsyncProcessTask will be submitted to AsyncProcess after initialization. AsyncProcess allows a continuous flow of requests from caller.

This class extracts from this list the operations it can send, i.e. the operations that are on region that are not considered as busy. The process is asynchronous, i.e. it returns immediately when if has finished to iterate on the list. If, and only if, the maximum number of current task is reached, the call to submit will block. Alternatively, the caller can call submitAll, in which case all the operations will be sent. Each call to submit returns a future-like object that can be used to track operation progress.

The class manages internally the retries.

// AsyncProcess.java

class AsyncProcess {

  private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
    boolean atLeastOne) throws InterruptedIOException {
      // ...
    }
}

Steps of submit method,

  1. Wait until there is at least one slot for a new task.
  2. Locate regions using ClusterConnection. Detail here: How Does Client Find the Region.
  3. Group the actions per region server.
  4. Send multiply action by calling AsyncRequestFutureImpl

Then, give the stage to AsyncRequestFutureImpl.

AsyncRequestFutureImpl

Note on how this class (one AP submit) works. Initially, all requests are split into groups by server. Request is sent to each server in parallel; the RPC calls are not async so a thread per server is used. Every time some actions fail, regions/locations might have changed, so we re-group them by server and region again and send these groups in parallel too. The result, in case of retries, is a “tree” of threads, with parent exiting after scheduling children. This is why lots of code doesn’t require any synchronization.

// AsyncRequestFutureImpl.java

class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
  void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
                               int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
    Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
          numAttempt);

      // run  all the runnables
      for (Runnable runnable : runnables) {
        runnable.run()
      }     
    }
  }

  private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
                                                                   MultiAction multiAction,
                                                                   int numAttempt) {

    // ...

    Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
    List<Runnable> toReturn = new ArrayList<>(actions.size());
    for (DelayingRunner runner : actions.values()) {
      Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
      // use a delay runner only if we need to sleep for some time
      if (runner.getSleepTime() > 0) {
        runner.setRunner(runnable);
        traceText = "AsyncProcess.clientBackoff.sendMultiAction";
        runnable = runner;
      } else {
        if (asyncProcess.connection.getConnectionMetrics() != null) {asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
        }
      }

      runnable = Trace.wrap(traceText, runnable);
      toReturn.add(runnable);
    }

    return toReturn;
  }
}

SingleServerRequestRunnable, nested class of AsyncRequestFutureImpl, is Runnable (that can be submitted to thread pool) that submits MultiAction to a single server. The server call is synchronous, therefore we do it on a thread pool.

// AsyncRequestFutureImpl.java
final class SingleServerRequestRunnable implements Runnable {
  private final MultiAction multiAction;
  private final int numAttempt;
  private final ServerName server;
  private final Set<CancellableRegionServerCallable> callsInProgress;

  @Override
  public void run() {
    // for short
    callable = createCallable(server, tableName, multiAction);
    RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
    caller.callWithoutRetries(callable, operationTimeout);
  }
}

Finally, all requests are sent by RpcRetryingCaller. For more about RpcRetryingCaller.