feat(gax): implement Scotty Resumable Upload protocol#13423
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the Scotty Resumable Upload Protocol (RUP) for Java into gax-java and gax-httpjson, adding new callable wrappers, request/response models, and a state machine for managing resumable uploads. It also updates the hermetic build generation tools to support unversioned and proto-only libraries. Feedback on the implementation highlights several critical issues: potential resource leaks with unclosed streams and readers, a partial skip bug with InputStream.skip, a potential ClassCastException when handling multi-valued headers, and a failure to handle finalized uploads during recovery. Additionally, the reviewer recommended letting InterruptedException bubble up to prevent rapid spinning, clamping timeout values to prevent integer overflow, and adding randomized jitter to the backoff algorithm.
| for (Map.Entry<String, Object> entry : requestHeaders.getHeaders().entrySet()) { | ||
| String key = entry.getKey(); | ||
| String value = (String) entry.getValue(); | ||
|
|
||
| // Prefix metadata headers to prevent collision with physical request metadata | ||
| if (isMetadataHeaderDenylisted(key)) { | ||
| httpRequest.getHeaders().set("X-Goog-Upload-Header-" + key, value); | ||
| } else { | ||
| httpRequest.getHeaders().set(key, value); | ||
| } | ||
| } |
There was a problem hiding this comment.
Casting the header value directly to String will throw a ClassCastException at runtime if any of the headers are multi-valued (which are represented as List<String> in HttpJsonMetadata). Since HttpHeaders.set natively accepts Object (including List and other types), we should avoid casting and pass the value directly.
| for (Map.Entry<String, Object> entry : requestHeaders.getHeaders().entrySet()) { | |
| String key = entry.getKey(); | |
| String value = (String) entry.getValue(); | |
| // Prefix metadata headers to prevent collision with physical request metadata | |
| if (isMetadataHeaderDenylisted(key)) { | |
| httpRequest.getHeaders().set("X-Goog-Upload-Header-" + key, value); | |
| } else { | |
| httpRequest.getHeaders().set(key, value); | |
| } | |
| } | |
| for (Map.Entry<String, Object> entry : requestHeaders.getHeaders().entrySet()) { | |
| String key = entry.getKey(); | |
| Object value = entry.getValue(); | |
| // Prefix metadata headers to prevent collision with physical request metadata | |
| if (isMetadataHeaderDenylisted(key)) { | |
| httpRequest.getHeaders().set("X-Goog-Upload-Header-" + key, value); | |
| } else { | |
| httpRequest.getHeaders().set(key, value); | |
| } | |
| } |
| private ResponseT transmitRemaining(String uploadUrl, long offset, Instant deadline) throws Exception { | ||
| HttpRequestFactory requestFactory = getRequestFactory(); | ||
|
|
||
| InputStream stream = uploadRequest.getStreamProvider().get(); | ||
| if (offset > 0) { | ||
| long skipped = stream.skip(offset); | ||
| if (skipped < offset) { | ||
| throw new IOException("Failed to skip stream bytes to offset: " + offset); | ||
| } | ||
| } | ||
|
|
||
| // Wrap the stream in custom HttpContent that updates the progress listener | ||
| HttpContent payload = new ProgressReportingHttpContent( | ||
| stream, uploadRequest.getTotalBytes(), offset, uploadRequest.getProgressListener()); | ||
|
|
||
| GenericUrl url = new GenericUrl(uploadUrl); | ||
| HttpRequest httpRequest = requestFactory.buildPostRequest(url, payload); | ||
| configureTimeouts(httpRequest, deadline); | ||
|
|
||
| httpRequest.getHeaders().set("X-Goog-Upload-Command", "upload, finalize"); | ||
| httpRequest.getHeaders().set("X-Goog-Upload-Offset", String.valueOf(offset)); | ||
|
|
||
| updateProgress(offset, ResumableUploadProgressListener.State.IN_PROGRESS); | ||
|
|
||
| HttpResponse response = null; | ||
| try { | ||
| response = httpRequest.execute(); | ||
| String status = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Status"); | ||
|
|
||
| if (!"final".equalsIgnoreCase(status)) { | ||
| throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) | ||
| .setMessage("Resumable upload failed: Status is not final") | ||
| .build(); | ||
| } | ||
|
|
||
| InputStreamReader reader = new InputStreamReader(response.getContent(), StandardCharsets.UTF_8); | ||
| ResponseT parsedResponse = methodDescriptor.getResponseParser().parse(reader, callOptions.getTypeRegistry()); | ||
|
|
||
| updateProgress(uploadRequest.getTotalBytes() > 0 ? uploadRequest.getTotalBytes() : offset, | ||
| ResumableUploadProgressListener.State.COMPLETED); | ||
|
|
||
| return parsedResponse; | ||
| } finally { | ||
| if (response != null) { | ||
| response.disconnect(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This method has three critical issues:
- Resource Leak: The
InputStreamretrieved fromuploadRequest.getStreamProvider().get()is never closed, leading to potential file descriptor or socket leaks. - Resource Leak: The
InputStreamReaderused to parse the response is never closed. - Partial Skip Bug:
InputStream.skip(long)is not guaranteed to skip the entire requested offset in a single call. It can return fewer bytes than requested even if more bytes are available. We should use Guava'sByteStreams.skipFullyto guarantee the correct offset is reached.
Using try-with-resources for both the stream and the reader, and utilizing ByteStreams.skipFully, resolves all of these issues cleanly.
private ResponseT transmitRemaining(String uploadUrl, long offset, Instant deadline) throws Exception {
HttpRequestFactory requestFactory = getRequestFactory();
try (InputStream stream = uploadRequest.getStreamProvider().get()) {
if (offset > 0) {
com.google.common.io.ByteStreams.skipFully(stream, offset);
}
// Wrap the stream in custom HttpContent that updates the progress listener
HttpContent payload = new ProgressReportingHttpContent(
stream, uploadRequest.getTotalBytes(), offset, uploadRequest.getProgressListener());
GenericUrl url = new GenericUrl(uploadUrl);
HttpRequest httpRequest = requestFactory.buildPostRequest(url, payload);
configureTimeouts(httpRequest, deadline);
httpRequest.getHeaders().set("X-Goog-Upload-Command", "upload, finalize");
httpRequest.getHeaders().set("X-Goog-Upload-Offset", String.valueOf(offset));
updateProgress(offset, ResumableUploadProgressListener.State.IN_PROGRESS);
HttpResponse response = null;
try {
response = httpRequest.execute();
String status = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Status");
if (!"final".equalsIgnoreCase(status)) {
throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders())
.setMessage("Resumable upload failed: Status is not final")
.build();
}
try (InputStreamReader reader = new InputStreamReader(response.getContent(), StandardCharsets.UTF_8)) {
ResponseT parsedResponse = methodDescriptor.getResponseParser().parse(reader, callOptions.getTypeRegistry());
updateProgress(uploadRequest.getTotalBytes() > 0 ? uploadRequest.getTotalBytes() : offset,
ResumableUploadProgressListener.State.COMPLETED);
return parsedResponse;
}
} finally {
if (response != null) {
response.disconnect();
}
}
}
}| String status = response.getHeaders().getFirstHeaderStringValue("X-Goog-Upload-Status"); | ||
|
|
||
| if ("final".equalsIgnoreCase(status)) { | ||
| // Already finalized, query command behaves like final | ||
| throw new HttpResponseException.Builder(response.getStatusCode(), response.getStatusMessage(), response.getHeaders()) | ||
| .setMessage("Query returned final status. Re-executing state machine...") | ||
| .build(); | ||
| } |
There was a problem hiding this comment.
If the connection drops on the final chunk, the server may successfully receive and finalize the upload, but the client will not receive the successful response. When the client triggers recovery by calling recoverOffset, the server will return final status to the query command.
Currently, recoverOffset throws a fatal HttpResponseException when the status is final, which terminates the state machine and reports the upload as failed, even though it succeeded on the server. Instead, we should parse the response body of the query response and return it as the final result.
To implement this cleanly without throwing exceptions for control flow, we can introduce a RecoveryResult wrapper:
private static final class RecoveryResult<ResponseT> {
private final long offset;
private final ResponseT response;
private final boolean finalized;
private RecoveryResult(long offset, ResponseT response, boolean finalized) {
this.offset = offset;
this.response = response;
this.finalized = finalized;
}
static <ResponseT> RecoveryResult<ResponseT> active(long offset) {
return new RecoveryResult<>(-1, response, false);
}
static <ResponseT> RecoveryResult<ResponseT> finalized(ResponseT response) {
return new RecoveryResult<>(-1, response, true);
}
}Then update recoverOffset to return RecoveryResult<ResponseT> and handle the final status by parsing the response, and update the state machine loop to return the parsed response if recovery.finalized is true.
| private void sleep(long ms) { | ||
| try { | ||
| Thread.sleep(ms); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Catching InterruptedException and only resetting the interrupt flag without throwing or returning causes the state machine loop to continue spinning rapidly when the thread is cancelled or interrupted. We should let InterruptedException bubble up so that the state machine aborts immediately.
private void sleep(long ms) throws InterruptedException {
Thread.sleep(ms);
}References
- In Java, do not swallow InterruptedException. When catching it, restore the thread's interrupted status by calling Thread.currentThread().interrupt() and handle the interruption appropriately, such as by throwing a relevant exception (e.g., SpannerException) to signal that the operation cannot proceed.
| private void configureTimeouts(HttpRequest request, Instant deadline) { | ||
| long remainingMs = Duration.between(Instant.now(), deadline).toMillis(); | ||
| if (remainingMs <= 0) { | ||
| remainingMs = 1; // force timeout | ||
| } | ||
| request.setConnectTimeout((int) remainingMs); | ||
| request.setReadTimeout((int) remainingMs); | ||
| } |
There was a problem hiding this comment.
Casting remainingMs directly to int can cause integer overflow if the configured deadline is extremely long (greater than ~24.8 days), resulting in negative or incorrect timeouts. We should clamp the value to Integer.MAX_VALUE before casting.
| private void configureTimeouts(HttpRequest request, Instant deadline) { | |
| long remainingMs = Duration.between(Instant.now(), deadline).toMillis(); | |
| if (remainingMs <= 0) { | |
| remainingMs = 1; // force timeout | |
| } | |
| request.setConnectTimeout((int) remainingMs); | |
| request.setReadTimeout((int) remainingMs); | |
| } | |
| private void configureTimeouts(HttpRequest request, Instant deadline) { | |
| long remainingMs = Duration.between(Instant.now(), deadline).toMillis(); | |
| if (remainingMs <= 0) { | |
| remainingMs = 1; // force timeout | |
| } | |
| int timeoutMs = (int) Math.min(remainingMs, Integer.MAX_VALUE); | |
| request.setConnectTimeout(timeoutMs); | |
| request.setReadTimeout(timeoutMs); | |
| } |
| private long calculateBackoff(int attempt) { | ||
| long baseDelay = 500; // 500ms | ||
| long maxDelay = 30000; // 30s | ||
| long delay = (long) (baseDelay * Math.pow(2, attempt)); | ||
| return Math.min(delay, maxDelay); | ||
| } |
There was a problem hiding this comment.
The exponential backoff algorithm lacks randomized jitter. Without jitter, multiple clients retrying after a transient failure or outage will synchronize and retry at the exact same intervals, leading to thundering herd problems. Adding a small randomized jitter is a standard best practice.
| private long calculateBackoff(int attempt) { | |
| long baseDelay = 500; // 500ms | |
| long maxDelay = 30000; // 30s | |
| long delay = (long) (baseDelay * Math.pow(2, attempt)); | |
| return Math.min(delay, maxDelay); | |
| } | |
| private long calculateBackoff(int attempt) { | |
| long baseDelay = 500; // 500ms | |
| long maxDelay = 30000; // 30s | |
| long delay = (long) (baseDelay * Math.pow(2, attempt)); | |
| long jitter = java.util.concurrent.ThreadLocalRandom.current().nextLong(0, 250); | |
| return Math.min(delay + jitter, maxDelay); | |
| } |
Draft PR implementing the Scotty Resumable Upload protocol in GAX. Includes design documentation in SCOTTY_DESIGN.md and full unit tests.