An issue, excessive threads of HBase client, annoyed me for a quite while. It reminds me to penetrate into uses of Executorservice in HBase client. That’s the origin of this blog.

Remember how to put some data into HBase?

val conf = HBaseConfiguration.create()
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(TableName.valueOf(Bytes.toBytes(tableName))
val p = new Put(Bytes.toBytes("rowId"))
p.addImmutable(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"))
table.put(p)
table.close()
conn.close()

Two classes is involved during this process,

  1. Connection
  2. Table

Then, let’s see where Executorservice is.

Connection

ConnectionImplementation, the child of Connection has two private fields.

// ConnectionImplementation.java

class ConnectionImplementation implements ClusterConnection, Closeable {
  private volatile ExecutorService batchPool = null;
  private volatile ExecutorService metaLookupPool = null;
}

batchPool is an thread executor shared by all Table instances created by this connection, and metaLookupPool, a meta thread executor shared by all Table instances created by this connection.

My question is which configuration decides the size of these pool? As the code says,

// ConnectionImplementation.java

  // for batch pool
  private ExecutorService getBatchPool() {
    if (batchPool == null) {
      synchronized (this) {
        if (batchPool == null) {
          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
          this.batchPool = getThreadPool(threads, threads, "-shared", null);
          this.cleanupPool = true;
        }
      }
    }
    return this.batchPool;
  }

  // for meta pool
  private ExecutorService getMetaLookupPool() {
    if (this.metaLookupPool == null) {
      synchronized (this) {
        if (this.metaLookupPool == null) {
          //Some of the threads would be used for meta replicas
          //To start with, threads.max.core threads can hit the meta (including replicas).
          //After that, requests will get queued up in the passed queue, and only after
          //the queue is full, a new thread will be started
          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
          this.metaLookupPool = getThreadPool(
             threads,
             threads,
             "-metaLookup-shared-", new LinkedBlockingQueue<>());
        }
      }
    }
    return this.metaLookupPool;
  }

Table

In HTable, an independent pool is also initialized.

// HTable.java

public class HTable implements Table {
  private final ExecutorService pool;

  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
    if (maxThreads == 0) {
      maxThreads = 1; // is there a better default?
    }
    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);

    // Using the "direct handoff" approach, new threads will only be created
    // if it is necessary and will grow unbounded. This could be bad but in HCM
    // we only create as many Runnables as there are region servers. It means
    // it also scales when new region servers are added.
    ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
      TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
    pool.allowCoreThreadTimeOut(true);
    return pool;
  }
}

So far, I found these configurations about threads limit:

  1. hbase.hconnection.threads.max
  2. hbase.hconnection.meta.lookup.threads.max
  3. hbase.htable.threads.max
  4. hbase.htable.threads.coresize

I tuned them, but nothing improved.

It strikes me RpcClient is also involved during A Lifecycle of HBase’s Put: Client-side.

After a quick look at AbstractRpcClient, I finally found the key configuration, which is misused in our code, hbase.client.ipc.pool.size.

// AbstractRpcClient.java

public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
  protected final PoolMap<ConnectionId, T> connections;

  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
      MetricsConnection metrics) {
       this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
  }
  private static int getPoolSize(Configuration config) {
    return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
  }
}