Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java-bigquery-jdbc/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target-it/**
**/*logs*/**
**/ITBigQueryJDBCLocalTest.java
**/BigQueryStatementE2EBenchmark.java
.agents/

tools/**/*.class
tools/**/drivers/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -209,6 +210,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
Boolean reqGoogleDriveScope;
private final Properties clientInfo = new Properties();
private boolean isReadOnlyTokenUsed = false;
private final ExecutorService metadataExecutor;
private final ExecutorService queryExecutor;

BigQueryConnection(String url) throws IOException {
this(url, DataSource.fromUrl(url));
Expand Down Expand Up @@ -344,6 +347,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {

this.headerProvider = createHeaderProvider();
this.bigQuery = getBigQueryConnection();
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
}
}

Expand Down Expand Up @@ -937,23 +942,91 @@ public void close() throws SQLException {
}

private void closeImpl() throws SQLException {
SQLException exceptionToThrow = null;
try {
if (this.bigQueryReadClient != null) {
this.bigQueryReadClient.shutdown();
this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES);
this.bigQueryReadClient.close();
}

if (this.bigQueryWriteClient != null) {
this.bigQueryWriteClient.shutdown();
this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES);
this.bigQueryWriteClient.close();
}
if (this.metadataExecutor != null) {
Comment thread
Neenu1995 marked this conversation as resolved.
this.metadataExecutor.shutdown();
}
if (this.queryExecutor != null) {
this.queryExecutor.shutdown();
}

for (Statement statement : this.openStatements) {
statement.close();
try {
statement.close();
} catch (SQLException e) {
if (exceptionToThrow == null) {
exceptionToThrow = e;
} else {
exceptionToThrow.addSuppressed(e);
}
}
}
this.openStatements.clear();

boolean interrupted = Thread.currentThread().isInterrupted();

try {
if (this.bigQueryReadClient != null) {
if (interrupted) {
this.bigQueryReadClient.shutdownNow();
} else {
this.bigQueryReadClient.awaitTermination(1, TimeUnit.MINUTES);
}
}
if (this.bigQueryWriteClient != null) {
if (interrupted) {
this.bigQueryWriteClient.shutdownNow();
} else {
this.bigQueryWriteClient.awaitTermination(1, TimeUnit.MINUTES);
}
}
if (this.metadataExecutor != null) {
if (interrupted || !this.metadataExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
this.metadataExecutor.shutdownNow();
}
}
if (this.queryExecutor != null) {
if (interrupted || !this.queryExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
this.queryExecutor.shutdownNow();
}
}
} catch (InterruptedException e) {
interrupted = true;
if (this.bigQueryReadClient != null) {
this.bigQueryReadClient.shutdownNow();
}
if (this.bigQueryWriteClient != null) {
this.bigQueryWriteClient.shutdownNow();
}
if (this.metadataExecutor != null) {
this.metadataExecutor.shutdownNow();
}
if (this.queryExecutor != null) {
this.queryExecutor.shutdownNow();
}
} finally {
try {
if (this.bigQueryReadClient != null) {
this.bigQueryReadClient.close();
}
} finally {
if (this.bigQueryWriteClient != null) {
this.bigQueryWriteClient.close();
}
}
}

if (interrupted) {
Thread.currentThread().interrupt();
throw new InterruptedException("Interrupted awaiting executor termination");
}
Comment thread
Neenu1995 marked this conversation as resolved.
} catch (ConcurrentModificationException ex) {
throw new BigQueryJdbcException("Concurrent modification during close", ex);
} catch (InterruptedException e) {
Expand All @@ -962,9 +1035,20 @@ private void closeImpl() throws SQLException {
BigQueryJdbcMdc.clear();
BigQueryJdbcRootLogger.closeConnectionHandler(this.connectionId);
}
if (exceptionToThrow != null) {
throw exceptionToThrow;
}
this.isClosed = true;
}

ExecutorService getExecutorService() {
return this.queryExecutor;
}

ExecutorService getMetadataExecutor() {
return this.metadataExecutor;
}

@Override
public boolean isClosed() {
return this.isClosed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/** Lightweight MDC implementation for the BigQuery JDBC driver using InheritableThreadLocal. */
class BigQueryJdbcMdc {
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger(BigQueryJdbcMdc.class.getName());

private static final InheritableThreadLocal<String> currentConnectionId =
new InheritableThreadLocal<>();

Expand Down Expand Up @@ -73,6 +77,28 @@ static ExecutorService newFixedThreadPool(int nThreads) {
return newFixedThreadPool(nThreads, Executors.defaultThreadFactory());
}

/**
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
* context from the submitting thread to the executing thread.
*/
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new MdcThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new java.util.concurrent.SynchronousQueue<>(),
new MdcThreadFactory(threadFactory));
}

/**
* Creates a new cached thread pool ExecutorService that automatically propagates MDC connection
* context from the submitting thread to the executing thread.
*/
static ExecutorService newCachedThreadPool() {
return newCachedThreadPool(Executors.defaultThreadFactory());
}

private static class MdcThreadFactory implements ThreadFactory {
private final ThreadFactory delegate;

Expand All @@ -82,11 +108,16 @@ public MdcThreadFactory(ThreadFactory delegate) {

@Override
public Thread newThread(Runnable r) {
return delegate.newThread(
() -> {
clear();
r.run();
});
Thread t =
delegate.newThread(
() -> {
clear();
r.run();
});
if (t != null) {
t.setDaemon(true);
}
return t;
}
}

Expand All @@ -102,11 +133,37 @@ public MdcThreadPoolExecutor(
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

private final AtomicBoolean warningLogged = new AtomicBoolean(false);

private void monitorQueueSaturation(int queueSize) {
int corePoolSize = getCorePoolSize();
// Warn when queue size is >= corePoolSize * 5, with a minimum of 10 tasks to avoid false
// alerts for tiny pools
int warnThreshold = Math.max(10, corePoolSize * 5);
// Recovery reset threshold is corePoolSize * 2, with a minimum of 4 tasks
int recoveryThreshold = Math.max(4, corePoolSize * 2);

if (queueSize >= warnThreshold) {
if (warningLogged.compareAndSet(false, true)) {
LOG.warning(
"Thread pool is saturating. Core pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
corePoolSize, getActiveCount(), queueSize);
}
} else if (queueSize <= recoveryThreshold) {
if (warningLogged.get()) {
warningLogged.set(false);
}
}
Comment thread
Neenu1995 marked this conversation as resolved.
}

@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}

monitorQueueSaturation(getQueue().size());

if (command instanceof MdcFutureTask) {
super.execute(command);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,4 +1387,17 @@ private static void validateNonNegative(long val, String propertyName) {
"Invalid value for %s. It must be greater than or equal to 0.", propertyName));
}
}

/**
* Validates that a property value is greater than or equal to a minimum threshold. For thread
* pools, a minimum of 2 is enforced to ensure there are enough threads to handle concurrent
* coordination and avoid deadlock or thread starvation.
*/
private static void validateMin(long val, long min, String propertyName) {
if (val < min) {
throw new BigQueryJdbcRuntimeException(
String.format(
"Invalid value for %s. It must be greater than or equal to %d.", propertyName, min));
}
}
}
Loading
Loading