An issue, excessive threads of HBase client, annoyed me for a quite while. It reminds me to penetrate into uses of Executorservice
in HBase client. That’s the origin of this blog.
Remember how to put some data into HBase?
val conf = HBaseConfiguration.create()
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(TableName.valueOf(Bytes.toBytes(tableName))
val p = new Put(Bytes.toBytes("rowId"))
p.addImmutable(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"))
table.put(p)
table.close()
conn.close()
Two classes is involved during this process,
Connection
Table
Then, let’s see where Executorservice
is.
Connection
ConnectionImplementation
, the child of Connection
has two private fields.
// ConnectionImplementation.java
class ConnectionImplementation implements ClusterConnection, Closeable {
private volatile ExecutorService batchPool = null;
private volatile ExecutorService metaLookupPool = null;
}
batchPool
is an thread executor shared by all Table instances created by this connection, and metaLookupPool
, a meta thread executor shared by all Table instances created by this connection.
My question is which configuration decides the size of these pool? As the code says,
// ConnectionImplementation.java
// for batch pool
private ExecutorService getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
int threads = conf.getInt("hbase.hconnection.threads.max", 256);
this.batchPool = getThreadPool(threads, threads, "-shared", null);
this.cleanupPool = true;
}
}
}
return this.batchPool;
}
// for meta pool
private ExecutorService getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
//Some of the threads would be used for meta replicas
//To start with, threads.max.core threads can hit the meta (including replicas).
//After that, requests will get queued up in the passed queue, and only after
//the queue is full, a new thread will be started
int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
this.metaLookupPool = getThreadPool(
threads,
threads,
"-metaLookup-shared-", new LinkedBlockingQueue<>());
}
}
}
return this.metaLookupPool;
}
Table
In HTable
, an independent pool is also initialized.
// HTable.java
public class HTable implements Table {
private final ExecutorService pool;
public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
if (maxThreads == 0) {
maxThreads = 1; // is there a better default?
}
int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
// Using the "direct handoff" approach, new threads will only be created
// if it is necessary and will grow unbounded. This could be bad but in HCM
// we only create as many Runnables as there are region servers. It means
// it also scales when new region servers are added.
ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime,
TimeUnit.SECONDS, new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable"));
pool.allowCoreThreadTimeOut(true);
return pool;
}
}
So far, I found these configurations about threads limit:
hbase.hconnection.threads.max
hbase.hconnection.meta.lookup.threads.max
hbase.htable.threads.max
hbase.htable.threads.coresize
I tuned them, but nothing improved.
It strikes me RpcClient
is also involved during A Lifecycle of HBase’s Put: Client-side.
After a quick look at AbstractRpcClient
, I finally found the key configuration, which is misused in our code, hbase.client.ipc.pool.size
.
// AbstractRpcClient.java
public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
protected final PoolMap<ConnectionId, T> connections;
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
MetricsConnection metrics) {
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
}
private static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
}