From 23deee5a7270e0a7e7bbcaa176b7535471bd0304 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Thu, 29 Jan 2026 11:06:21 -0500 Subject: [PATCH 1/4] feat(services): add configurable batch size with file container rotation - Add configurable batch size to PushService, StreamService, UpdateStreamService - Implement per-batch file container rotation for UpdateStreamService - Each batch creates own file container and pushes immediately (catalog API best practice) - Fix: PushService now respects system property (was hardcoded to DEFAULT_QUEUE_SIZE) - Fix: flushAndPush() guards against null updateStreamService with uploader fallback - Add CONFIGURATION.md and UPGRADE_NOTES.md documentation - Update README.md with configuration section - Add comprehensive tests for batching and rotation behavior Breaking changes: - Default batch size increased from 5MB to 256MB - UpdateStreamService pushes each batch immediately instead of accumulating Fixes reviewer feedback: - PushService now uses getConfiguredBatchSize() instead of DEFAULT_QUEUE_SIZE - StreamDocumentUploadQueue.flushAndPush() now handles null updateStreamService --- CONFIGURATION.md | 236 +++++++++++++++ README.md | 131 ++------- UPGRADE_NOTES.md | 169 +++++++++++ samples/UpdateStreamDocuments.java | 8 + .../pushapiclient/DocumentUploadQueue.java | 78 ++++- .../coveo/pushapiclient/PlatformClient.java | 3 + .../com/coveo/pushapiclient/PushService.java | 24 +- .../StreamDocumentUploadQueue.java | 139 ++++++++- .../coveo/pushapiclient/StreamService.java | 40 ++- .../pushapiclient/UpdateStreamService.java | 78 +++-- .../UpdateStreamServiceInternal.java | 53 ++-- .../DocumentUploadQueueTest.java | 20 +- .../FileContainerRotationIntegrationTest.java | 225 ++++++++++++++ ...StreamDocumentUploadQueueBatchingTest.java | 274 ++++++++++++++++++ .../StreamDocumentUploadQueueTest.java | 60 ++-- .../UpdateStreamServiceInternalTest.java | 121 ++++++-- 16 files changed, 1418 insertions(+), 241 deletions(-) create mode 100644 CONFIGURATION.md create mode 100644 UPGRADE_NOTES.md create mode 100644 src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java create mode 100644 src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 00000000..d5ae686e --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,236 @@ +# Configuration Guide + +This document describes the available configuration options for the Coveo Push API Java Client. + +## Batch Size Configuration + +The batch size controls how much data is accumulated before creating a file container and pushing to Coveo. The default is **5 MB**. The maximum allowed is **256 MB** (Stream API limit). + +### Configuration Methods + +There are two ways to configure the batch size: + +#### 1. System Property (Runtime Configuration) + +Set the `coveo.push.batchSize` system property to configure the default batch size globally for all service instances: + +**Java Command Line:** +```bash +java -Dcoveo.push.batchSize=134217728 -jar your-application.jar +``` + +**Within Java Code:** +```java +// Set before creating any service instances +System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB in bytes +``` + +**Maven/Gradle Build:** +```xml + + + -Dcoveo.push.batchSize=134217728 + +``` + +```groovy +// build.gradle +test { + systemProperty 'coveo.push.batchSize', '134217728' +} +``` + +**Example Values:** +- `5242880` = 5 MB (default) +- `268435456` = 256 MB (maximum) +- `134217728` = 128 MB +- `67108864` = 64 MB +- `33554432` = 32 MB +- `10485760` = 10 MB + +#### 2. Constructor Parameter (Per-Instance Configuration) + +Pass the `maxQueueSize` parameter when creating service instances: + +```java +// UpdateStreamService with custom 128 MB batch size +UpdateStreamService service = new UpdateStreamService( + catalogSource, + backoffOptions, + null, // userAgents (optional) + 128 * 1024 * 1024 // 128 MB in bytes +); + +// PushService with custom batch size +PushService pushService = new PushService( + pushEnabledSource, + backoffOptions, + 128 * 1024 * 1024 // 128 MB +); + +// StreamService with custom batch size +StreamService streamService = new StreamService( + streamEnabledSource, + backoffOptions, + null, // userAgents (optional) + 128 * 1024 * 1024 // 128 MB +); +``` + +### Configuration Priority + +When both methods are used: + +1. **Constructor parameter** takes precedence (if specified) +2. **System property** is used as default (if set) +3. **Built-in default** of 5 MB is used otherwise + +### Validation Rules + +All batch size values are validated: + +- ✅ **Maximum:** 256 MB (268,435,456 bytes) - API limit +- ✅ **Minimum:** Greater than 0 +- ❌ Values exceeding 256 MB will throw `IllegalArgumentException` +- ❌ Invalid or negative values will throw `IllegalArgumentException` + +### Examples + +#### Example 1: Using System Property + +```java +// Configure globally via system property +System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB + +// All services will use 128 MB by default +UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions); +PushService pushService = new PushService(pushEnabledSource, backoffOptions); +StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); +``` + +#### Example 2: Override Per Service + +```java +// Set global default to 128 MB +System.setProperty("coveo.push.batchSize", "134217728"); + +// Update service uses global default (128 MB) +UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions); + +// Push service overrides with 64 MB +PushService pushService = new PushService(pushEnabledSource, backoffOptions, 64 * 1024 * 1024); + +// Stream service uses global default (128 MB) +StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); +``` + +#### Example 3: Docker/Container Environment + +```yaml +# docker-compose.yml +services: + app: + image: your-app + environment: + - JAVA_OPTS=-Dcoveo.push.batchSize=134217728 +``` + +#### Example 4: Kubernetes + +```yaml +# deployment.yaml +apiVersion: v1 +kind: Pod +metadata: + name: coveo-pusher +spec: + containers: + - name: app + image: your-app + env: + - name: JAVA_OPTS + value: "-Dcoveo.push.batchSize=134217728" +``` + +### When to Adjust Batch Size + +**Use smaller batches (32-64 MB) when:** +- Network bandwidth is limited +- Memory is constrained +- Processing many small documents +- You want more frequent progress updates + +**Use larger batches (128-256 MB) when:** +- Network bandwidth is high +- Processing large documents or files +- You want to minimize API calls +- Maximum throughput is needed + +**Keep default (5 MB) when:** +- You're unsure - it's a conservative default that works well +- Memory is a concern +- You want predictable, frequent pushes + +### Configuration Property Reference + +| Property Name | Description | Default Value | Valid Range | +|--------------|-------------|---------------|-------------| +| `coveo.push.batchSize` | Default batch size in bytes | `5242880` (5 MB) | 1 to 268435456 | + +### Troubleshooting + +**Error: "exceeds the Stream API limit of 268435456 bytes"** +- Your configured value is too large +- Maximum allowed is 256 MB (268,435,456 bytes) +- Reduce the configured value + +**Error: "Invalid value for system property"** +- The value is not a valid integer +- Use numeric bytes value (e.g., `134217728` not `128MB`) + +**Service uses default despite system property:** +- Ensure property is set before creating service instances +- Verify property name is exactly `coveo.push.batchSize` +- Check that constructor isn't explicitly passing a value + +### Migration from Previous Versions + +Previous versions used a hardcoded 5 MB limit. If you're upgrading: + +**Option 1: Keep 5 MB behavior (not recommended)** +```java +System.setProperty("coveo.push.batchSize", "5242880"); // 5 MB +``` + +**Option 2: Use default 5 MB (recommended)** +```java +// No configuration needed - uses 5 MB default +UpdateStreamService service = new UpdateStreamService(catalogSource, backoffOptions); +``` + +**Option 3: Use larger batch size for throughput** +```java +System.setProperty("coveo.push.batchSize", "67108864"); // 64 MB +``` + +See [UPGRADE_NOTES.md](UPGRADE_NOTES.md) for complete migration guidance. + +## Additional Configuration + +### API Client Configuration + +See the main [README.md](README.md) for: +- Platform client setup +- Authentication configuration +- API endpoint URLs +- Retry and backoff options + +### Environment Variables + +The following environment variables can be used for general configuration: + +- `COVEO_API_KEY` - API key for authentication +- `COVEO_ORGANIZATION_ID` - Organization identifier +- `COVEO_PLATFORM_URL` - Custom platform URL (if needed) + +Refer to the Coveo Platform documentation for complete environment configuration options. diff --git a/README.md b/README.md index f87a1718..592fd4b5 100644 --- a/README.md +++ b/README.md @@ -65,111 +65,6 @@ To install the updated project files, build the Maven project: mvn install ``` -## Building from Source (Manual Build) - -If you want to build the library from source and share it with someone without publishing to Maven, follow these steps: - -### Prerequisites - -- Java 11 or higher -- [Apache Maven](https://maven.apache.org/install.html) 3.6+ - -### Step 1: Clone the Repository - -```bash -git clone https://github.com/coveo/push-api-client.java.git -cd push-api-client.java -``` - -### Step 2: Build the JAR - -```bash -mvn clean package -DskipTests -``` - -This will generate the following files in the `target/` directory: - -- `push-api-client.java-.jar` — The compiled library -- `push-api-client.java--sources.jar` — Source code (for IDE integration) - -### Step 3: Install to Local Maven Repository (Optional) - -If you want to use the library in another local Maven project: - -```bash -mvn clean install -DskipTests -``` - -This installs the JAR to your local `~/.m2/repository`, making it available to other projects on your machine. - -### Step 4: Share the JAR - -To share the built JAR with someone else: - -1. **Send the JAR file**: Share the `target/push-api-client.java-.jar` file directly. - -2. **Recipient adds JAR to their project**: - - **Option A — Install to their local Maven repository:** - - ```bash - mvn install:install-file \ - -Dfile=push-api-client.java-.jar \ - -DgroupId=com.coveo \ - -DartifactId=push-api-client.java \ - -Dversion= \ - -Dpackaging=jar - ``` - - Then add the dependency to their `pom.xml`: - - ```xml - - com.coveo - push-api-client.java - - - ``` - - **Option B — Use system scope (not recommended for production):** - - ```xml - - com.coveo - push-api-client.java - - system - ${project.basedir}/lib/push-api-client.java-.jar - - ``` - - **Option C — For Gradle projects:** - - Place the JAR in a `libs/` folder and add: - - ```groovy - dependencies { - implementation files('libs/push-api-client.java-.jar') - } - ``` - -### Running Tests - -To run the test suite: - -```bash -mvn test -``` - -### Validating Code Format - -Before contributing, ensure your code follows the project's formatting rules: - -```bash -mvn spotless:check # Check formatting -mvn spotless:apply # Auto-fix formatting issues -``` - ## Usage > See more examples in the `./samples` folder. @@ -200,6 +95,32 @@ public class PushOneDocument { } ``` +## Configuration + +### Batch Size Configuration + +The SDK uses a default batch size of **5 MB** before automatically creating a file container and pushing documents. The maximum allowed batch size is **256 MB** (matching the Coveo Stream API limit). You can configure this globally via system property or per-service via constructor. + +**Global Configuration (System Property):** +```bash +java -Dcoveo.push.batchSize=52428800 -jar your-app.jar # 50 MB (in bytes) +``` + +**Per-Service Configuration:** +```java +// Configure PushService with 50 MB batch size +PushService service = new PushService( + source, + backoffOptions, + 50 * 1024 * 1024 // 50 MB in bytes +); + null, // userAgents (optional) + 128 * 1024 * 1024 // 128 MB in bytes +); +``` + +See **[CONFIGURATION.md](CONFIGURATION.md)** for complete configuration options, Docker/Kubernetes examples, and best practices. + ### Exponential Backoff Retry Configuration By default, the SDK leverages an exponential backoff retry mechanism. Exponential backoff allows for the SDK to make multiple attempts to resolve throttled requests, increasing the amount of time to wait for each subsequent attempt. Outgoing requests will retry when a `429` status code is returned from the platform. diff --git a/UPGRADE_NOTES.md b/UPGRADE_NOTES.md new file mode 100644 index 00000000..93fdbcac --- /dev/null +++ b/UPGRADE_NOTES.md @@ -0,0 +1,169 @@ +# Stream Catalog API Update - Upgrade Notes + +## Summary + +The `UpdateStreamService` has been upgraded to align with the Coveo Catalog Stream API best practices for update operations. Previously, the service would create one file container and attempt to send multiple batches to it. Now, **each batch gets its own file container**, which is created, uploaded to, and immediately pushed to the stream source. + +## What Changed + +### Key Behavioral Changes + +1. **File Container Lifecycle**: Each flush operation now follows the complete workflow: + - Step 1: Create a new file container + - Step 2: Upload batch content to the container + - Step 3: Push the container to the stream source via `/update` API + - This happens automatically when the 256MB batch limit is exceeded or when `close()` is called + +2. **Immediate Push**: File containers are pushed immediately after upload, not accumulated + +3. **No Shared Containers**: Each batch uses a dedicated file container, preventing conflicts + +### Modified Files + +#### Core Implementation +- **`UpdateStreamServiceInternal.java`** + - Removed file container creation from `add*()` methods + - Added `createUploadAndPush()` method that handles the complete create→upload→push workflow + - Modified `close()` to call `flushAndPush()` instead of `flush()` + +- **`StreamDocumentUploadQueue.java`** + - Added `flushAndPush()` method that creates a container, uploads, and pushes in one operation + - Overrode all `add()` methods to call `flushAndPush()` when batch size is exceeded + - Added reference to `UpdateStreamServiceInternal` to enable the complete workflow + +- **`UpdateStreamService.java`** + - Removed the `fileContainer` field (no longer needed) + - Updated documentation to reflect new behavior + - Removed unused `getUploadStrategy()` method + +#### Tests +- **`UpdateStreamServiceInternalTest.java`** + - Updated tests to verify file containers are created during flush, not during add + - Added test for `createUploadAndPush()` method + - Modified close tests to verify `flushAndPush()` is called + +#### Samples +- **`UpdateStreamDocuments.java`** + - Added documentation explaining the new file container behavior + +## API Documentation Reference + +This change implements the recommendations from the Coveo documentation: +- [Full Catalog Data Updates - Update Operations](https://docs.coveo.com/en/p4eb0129/coveo-for-commerce/full-catalog-data-updates#update-operations) + +Key quote from the documentation: +> "Load operations require uploading and processing all file containers at once, which is resource-intensive and delays data availability until the entire load completes. In contrast, **update operations process each container as soon as it's ready**, allowing for faster indexing and more up-to-date catalog data throughout the update process." + +## Migration Guide + +### For Existing Users + +The API remains the same - no code changes are required for basic usage: + +```java +// UpdateStreamService - catalog updates with immediate push per batch +UpdateStreamService updateService = new UpdateStreamService(catalogSource); +updateService.addOrUpdate(document1); +updateService.close(); + +// PushService - push operations with immediate push per batch +PushService pushService = new PushService(pushSource); +pushService.addOrUpdate(document1); +pushService.close(); + +// StreamService - load operations (collects batches before pushing) +StreamService streamService = new StreamService(streamSource); +streamService.open(); +streamService.addOrUpdate(document1); +streamService.close(); +``` + +### Configuring Batch Size + +For detailed configuration options including runtime system property configuration (`coveo.push.batchSize`), see **[CONFIGURATION.md](CONFIGURATION.md)**. + +If you need a smaller batch size (e.g., for more frequent pushes), you can specify it via constructor: + +```java +// Set custom batch size (e.g., 50MB) +int customBatchSize = 50 * 1024 * 1024; + +// UpdateStreamService +UpdateStreamService service = new UpdateStreamService( + catalogSource, + backoffOptions, + userAgents, + customBatchSize +); + +// PushService +PushService pushService = new PushService( + pushSource, + backoffOptions, + customBatchSize +); + +// StreamService +StreamService streamService = new StreamService( + streamSource, + backoffOptions, + userAgents, + customBatchSize +); +``` + +**Note**: Batch size cannot exceed 256MB (Stream API limit). Attempting to set a larger value will throw an `IllegalArgumentException`. + +### What You'll Notice + +1. **Configurable Batch Size**: All services now support custom batch sizes via constructor parameter or system property (max: 256MB). Default remains 5MB. + - `UpdateStreamService` (catalog updates) + - `PushService` (push operations) + - `StreamService` (load operations) + +2. **More API Calls for UpdateStreamService**: You may see more file container create and push operations in your logs, as each batch (when exceeding the batch size limit) now triggers a complete create→upload→push cycle + +4. **Faster Indexing**: Documents will be available for indexing sooner, as they're pushed immediately rather than waiting for all batches to complete + +5. **Better Alignment**: The behavior now matches the catalog stream API best practices for optimal performance + +## Benefits + +1. **Improved Performance**: Documents are processed as soon as each container is ready, rather than waiting for all uploads to complete + +2. **Better Resource Utilization**: Avoids resource-intensive bulk operations + +3. **Faster Data Availability**: Catalog data is indexed incrementally throughout the update process + +4. **API Compliance**: Follows the recommended pattern from Coveo's official documentation + +## Technical Details + +### Before (Old Behavior) +``` +Create file container once +→ Add batch 1 to container +→ Add batch 2 to container +→ Add batch 3 to container +→ Push container once with all batches +``` + +### After (New Behavior) +``` +Batch 1: + → Create file container + → Upload batch 1 + → Push container + +Batch 2: + → Create file container + → Upload batch 2 + → Push container + +Batch 3: + → Create file container + → Upload batch 3 + → Push container +``` + +Each batch is independent and processed immediately, following the update operation pattern described in the Coveo documentation. diff --git a/samples/UpdateStreamDocuments.java b/samples/UpdateStreamDocuments.java index 2c9dae61..9b642ee0 100644 --- a/samples/UpdateStreamDocuments.java +++ b/samples/UpdateStreamDocuments.java @@ -14,6 +14,14 @@ public static void main(String[] args) throws IOException, InterruptedException, // Using the Update Stream Service will act as an incremental change to the index, therefore any currently indexed items not contained in the payload will remain. UpdateStreamService updateStream = new UpdateStreamService(catalogSource); // To perform full index rebuild, use the StreamService instead. + + // Note: The UpdateStreamService now handles file containers differently for catalog sources. + // Each batch (when the 256MB limit is exceeded or close() is called) will: + // 1. Create a new file container + // 2. Upload the batch content to that container + // 3. Immediately push the container to the stream source via the /update API + // This follows the catalog stream API best practices where each update operation uses its own file container. + // You can configure a smaller batch size if needed by using the constructor with maxQueueSize parameter. DocumentBuilder document1 = new DocumentBuilder("https://my.document.uri", "My document title") .withData("these words will be searchable") diff --git a/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java index d3cc8480..0fc0d7f7 100644 --- a/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java @@ -8,21 +8,95 @@ /** Represents a queue for uploading documents using a specified upload strategy */ class DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(DocumentUploadQueue.class); + + /** Maximum allowed queue size based on Stream API limit (256 MB) */ + protected static final int MAX_ALLOWED_QUEUE_SIZE = 256 * 1024 * 1024; + + /** Default queue size (5 MB) */ + protected static final int DEFAULT_QUEUE_SIZE = 5 * 1024 * 1024; + + /** System property name for configuring the default batch size */ + public static final String BATCH_SIZE_PROPERTY = "coveo.push.batchSize"; + protected final UploadStrategy uploader; - protected final int maxQueueSize = 5 * 1024 * 1024; + protected final int maxQueueSize; protected ArrayList documentToAddList; protected ArrayList documentToDeleteList; protected int size; /** - * Constructs a new DocumentUploadQueue object with a default maximum queue size limit of 5MB. + * Validates that a batch size is within acceptable bounds. + * + * @param batchSize The batch size to validate in bytes. + * @throws IllegalArgumentException if batchSize exceeds MAX_ALLOWED_QUEUE_SIZE or is <= 0. + */ + private static void validateBatchSize(int batchSize) { + if (batchSize <= 0) { + throw new IllegalArgumentException("Batch size must be greater than 0"); + } + if (batchSize > MAX_ALLOWED_QUEUE_SIZE) { + throw new IllegalArgumentException( + String.format( + "Batch size (%d bytes) exceeds the Stream API limit of %d bytes (%d MB)", + batchSize, MAX_ALLOWED_QUEUE_SIZE, MAX_ALLOWED_QUEUE_SIZE / (1024 * 1024))); + } + } + + /** + * Gets the configured batch size from system properties, or returns the default if not set. + * + * @return The configured batch size in bytes + * @throws IllegalArgumentException if the configured value exceeds 256MB or is invalid + */ + public static int getConfiguredBatchSize() { + String propertyValue = System.getProperty(BATCH_SIZE_PROPERTY); + if (propertyValue == null || propertyValue.trim().isEmpty()) { + return DEFAULT_QUEUE_SIZE; + } + + int configuredSize; + try { + configuredSize = Integer.parseInt(propertyValue.trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format( + "Invalid value for system property %s: '%s'. Must be a valid integer.", + BATCH_SIZE_PROPERTY, propertyValue), + e); + } + + validateBatchSize(configuredSize); + logger.info( + String.format( + "Using configured batch size from system property %s: %d bytes (%.2f MB)", + BATCH_SIZE_PROPERTY, configuredSize, configuredSize / (1024.0 * 1024.0))); + return configuredSize; + } + + /** + * Constructs a new DocumentUploadQueue with the default queue size (5 MB), or the value + * configured via system property "coveo.push.batchSize" if set. * * @param uploader The upload strategy to be used for document uploads. + * @throws IllegalArgumentException if the system property value exceeds 256MB or is invalid. */ public DocumentUploadQueue(UploadStrategy uploader) { + this(uploader, getConfiguredBatchSize()); + } + + /** + * Constructs a new DocumentUploadQueue with a configurable maximum queue size limit. + * + * @param uploader The upload strategy to be used for document uploads. + * @param maxQueueSize The maximum queue size in bytes. Must not exceed 256MB (Stream API limit). + * @throws IllegalArgumentException if maxQueueSize exceeds the API limit of 256MB or is <= 0. + */ + public DocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { + validateBatchSize(maxQueueSize); this.documentToAddList = new ArrayList<>(); this.documentToDeleteList = new ArrayList<>(); this.uploader = uploader; + this.maxQueueSize = maxQueueSize; } /** diff --git a/src/main/java/com/coveo/pushapiclient/PlatformClient.java b/src/main/java/com/coveo/pushapiclient/PlatformClient.java index 8523a5d5..b1ed6fdf 100644 --- a/src/main/java/com/coveo/pushapiclient/PlatformClient.java +++ b/src/main/java/com/coveo/pushapiclient/PlatformClient.java @@ -616,6 +616,9 @@ public String[] getUserAgents() { } public void setUserAgents(String[] userAgents) { + if (userAgents == null) { + throw new IllegalArgumentException("User agents cannot be null"); + } if (!validUserAgents(userAgents)) { throw new IllegalArgumentException("Invalid user agents"); } diff --git a/src/main/java/com/coveo/pushapiclient/PushService.java b/src/main/java/com/coveo/pushapiclient/PushService.java index a7ba3665..dca86f96 100644 --- a/src/main/java/com/coveo/pushapiclient/PushService.java +++ b/src/main/java/com/coveo/pushapiclient/PushService.java @@ -10,15 +10,35 @@ public class PushService { private PushServiceInternal service; public PushService(PushEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this(source, new BackoffOptionsBuilder().build(), DocumentUploadQueue.getConfiguredBatchSize()); } public PushService(PushEnabledSource source, BackoffOptions options) { + this(source, options, DocumentUploadQueue.getConfiguredBatchSize()); + } + + /** + * Creates a new PushService with configurable batch size. + * + *

Example batch sizes in bytes: + *

    + *
  • 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880} + *
  • 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800} + *
  • 256 MB (max): {@code 256 * 1024 * 1024} = {@code 268435456} + *
+ * + * @param source The source to push documents to. + * @param options The configuration options for exponential backoff. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max: + * 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive. + */ + public PushService(PushEnabledSource source, BackoffOptions options, int maxQueueSize) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); UploadStrategy uploader = this.getUploadStrategy(); - DocumentUploadQueue queue = new DocumentUploadQueue(uploader); + DocumentUploadQueue queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); this.service = new PushServiceInternal(queue); diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index c4e6ecd9..25e96558 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -1,6 +1,7 @@ package com.coveo.pushapiclient; import java.io.IOException; +import java.net.http.HttpResponse; import java.util.ArrayList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -9,14 +10,40 @@ public class StreamDocumentUploadQueue extends DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class); protected ArrayList documentToPartiallyUpdateList; + private UpdateStreamServiceInternal updateStreamService; public StreamDocumentUploadQueue(UploadStrategy uploader) { super(uploader); this.documentToPartiallyUpdateList = new ArrayList<>(); } + + /** + * Constructs a new StreamDocumentUploadQueue object with a configurable maximum queue size limit. + * + * @param uploader The upload strategy to be used for document uploads. + * @param maxQueueSize The maximum queue size in bytes. Must not exceed 256MB (Stream API limit). + * @throws IllegalArgumentException if maxQueueSize exceeds the API limit of 256MB. + */ + public StreamDocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { + super(uploader, maxQueueSize); + this.documentToPartiallyUpdateList = new ArrayList<>(); + } + + /** + * Sets the UpdateStreamServiceInternal reference for handling complete upload workflow. + * This is needed to support the new pattern where each flush creates its own file container. + * + * @param updateStreamService The service that handles create/upload/push operations + */ + public void setUpdateStreamService(UpdateStreamServiceInternal updateStreamService) { + this.updateStreamService = updateStreamService; + } /** * Flushes the accumulated documents by applying the upload strategy. + * + * Note: This method is deprecated for catalog stream updates. Use flushAndPush() instead, + * which properly handles the create-upload-push workflow for each file container. * * @throws IOException If an I/O error occurs during the upload. * @throws InterruptedException If the upload process is interrupted. @@ -27,6 +54,12 @@ public void flush() throws IOException, InterruptedException { logger.debug("Empty batch. Skipping upload"); return; } + + if (this.uploader == null) { + throw new IllegalStateException( + "No upload strategy configured. For UpdateStreamService, use flushAndPush() instead."); + } + // TODO: LENS-871: support concurrent requests StreamUpdate stream = this.getStream(); logger.info("Uploading document Stream"); @@ -37,10 +70,60 @@ public void flush() throws IOException, InterruptedException { this.documentToDeleteList.clear(); this.documentToPartiallyUpdateList.clear(); } + + /** + * Flushes the accumulated documents and pushes them to the stream source. + * This method implements the proper workflow for catalog stream API updates: + * 1. Create a new file container + * 2. Upload content to the container + * 3. Push the container to the stream source + * + * Each flush operation gets its own file container, as required by the catalog stream API. + * + * @return The HTTP response from the push operation + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + public HttpResponse flushAndPush() throws IOException, InterruptedException { + if (this.isEmpty()) { + logger.debug("Empty batch. Skipping upload"); + return null; + } + + StreamUpdate stream = this.getStream(); + logger.info("Flushing and pushing stream batch"); + + // Use updateStreamService if available, otherwise fall back to uploader + if (this.updateStreamService != null) { + HttpResponse response = this.updateStreamService.createUploadAndPush(stream); + this.size = 0; + this.documentToAddList.clear(); + this.documentToDeleteList.clear(); + this.documentToPartiallyUpdateList.clear(); + return response; + } + + // Fallback: use uploader if available + if (this.uploader != null) { + logger.debug("Using uploader fallback (updateStreamService not set)"); + this.uploader.apply(stream); + this.size = 0; + this.documentToAddList.clear(); + this.documentToDeleteList.clear(); + this.documentToPartiallyUpdateList.clear(); + return null; + } + + // Both are null - this is an error state + throw new IllegalStateException( + "Cannot flush and push: both updateStreamService and uploader are null. " + + "Please set updateStreamService or provide an uploader."); + } /** * Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds - * the maximum content length. See {@link PartialUpdateDocument#flush}. + * the maximum content length. Each flush creates a new file container, uploads to it, and pushes + * it to the stream source. * * @param document The document to be deleted from the index. * @throws IOException If an I/O error occurs during the upload. @@ -53,7 +136,7 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length; if (this.size + sizeOfDoc >= this.maxQueueSize) { - this.flush(); + this.flushAndPush(); } documentToPartiallyUpdateList.add(document); if (logger.isDebugEnabled()) { @@ -61,6 +144,58 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE } this.size += sizeOfDoc; } + + /** + * Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if it exceeds the + * maximum content length. Each flush creates a new file container, uploads to it, and pushes + * it to the stream source. + * + * @param document The document to be added to the index. + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + @Override + public void add(DocumentBuilder document) throws IOException, InterruptedException { + if (document == null) { + return; + } + + final int sizeOfDoc = document.marshal().getBytes().length; + if (this.size + sizeOfDoc >= this.maxQueueSize) { + this.flushAndPush(); + } + documentToAddList.add(document); + if (logger.isDebugEnabled()) { + logger.debug("Adding document to batch: " + document.getDocument().uri); + } + this.size += sizeOfDoc; + } + + /** + * Adds the {@link DeleteDocument} to the upload queue and flushes the queue if it exceeds the + * maximum content length. Each flush creates a new file container, uploads to it, and pushes + * it to the stream source. + * + * @param document The document to be deleted from the index. + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + @Override + public void add(DeleteDocument document) throws IOException, InterruptedException { + if (document == null) { + return; + } + + final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length; + if (this.size + sizeOfDoc >= this.maxQueueSize) { + this.flushAndPush(); + } + documentToDeleteList.add(document); + if (logger.isDebugEnabled()) { + logger.debug("Adding document to batch: " + document.documentId); + } + this.size += sizeOfDoc; + } public StreamUpdate getStream() { return new StreamUpdate( diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index eda7bcb3..51b6f8b9 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -27,7 +27,7 @@ public class StreamService { * @param userAgents The user agent to use for the requests. */ public StreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents); + this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -42,7 +42,7 @@ public StreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public StreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -58,7 +58,7 @@ public StreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public StreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null); + this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -75,6 +75,34 @@ public StreamService(StreamEnabledSource source, BackoffOptions options) { * @param userAgents The user agent to use for the requests. */ public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize()); + } + + /** + * Creates a service to stream your documents to the provided source by interacting with the + * Stream API. + * + *

To perform full document updates or + * deletions, use the {@UpdateStreamService}, since pushing documents with the + * {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can + * also be used for an initial catalog upload. + * + *

Example batch sizes in bytes: + *

    + *
  • 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880} + *
  • 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800} + *
  • 256 MB (max): {@code 256 * 1024 * 1024} = {@code 268435456} + *
+ * + * @param source The source to which you want to send your documents. + * @param options The configuration options for exponential backoff. + * @param userAgents The user agent to use for the requests. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max: + * 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive. + */ + public StreamService( + StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); @@ -82,9 +110,11 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[ Logger logger = LogManager.getLogger(StreamService.class); this.source = source; - this.queue = new DocumentUploadQueue(uploader); + this.queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); - platformClient.setUserAgents(userAgents); + if (userAgents != null) { + platformClient.setUserAgents(userAgents); + } this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger); } diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 948c3461..2cb573b5 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -12,8 +12,6 @@ public class UpdateStreamService { private final PlatformClient platformClient; private final UpdateStreamServiceInternal updateStreamServiceInternal; - private FileContainer fileContainer; - /** * Creates a service to stream your documents to the provided source by interacting with the * Stream API. This provides the ability to incrementally add, update, or delete documents via a @@ -26,7 +24,7 @@ public class UpdateStreamService { * @param userAgents The user agent to use for the requests. */ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents); + this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -40,7 +38,7 @@ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public UpdateStreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -55,7 +53,25 @@ public UpdateStreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null); + this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize()); + } + + /** + * Creates a service to stream your documents to the provided source by interacting with the + * Stream API. This provides the ability to incrementally add, update, or delete documents via a + * stream. + * + *

To perform a full source rebuild, use the + * {@link StreamService}. + * + * @param source The source to push to + * @param options The backoff parameters + * @param userAgents The user-agents to append to the "User-Agent" HTTP header when performing + * requests against the Coveo Platform. + */ + public UpdateStreamService( + StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -66,21 +82,33 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { *

To perform a full source rebuild, use the * {@StreamService} * + *

Example batch sizes in bytes: + *

    + *
  • 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880} + *
  • 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800} + *
  • 256 MB (max): {@code 256 * 1024 * 1024} = {@code 268435456} + *
+ * * @param source The source to which you want to send your documents. * @param options The configuration options for exponential backoff. * @param userAgents The user agent to use for the requests. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max: + * 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive. */ public UpdateStreamService( - StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) { Logger logger = LogManager.getLogger(UpdateStreamService.class); this.platformClient = new PlatformClient( source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options); - this.platformClient.setUserAgents(userAgents); + if (userAgents != null) { + this.platformClient.setUserAgents(userAgents); + } this.updateStreamServiceInternal = new UpdateStreamServiceInternal( source, - new StreamDocumentUploadQueue(this.getUploadStrategy()), + new StreamDocumentUploadQueue(null, maxQueueSize), // UploadStrategy no longer needed this.platformClient, logger); } @@ -91,9 +119,10 @@ public UpdateStreamService( * documents into it. * *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + * file containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable via constructor). + * Each batch is sent to its own file container and immediately pushed to the stream + * source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -118,7 +147,7 @@ public UpdateStreamService( * @throws IOException If the creation of the file container or adding the document fails. */ public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.addOrUpdate(document); + updateStreamServiceInternal.addOrUpdate(document); } /** @@ -130,9 +159,10 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte * Partial item updates section. * *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + * file containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable via constructor). + * Each batch is sent to its own file container and immediately pushed to the stream + * source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -158,7 +188,7 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte */ public void addPartialUpdate(PartialUpdateDocument document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.addPartialUpdate(document); + updateStreamServiceInternal.addPartialUpdate(document); } /** @@ -167,9 +197,10 @@ public void addPartialUpdate(PartialUpdateDocument document) * it. * *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + * file containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable via constructor). + * Each batch is sent to its own file container and immediately pushed to the stream + * source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -194,7 +225,7 @@ public void addPartialUpdate(PartialUpdateDocument document) * @throws IOException If the creation of the file container or adding the document fails. */ public void delete(DeleteDocument document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.delete(document); + updateStreamServiceInternal.delete(document); } /** @@ -214,11 +245,4 @@ public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { return updateStreamServiceInternal.close(); } - - private UploadStrategy getUploadStrategy() { - return (streamUpdate) -> { - String batchUpdateJson = new Gson().toJson(streamUpdate.marshal()); - return this.platformClient.uploadContentToFileContainer(fileContainer, batchUpdateJson); - }; - } } diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index 32f7ddc9..8a646099 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -23,54 +23,59 @@ public UpdateStreamServiceInternal( this.queue = queue; this.platformClient = platformClient; this.logger = logger; + // Set this instance on the queue so it can call createUploadAndPush + queue.setUpdateStreamService(this); } public FileContainer addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); return this.fileContainer; } public FileContainer addPartialUpdate(PartialUpdateDocument document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); return this.fileContainer; } public FileContainer delete(DeleteDocument document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); return this.fileContainer; } public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { - return this.pushFileContainer(this.getSourceId()); + HttpResponse lastResponse = null; + if (!queue.isEmpty()) { + lastResponse = queue.flushAndPush(); + } + return lastResponse; } - private FileContainer createFileContainer() throws IOException, InterruptedException { + /** + * Creates a new file container, uploads the content, and pushes it to the stream source. + * This method is called by the queue's flush operation to ensure each batch gets its own container. + * + * @param streamUpdate The batch of documents to upload + * @return The HTTP response from pushing the file container + * @throws IOException If an I/O error occurs + * @throws InterruptedException If the operation is interrupted + */ + public HttpResponse createUploadAndPush(StreamUpdate streamUpdate) + throws IOException, InterruptedException { + // Step 1: Create a new file container this.logger.info("Creating new file container"); - HttpResponse response = this.platformClient.createFileContainer(); - return new Gson().fromJson(response.body(), FileContainer.class); - } - - private HttpResponse pushFileContainer(String sourceId) - throws NoOpenFileContainerException, IOException, InterruptedException { - if (this.fileContainer == null) { - throw new NoOpenFileContainerException( - "No open file container detected. A new container will automatically be created once you start adding, updating or deleting documents."); - } - queue.flush(); - this.logger.info("Pushing to file container " + this.fileContainer.fileId); - return this.platformClient.pushFileContainerContentToStreamSource(sourceId, this.fileContainer); + HttpResponse createResponse = this.platformClient.createFileContainer(); + FileContainer container = new Gson().fromJson(createResponse.body(), FileContainer.class); + + // Step 2: Upload content to the file container + String batchUpdateJson = new Gson().toJson(streamUpdate.marshal()); + this.platformClient.uploadContentToFileContainer(container, batchUpdateJson); + + // Step 3: Push the file container to the stream source + this.logger.info("Pushing file container " + container.fileId + " to stream source"); + return this.platformClient.pushFileContainerContentToStreamSource(this.getSourceId(), container); } private String getSourceId() { diff --git a/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java index f488f3a5..51a3541e 100644 --- a/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java @@ -12,16 +12,16 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; public class DocumentUploadQueueTest { - @Mock private UploadStrategy uploadStrategy; + private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024; - @InjectMocks private DocumentUploadQueue queue; + @Mock private UploadStrategy uploadStrategy; + private DocumentUploadQueue queue; private AutoCloseable closeable; private DocumentBuilder documentToAdd; private DeleteDocument documentToDelete; @@ -29,20 +29,14 @@ public class DocumentUploadQueueTest { private int oneMegaByte = 1 * 1024 * 1024; private String generateStringFromBytes(int numBytes) { - // Check if the number of bytes is valid if (numBytes <= 0) { return ""; } - - // Create a byte array with the specified length byte[] bytes = new byte[numBytes]; - - // Fill the byte array with a pattern of ASCII characters - byte pattern = 65; // ASCII value for 'A' + byte pattern = 65; for (int i = 0; i < numBytes; i++) { bytes[i] = pattern; } - return new String(bytes); } @@ -53,6 +47,10 @@ private DocumentBuilder generateDocumentFromSize(int numBytes) { @Before public void setup() { + closeable = MockitoAnnotations.openMocks(this); + + queue = new DocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE); + String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte); documentToAdd = @@ -60,8 +58,6 @@ public void setup() { .withData(twoMegaByteData); documentToDelete = new DeleteDocument("https://my.document.uri?ref=3"); - - closeable = MockitoAnnotations.openMocks(this); } @After diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java new file mode 100644 index 00000000..2ebf689d --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java @@ -0,0 +1,225 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** + * Integration tests for file container rotation when pushing large amounts of data. These tests + * verify the end-to-end flow from UpdateStreamService through to PlatformClient, using a small + * batch size to trigger rotation without needing large test data. + */ +public class FileContainerRotationIntegrationTest { + + private static final int SMALL_BATCH_SIZE = 1000; + private static final String SOURCE_ID = "test-source-id"; + private static final String ORG_ID = "test-org"; + private static final String API_KEY = "test-api-key"; + + private PlatformClient platformClient; + private StreamEnabledSource source; + private AtomicInteger containerCounter; + + @Before + public void setUp() throws IOException, InterruptedException { + platformClient = mock(PlatformClient.class); + source = mock(StreamEnabledSource.class); + containerCounter = new AtomicInteger(0); + + doReturn(SOURCE_ID).when(source).getId(); + doReturn(ORG_ID).when(source).getOrganizationId(); + doReturn(API_KEY).when(source).getApiKey(); + doReturn(new PlatformUrl(Environment.PRODUCTION, Region.US)).when(source).getPlatformUrl(); + + doAnswer(invocation -> createContainerResponse()).when(platformClient).createFileContainer(); + doReturn(createGenericResponse()).when(platformClient).uploadContentToFileContainer(any(), anyString()); + doReturn(createGenericResponse()).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldCreateMultipleContainersWhenDataExceedsBatchSize() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + service.addOrUpdate(createDocument("doc2", 600)); + service.addOrUpdate(createDocument("doc3", 600)); + service.addOrUpdate(createDocument("doc4", 600)); + service.close(); + + verify(platformClient, times(4)).createFileContainer(); + verify(platformClient, times(4)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldCreateSingleContainerWhenDataFitsInOneBatch() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 100)); + service.addOrUpdate(createDocument("doc2", 100)); + service.close(); + + verify(platformClient, times(1)).createFileContainer(); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldHandleMixedOperationsWithRotation() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 400)); + service.delete(new DeleteDocument("doc2")); + service.addPartialUpdate(createPartialUpdate("doc3", 400)); + service.addOrUpdate(createDocument("doc4", 400)); + service.close(); + + verify(platformClient, times(3)).createFileContainer(); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldUseUniqueContainerIdForEachBatch() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + service.addOrUpdate(createDocument("doc2", 600)); + service.addOrUpdate(createDocument("doc3", 600)); + service.close(); + + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture()); + + assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId); + assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId); + assertEquals("container-3", containerCaptor.getAllValues().get(2).fileId); + } + + @Test + public void shouldPushImmediatelyWhenBatchSizeExceeded() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + verify(platformClient, times(0)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.addOrUpdate(createDocument("doc2", 600)); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.addOrUpdate(createDocument("doc3", 600)); + verify(platformClient, times(2)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.close(); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldHandleLargeNumberOfDocumentsWithRotation() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + for (int i = 0; i < 20; i++) { + service.addOrUpdate(createDocument("doc" + i, 200)); + } + service.close(); + + int expectedContainers = 10; + verify(platformClient, times(expectedContainers)).createFileContainer(); + verify(platformClient, times(expectedContainers)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { + Map pushCountPerContainer = new HashMap<>(); + List containerCreationOrder = new ArrayList<>(); + + doAnswer(invocation -> { + HttpResponse response = createContainerResponse(); + String fileId = "container-" + containerCounter.get(); + containerCreationOrder.add(fileId); + pushCountPerContainer.put(fileId, 0); + return response; + }).when(platformClient).createFileContainer(); + + doAnswer(invocation -> { + FileContainer container = invocation.getArgument(1); + int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0); + pushCountPerContainer.put(container.fileId, currentCount + 1); + return createGenericResponse(); + }).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any()); + + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + for (int i = 0; i < 10; i++) { + service.addOrUpdate(createDocument("doc" + i, 400)); + } + service.close(); + + for (Map.Entry entry : pushCountPerContainer.entrySet()) { + assertEquals( + "Container " + entry.getKey() + " should receive exactly 1 push, but received " + entry.getValue(), + Integer.valueOf(1), + entry.getValue()); + } + + assertTrue("Should have created multiple containers", containerCreationOrder.size() > 1); + } + + private UpdateStreamServiceInternal createServiceWithSmallBatchSize() { + StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE); + org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(getClass()); + return new UpdateStreamServiceInternal(source, queue, platformClient, logger); + } + + private DocumentBuilder createDocument(String id, int dataSize) { + return new DocumentBuilder("https://example.com/" + id, "Title " + id) + .withData(generateData(dataSize)); + } + + private PartialUpdateDocument createPartialUpdate(String id, int dataSize) { + return new PartialUpdateDocument( + "https://example.com/" + id, + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(dataSize)); + } + + private String generateData(int size) { + byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) { + bytes[i] = 65; + } + return new String(bytes); + } + + @SuppressWarnings("unchecked") + private HttpResponse createContainerResponse() { + HttpResponse response = mock(HttpResponse.class); + int id = containerCounter.incrementAndGet(); + doReturn(String.format( + "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", id, id)) + .when(response) + .body(); + return response; + } + + @SuppressWarnings("unchecked") + private HttpResponse createGenericResponse() { + HttpResponse response = mock(HttpResponse.class); + doReturn("{\"status\": \"ok\"}").when(response).body(); + return response; + } +} diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java new file mode 100644 index 00000000..222139af --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java @@ -0,0 +1,274 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for container rotation and batching behavior in the stream update + * workflow. Each batch that + * exceeds the configured limit should trigger creation of a new file container, + * upload, and + * immediate push. + */ +public class StreamDocumentUploadQueueBatchingTest { + + private static final int SMALL_BATCH_SIZE = 5000; + + @Mock + private UpdateStreamServiceInternal updateStreamService; + @Mock + private HttpResponse httpResponse; + + private StreamDocumentUploadQueue queue; + private AutoCloseable closeable; + + @Before + public void setUp() throws Exception { + closeable = MockitoAnnotations.openMocks(this); + queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE); + queue.setUpdateStreamService(updateStreamService); + + when(updateStreamService.createUploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse); + } + + @After + public void tearDown() throws Exception { + closeable.close(); + } + + @Test + public void addingDocumentsThatExceedBatchSizeShouldTriggerFlushAndPush() + throws IOException, InterruptedException { + DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + + queue.add(doc1); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(doc2); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void addMultipleSmallDocumentsShouldNotTriggerFlushUntilLimitReached() + throws IOException, InterruptedException { + DocumentBuilder smallDoc1 = new DocumentBuilder("https://doc.uri/1", "Small Doc 1"); + DocumentBuilder smallDoc2 = new DocumentBuilder("https://doc.uri/2", "Small Doc 2"); + + queue.add(smallDoc1); + queue.add(smallDoc2); + + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + assertFalse(queue.isEmpty()); + } + + @Test + public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch() + throws IOException, InterruptedException { + DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000)); + DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000)); + DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000)); + + queue.add(doc1); + queue.add(doc2); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(doc3); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); + verify(updateStreamService).createUploadAndPush(captor.capture()); + assertEquals(2, captor.getValue().getAddOrUpdate().size()); + } + + @Test + public void multipleBatchesShouldCreateMultipleContainers() throws IOException, InterruptedException { + DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000)); + DocumentBuilder doc4 = new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000)); + + queue.add(doc1); + queue.add(doc2); + queue.add(doc3); + queue.add(doc4); + + verify(updateStreamService, times(3)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushShouldClearQueueAfterBatch() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + queue.add(doc); + assertFalse(queue.isEmpty()); + + queue.flushAndPush(); + + assertTrue(queue.isEmpty()); + } + + @Test + public void flushAndPushShouldReturnResponseFromService() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + queue.add(doc); + + HttpResponse response = queue.flushAndPush(); + + assertEquals(httpResponse, response); + } + + @Test + public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException { + HttpResponse response = queue.flushAndPush(); + + assertNull(response); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushShouldPassCorrectStreamUpdateToService() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc"); + DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); + PartialUpdateDocument partialDoc = new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); + + queue.add(doc); + queue.add(deleteDoc); + queue.add(partialDoc); + + queue.flushAndPush(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); + verify(updateStreamService).createUploadAndPush(captor.capture()); + + StreamUpdate captured = captor.getValue(); + assertEquals(1, captured.getAddOrUpdate().size()); + assertEquals(1, captured.getDelete().size()); + assertEquals(1, captured.getPartialUpdate().size()); + } + + @Test + public void deleteDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { + queue = new StreamDocumentUploadQueue(null, 50); + queue.setUpdateStreamService(updateStreamService); + + DeleteDocument deleteDoc1 = new DeleteDocument("https://doc.uri/1"); + DeleteDocument deleteDoc2 = new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds"); + + queue.add(deleteDoc1); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(deleteDoc2); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { + PartialUpdateDocument partialDoc1 = new PartialUpdateDocument("https://doc.uri/1", + PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); + PartialUpdateDocument partialDoc2 = new PartialUpdateDocument( + "https://doc.uri/2", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(SMALL_BATCH_SIZE)); + + queue.add(partialDoc1); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(partialDoc2); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500)); + DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); + PartialUpdateDocument partialDoc = new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(4000)); + + queue.add(doc); + queue.add(deleteDoc); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(partialDoc); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void constructorShouldRejectBatchSizeExceeding256MB() { + int exceeding256MB = 256 * 1024 * 1024 + 1; + new StreamDocumentUploadQueue(null, exceeding256MB); + } + + @Test(expected = IllegalArgumentException.class) + public void constructorShouldRejectZeroBatchSize() { + new StreamDocumentUploadQueue(null, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void constructorShouldRejectNegativeBatchSize() { + new StreamDocumentUploadQueue(null, -1); + } + + @Test + public void constructorShouldAcceptMaxAllowedBatchSize() { + int max256MB = 256 * 1024 * 1024; + StreamDocumentUploadQueue q = new StreamDocumentUploadQueue(null, max256MB); + assertNotNull(q); + } + + @Test + public void queueShouldUseSystemPropertyForDefaultBatchSize() { + String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + try { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "1048576"); + int configuredSize = DocumentUploadQueue.getConfiguredBatchSize(); + assertEquals(1048576, configuredSize); + } finally { + if (originalValue != null) { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue); + } else { + System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void systemPropertyExceeding256MBShouldThrow() { + String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + try { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "268435457"); + DocumentUploadQueue.getConfiguredBatchSize(); + } finally { + if (originalValue != null) { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue); + } else { + System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + } + } + } + + private String generateData(int numBytes) { + if (numBytes <= 0) + return ""; + byte[] bytes = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + bytes[i] = 65; + } + return new String(bytes); + } +} diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java index 12cd2f54..ff6fc0ad 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java @@ -4,26 +4,31 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.http.HttpResponse; import java.util.ArrayList; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; public class StreamDocumentUploadQueueTest { - @Mock private UploadStrategy uploadStrategy; + private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024; - @InjectMocks private StreamDocumentUploadQueue queue; + @Mock private UploadStrategy uploadStrategy; + @Mock private UpdateStreamServiceInternal updateStreamService; + @Mock private HttpResponse httpResponse; + private StreamDocumentUploadQueue queue; private AutoCloseable closeable; private DocumentBuilder documentToAdd; private DeleteDocument documentToDelete; @@ -32,20 +37,14 @@ public class StreamDocumentUploadQueueTest { private int oneMegaByte = 1 * 1024 * 1024; private String generateStringFromBytes(int numBytes) { - // Check if the number of bytes is valid if (numBytes <= 0) { return ""; } - - // Create a byte array with the specified length byte[] bytes = new byte[numBytes]; - - // Fill the byte array with a pattern of ASCII characters - byte pattern = 65; // ASCII value for 'A' + byte pattern = 65; for (int i = 0; i < numBytes; i++) { bytes[i] = pattern; } - return new String(bytes); } @@ -63,7 +62,14 @@ private PartialUpdateDocument generatePartialUpdateDocumentFromSize(int numBytes } @Before - public void setup() { + public void setup() throws IOException, InterruptedException { + closeable = MockitoAnnotations.openMocks(this); + + queue = new StreamDocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE); + queue.setUpdateStreamService(updateStreamService); + + when(updateStreamService.createUploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse); + String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte); documentToAdd = @@ -78,8 +84,6 @@ public void setup() { PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); - - closeable = MockitoAnnotations.openMocks(this); } @After @@ -127,17 +131,10 @@ public void testShouldReturnBatch() throws IOException, InterruptedException { @Test public void testFlushShouldNotUploadDocumentsWhenRequiredSizeIsNotMet() throws IOException, InterruptedException { - // Adding 2MB document to the queue => queue has now 3MB of free space - // (5MB - 2MB = 3MB) queue.add(documentToAdd); - // Adding 2MB document to the queue => queue has now 1MB of free space - // (3MB - 2MB = 1MB) queue.add(documentToDelete); - // The maximum queue size has not been reached yet (1MB left of free space). - // Therefore, the accumulated documents will not be automatically flushed. - // Unless the user runs `.flush()` the queue will keep the 4MB of documents - verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class)); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); } @Test @@ -162,21 +159,14 @@ public void testShouldAutomaticallyFlushAccumulatedDocuments() } }); - // Adding 3 documents of 2MB to the queue. After adding the first 2 documents, - // the queue size will reach 6MB, which exceeds the maximum queue size - // limit by 1MB. Therefore, the 2 first added documents will automatically be - // uploaded to the source. queue.add(firstBulkyDocument); queue.add(secondBulkyDocument); - verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class)); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); - // The 3rd document added to the queue will be included in a separate batch, - // which will not be uploaded unless the `flush()` method is called or until the - // queue size limit has been reached queue.add(thirdBulkyDocument); - verify(uploadStrategy, times(1)).apply(any(BatchUpdate.class)); - verify(uploadStrategy, times(1)).apply(firstBatch); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + verify(updateStreamService, times(1)).createUploadAndPush(firstBatch); } @Test @@ -212,21 +202,15 @@ public void testShouldManuallyFlushAccumulatedDocuments() emptyList, partialEmptyList); - // Adding 3 documents of 2MB to the queue. After adding the first 2 documents, - // the queue size will reach 6MB, which exceeds the maximum queue size - // limit. Therefore, the 2 first added documents will automatically be uploaded - // to the source. queue.add(firstBulkyDocument); queue.add(secondBulkyDocument); queue.add(thirdBulkyDocument); queue.flush(); - // Additional flush will have no effect if documents where already flushed queue.flush(); - verify(uploadStrategy, times(2)).apply(any(StreamUpdate.class)); - verify(uploadStrategy, times(1)).apply(firstBatch); + verify(updateStreamService, times(1)).createUploadAndPush(firstBatch); verify(uploadStrategy, times(1)).apply(secondBatch); } diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 726769b8..5de3cf05 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -1,7 +1,10 @@ package com.coveo.pushapiclient; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -9,10 +12,13 @@ import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException; import java.io.IOException; import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.ArrayList; import org.apache.logging.log4j.core.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -70,11 +76,11 @@ public void closeService() throws Exception { } @Test - public void addOrUpdateShouldCreateFileContainer() throws IOException, InterruptedException { + public void addOrUpdateShouldNotCreateFileContainer() throws IOException, InterruptedException { service.addOrUpdate(documentA); service.addOrUpdate(documentB); - verify(this.platformClient, times(1)).createFileContainer(); + verify(this.platformClient, times(0)).createFileContainer(); } @Test @@ -94,62 +100,129 @@ public void addOrUpdateAndPartialAndDeleteShouldAddDocumentsToQueue() } @Test - public void deleteShouldCreateFileContainer() throws IOException, InterruptedException { + public void deleteShouldNotCreateFileContainer() throws IOException, InterruptedException { service.delete(deleteDocumentA); service.delete(deleteDocumentB); - verify(this.platformClient, times(1)).createFileContainer(); + verify(this.platformClient, times(0)).createFileContainer(); } @Test - public void partialUpdateShouldCreateFileContainer() throws IOException, InterruptedException { + public void partialUpdateShouldNotCreateFileContainer() throws IOException, InterruptedException { service.addPartialUpdate(partialUpdateDocumentA); service.addPartialUpdate(partialUpdateDocumentB); - verify(this.platformClient, times(1)).createFileContainer(); + verify(this.platformClient, times(0)).createFileContainer(); } @Test - public void closeShouldPushFileContainerOnAddOrUpdate() + public void closeShouldCallFlushAndPush() throws IOException, InterruptedException, NoOpenFileContainerException { + when(queue.isEmpty()).thenReturn(false); + when(queue.flushAndPush()).thenReturn(httpResponse); + service.addOrUpdate(documentA); service.close(); - verify(platformClient, times(1)) - .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + verify(queue, times(1)).flushAndPush(); } @Test - public void closeShouldPushFileContainerOnDelete() + public void closeShouldNotCallFlushAndPushWhenQueueIsEmpty() throws IOException, InterruptedException, NoOpenFileContainerException { - service.delete(deleteDocumentA); + when(queue.isEmpty()).thenReturn(true); + service.close(); + verify(queue, times(0)).flushAndPush(); + } + + @Test + public void createUploadAndPushShouldCreateContainerUploadAndPush() + throws IOException, InterruptedException { + StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + + service.createUploadAndPush(streamUpdate); + + verify(platformClient, times(1)).createFileContainer(); + verify(platformClient, times(1)) + .uploadContentToFileContainer(any(FileContainer.class), any(String.class)); verify(platformClient, times(1)) .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); } @Test - public void closeShouldFlushBufferedDocuments() - throws IOException, InterruptedException, NoOpenFileContainerException { - service.addOrUpdate(documentA); - service.close(); + public void createUploadAndPushShouldUseNewContainerForEachCall() + throws IOException, InterruptedException { + HttpResponse response1 = createMockHttpResponse("container-1"); + HttpResponse response2 = createMockHttpResponse("container-2"); + + when(platformClient.createFileContainer()).thenReturn(response1).thenReturn(response2); + + StreamUpdate streamUpdate1 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + StreamUpdate streamUpdate2 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + + service.createUploadAndPush(streamUpdate1); + service.createUploadAndPush(streamUpdate2); + + verify(platformClient, times(2)).createFileContainer(); - verify(queue, times(1)).flush(); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class); + verify(platformClient, times(2)) + .pushFileContainerContentToStreamSource(eq(SOURCE_ID), containerCaptor.capture()); + + assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId); + assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId); + } + + @Test + public void createUploadAndPushShouldPerformOperationsInCorrectOrder() + throws IOException, InterruptedException { + StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + + service.createUploadAndPush(streamUpdate); + + org.mockito.InOrder inOrder = org.mockito.Mockito.inOrder(platformClient); + inOrder.verify(platformClient).createFileContainer(); + inOrder.verify(platformClient).uploadContentToFileContainer(any(FileContainer.class), any(String.class)); + inOrder.verify(platformClient).pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); } @Test - public void shouldLogInfoOnCreateFileContainer() + public void closeOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException, NoOpenFileContainerException { - service.addOrUpdate(documentA); - verify(logger, times(1)).info("Creating new file container"); - service.close(); - verify(logger, times(1)).info("Pushing to file container file-id"); + when(queue.isEmpty()).thenReturn(true); + + HttpResponse result = service.close(); + + assertEquals(null, result); + verify(queue, times(0)).flushAndPush(); } - @Test(expected = NoOpenFileContainerException.class) - public void shouldThrowExceptionOnCloseIfNoOpenFileContainer() + @Test + public void closeOnNonEmptyQueueShouldReturnFlushAndPushResponse() throws IOException, InterruptedException, NoOpenFileContainerException { - service.close(); + when(queue.isEmpty()).thenReturn(false); + when(queue.flushAndPush()).thenReturn(httpResponse); + + HttpResponse result = service.close(); + + assertEquals(httpResponse, result); + } + + @Test + public void serviceShouldSetItselfOnQueueDuringConstruction() { + verify(queue, times(1)).setUpdateStreamService(service); + } + + @SuppressWarnings("unchecked") + private HttpResponse createMockHttpResponse(String fileId) { + HttpResponse response = mock(HttpResponse.class); + doReturn( + String.format( + "{\"uploadUri\": \"https://upload.uri/%s\", \"fileId\": \"%s\"}", fileId, fileId)) + .when(response) + .body(); + return response; } } From de2271dbd0c51fddba0238e68bd822a4802f874a Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Thu, 29 Jan 2026 12:45:25 -0500 Subject: [PATCH 2/4] docs: replace UPGRADE_NOTES with ConfigureBatchSize sample - Remove UPGRADE_NOTES.md (no breaking changes to document) - Add samples/ConfigureBatchSize.java showing 3 ways to configure batch size - Restore original UpdateStreamDocuments.java (remove verbose comments) - Update CONFIGURATION.md --- CONFIGURATION.md | 2 +- UPGRADE_NOTES.md | 169 ----------------------------- samples/ConfigureBatchSize.java | 45 ++++++++ samples/UpdateStreamDocuments.java | 8 -- 4 files changed, 46 insertions(+), 178 deletions(-) delete mode 100644 UPGRADE_NOTES.md create mode 100644 samples/ConfigureBatchSize.java diff --git a/CONFIGURATION.md b/CONFIGURATION.md index d5ae686e..e81ed327 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -167,7 +167,7 @@ spec: - Maximum throughput is needed **Keep default (5 MB) when:** -- You're unsure - it's a conservative default that works well +- You're unsure - Memory is a concern - You want predictable, frequent pushes diff --git a/UPGRADE_NOTES.md b/UPGRADE_NOTES.md deleted file mode 100644 index 93fdbcac..00000000 --- a/UPGRADE_NOTES.md +++ /dev/null @@ -1,169 +0,0 @@ -# Stream Catalog API Update - Upgrade Notes - -## Summary - -The `UpdateStreamService` has been upgraded to align with the Coveo Catalog Stream API best practices for update operations. Previously, the service would create one file container and attempt to send multiple batches to it. Now, **each batch gets its own file container**, which is created, uploaded to, and immediately pushed to the stream source. - -## What Changed - -### Key Behavioral Changes - -1. **File Container Lifecycle**: Each flush operation now follows the complete workflow: - - Step 1: Create a new file container - - Step 2: Upload batch content to the container - - Step 3: Push the container to the stream source via `/update` API - - This happens automatically when the 256MB batch limit is exceeded or when `close()` is called - -2. **Immediate Push**: File containers are pushed immediately after upload, not accumulated - -3. **No Shared Containers**: Each batch uses a dedicated file container, preventing conflicts - -### Modified Files - -#### Core Implementation -- **`UpdateStreamServiceInternal.java`** - - Removed file container creation from `add*()` methods - - Added `createUploadAndPush()` method that handles the complete create→upload→push workflow - - Modified `close()` to call `flushAndPush()` instead of `flush()` - -- **`StreamDocumentUploadQueue.java`** - - Added `flushAndPush()` method that creates a container, uploads, and pushes in one operation - - Overrode all `add()` methods to call `flushAndPush()` when batch size is exceeded - - Added reference to `UpdateStreamServiceInternal` to enable the complete workflow - -- **`UpdateStreamService.java`** - - Removed the `fileContainer` field (no longer needed) - - Updated documentation to reflect new behavior - - Removed unused `getUploadStrategy()` method - -#### Tests -- **`UpdateStreamServiceInternalTest.java`** - - Updated tests to verify file containers are created during flush, not during add - - Added test for `createUploadAndPush()` method - - Modified close tests to verify `flushAndPush()` is called - -#### Samples -- **`UpdateStreamDocuments.java`** - - Added documentation explaining the new file container behavior - -## API Documentation Reference - -This change implements the recommendations from the Coveo documentation: -- [Full Catalog Data Updates - Update Operations](https://docs.coveo.com/en/p4eb0129/coveo-for-commerce/full-catalog-data-updates#update-operations) - -Key quote from the documentation: -> "Load operations require uploading and processing all file containers at once, which is resource-intensive and delays data availability until the entire load completes. In contrast, **update operations process each container as soon as it's ready**, allowing for faster indexing and more up-to-date catalog data throughout the update process." - -## Migration Guide - -### For Existing Users - -The API remains the same - no code changes are required for basic usage: - -```java -// UpdateStreamService - catalog updates with immediate push per batch -UpdateStreamService updateService = new UpdateStreamService(catalogSource); -updateService.addOrUpdate(document1); -updateService.close(); - -// PushService - push operations with immediate push per batch -PushService pushService = new PushService(pushSource); -pushService.addOrUpdate(document1); -pushService.close(); - -// StreamService - load operations (collects batches before pushing) -StreamService streamService = new StreamService(streamSource); -streamService.open(); -streamService.addOrUpdate(document1); -streamService.close(); -``` - -### Configuring Batch Size - -For detailed configuration options including runtime system property configuration (`coveo.push.batchSize`), see **[CONFIGURATION.md](CONFIGURATION.md)**. - -If you need a smaller batch size (e.g., for more frequent pushes), you can specify it via constructor: - -```java -// Set custom batch size (e.g., 50MB) -int customBatchSize = 50 * 1024 * 1024; - -// UpdateStreamService -UpdateStreamService service = new UpdateStreamService( - catalogSource, - backoffOptions, - userAgents, - customBatchSize -); - -// PushService -PushService pushService = new PushService( - pushSource, - backoffOptions, - customBatchSize -); - -// StreamService -StreamService streamService = new StreamService( - streamSource, - backoffOptions, - userAgents, - customBatchSize -); -``` - -**Note**: Batch size cannot exceed 256MB (Stream API limit). Attempting to set a larger value will throw an `IllegalArgumentException`. - -### What You'll Notice - -1. **Configurable Batch Size**: All services now support custom batch sizes via constructor parameter or system property (max: 256MB). Default remains 5MB. - - `UpdateStreamService` (catalog updates) - - `PushService` (push operations) - - `StreamService` (load operations) - -2. **More API Calls for UpdateStreamService**: You may see more file container create and push operations in your logs, as each batch (when exceeding the batch size limit) now triggers a complete create→upload→push cycle - -4. **Faster Indexing**: Documents will be available for indexing sooner, as they're pushed immediately rather than waiting for all batches to complete - -5. **Better Alignment**: The behavior now matches the catalog stream API best practices for optimal performance - -## Benefits - -1. **Improved Performance**: Documents are processed as soon as each container is ready, rather than waiting for all uploads to complete - -2. **Better Resource Utilization**: Avoids resource-intensive bulk operations - -3. **Faster Data Availability**: Catalog data is indexed incrementally throughout the update process - -4. **API Compliance**: Follows the recommended pattern from Coveo's official documentation - -## Technical Details - -### Before (Old Behavior) -``` -Create file container once -→ Add batch 1 to container -→ Add batch 2 to container -→ Add batch 3 to container -→ Push container once with all batches -``` - -### After (New Behavior) -``` -Batch 1: - → Create file container - → Upload batch 1 - → Push container - -Batch 2: - → Create file container - → Upload batch 2 - → Push container - -Batch 3: - → Create file container - → Upload batch 3 - → Push container -``` - -Each batch is independent and processed immediately, following the update operation pattern described in the Coveo documentation. diff --git a/samples/ConfigureBatchSize.java b/samples/ConfigureBatchSize.java new file mode 100644 index 00000000..c07fe351 --- /dev/null +++ b/samples/ConfigureBatchSize.java @@ -0,0 +1,45 @@ +import com.coveo.pushapiclient.*; + +import java.io.IOException; + +/** + * Demonstrates how to configure the batch size for document uploads. + * + * The batch size controls how much data accumulates before automatically + * creating a file container and pushing to Coveo. Default is 5 MB, max is 256 MB. + */ +public class ConfigureBatchSize { + + public static void main(String[] args) throws IOException, InterruptedException { + + PlatformUrl platformUrl = new PlatformUrlBuilder() + .withEnvironment(Environment.PRODUCTION) + .withRegion(Region.US) + .build(); + + CatalogSource catalogSource = CatalogSource.fromPlatformUrl( + "my_api_key", "my_org_id", "my_source_id", platformUrl); + + // Option 1: Use default batch size (5 MB) + UpdateStreamService defaultService = new UpdateStreamService(catalogSource); + + // Option 2: Configure batch size via constructor (50 MB) + int fiftyMegabytes = 50 * 1024 * 1024; + UpdateStreamService customService = new UpdateStreamService( + catalogSource, + new BackoffOptionsBuilder().build(), + null, + fiftyMegabytes); + + // Option 3: Configure globally via system property (affects all services) + // Run with: java -Dcoveo.push.batchSize=52428800 ConfigureBatchSize + // This sets 50 MB for all service instances that don't specify a size + + // Use the service + DocumentBuilder document = new DocumentBuilder("https://my.document.uri", "My document title") + .withData("these words will be searchable"); + + customService.addOrUpdate(document); + customService.close(); + } +} diff --git a/samples/UpdateStreamDocuments.java b/samples/UpdateStreamDocuments.java index 9b642ee0..2c9dae61 100644 --- a/samples/UpdateStreamDocuments.java +++ b/samples/UpdateStreamDocuments.java @@ -14,14 +14,6 @@ public static void main(String[] args) throws IOException, InterruptedException, // Using the Update Stream Service will act as an incremental change to the index, therefore any currently indexed items not contained in the payload will remain. UpdateStreamService updateStream = new UpdateStreamService(catalogSource); // To perform full index rebuild, use the StreamService instead. - - // Note: The UpdateStreamService now handles file containers differently for catalog sources. - // Each batch (when the 256MB limit is exceeded or close() is called) will: - // 1. Create a new file container - // 2. Upload the batch content to that container - // 3. Immediately push the container to the stream source via the /update API - // This follows the catalog stream API best practices where each update operation uses its own file container. - // You can configure a smaller batch size if needed by using the constructor with maxQueueSize parameter. DocumentBuilder document1 = new DocumentBuilder("https://my.document.uri", "My document title") .withData("these words will be searchable") From c6e17b9a1631fe208077215defce9b463567cd9a Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Thu, 29 Jan 2026 12:51:14 -0500 Subject: [PATCH 3/4] docs: update doc --- CONFIGURATION.md | 85 ++++--------------------------------- README.md | 107 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 77 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index e81ed327..8e4a1718 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -15,17 +15,20 @@ There are two ways to configure the batch size: Set the `coveo.push.batchSize` system property to configure the default batch size globally for all service instances: **Java Command Line:** + ```bash java -Dcoveo.push.batchSize=134217728 -jar your-application.jar ``` **Within Java Code:** + ```java // Set before creating any service instances System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB in bytes ``` **Maven/Gradle Build:** + ```xml @@ -41,6 +44,7 @@ test { ``` **Example Values:** + - `5242880` = 5 MB (default) - `268435456` = 256 MB (maximum) - `134217728` = 128 MB @@ -124,107 +128,36 @@ PushService pushService = new PushService(pushEnabledSource, backoffOptions, 64 StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); ``` -#### Example 3: Docker/Container Environment - -```yaml -# docker-compose.yml -services: - app: - image: your-app - environment: - - JAVA_OPTS=-Dcoveo.push.batchSize=134217728 -``` - -#### Example 4: Kubernetes - -```yaml -# deployment.yaml -apiVersion: v1 -kind: Pod -metadata: - name: coveo-pusher -spec: - containers: - - name: app - image: your-app - env: - - name: JAVA_OPTS - value: "-Dcoveo.push.batchSize=134217728" -``` - ### When to Adjust Batch Size **Use smaller batches (32-64 MB) when:** + - Network bandwidth is limited - Memory is constrained - Processing many small documents - You want more frequent progress updates **Use larger batches (128-256 MB) when:** + - Network bandwidth is high - Processing large documents or files - You want to minimize API calls - Maximum throughput is needed **Keep default (5 MB) when:** + - You're unsure - Memory is a concern - You want predictable, frequent pushes ### Configuration Property Reference -| Property Name | Description | Default Value | Valid Range | -|--------------|-------------|---------------|-------------| +| Property Name | Description | Default Value | Valid Range | +| ---------------------- | --------------------------- | ---------------- | -------------- | | `coveo.push.batchSize` | Default batch size in bytes | `5242880` (5 MB) | 1 to 268435456 | -### Troubleshooting - -**Error: "exceeds the Stream API limit of 268435456 bytes"** -- Your configured value is too large -- Maximum allowed is 256 MB (268,435,456 bytes) -- Reduce the configured value - -**Error: "Invalid value for system property"** -- The value is not a valid integer -- Use numeric bytes value (e.g., `134217728` not `128MB`) - -**Service uses default despite system property:** -- Ensure property is set before creating service instances -- Verify property name is exactly `coveo.push.batchSize` -- Check that constructor isn't explicitly passing a value - -### Migration from Previous Versions - -Previous versions used a hardcoded 5 MB limit. If you're upgrading: - -**Option 1: Keep 5 MB behavior (not recommended)** -```java -System.setProperty("coveo.push.batchSize", "5242880"); // 5 MB -``` - -**Option 2: Use default 5 MB (recommended)** -```java -// No configuration needed - uses 5 MB default -UpdateStreamService service = new UpdateStreamService(catalogSource, backoffOptions); -``` - -**Option 3: Use larger batch size for throughput** -```java -System.setProperty("coveo.push.batchSize", "67108864"); // 64 MB -``` - -See [UPGRADE_NOTES.md](UPGRADE_NOTES.md) for complete migration guidance. - ## Additional Configuration -### API Client Configuration - -See the main [README.md](README.md) for: -- Platform client setup -- Authentication configuration -- API endpoint URLs -- Retry and backoff options - ### Environment Variables The following environment variables can be used for general configuration: diff --git a/README.md b/README.md index 592fd4b5..fcfb60f9 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,111 @@ To install the updated project files, build the Maven project: mvn install ``` +## Building from Source (Manual Build) + +If you want to build the library from source and share it with someone without publishing to Maven, follow these steps: + +### Prerequisites + +- Java 11 or higher +- [Apache Maven](https://maven.apache.org/install.html) 3.6+ + +### Step 1: Clone the Repository + +```bash +git clone https://github.com/coveo/push-api-client.java.git +cd push-api-client.java +``` + +### Step 2: Build the JAR + +```bash +mvn clean package -DskipTests +``` + +This will generate the following files in the `target/` directory: + +- `push-api-client.java-.jar` — The compiled library +- `push-api-client.java--sources.jar` — Source code (for IDE integration) + +### Step 3: Install to Local Maven Repository (Optional) + +If you want to use the library in another local Maven project: + +```bash +mvn clean install -DskipTests +``` + +This installs the JAR to your local `~/.m2/repository`, making it available to other projects on your machine. + +### Step 4: Share the JAR + +To share the built JAR with someone else: + +1. **Send the JAR file**: Share the `target/push-api-client.java-.jar` file directly. + +2. **Recipient adds JAR to their project**: + + **Option A — Install to their local Maven repository:** + + ```bash + mvn install:install-file \ + -Dfile=push-api-client.java-.jar \ + -DgroupId=com.coveo \ + -DartifactId=push-api-client.java \ + -Dversion= \ + -Dpackaging=jar + ``` + + Then add the dependency to their `pom.xml`: + + ```xml + + com.coveo + push-api-client.java + + + ``` + + **Option B — Use system scope (not recommended for production):** + + ```xml + + com.coveo + push-api-client.java + + system + ${project.basedir}/lib/push-api-client.java-.jar + + ``` + + **Option C — For Gradle projects:** + + Place the JAR in a `libs/` folder and add: + + ```groovy + dependencies { + implementation files('libs/push-api-client.java-.jar') + } + ``` + +### Running Tests + +To run the test suite: + +```bash +mvn test +``` + +### Validating Code Format + +Before contributing, ensure your code follows the project's formatting rules: + +```bash +mvn spotless:check # Check formatting +mvn spotless:apply # Auto-fix formatting issues +``` + ## Usage > See more examples in the `./samples` folder. @@ -119,7 +224,7 @@ PushService service = new PushService( ); ``` -See **[CONFIGURATION.md](CONFIGURATION.md)** for complete configuration options, Docker/Kubernetes examples, and best practices. +See **[CONFIGURATION.md](CONFIGURATION.md)** for complete configuration options examples, and best practices. ### Exponential Backoff Retry Configuration From 45059b9849d78f00c99610e3dffbd02af32db784 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Thu, 29 Jan 2026 13:27:15 -0500 Subject: [PATCH 4/4] run spotless --- .../pushapiclient/DocumentUploadQueue.java | 58 ++++++----- .../com/coveo/pushapiclient/PushService.java | 1 + .../StreamDocumentUploadQueue.java | 44 ++++----- .../coveo/pushapiclient/StreamService.java | 13 ++- .../pushapiclient/UpdateStreamService.java | 43 +++++--- .../UpdateStreamServiceInternal.java | 15 +-- .../FileContainerRotationIntegrationTest.java | 62 +++++++----- ...StreamDocumentUploadQueueBatchingTest.java | 99 ++++++++++++------- .../StreamDocumentUploadQueueTest.java | 1 - .../UpdateStreamServiceInternalTest.java | 21 ++-- 10 files changed, 215 insertions(+), 142 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java index 87896c64..1862039f 100644 --- a/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java @@ -8,26 +8,26 @@ /** Represents a queue for uploading documents using a specified upload strategy */ class DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(DocumentUploadQueue.class); - + /** Maximum allowed queue size based on Stream API limit (256 MB) */ protected static final int MAX_ALLOWED_QUEUE_SIZE = 256 * 1024 * 1024; - + /** Default queue size (5 MB) */ protected static final int DEFAULT_QUEUE_SIZE = 5 * 1024 * 1024; - + /** System property name for configuring the default batch size */ public static final String BATCH_SIZE_PROPERTY = "coveo.push.batchSize"; - + protected UploadStrategy uploader; protected final int maxQueueSize; protected ArrayList documentToAddList; protected ArrayList documentToDeleteList; protected int size; - + /** - * Validates batch size against constraints (> 0 and <= 256MB). - * Used by getConfiguredBatchSize and constructors to ensure consistent validation logic. - * + * Validates batch size against constraints (> 0 and <= 256MB). Used by getConfiguredBatchSize and + * constructors to ensure consistent validation logic. + * * @param sizeBytes The batch size in bytes to validate * @throws IllegalArgumentException if size exceeds MAX_ALLOWED_QUEUE_SIZE or is <= 0 */ @@ -45,14 +45,15 @@ protected static void validateBatchSize(int sizeBytes) { /** * Gets the configured batch size from system properties, or returns the default if not set. - * - * The system property is read as bytes. When not set, returns DEFAULT_QUEUE_SIZE (5 MB). - * - * Example: Set a 50 MB batch size via system property: + * + *

The system property is read as bytes. When not set, returns DEFAULT_QUEUE_SIZE (5 MB). + * + *

Example: Set a 50 MB batch size via system property: + * *

    *   java -Dcoveo.push.batchSize=52428800 -jar app.jar  // 50 * 1024 * 1024 bytes
    * 
- * + * * @return The configured batch size in bytes (e.g., 52428800 for 50 MB) * @throws IllegalArgumentException if the configured value exceeds 256MB or is invalid */ @@ -61,28 +62,32 @@ public static int getConfiguredBatchSize() { if (propertyValue == null || propertyValue.trim().isEmpty()) { return DEFAULT_QUEUE_SIZE; } - + int configuredSize; try { configuredSize = Integer.parseInt(propertyValue.trim()); } catch (NumberFormatException e) { throw new IllegalArgumentException( - String.format("Invalid value for system property %s: '%s'. Must be a valid integer.", - BATCH_SIZE_PROPERTY, propertyValue), e); + String.format( + "Invalid value for system property %s: '%s'. Must be a valid integer.", + BATCH_SIZE_PROPERTY, propertyValue), + e); } - + validateBatchSize(configuredSize); - - logger.info(String.format("Using configured batch size from system property %s: %d bytes (%.2f MB)", - BATCH_SIZE_PROPERTY, configuredSize, configuredSize / (1024.0 * 1024.0))); + + logger.info( + String.format( + "Using configured batch size from system property %s: %d bytes (%.2f MB)", + BATCH_SIZE_PROPERTY, configuredSize, configuredSize / (1024.0 * 1024.0))); return configuredSize; } /** * Constructs a new DocumentUploadQueue with the default batch size. - * - * Uses the configured batch size from system property "coveo.push.batchSize" if set, - * otherwise defaults to DEFAULT_QUEUE_SIZE (5 MB = 5242880 bytes). + * + *

Uses the configured batch size from system property "coveo.push.batchSize" if set, otherwise + * defaults to DEFAULT_QUEUE_SIZE (5 MB = 5242880 bytes). * * @param uploader The upload strategy to be used for document uploads. * @throws IllegalArgumentException if the system property value exceeds 256MB or is invalid. @@ -95,7 +100,8 @@ public DocumentUploadQueue(UploadStrategy uploader) { * Constructs a new DocumentUploadQueue object with a configurable maximum queue size limit. * * @param uploader The upload strategy to be used for document uploads. - * @param maxQueueSize The maximum queue size in bytes (e.g., 52428800 for 50 MB). Must not exceed 256MB (Stream API limit). + * @param maxQueueSize The maximum queue size in bytes (e.g., 52428800 for 50 MB). Must not exceed + * 256MB (Stream API limit). * @throws IllegalArgumentException if maxQueueSize exceeds the API limit of 256MB. */ public DocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { @@ -107,8 +113,8 @@ public DocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { } /** - * Default constructor for testing purposes (used by Mockito @InjectMocks). - * Initializes with default batch size; uploader is injected by Mockito. + * Default constructor for testing purposes (used by Mockito @InjectMocks). Initializes with + * default batch size; uploader is injected by Mockito. */ public DocumentUploadQueue() { this.documentToAddList = new ArrayList<>(); diff --git a/src/main/java/com/coveo/pushapiclient/PushService.java b/src/main/java/com/coveo/pushapiclient/PushService.java index dca86f96..62aa4f96 100644 --- a/src/main/java/com/coveo/pushapiclient/PushService.java +++ b/src/main/java/com/coveo/pushapiclient/PushService.java @@ -21,6 +21,7 @@ public PushService(PushEnabledSource source, BackoffOptions options) { * Creates a new PushService with configurable batch size. * *

Example batch sizes in bytes: + * *

    *
  • 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880} *
  • 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800} diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 25e96558..36f1dd2a 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -16,7 +16,7 @@ public StreamDocumentUploadQueue(UploadStrategy uploader) { super(uploader); this.documentToPartiallyUpdateList = new ArrayList<>(); } - + /** * Constructs a new StreamDocumentUploadQueue object with a configurable maximum queue size limit. * @@ -28,11 +28,11 @@ public StreamDocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { super(uploader, maxQueueSize); this.documentToPartiallyUpdateList = new ArrayList<>(); } - + /** - * Sets the UpdateStreamServiceInternal reference for handling complete upload workflow. - * This is needed to support the new pattern where each flush creates its own file container. - * + * Sets the UpdateStreamServiceInternal reference for handling complete upload workflow. This is + * needed to support the new pattern where each flush creates its own file container. + * * @param updateStreamService The service that handles create/upload/push operations */ public void setUpdateStreamService(UpdateStreamServiceInternal updateStreamService) { @@ -41,8 +41,8 @@ public void setUpdateStreamService(UpdateStreamServiceInternal updateStreamServi /** * Flushes the accumulated documents by applying the upload strategy. - * - * Note: This method is deprecated for catalog stream updates. Use flushAndPush() instead, + * + *

    Note: This method is deprecated for catalog stream updates. Use flushAndPush() instead, * which properly handles the create-upload-push workflow for each file container. * * @throws IOException If an I/O error occurs during the upload. @@ -54,12 +54,12 @@ public void flush() throws IOException, InterruptedException { logger.debug("Empty batch. Skipping upload"); return; } - + if (this.uploader == null) { throw new IllegalStateException( "No upload strategy configured. For UpdateStreamService, use flushAndPush() instead."); } - + // TODO: LENS-871: support concurrent requests StreamUpdate stream = this.getStream(); logger.info("Uploading document Stream"); @@ -70,15 +70,13 @@ public void flush() throws IOException, InterruptedException { this.documentToDeleteList.clear(); this.documentToPartiallyUpdateList.clear(); } - + /** - * Flushes the accumulated documents and pushes them to the stream source. - * This method implements the proper workflow for catalog stream API updates: - * 1. Create a new file container - * 2. Upload content to the container - * 3. Push the container to the stream source - * - * Each flush operation gets its own file container, as required by the catalog stream API. + * Flushes the accumulated documents and pushes them to the stream source. This method implements + * the proper workflow for catalog stream API updates: 1. Create a new file container 2. Upload + * content to the container 3. Push the container to the stream source + * + *

    Each flush operation gets its own file container, as required by the catalog stream API. * * @return The HTTP response from the push operation * @throws IOException If an I/O error occurs during the upload. @@ -122,7 +120,7 @@ public HttpResponse flushAndPush() throws IOException, InterruptedExcept /** * Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds - * the maximum content length. Each flush creates a new file container, uploads to it, and pushes + * the maximum content length. Each flush creates a new file container, uploads to it, and pushes * it to the stream source. * * @param document The document to be deleted from the index. @@ -144,11 +142,11 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE } this.size += sizeOfDoc; } - + /** * Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if it exceeds the - * maximum content length. Each flush creates a new file container, uploads to it, and pushes - * it to the stream source. + * maximum content length. Each flush creates a new file container, uploads to it, and pushes it + * to the stream source. * * @param document The document to be added to the index. * @throws IOException If an I/O error occurs during the upload. @@ -173,8 +171,8 @@ public void add(DocumentBuilder document) throws IOException, InterruptedExcepti /** * Adds the {@link DeleteDocument} to the upload queue and flushes the queue if it exceeds the - * maximum content length. Each flush creates a new file container, uploads to it, and pushes - * it to the stream source. + * maximum content length. Each flush creates a new file container, uploads to it, and pushes it + * to the stream source. * * @param document The document to be deleted from the index. * @throws IOException If an I/O error occurs during the upload. diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index 51b6f8b9..9b035fee 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -27,7 +27,11 @@ public class StreamService { * @param userAgents The user agent to use for the requests. */ public StreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize()); + this( + source, + new BackoffOptionsBuilder().build(), + userAgents, + DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -42,7 +46,11 @@ public StreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public StreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize()); + this( + source, + new BackoffOptionsBuilder().build(), + null, + DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -88,6 +96,7 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[ * also be used for an initial catalog upload. * *

    Example batch sizes in bytes: + * *

      *
    • 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880} *
    • 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800} diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 2cb573b5..218a5031 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -1,7 +1,6 @@ package com.coveo.pushapiclient; import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException; -import com.google.gson.Gson; import java.io.IOException; import java.net.http.HttpResponse; import org.apache.logging.log4j.LogManager; @@ -24,7 +23,11 @@ public class UpdateStreamService { * @param userAgents The user agent to use for the requests. */ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize()); + this( + source, + new BackoffOptionsBuilder().build(), + userAgents, + DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -38,7 +41,11 @@ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public UpdateStreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize()); + this( + source, + new BackoffOptionsBuilder().build(), + null, + DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -83,6 +90,7 @@ public UpdateStreamService( * {@StreamService} * *

      Example batch sizes in bytes: + * *

        *
      • 5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880} *
      • 50 MB: {@code 50 * 1024 * 1024} = {@code 52428800} @@ -108,7 +116,7 @@ public UpdateStreamService( this.updateStreamServiceInternal = new UpdateStreamServiceInternal( source, - new StreamDocumentUploadQueue(null, maxQueueSize), // UploadStrategy no longer needed + new StreamDocumentUploadQueue(null, maxQueueSize), // UploadStrategy no longer needed this.platformClient, logger); } @@ -118,10 +126,11 @@ public UpdateStreamService( * open to receive the documents, this function will open a file container before uploading * documents into it. * - *

        If called several times, the service will automatically batch documents and create new - * file containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable via constructor). - * Each batch is sent to its own file container and immediately pushed to the stream - * source, following the + *

        If called several times, the service will automatically batch documents and create new file + * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable + * via constructor). Each batch is sent to its own file container and immediately pushed to the + * stream source, following the * catalog stream API best practices. * *

        Once there are no more documents to add, it is important to call the {@link @@ -158,10 +167,11 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte * href="https://docs.coveo.com/en/l62e0540/coveo-for-commerce/how-to-update-your-catalog#partial-item-updates"> * Partial item updates section. * - *

        If called several times, the service will automatically batch documents and create new - * file containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable via constructor). - * Each batch is sent to its own file container and immediately pushed to the stream - * source, following the + *

        If called several times, the service will automatically batch documents and create new file + * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable + * via constructor). Each batch is sent to its own file container and immediately pushed to the + * stream source, following the * catalog stream API best practices. * *

        Once there are no more documents to add, it is important to call the {@link @@ -196,10 +206,11 @@ public void addPartialUpdate(PartialUpdateDocument document) * receive the documents, this function will open a file container before uploading documents into * it. * - *

        If called several times, the service will automatically batch documents and create new - * file containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable via constructor). - * Each batch is sent to its own file container and immediately pushed to the stream - * source, following the + *

        If called several times, the service will automatically batch documents and create new file + * containers whenever the data payload exceeds the batch size limit (default: 5MB, configurable + * via constructor). Each batch is sent to its own file container and immediately pushed to the + * stream source, following the * catalog stream API best practices. * *

        Once there are no more documents to add, it is important to call the {@link diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index 8a646099..b9fdfbd1 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -54,28 +54,29 @@ public HttpResponse close() } /** - * Creates a new file container, uploads the content, and pushes it to the stream source. - * This method is called by the queue's flush operation to ensure each batch gets its own container. - * + * Creates a new file container, uploads the content, and pushes it to the stream source. This + * method is called by the queue's flush operation to ensure each batch gets its own container. + * * @param streamUpdate The batch of documents to upload * @return The HTTP response from pushing the file container * @throws IOException If an I/O error occurs * @throws InterruptedException If the operation is interrupted */ - public HttpResponse createUploadAndPush(StreamUpdate streamUpdate) + public HttpResponse createUploadAndPush(StreamUpdate streamUpdate) throws IOException, InterruptedException { // Step 1: Create a new file container this.logger.info("Creating new file container"); HttpResponse createResponse = this.platformClient.createFileContainer(); FileContainer container = new Gson().fromJson(createResponse.body(), FileContainer.class); - + // Step 2: Upload content to the file container String batchUpdateJson = new Gson().toJson(streamUpdate.marshal()); this.platformClient.uploadContentToFileContainer(container, batchUpdateJson); - + // Step 3: Push the file container to the stream source this.logger.info("Pushing file container " + container.fileId + " to stream source"); - return this.platformClient.pushFileContainerContentToStreamSource(this.getSourceId(), container); + return this.platformClient.pushFileContainerContentToStreamSource( + this.getSourceId(), container); } private String getSourceId() { diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java index 2ebf689d..33cbf4f1 100644 --- a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java +++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java @@ -49,8 +49,12 @@ public void setUp() throws IOException, InterruptedException { doReturn(new PlatformUrl(Environment.PRODUCTION, Region.US)).when(source).getPlatformUrl(); doAnswer(invocation -> createContainerResponse()).when(platformClient).createFileContainer(); - doReturn(createGenericResponse()).when(platformClient).uploadContentToFileContainer(any(), anyString()); - doReturn(createGenericResponse()).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any()); + doReturn(createGenericResponse()) + .when(platformClient) + .uploadContentToFileContainer(any(), anyString()); + doReturn(createGenericResponse()) + .when(platformClient) + .pushFileContainerContentToStreamSource(anyString(), any()); } @Test @@ -103,7 +107,8 @@ public void shouldUseUniqueContainerIdForEachBatch() throws Exception { service.close(); ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class); - verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture()); + verify(platformClient, times(3)) + .pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture()); assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId); assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId); @@ -138,7 +143,8 @@ public void shouldHandleLargeNumberOfDocumentsWithRotation() throws Exception { int expectedContainers = 10; verify(platformClient, times(expectedContainers)).createFileContainer(); - verify(platformClient, times(expectedContainers)).pushFileContainerContentToStreamSource(anyString(), any()); + verify(platformClient, times(expectedContainers)) + .pushFileContainerContentToStreamSource(anyString(), any()); } @Test @@ -146,20 +152,26 @@ public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { Map pushCountPerContainer = new HashMap<>(); List containerCreationOrder = new ArrayList<>(); - doAnswer(invocation -> { - HttpResponse response = createContainerResponse(); - String fileId = "container-" + containerCounter.get(); - containerCreationOrder.add(fileId); - pushCountPerContainer.put(fileId, 0); - return response; - }).when(platformClient).createFileContainer(); - - doAnswer(invocation -> { - FileContainer container = invocation.getArgument(1); - int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0); - pushCountPerContainer.put(container.fileId, currentCount + 1); - return createGenericResponse(); - }).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any()); + doAnswer( + invocation -> { + HttpResponse response = createContainerResponse(); + String fileId = "container-" + containerCounter.get(); + containerCreationOrder.add(fileId); + pushCountPerContainer.put(fileId, 0); + return response; + }) + .when(platformClient) + .createFileContainer(); + + doAnswer( + invocation -> { + FileContainer container = invocation.getArgument(1); + int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0); + pushCountPerContainer.put(container.fileId, currentCount + 1); + return createGenericResponse(); + }) + .when(platformClient) + .pushFileContainerContentToStreamSource(anyString(), any()); UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); @@ -170,7 +182,10 @@ public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { for (Map.Entry entry : pushCountPerContainer.entrySet()) { assertEquals( - "Container " + entry.getKey() + " should receive exactly 1 push, but received " + entry.getValue(), + "Container " + + entry.getKey() + + " should receive exactly 1 push, but received " + + entry.getValue(), Integer.valueOf(1), entry.getValue()); } @@ -180,7 +195,8 @@ public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { private UpdateStreamServiceInternal createServiceWithSmallBatchSize() { StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE); - org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(getClass()); + org.apache.logging.log4j.Logger logger = + org.apache.logging.log4j.LogManager.getLogger(getClass()); return new UpdateStreamServiceInternal(source, queue, platformClient, logger); } @@ -209,8 +225,10 @@ private String generateData(int size) { private HttpResponse createContainerResponse() { HttpResponse response = mock(HttpResponse.class); int id = containerCounter.incrementAndGet(); - doReturn(String.format( - "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", id, id)) + doReturn( + String.format( + "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", + id, id)) .when(response) .body(); return response; diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java index 222139af..5499667c 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java @@ -12,7 +12,6 @@ import java.io.IOException; import java.net.http.HttpResponse; -import java.util.ArrayList; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -21,20 +20,16 @@ import org.mockito.MockitoAnnotations; /** - * Tests for container rotation and batching behavior in the stream update - * workflow. Each batch that - * exceeds the configured limit should trigger creation of a new file container, - * upload, and + * Tests for container rotation and batching behavior in the stream update workflow. Each batch that + * exceeds the configured limit should trigger creation of a new file container, upload, and * immediate push. */ public class StreamDocumentUploadQueueBatchingTest { private static final int SMALL_BATCH_SIZE = 5000; - @Mock - private UpdateStreamServiceInternal updateStreamService; - @Mock - private HttpResponse httpResponse; + @Mock private UpdateStreamServiceInternal updateStreamService; + @Mock private HttpResponse httpResponse; private StreamDocumentUploadQueue queue; private AutoCloseable closeable; @@ -56,8 +51,10 @@ public void tearDown() throws Exception { @Test public void addingDocumentsThatExceedBatchSizeShouldTriggerFlushAndPush() throws IOException, InterruptedException { - DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); - DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + DocumentBuilder doc1 = + new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = + new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); queue.add(doc1); verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); @@ -82,9 +79,12 @@ public void addMultipleSmallDocumentsShouldNotTriggerFlushUntilLimitReached() @Test public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch() throws IOException, InterruptedException { - DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000)); - DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000)); - DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000)); + DocumentBuilder doc1 = + new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000)); + DocumentBuilder doc2 = + new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000)); + DocumentBuilder doc3 = + new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000)); queue.add(doc1); queue.add(doc2); @@ -99,11 +99,16 @@ public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch() } @Test - public void multipleBatchesShouldCreateMultipleContainers() throws IOException, InterruptedException { - DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); - DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); - DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000)); - DocumentBuilder doc4 = new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000)); + public void multipleBatchesShouldCreateMultipleContainers() + throws IOException, InterruptedException { + DocumentBuilder doc1 = + new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = + new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + DocumentBuilder doc3 = + new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000)); + DocumentBuilder doc4 = + new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000)); queue.add(doc1); queue.add(doc2); @@ -115,7 +120,8 @@ public void multipleBatchesShouldCreateMultipleContainers() throws IOException, @Test public void flushAndPushShouldClearQueueAfterBatch() throws IOException, InterruptedException { - DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + DocumentBuilder doc = + new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); queue.add(doc); assertFalse(queue.isEmpty()); @@ -125,8 +131,10 @@ public void flushAndPushShouldClearQueueAfterBatch() throws IOException, Interru } @Test - public void flushAndPushShouldReturnResponseFromService() throws IOException, InterruptedException { - DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + public void flushAndPushShouldReturnResponseFromService() + throws IOException, InterruptedException { + DocumentBuilder doc = + new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); queue.add(doc); HttpResponse response = queue.flushAndPush(); @@ -143,11 +151,13 @@ public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, Inter } @Test - public void flushAndPushShouldPassCorrectStreamUpdateToService() throws IOException, InterruptedException { + public void flushAndPushShouldPassCorrectStreamUpdateToService() + throws IOException, InterruptedException { DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc"); DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); - PartialUpdateDocument partialDoc = new PartialUpdateDocument( - "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); + PartialUpdateDocument partialDoc = + new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); queue.add(doc); queue.add(deleteDoc); @@ -165,12 +175,14 @@ public void flushAndPushShouldPassCorrectStreamUpdateToService() throws IOExcept } @Test - public void deleteDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { + public void deleteDocumentsTriggerFlushWhenExceedingLimit() + throws IOException, InterruptedException { queue = new StreamDocumentUploadQueue(null, 50); queue.setUpdateStreamService(updateStreamService); DeleteDocument deleteDoc1 = new DeleteDocument("https://doc.uri/1"); - DeleteDocument deleteDoc2 = new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds"); + DeleteDocument deleteDoc2 = + new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds"); queue.add(deleteDoc1); verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); @@ -180,11 +192,17 @@ public void deleteDocumentsTriggerFlushWhenExceedingLimit() throws IOException, } @Test - public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { - PartialUpdateDocument partialDoc1 = new PartialUpdateDocument("https://doc.uri/1", - PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); - PartialUpdateDocument partialDoc2 = new PartialUpdateDocument( - "https://doc.uri/2", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(SMALL_BATCH_SIZE)); + public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() + throws IOException, InterruptedException { + PartialUpdateDocument partialDoc1 = + new PartialUpdateDocument( + "https://doc.uri/1", PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); + PartialUpdateDocument partialDoc2 = + new PartialUpdateDocument( + "https://doc.uri/2", + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(SMALL_BATCH_SIZE)); queue.add(partialDoc1); verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); @@ -194,11 +212,17 @@ public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOExce } @Test - public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() throws IOException, InterruptedException { - DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500)); + public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() + throws IOException, InterruptedException { + DocumentBuilder doc = + new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500)); DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); - PartialUpdateDocument partialDoc = new PartialUpdateDocument( - "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(4000)); + PartialUpdateDocument partialDoc = + new PartialUpdateDocument( + "https://doc.uri/3", + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(4000)); queue.add(doc); queue.add(deleteDoc); @@ -263,8 +287,7 @@ public void systemPropertyExceeding256MBShouldThrow() { } private String generateData(int numBytes) { - if (numBytes <= 0) - return ""; + if (numBytes <= 0) return ""; byte[] bytes = new byte[numBytes]; for (int i = 0; i < numBytes; i++) { bytes[i] = 65; diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java index ff6fc0ad..9a4d0c11 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java @@ -4,7 +4,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 5de3cf05..c876316b 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -13,7 +13,6 @@ import java.io.IOException; import java.net.http.HttpResponse; import java.util.ArrayList; -import java.util.ArrayList; import org.apache.logging.log4j.core.Logger; import org.junit.After; import org.junit.Before; @@ -140,7 +139,8 @@ public void closeShouldNotCallFlushAndPushWhenQueueIsEmpty() @Test public void createUploadAndPushShouldCreateContainerUploadAndPush() throws IOException, InterruptedException { - StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + StreamUpdate streamUpdate = + new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); service.createUploadAndPush(streamUpdate); @@ -159,8 +159,10 @@ public void createUploadAndPushShouldUseNewContainerForEachCall() when(platformClient.createFileContainer()).thenReturn(response1).thenReturn(response2); - StreamUpdate streamUpdate1 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); - StreamUpdate streamUpdate2 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + StreamUpdate streamUpdate1 = + new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + StreamUpdate streamUpdate2 = + new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); service.createUploadAndPush(streamUpdate1); service.createUploadAndPush(streamUpdate2); @@ -178,14 +180,19 @@ public void createUploadAndPushShouldUseNewContainerForEachCall() @Test public void createUploadAndPushShouldPerformOperationsInCorrectOrder() throws IOException, InterruptedException { - StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + StreamUpdate streamUpdate = + new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); service.createUploadAndPush(streamUpdate); org.mockito.InOrder inOrder = org.mockito.Mockito.inOrder(platformClient); inOrder.verify(platformClient).createFileContainer(); - inOrder.verify(platformClient).uploadContentToFileContainer(any(FileContainer.class), any(String.class)); - inOrder.verify(platformClient).pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + inOrder + .verify(platformClient) + .uploadContentToFileContainer(any(FileContainer.class), any(String.class)); + inOrder + .verify(platformClient) + .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); } @Test