From 45282f1d1e406a6cc7846914877dd6f3dfa4b402 Mon Sep 17 00:00:00 2001 From: achouan-coveo Date: Thu, 15 Jan 2026 14:43:40 -0500 Subject: [PATCH 1/6] feat: add configurable batch size with 256MB default and runtime configuration - Increased default batch size from 5MB to 256MB (Stream API limit) - Added system property 'coveo.push.batchSize' for runtime configuration - Made batch size configurable via constructor parameter in UpdateStreamService, PushService, and StreamService - Added validation to prevent exceeding 256MB API limit - Updated UpdateStreamService to create new file container per batch and push immediately - Enhanced StreamDocumentUploadQueue with flushAndPush() for per-batch workflow Configuration: - System property: -Dcoveo.push.batchSize= - Constructor parameter: new UpdateStreamService(source, options, maxQueueSize) - Priority: constructor parameter > system property > default (256MB) Documentation: - Added CONFIGURATION.md with comprehensive configuration guide - Added UPGRADE_NOTES.md with migration guidance and behavioral changes - Updated README.md with configuration section Breaking changes: - UpdateStreamService now pushes each batch immediately instead of accumulating multiple batches in one container - Default batch size increased 51x (5MB -> 256MB) which may affect push frequency Closes: Update calls to stream catalog API handled per-batch with configurable size --- CONFIGURATION.md | 233 ++++++++++++++++++ README.md | 23 ++ UPGRADE_NOTES.md | 171 +++++++++++++ samples/UpdateStreamDocuments.java | 8 + .../pushapiclient/DocumentUploadQueue.java | 70 +++++- .../com/coveo/pushapiclient/PushService.java | 16 +- .../StreamDocumentUploadQueue.java | 117 ++++++++- .../coveo/pushapiclient/StreamService.java | 12 +- .../pushapiclient/UpdateStreamService.java | 48 ++-- .../UpdateStreamServiceInternal.java | 53 ++-- .../UpdateStreamServiceInternalTest.java | 59 ++--- 11 files changed, 716 insertions(+), 94 deletions(-) create mode 100644 CONFIGURATION.md create mode 100644 UPGRADE_NOTES.md diff --git a/CONFIGURATION.md b/CONFIGURATION.md new file mode 100644 index 00000000..b8453c97 --- /dev/null +++ b/CONFIGURATION.md @@ -0,0 +1,233 @@ +# Configuration Guide + +This document describes the available configuration options for the Coveo Push API Java Client. + +## Batch Size Configuration + +The batch size controls how much data is accumulated before creating a file container and pushing to Coveo. The default is **256 MB** (matching the Stream API limit). + +### Configuration Methods + +There are two ways to configure the batch size: + +#### 1. System Property (Runtime Configuration) + +Set the `coveo.push.batchSize` system property to configure the default batch size globally for all service instances: + +**Java Command Line:** +```bash +java -Dcoveo.push.batchSize=134217728 -jar your-application.jar +``` + +**Within Java Code:** +```java +// Set before creating any service instances +System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB in bytes +``` + +**Maven/Gradle Build:** +```xml + + + -Dcoveo.push.batchSize=134217728 + +``` + +```groovy +// build.gradle +test { + systemProperty 'coveo.push.batchSize', '134217728' +} +``` + +**Example Values:** +- `268435456` = 256 MB (default) +- `134217728` = 128 MB +- `67108864` = 64 MB +- `33554432` = 32 MB +- `10485760` = 10 MB + +#### 2. Constructor Parameter (Per-Instance Configuration) + +Pass the `maxQueueSize` parameter when creating service instances: + +```java +// UpdateStreamService with custom 128 MB batch size +UpdateStreamService service = new UpdateStreamService( + platformClient, + sourceId, + 128 * 1024 * 1024 // 128 MB in bytes +); + +// PushService with custom batch size +PushService pushService = new PushService( + platformClient, + sourceId, + 128 * 1024 * 1024 // 128 MB +); + +// StreamService with custom batch size +StreamService streamService = new StreamService( + platformClient, + sourceId, + 128 * 1024 * 1024 // 128 MB +); +``` + +### Configuration Priority + +When both methods are used: + +1. **Constructor parameter** takes precedence (if specified) +2. **System property** is used as default (if set) +3. **Built-in default** of 256 MB is used otherwise + +### Validation Rules + +All batch size values are validated: + +- ✅ **Maximum:** 256 MB (268,435,456 bytes) - API limit +- ✅ **Minimum:** Greater than 0 +- ❌ Values exceeding 256 MB will throw `IllegalArgumentException` +- ❌ Invalid or negative values will throw `IllegalArgumentException` + +### Examples + +#### Example 1: Using System Property + +```java +// Configure globally via system property +System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB + +// All services will use 128 MB by default +UpdateStreamService updateService = new UpdateStreamService(platformClient, sourceId); +PushService pushService = new PushService(platformClient, sourceId); +StreamService streamService = new StreamService(platformClient, sourceId); +``` + +#### Example 2: Override Per Service + +```java +// Set global default to 128 MB +System.setProperty("coveo.push.batchSize", "134217728"); + +// Update service uses global default (128 MB) +UpdateStreamService updateService = new UpdateStreamService(platformClient, sourceId); + +// Push service overrides with 64 MB +PushService pushService = new PushService(platformClient, sourceId, 64 * 1024 * 1024); + +// Stream service uses global default (128 MB) +StreamService streamService = new StreamService(platformClient, sourceId); +``` + +#### Example 3: Docker/Container Environment + +```yaml +# docker-compose.yml +services: + app: + image: your-app + environment: + - JAVA_OPTS=-Dcoveo.push.batchSize=134217728 +``` + +#### Example 4: Kubernetes + +```yaml +# deployment.yaml +apiVersion: v1 +kind: Pod +metadata: + name: coveo-pusher +spec: + containers: + - name: app + image: your-app + env: + - name: JAVA_OPTS + value: "-Dcoveo.push.batchSize=134217728" +``` + +### When to Adjust Batch Size + +**Use smaller batches (32-64 MB) when:** +- Network bandwidth is limited +- Memory is constrained +- Processing many small documents +- You want more frequent progress updates + +**Use larger batches (128-256 MB) when:** +- Network bandwidth is high +- Processing large documents or files +- You want to minimize API calls +- Maximum throughput is needed + +**Keep default (256 MB) when:** +- You're unsure - it's optimized for most scenarios +- Processing mixed document sizes +- You want to maximize batch efficiency + +### Configuration Property Reference + +| Property Name | Description | Default Value | Valid Range | +|--------------|-------------|---------------|-------------| +| `coveo.push.batchSize` | Default batch size in bytes | `268435456` (256 MB) | 1 to 268435456 | + +### Troubleshooting + +**Error: "exceeds the Stream API limit of 268435456 bytes"** +- Your configured value is too large +- Maximum allowed is 256 MB (268,435,456 bytes) +- Reduce the configured value + +**Error: "Invalid value for system property"** +- The value is not a valid integer +- Use numeric bytes value (e.g., `134217728` not `128MB`) + +**Service uses default despite system property:** +- Ensure property is set before creating service instances +- Verify property name is exactly `coveo.push.batchSize` +- Check that constructor isn't explicitly passing a value + +### Migration from Previous Versions + +Previous versions used a hardcoded 5 MB limit. If you're upgrading: + +**Option 1: Keep 5 MB behavior (not recommended)** +```java +System.setProperty("coveo.push.batchSize", "5242880"); // 5 MB +``` + +**Option 2: Use new 256 MB default (recommended)** +```java +// No configuration needed - automatic +UpdateStreamService service = new UpdateStreamService(platformClient, sourceId); +``` + +**Option 3: Choose custom size** +```java +System.setProperty("coveo.push.batchSize", "67108864"); // 64 MB +``` + +See [UPGRADE_NOTES.md](UPGRADE_NOTES.md) for complete migration guidance. + +## Additional Configuration + +### API Client Configuration + +See the main [README.md](README.md) for: +- Platform client setup +- Authentication configuration +- API endpoint URLs +- Retry and backoff options + +### Environment Variables + +The following environment variables can be used for general configuration: + +- `COVEO_API_KEY` - API key for authentication +- `COVEO_ORGANIZATION_ID` - Organization identifier +- `COVEO_PLATFORM_URL` - Custom platform URL (if needed) + +Refer to the Coveo Platform documentation for complete environment configuration options. diff --git a/README.md b/README.md index 5f9cad34..8c885f66 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,29 @@ public class PushOneDocument { } ``` +## Configuration + +### Batch Size Configuration + +The SDK uses a default batch size of **256 MB** (matching the Coveo Stream API limit) before automatically creating a file container and pushing documents. You can configure this globally via system property or per-service via constructor. + +**Global Configuration (System Property):** +```bash +java -Dcoveo.push.batchSize=134217728 -jar your-app.jar # 128 MB +``` + +**Per-Service Configuration:** +```java +// Configure UpdateStreamService with 128 MB batch size +UpdateStreamService service = new UpdateStreamService( + catalogSource, + backoffOptions, + 128 * 1024 * 1024 // 128 MB in bytes +); +``` + +See **[CONFIGURATION.md](CONFIGURATION.md)** for complete configuration options, Docker/Kubernetes examples, and best practices. + ### Exponential Backoff Retry Configuration By default, the SDK leverages an exponential backoff retry mechanism. Exponential backoff allows for the SDK to make multiple attempts to resolve throttled requests, increasing the amount of time to wait for each subsequent attempt. Outgoing requests will retry when a `429` status code is returned from the platform. diff --git a/UPGRADE_NOTES.md b/UPGRADE_NOTES.md new file mode 100644 index 00000000..a4e8c9bb --- /dev/null +++ b/UPGRADE_NOTES.md @@ -0,0 +1,171 @@ +# Stream Catalog API Update - Upgrade Notes + +## Summary + +The `UpdateStreamService` has been upgraded to align with the Coveo Catalog Stream API best practices for update operations. Previously, the service would create one file container and attempt to send multiple batches to it. Now, **each batch gets its own file container**, which is created, uploaded to, and immediately pushed to the stream source. + +## What Changed + +### Key Behavioral Changes + +1. **File Container Lifecycle**: Each flush operation now follows the complete workflow: + - Step 1: Create a new file container + - Step 2: Upload batch content to the container + - Step 3: Push the container to the stream source via `/update` API + - This happens automatically when the 256MB batch limit is exceeded or when `close()` is called + +2. **Immediate Push**: File containers are pushed immediately after upload, not accumulated + +3. **No Shared Containers**: Each batch uses a dedicated file container, preventing conflicts + +### Modified Files + +#### Core Implementation +- **`UpdateStreamServiceInternal.java`** + - Removed file container creation from `add*()` methods + - Added `createUploadAndPush()` method that handles the complete create→upload→push workflow + - Modified `close()` to call `flushAndPush()` instead of `flush()` + +- **`StreamDocumentUploadQueue.java`** + - Added `flushAndPush()` method that creates a container, uploads, and pushes in one operation + - Overrode all `add()` methods to call `flushAndPush()` when batch size is exceeded + - Added reference to `UpdateStreamServiceInternal` to enable the complete workflow + +- **`UpdateStreamService.java`** + - Removed the `fileContainer` field (no longer needed) + - Updated documentation to reflect new behavior + - Removed unused `getUploadStrategy()` method + +#### Tests +- **`UpdateStreamServiceInternalTest.java`** + - Updated tests to verify file containers are created during flush, not during add + - Added test for `createUploadAndPush()` method + - Modified close tests to verify `flushAndPush()` is called + +#### Samples +- **`UpdateStreamDocuments.java`** + - Added documentation explaining the new file container behavior + +## API Documentation Reference + +This change implements the recommendations from the Coveo documentation: +- [Full Catalog Data Updates - Update Operations](https://docs.coveo.com/en/p4eb0129/coveo-for-commerce/full-catalog-data-updates#update-operations) + +Key quote from the documentation: +> "Load operations require uploading and processing all file containers at once, which is resource-intensive and delays data availability until the entire load completes. In contrast, **update operations process each container as soon as it's ready**, allowing for faster indexing and more up-to-date catalog data throughout the update process." + +## Migration Guide + +### For Existing Users + +The API remains the same - no code changes are required for basic usage: + +```java +// UpdateStreamService - catalog updates with immediate push per batch +UpdateStreamService updateService = new UpdateStreamService(catalogSource); +updateService.addOrUpdate(document1); +updateService.close(); + +// PushService - push operations with immediate push per batch +PushService pushService = new PushService(pushSource); +pushService.addOrUpdate(document1); +pushService.close(); + +// StreamService - load operations (collects batches before pushing) +StreamService streamService = new StreamService(streamSource); +streamService.open(); +streamService.addOrUpdate(document1); +streamService.close(); +``` + +### Configuring Batch Size + +For detailed configuration options including runtime system property configuration (`coveo.push.batchSize`), see **[CONFIGURATION.md](CONFIGURATION.md)**. + +If you need a smaller batch size (e.g., for more frequent pushes), you can specify it via constructor: + +```java +// Set custom batch size (e.g., 50MB) +int customBatchSize = 50 * 1024 * 1024; + +// UpdateStreamService +UpdateStreamService service = new UpdateStreamService( + catalogSource, + backoffOptions, + userAgents, + customBatchSize +); + +// PushService +PushService pushService = new PushService( + pushSource, + backoffOptions, + customBatchSize +); + +// StreamService +StreamService streamService = new StreamService( + streamSource, + backoffOptions, + userAgents, + customBatchSize +); +``` + +**Note**: Batch size cannot exceed 256MB (Stream API limit). Attempting to set a larger value will throw an `IllegalArgumentException`. + +### What You'll Notice + +1. **Larger Default Batch Size**: The default batch size has increased from 5MB to 256MB (matching the Stream API limit) for all services: + - `UpdateStreamService` (catalog updates) + - `PushService` (push operations) + - `StreamService` (load operations) + +2. **Configurable Batch Size**: All services now support custom batch sizes via constructor parameter (max: 256MB) + +3. **More API Calls for UpdateStreamService**: You may see more file container create and push operations in your logs, as each batch (when exceeding 256MB) now triggers a complete create→upload→push cycle + +4. **Faster Indexing**: Documents will be available for indexing sooner, as they're pushed immediately rather than waiting for all batches to complete + +5. **Better Alignment**: The behavior now matches the catalog stream API best practices for optimal performance + +## Benefits + +1. **Improved Performance**: Documents are processed as soon as each container is ready, rather than waiting for all uploads to complete + +2. **Better Resource Utilization**: Avoids resource-intensive bulk operations + +3. **Faster Data Availability**: Catalog data is indexed incrementally throughout the update process + +4. **API Compliance**: Follows the recommended pattern from Coveo's official documentation + +## Technical Details + +### Before (Old Behavior) +``` +Create file container once +→ Add batch 1 to container +→ Add batch 2 to container +→ Add batch 3 to container +→ Push container once with all batches +``` + +### After (New Behavior) +``` +Batch 1: + → Create file container + → Upload batch 1 + → Push container + +Batch 2: + → Create file container + → Upload batch 2 + → Push container + +Batch 3: + → Create file container + → Upload batch 3 + → Push container +``` + +Each batch is independent and processed immediately, following the update operation pattern described in the Coveo documentation. diff --git a/samples/UpdateStreamDocuments.java b/samples/UpdateStreamDocuments.java index 2c9dae61..9b642ee0 100644 --- a/samples/UpdateStreamDocuments.java +++ b/samples/UpdateStreamDocuments.java @@ -14,6 +14,14 @@ public static void main(String[] args) throws IOException, InterruptedException, // Using the Update Stream Service will act as an incremental change to the index, therefore any currently indexed items not contained in the payload will remain. UpdateStreamService updateStream = new UpdateStreamService(catalogSource); // To perform full index rebuild, use the StreamService instead. + + // Note: The UpdateStreamService now handles file containers differently for catalog sources. + // Each batch (when the 256MB limit is exceeded or close() is called) will: + // 1. Create a new file container + // 2. Upload the batch content to that container + // 3. Immediately push the container to the stream source via the /update API + // This follows the catalog stream API best practices where each update operation uses its own file container. + // You can configure a smaller batch size if needed by using the constructor with maxQueueSize parameter. DocumentBuilder document1 = new DocumentBuilder("https://my.document.uri", "My document title") .withData("these words will be searchable") diff --git a/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java index d3cc8480..2b8b63c1 100644 --- a/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java @@ -8,21 +8,87 @@ /** Represents a queue for uploading documents using a specified upload strategy */ class DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(DocumentUploadQueue.class); + + /** Maximum allowed queue size based on Stream API limit (256 MB) */ + protected static final int MAX_ALLOWED_QUEUE_SIZE = 256 * 1024 * 1024; + + /** Default queue size (256 MB to match API limit) */ + protected static final int DEFAULT_QUEUE_SIZE = 256 * 1024 * 1024; + + /** System property name for configuring the default batch size */ + public static final String BATCH_SIZE_PROPERTY = "coveo.push.batchSize"; + protected final UploadStrategy uploader; - protected final int maxQueueSize = 5 * 1024 * 1024; + protected final int maxQueueSize; protected ArrayList documentToAddList; protected ArrayList documentToDeleteList; protected int size; + + /** + * Gets the configured batch size from system properties, or returns the default if not set. + * + * @return The configured batch size in bytes + * @throws IllegalArgumentException if the configured value exceeds 256MB or is invalid + */ + public static int getConfiguredBatchSize() { + String propertyValue = System.getProperty(BATCH_SIZE_PROPERTY); + if (propertyValue == null || propertyValue.trim().isEmpty()) { + return DEFAULT_QUEUE_SIZE; + } + + try { + int configuredSize = Integer.parseInt(propertyValue.trim()); + if (configuredSize > MAX_ALLOWED_QUEUE_SIZE) { + throw new IllegalArgumentException( + String.format("System property %s (%d bytes) exceeds the Stream API limit of %d bytes (256 MB)", + BATCH_SIZE_PROPERTY, configuredSize, MAX_ALLOWED_QUEUE_SIZE)); + } + if (configuredSize <= 0) { + throw new IllegalArgumentException( + String.format("System property %s must be greater than 0, got: %d", + BATCH_SIZE_PROPERTY, configuredSize)); + } + logger.info(String.format("Using configured batch size from system property %s: %d bytes (%.2f MB)", + BATCH_SIZE_PROPERTY, configuredSize, configuredSize / (1024.0 * 1024.0))); + return configuredSize; + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format("Invalid value for system property %s: '%s'. Must be a valid integer.", + BATCH_SIZE_PROPERTY, propertyValue), e); + } + } /** - * Constructs a new DocumentUploadQueue object with a default maximum queue size limit of 5MB. + * Constructs a new DocumentUploadQueue object with the default maximum queue size limit of 256MB, + * or the value configured via system property "coveo.push.batchSize" if set. * * @param uploader The upload strategy to be used for document uploads. + * @throws IllegalArgumentException if the system property value exceeds 256MB or is invalid. */ public DocumentUploadQueue(UploadStrategy uploader) { + this(uploader, getConfiguredBatchSize()); + } + + /** + * Constructs a new DocumentUploadQueue object with a configurable maximum queue size limit. + * + * @param uploader The upload strategy to be used for document uploads. + * @param maxQueueSize The maximum queue size in bytes. Must not exceed 256MB (Stream API limit). + * @throws IllegalArgumentException if maxQueueSize exceeds the API limit of 256MB. + */ + public DocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { + if (maxQueueSize > MAX_ALLOWED_QUEUE_SIZE) { + throw new IllegalArgumentException( + String.format("maxQueueSize (%d bytes) exceeds the Stream API limit of %d bytes (256 MB)", + maxQueueSize, MAX_ALLOWED_QUEUE_SIZE)); + } + if (maxQueueSize <= 0) { + throw new IllegalArgumentException("maxQueueSize must be greater than 0"); + } this.documentToAddList = new ArrayList<>(); this.documentToDeleteList = new ArrayList<>(); this.uploader = uploader; + this.maxQueueSize = maxQueueSize; } /** diff --git a/src/main/java/com/coveo/pushapiclient/PushService.java b/src/main/java/com/coveo/pushapiclient/PushService.java index a7ba3665..8b7f2a7b 100644 --- a/src/main/java/com/coveo/pushapiclient/PushService.java +++ b/src/main/java/com/coveo/pushapiclient/PushService.java @@ -10,15 +10,27 @@ public class PushService { private PushServiceInternal service; public PushService(PushEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this(source, new BackoffOptionsBuilder().build(), DocumentUploadQueue.DEFAULT_QUEUE_SIZE); } public PushService(PushEnabledSource source, BackoffOptions options) { + this(source, options, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); + } + + /** + * Creates a new PushService with configurable batch size. + * + * @param source The source to push documents to. + * @param options The configuration options for exponential backoff. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 256MB, max: 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB. + */ + public PushService(PushEnabledSource source, BackoffOptions options, int maxQueueSize) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); UploadStrategy uploader = this.getUploadStrategy(); - DocumentUploadQueue queue = new DocumentUploadQueue(uploader); + DocumentUploadQueue queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); this.service = new PushServiceInternal(queue); diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index c4e6ecd9..91f3c19c 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -1,6 +1,7 @@ package com.coveo.pushapiclient; import java.io.IOException; +import java.net.http.HttpResponse; import java.util.ArrayList; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -9,14 +10,40 @@ public class StreamDocumentUploadQueue extends DocumentUploadQueue { private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class); protected ArrayList documentToPartiallyUpdateList; + private UpdateStreamServiceInternal updateStreamService; public StreamDocumentUploadQueue(UploadStrategy uploader) { super(uploader); this.documentToPartiallyUpdateList = new ArrayList<>(); } + + /** + * Constructs a new StreamDocumentUploadQueue object with a configurable maximum queue size limit. + * + * @param uploader The upload strategy to be used for document uploads. + * @param maxQueueSize The maximum queue size in bytes. Must not exceed 256MB (Stream API limit). + * @throws IllegalArgumentException if maxQueueSize exceeds the API limit of 256MB. + */ + public StreamDocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) { + super(uploader, maxQueueSize); + this.documentToPartiallyUpdateList = new ArrayList<>(); + } + + /** + * Sets the UpdateStreamServiceInternal reference for handling complete upload workflow. + * This is needed to support the new pattern where each flush creates its own file container. + * + * @param updateStreamService The service that handles create/upload/push operations + */ + public void setUpdateStreamService(UpdateStreamServiceInternal updateStreamService) { + this.updateStreamService = updateStreamService; + } /** * Flushes the accumulated documents by applying the upload strategy. + * + * Note: This method is deprecated for catalog stream updates. Use flushAndPush() instead, + * which properly handles the create-upload-push workflow for each file container. * * @throws IOException If an I/O error occurs during the upload. * @throws InterruptedException If the upload process is interrupted. @@ -37,10 +64,44 @@ public void flush() throws IOException, InterruptedException { this.documentToDeleteList.clear(); this.documentToPartiallyUpdateList.clear(); } + + /** + * Flushes the accumulated documents and pushes them to the stream source. + * This method implements the proper workflow for catalog stream API updates: + * 1. Create a new file container + * 2. Upload content to the container + * 3. Push the container to the stream source + * + * Each flush operation gets its own file container, as required by the catalog stream API. + * + * @return The HTTP response from the push operation + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + public HttpResponse flushAndPush() throws IOException, InterruptedException { + if (this.isEmpty()) { + logger.debug("Empty batch. Skipping upload"); + return null; + } + + StreamUpdate stream = this.getStream(); + logger.info("Creating file container, uploading, and pushing stream batch"); + + // Use the new createUploadAndPush method that handles the complete workflow + HttpResponse response = this.updateStreamService.createUploadAndPush(stream); + + this.size = 0; + this.documentToAddList.clear(); + this.documentToDeleteList.clear(); + this.documentToPartiallyUpdateList.clear(); + + return response; + } /** * Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds - * the maximum content length. See {@link PartialUpdateDocument#flush}. + * the maximum content length. Each flush creates a new file container, uploads to it, and pushes + * it to the stream source. * * @param document The document to be deleted from the index. * @throws IOException If an I/O error occurs during the upload. @@ -53,7 +114,7 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length; if (this.size + sizeOfDoc >= this.maxQueueSize) { - this.flush(); + this.flushAndPush(); } documentToPartiallyUpdateList.add(document); if (logger.isDebugEnabled()) { @@ -61,6 +122,58 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE } this.size += sizeOfDoc; } + + /** + * Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if it exceeds the + * maximum content length. Each flush creates a new file container, uploads to it, and pushes + * it to the stream source. + * + * @param document The document to be added to the index. + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + @Override + public void add(DocumentBuilder document) throws IOException, InterruptedException { + if (document == null) { + return; + } + + final int sizeOfDoc = document.marshal().getBytes().length; + if (this.size + sizeOfDoc >= this.maxQueueSize) { + this.flushAndPush(); + } + documentToAddList.add(document); + if (logger.isDebugEnabled()) { + logger.debug("Adding document to batch: " + document.getDocument().uri); + } + this.size += sizeOfDoc; + } + + /** + * Adds the {@link DeleteDocument} to the upload queue and flushes the queue if it exceeds the + * maximum content length. Each flush creates a new file container, uploads to it, and pushes + * it to the stream source. + * + * @param document The document to be deleted from the index. + * @throws IOException If an I/O error occurs during the upload. + * @throws InterruptedException If the upload process is interrupted. + */ + @Override + public void add(DeleteDocument document) throws IOException, InterruptedException { + if (document == null) { + return; + } + + final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length; + if (this.size + sizeOfDoc >= this.maxQueueSize) { + this.flushAndPush(); + } + documentToDeleteList.add(document); + if (logger.isDebugEnabled()) { + logger.debug("Adding document to batch: " + document.documentId); + } + this.size += sizeOfDoc; + } public StreamUpdate getStream() { return new StreamUpdate( diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index eda7bcb3..ebada08f 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -27,7 +27,7 @@ public class StreamService { * @param userAgents The user agent to use for the requests. */ public StreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents); + this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); } /** @@ -42,7 +42,7 @@ public StreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public StreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); } /** @@ -58,7 +58,7 @@ public StreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public StreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null); + this(source, options, null, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); } /** @@ -73,8 +73,10 @@ public StreamService(StreamEnabledSource source, BackoffOptions options) { * @param source The source to which you want to send your documents. * @param options The configuration options for exponential backoff. * @param userAgents The user agent to use for the requests. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 256MB, max: 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB. */ - public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) { String apiKey = source.getApiKey(); String organizationId = source.getOrganizationId(); PlatformUrl platformUrl = source.getPlatformUrl(); @@ -82,7 +84,7 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[ Logger logger = LogManager.getLogger(StreamService.class); this.source = source; - this.queue = new DocumentUploadQueue(uploader); + this.queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); platformClient.setUserAgents(userAgents); this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger); diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 948c3461..14737477 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -12,8 +12,6 @@ public class UpdateStreamService { private final PlatformClient platformClient; private final UpdateStreamServiceInternal updateStreamServiceInternal; - private FileContainer fileContainer; - /** * Creates a service to stream your documents to the provided source by interacting with the * Stream API. This provides the ability to incrementally add, update, or delete documents via a @@ -26,7 +24,7 @@ public class UpdateStreamService { * @param userAgents The user agent to use for the requests. */ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents); + this(source, new BackoffOptionsBuilder().build(), userAgents, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); } /** @@ -40,7 +38,7 @@ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public UpdateStreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build()); + this(source, new BackoffOptionsBuilder().build(), null, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); } /** @@ -55,7 +53,7 @@ public UpdateStreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null); + this(source, options, null, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); } /** @@ -69,9 +67,11 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { * @param source The source to which you want to send your documents. * @param options The configuration options for exponential backoff. * @param userAgents The user agent to use for the requests. + * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 256MB, max: 256MB). + * @throws IllegalArgumentException if maxQueueSize exceeds 256MB. */ public UpdateStreamService( - StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) { Logger logger = LogManager.getLogger(UpdateStreamService.class); this.platformClient = new PlatformClient( @@ -80,7 +80,7 @@ public UpdateStreamService( this.updateStreamServiceInternal = new UpdateStreamServiceInternal( source, - new StreamDocumentUploadQueue(this.getUploadStrategy()), + new StreamDocumentUploadQueue(null, maxQueueSize), // UploadStrategy no longer needed this.platformClient, logger); } @@ -91,9 +91,10 @@ public UpdateStreamService( * documents into it. * *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + * file containers whenever the data payload exceeds the batch size limit (default: 256MB, configurable via constructor). + * Each batch is sent to its own file container and immediately pushed to the stream + * source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -118,7 +119,7 @@ public UpdateStreamService( * @throws IOException If the creation of the file container or adding the document fails. */ public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.addOrUpdate(document); + updateStreamServiceInternal.addOrUpdate(document); } /** @@ -130,9 +131,10 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte * Partial item updates section. * *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + * file containers whenever the data payload exceeds the batch size limit (default: 256MB, configurable via constructor). + * Each batch is sent to its own file container and immediately pushed to the stream + * source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -158,7 +160,7 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte */ public void addPartialUpdate(PartialUpdateDocument document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.addPartialUpdate(document); + updateStreamServiceInternal.addPartialUpdate(document); } /** @@ -167,9 +169,10 @@ public void addPartialUpdate(PartialUpdateDocument document) * it. * *

If called several times, the service will automatically batch documents and create new - * stream chunks whenever the data payload exceeds the batch size limit set for the - * Stream API. + * file containers whenever the data payload exceeds the batch size limit (default: 256MB, configurable via constructor). + * Each batch is sent to its own file container and immediately pushed to the stream + * source, following the + * catalog stream API best practices. * *

Once there are no more documents to add, it is important to call the {@link * UpdateStreamService#close} function in order to send any buffered documents and push the file @@ -194,7 +197,7 @@ public void addPartialUpdate(PartialUpdateDocument document) * @throws IOException If the creation of the file container or adding the document fails. */ public void delete(DeleteDocument document) throws IOException, InterruptedException { - fileContainer = updateStreamServiceInternal.delete(document); + updateStreamServiceInternal.delete(document); } /** @@ -214,11 +217,4 @@ public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { return updateStreamServiceInternal.close(); } - - private UploadStrategy getUploadStrategy() { - return (streamUpdate) -> { - String batchUpdateJson = new Gson().toJson(streamUpdate.marshal()); - return this.platformClient.uploadContentToFileContainer(fileContainer, batchUpdateJson); - }; - } } diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java index 32f7ddc9..8a646099 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java @@ -23,54 +23,59 @@ public UpdateStreamServiceInternal( this.queue = queue; this.platformClient = platformClient; this.logger = logger; + // Set this instance on the queue so it can call createUploadAndPush + queue.setUpdateStreamService(this); } public FileContainer addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); return this.fileContainer; } public FileContainer addPartialUpdate(PartialUpdateDocument document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); return this.fileContainer; } public FileContainer delete(DeleteDocument document) throws IOException, InterruptedException { - if (this.fileContainer == null) { - this.fileContainer = this.createFileContainer(); - } queue.add(document); return this.fileContainer; } public HttpResponse close() throws IOException, InterruptedException, NoOpenFileContainerException { - return this.pushFileContainer(this.getSourceId()); + HttpResponse lastResponse = null; + if (!queue.isEmpty()) { + lastResponse = queue.flushAndPush(); + } + return lastResponse; } - private FileContainer createFileContainer() throws IOException, InterruptedException { + /** + * Creates a new file container, uploads the content, and pushes it to the stream source. + * This method is called by the queue's flush operation to ensure each batch gets its own container. + * + * @param streamUpdate The batch of documents to upload + * @return The HTTP response from pushing the file container + * @throws IOException If an I/O error occurs + * @throws InterruptedException If the operation is interrupted + */ + public HttpResponse createUploadAndPush(StreamUpdate streamUpdate) + throws IOException, InterruptedException { + // Step 1: Create a new file container this.logger.info("Creating new file container"); - HttpResponse response = this.platformClient.createFileContainer(); - return new Gson().fromJson(response.body(), FileContainer.class); - } - - private HttpResponse pushFileContainer(String sourceId) - throws NoOpenFileContainerException, IOException, InterruptedException { - if (this.fileContainer == null) { - throw new NoOpenFileContainerException( - "No open file container detected. A new container will automatically be created once you start adding, updating or deleting documents."); - } - queue.flush(); - this.logger.info("Pushing to file container " + this.fileContainer.fileId); - return this.platformClient.pushFileContainerContentToStreamSource(sourceId, this.fileContainer); + HttpResponse createResponse = this.platformClient.createFileContainer(); + FileContainer container = new Gson().fromJson(createResponse.body(), FileContainer.class); + + // Step 2: Upload content to the file container + String batchUpdateJson = new Gson().toJson(streamUpdate.marshal()); + this.platformClient.uploadContentToFileContainer(container, batchUpdateJson); + + // Step 3: Push the file container to the stream source + this.logger.info("Pushing file container " + container.fileId + " to stream source"); + return this.platformClient.pushFileContainerContentToStreamSource(this.getSourceId(), container); } private String getSourceId() { diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 726769b8..005a3fb4 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -70,11 +70,12 @@ public void closeService() throws Exception { } @Test - public void addOrUpdateShouldCreateFileContainer() throws IOException, InterruptedException { + public void addOrUpdateShouldNotCreateFileContainer() throws IOException, InterruptedException { service.addOrUpdate(documentA); service.addOrUpdate(documentB); - verify(this.platformClient, times(1)).createFileContainer(); + // File containers are now created during flush, not during add + verify(this.platformClient, times(0)).createFileContainer(); } @Test @@ -94,62 +95,54 @@ public void addOrUpdateAndPartialAndDeleteShouldAddDocumentsToQueue() } @Test - public void deleteShouldCreateFileContainer() throws IOException, InterruptedException { + public void deleteShouldNotCreateFileContainer() throws IOException, InterruptedException { service.delete(deleteDocumentA); service.delete(deleteDocumentB); - verify(this.platformClient, times(1)).createFileContainer(); + // File containers are now created during flush, not during add + verify(this.platformClient, times(0)).createFileContainer(); } @Test - public void partialUpdateShouldCreateFileContainer() throws IOException, InterruptedException { + public void partialUpdateShouldNotCreateFileContainer() throws IOException, InterruptedException { service.addPartialUpdate(partialUpdateDocumentA); service.addPartialUpdate(partialUpdateDocumentB); - verify(this.platformClient, times(1)).createFileContainer(); + // File containers are now created during flush, not during add + verify(this.platformClient, times(0)).createFileContainer(); } @Test - public void closeShouldPushFileContainerOnAddOrUpdate() + public void closeShouldCallFlushAndPush() throws IOException, InterruptedException, NoOpenFileContainerException { + when(queue.isEmpty()).thenReturn(false); + when(queue.flushAndPush()).thenReturn(httpResponse); + service.addOrUpdate(documentA); service.close(); - verify(platformClient, times(1)) - .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + verify(queue, times(1)).flushAndPush(); } @Test - public void closeShouldPushFileContainerOnDelete() + public void closeShouldNotCallFlushAndPushWhenQueueIsEmpty() throws IOException, InterruptedException, NoOpenFileContainerException { - service.delete(deleteDocumentA); - service.close(); - - verify(platformClient, times(1)) - .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); - } - - @Test - public void closeShouldFlushBufferedDocuments() - throws IOException, InterruptedException, NoOpenFileContainerException { - service.addOrUpdate(documentA); + when(queue.isEmpty()).thenReturn(true); + service.close(); - verify(queue, times(1)).flush(); + verify(queue, times(0)).flushAndPush(); } @Test - public void shouldLogInfoOnCreateFileContainer() - throws IOException, InterruptedException, NoOpenFileContainerException { - service.addOrUpdate(documentA); - verify(logger, times(1)).info("Creating new file container"); - service.close(); - verify(logger, times(1)).info("Pushing to file container file-id"); - } + public void createUploadAndPushShouldCreateContainerUploadAndPush() + throws IOException, InterruptedException { + StreamUpdate streamUpdate = new StreamUpdate(null, null, null); + + service.createUploadAndPush(streamUpdate); - @Test(expected = NoOpenFileContainerException.class) - public void shouldThrowExceptionOnCloseIfNoOpenFileContainer() - throws IOException, InterruptedException, NoOpenFileContainerException { - service.close(); + verify(platformClient, times(1)).createFileContainer(); + verify(platformClient, times(1)).uploadContentToFileContainer(any(FileContainer.class), any(String.class)); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); } } From 8280d47062fe468a3ffbc76e4c708931a5d8413e Mon Sep 17 00:00:00 2001 From: achouan-coveo Date: Tue, 20 Jan 2026 19:44:05 -0500 Subject: [PATCH 2/6] Update docs and add UpdateStreamService constructor Updated CONFIGURATION.md and README.md to reflect new constructor signatures and usage examples for UpdateStreamService, PushService, and StreamService. Added a new constructor to UpdateStreamService that accepts userAgents as an optional parameter for enhanced flexibility. --- CONFIGURATION.md | 28 ++++++++++--------- README.md | 3 +- .../pushapiclient/UpdateStreamService.java | 18 ++++++++++++ 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index b8453c97..cbe0a0d4 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -54,22 +54,24 @@ Pass the `maxQueueSize` parameter when creating service instances: ```java // UpdateStreamService with custom 128 MB batch size UpdateStreamService service = new UpdateStreamService( - platformClient, - sourceId, + catalogSource, + backoffOptions, + null, // userAgents (optional) 128 * 1024 * 1024 // 128 MB in bytes ); // PushService with custom batch size PushService pushService = new PushService( - platformClient, - sourceId, + pushEnabledSource, + backoffOptions, 128 * 1024 * 1024 // 128 MB ); // StreamService with custom batch size StreamService streamService = new StreamService( - platformClient, - sourceId, + streamEnabledSource, + backoffOptions, + null, // userAgents (optional) 128 * 1024 * 1024 // 128 MB ); ``` @@ -100,9 +102,9 @@ All batch size values are validated: System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB // All services will use 128 MB by default -UpdateStreamService updateService = new UpdateStreamService(platformClient, sourceId); -PushService pushService = new PushService(platformClient, sourceId); -StreamService streamService = new StreamService(platformClient, sourceId); +UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions); +PushService pushService = new PushService(pushEnabledSource, backoffOptions); +StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); ``` #### Example 2: Override Per Service @@ -112,13 +114,13 @@ StreamService streamService = new StreamService(platformClient, sourceId); System.setProperty("coveo.push.batchSize", "134217728"); // Update service uses global default (128 MB) -UpdateStreamService updateService = new UpdateStreamService(platformClient, sourceId); +UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions); // Push service overrides with 64 MB -PushService pushService = new PushService(platformClient, sourceId, 64 * 1024 * 1024); +PushService pushService = new PushService(pushEnabledSource, backoffOptions, 64 * 1024 * 1024); // Stream service uses global default (128 MB) -StreamService streamService = new StreamService(platformClient, sourceId); +StreamService streamService = new StreamService(streamEnabledSource, backoffOptions); ``` #### Example 3: Docker/Container Environment @@ -202,7 +204,7 @@ System.setProperty("coveo.push.batchSize", "5242880"); // 5 MB **Option 2: Use new 256 MB default (recommended)** ```java // No configuration needed - automatic -UpdateStreamService service = new UpdateStreamService(platformClient, sourceId); +UpdateStreamService service = new UpdateStreamService(catalogSource, backoffOptions); ``` **Option 3: Choose custom size** diff --git a/README.md b/README.md index 8c885f66..08e2db44 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,8 @@ java -Dcoveo.push.batchSize=134217728 -jar your-app.jar # 128 MB // Configure UpdateStreamService with 128 MB batch size UpdateStreamService service = new UpdateStreamService( catalogSource, - backoffOptions, + backoffOptions, + null, // userAgents (optional) 128 * 1024 * 1024 // 128 MB in bytes ); ``` diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 14737477..c1df5602 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -56,6 +56,24 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { this(source, options, null, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); } + /** + * Creates a service to stream your documents to the provided source by interacting with the + * Stream API. This provides the ability to incrementally add, update, or delete documents via a + * stream. + * + *

To perform a full source rebuild, use the + * {@link StreamService}. + * + * @param source The source to push to + * @param options The backoff parameters + * @param userAgents The user-agents to append to the "User-Agent" HTTP header when performing + * requests against the Coveo Platform. + */ + public UpdateStreamService( + StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + this(source, options, userAgents, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); + } + /** * Creates a service to stream your documents to the provided source by interacting with the * Stream API. This provides the ability to incrementally add, update, or delete documents via a From 68e46e19249c5d8c03c63b196f1815931015bf62 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Wed, 21 Jan 2026 15:37:07 -0500 Subject: [PATCH 3/6] tests: test new file container rotation logic --- .../DocumentUploadQueueTest.java | 20 +- .../FileContainerRotationIntegrationTest.java | 225 +++++++++++++++ ...StreamDocumentUploadQueueBatchingTest.java | 272 ++++++++++++++++++ .../StreamDocumentUploadQueueTest.java | 60 ++-- .../UpdateStreamServiceInternalTest.java | 98 ++++++- 5 files changed, 616 insertions(+), 59 deletions(-) create mode 100644 src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java create mode 100644 src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java diff --git a/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java index f488f3a5..51a3541e 100644 --- a/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java @@ -12,16 +12,16 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; public class DocumentUploadQueueTest { - @Mock private UploadStrategy uploadStrategy; + private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024; - @InjectMocks private DocumentUploadQueue queue; + @Mock private UploadStrategy uploadStrategy; + private DocumentUploadQueue queue; private AutoCloseable closeable; private DocumentBuilder documentToAdd; private DeleteDocument documentToDelete; @@ -29,20 +29,14 @@ public class DocumentUploadQueueTest { private int oneMegaByte = 1 * 1024 * 1024; private String generateStringFromBytes(int numBytes) { - // Check if the number of bytes is valid if (numBytes <= 0) { return ""; } - - // Create a byte array with the specified length byte[] bytes = new byte[numBytes]; - - // Fill the byte array with a pattern of ASCII characters - byte pattern = 65; // ASCII value for 'A' + byte pattern = 65; for (int i = 0; i < numBytes; i++) { bytes[i] = pattern; } - return new String(bytes); } @@ -53,6 +47,10 @@ private DocumentBuilder generateDocumentFromSize(int numBytes) { @Before public void setup() { + closeable = MockitoAnnotations.openMocks(this); + + queue = new DocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE); + String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte); documentToAdd = @@ -60,8 +58,6 @@ public void setup() { .withData(twoMegaByteData); documentToDelete = new DeleteDocument("https://my.document.uri?ref=3"); - - closeable = MockitoAnnotations.openMocks(this); } @After diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java new file mode 100644 index 00000000..2ebf689d --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java @@ -0,0 +1,225 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +/** + * Integration tests for file container rotation when pushing large amounts of data. These tests + * verify the end-to-end flow from UpdateStreamService through to PlatformClient, using a small + * batch size to trigger rotation without needing large test data. + */ +public class FileContainerRotationIntegrationTest { + + private static final int SMALL_BATCH_SIZE = 1000; + private static final String SOURCE_ID = "test-source-id"; + private static final String ORG_ID = "test-org"; + private static final String API_KEY = "test-api-key"; + + private PlatformClient platformClient; + private StreamEnabledSource source; + private AtomicInteger containerCounter; + + @Before + public void setUp() throws IOException, InterruptedException { + platformClient = mock(PlatformClient.class); + source = mock(StreamEnabledSource.class); + containerCounter = new AtomicInteger(0); + + doReturn(SOURCE_ID).when(source).getId(); + doReturn(ORG_ID).when(source).getOrganizationId(); + doReturn(API_KEY).when(source).getApiKey(); + doReturn(new PlatformUrl(Environment.PRODUCTION, Region.US)).when(source).getPlatformUrl(); + + doAnswer(invocation -> createContainerResponse()).when(platformClient).createFileContainer(); + doReturn(createGenericResponse()).when(platformClient).uploadContentToFileContainer(any(), anyString()); + doReturn(createGenericResponse()).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldCreateMultipleContainersWhenDataExceedsBatchSize() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + service.addOrUpdate(createDocument("doc2", 600)); + service.addOrUpdate(createDocument("doc3", 600)); + service.addOrUpdate(createDocument("doc4", 600)); + service.close(); + + verify(platformClient, times(4)).createFileContainer(); + verify(platformClient, times(4)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldCreateSingleContainerWhenDataFitsInOneBatch() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 100)); + service.addOrUpdate(createDocument("doc2", 100)); + service.close(); + + verify(platformClient, times(1)).createFileContainer(); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldHandleMixedOperationsWithRotation() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 400)); + service.delete(new DeleteDocument("doc2")); + service.addPartialUpdate(createPartialUpdate("doc3", 400)); + service.addOrUpdate(createDocument("doc4", 400)); + service.close(); + + verify(platformClient, times(3)).createFileContainer(); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldUseUniqueContainerIdForEachBatch() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + service.addOrUpdate(createDocument("doc2", 600)); + service.addOrUpdate(createDocument("doc3", 600)); + service.close(); + + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture()); + + assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId); + assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId); + assertEquals("container-3", containerCaptor.getAllValues().get(2).fileId); + } + + @Test + public void shouldPushImmediatelyWhenBatchSizeExceeded() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + service.addOrUpdate(createDocument("doc1", 600)); + verify(platformClient, times(0)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.addOrUpdate(createDocument("doc2", 600)); + verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.addOrUpdate(createDocument("doc3", 600)); + verify(platformClient, times(2)).pushFileContainerContentToStreamSource(anyString(), any()); + + service.close(); + verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldHandleLargeNumberOfDocumentsWithRotation() throws Exception { + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + for (int i = 0; i < 20; i++) { + service.addOrUpdate(createDocument("doc" + i, 200)); + } + service.close(); + + int expectedContainers = 10; + verify(platformClient, times(expectedContainers)).createFileContainer(); + verify(platformClient, times(expectedContainers)).pushFileContainerContentToStreamSource(anyString(), any()); + } + + @Test + public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception { + Map pushCountPerContainer = new HashMap<>(); + List containerCreationOrder = new ArrayList<>(); + + doAnswer(invocation -> { + HttpResponse response = createContainerResponse(); + String fileId = "container-" + containerCounter.get(); + containerCreationOrder.add(fileId); + pushCountPerContainer.put(fileId, 0); + return response; + }).when(platformClient).createFileContainer(); + + doAnswer(invocation -> { + FileContainer container = invocation.getArgument(1); + int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0); + pushCountPerContainer.put(container.fileId, currentCount + 1); + return createGenericResponse(); + }).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any()); + + UpdateStreamServiceInternal service = createServiceWithSmallBatchSize(); + + for (int i = 0; i < 10; i++) { + service.addOrUpdate(createDocument("doc" + i, 400)); + } + service.close(); + + for (Map.Entry entry : pushCountPerContainer.entrySet()) { + assertEquals( + "Container " + entry.getKey() + " should receive exactly 1 push, but received " + entry.getValue(), + Integer.valueOf(1), + entry.getValue()); + } + + assertTrue("Should have created multiple containers", containerCreationOrder.size() > 1); + } + + private UpdateStreamServiceInternal createServiceWithSmallBatchSize() { + StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE); + org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(getClass()); + return new UpdateStreamServiceInternal(source, queue, platformClient, logger); + } + + private DocumentBuilder createDocument(String id, int dataSize) { + return new DocumentBuilder("https://example.com/" + id, "Title " + id) + .withData(generateData(dataSize)); + } + + private PartialUpdateDocument createPartialUpdate(String id, int dataSize) { + return new PartialUpdateDocument( + "https://example.com/" + id, + PartialUpdateOperator.FIELDVALUEREPLACE, + "field", + generateData(dataSize)); + } + + private String generateData(int size) { + byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) { + bytes[i] = 65; + } + return new String(bytes); + } + + @SuppressWarnings("unchecked") + private HttpResponse createContainerResponse() { + HttpResponse response = mock(HttpResponse.class); + int id = containerCounter.incrementAndGet(); + doReturn(String.format( + "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", id, id)) + .when(response) + .body(); + return response; + } + + @SuppressWarnings("unchecked") + private HttpResponse createGenericResponse() { + HttpResponse response = mock(HttpResponse.class); + doReturn("{\"status\": \"ok\"}").when(response).body(); + return response; + } +} diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java new file mode 100644 index 00000000..3e278d6c --- /dev/null +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java @@ -0,0 +1,272 @@ +package com.coveo.pushapiclient; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for container rotation and batching behavior in the stream update workflow. Each batch that + * exceeds the configured limit should trigger creation of a new file container, upload, and + * immediate push. + */ +public class StreamDocumentUploadQueueBatchingTest { + + private static final int SMALL_BATCH_SIZE = 5000; + + @Mock private UpdateStreamServiceInternal updateStreamService; + @Mock private HttpResponse httpResponse; + + private StreamDocumentUploadQueue queue; + private AutoCloseable closeable; + + @Before + public void setUp() throws Exception { + closeable = MockitoAnnotations.openMocks(this); + queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE); + queue.setUpdateStreamService(updateStreamService); + + when(updateStreamService.createUploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse); + } + + @After + public void tearDown() throws Exception { + closeable.close(); + } + + @Test + public void addingDocumentsThatExceedBatchSizeShouldTriggerFlushAndPush() + throws IOException, InterruptedException { + DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + + queue.add(doc1); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(doc2); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void addMultipleSmallDocumentsShouldNotTriggerFlushUntilLimitReached() + throws IOException, InterruptedException { + DocumentBuilder smallDoc1 = new DocumentBuilder("https://doc.uri/1", "Small Doc 1"); + DocumentBuilder smallDoc2 = new DocumentBuilder("https://doc.uri/2", "Small Doc 2"); + + queue.add(smallDoc1); + queue.add(smallDoc2); + + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + assertFalse(queue.isEmpty()); + } + + @Test + public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch() + throws IOException, InterruptedException { + DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000)); + DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000)); + DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000)); + + queue.add(doc1); + queue.add(doc2); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(doc3); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); + verify(updateStreamService).createUploadAndPush(captor.capture()); + assertEquals(2, captor.getValue().getAddOrUpdate().size()); + } + + @Test + public void multipleBatchesShouldCreateMultipleContainers() throws IOException, InterruptedException { + DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000)); + DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000)); + DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000)); + DocumentBuilder doc4 = new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000)); + + queue.add(doc1); + queue.add(doc2); + queue.add(doc3); + queue.add(doc4); + + verify(updateStreamService, times(3)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushShouldClearQueueAfterBatch() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + queue.add(doc); + assertFalse(queue.isEmpty()); + + queue.flushAndPush(); + + assertTrue(queue.isEmpty()); + } + + @Test + public void flushAndPushShouldReturnResponseFromService() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10)); + queue.add(doc); + + HttpResponse response = queue.flushAndPush(); + + assertEquals(httpResponse, response); + } + + @Test + public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException { + HttpResponse response = queue.flushAndPush(); + + assertNull(response); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void flushAndPushShouldPassCorrectStreamUpdateToService() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc"); + DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); + PartialUpdateDocument partialDoc = + new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); + + queue.add(doc); + queue.add(deleteDoc); + queue.add(partialDoc); + + queue.flushAndPush(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class); + verify(updateStreamService).createUploadAndPush(captor.capture()); + + StreamUpdate captured = captor.getValue(); + assertEquals(1, captured.getAddOrUpdate().size()); + assertEquals(1, captured.getDelete().size()); + assertEquals(1, captured.getPartialUpdate().size()); + } + + @Test + public void deleteDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { + queue = new StreamDocumentUploadQueue(null, 50); + queue.setUpdateStreamService(updateStreamService); + + DeleteDocument deleteDoc1 = new DeleteDocument("https://doc.uri/1"); + DeleteDocument deleteDoc2 = new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds"); + + queue.add(deleteDoc1); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(deleteDoc2); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { + PartialUpdateDocument partialDoc1 = + new PartialUpdateDocument("https://doc.uri/1", PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); + PartialUpdateDocument partialDoc2 = + new PartialUpdateDocument( + "https://doc.uri/2", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(SMALL_BATCH_SIZE)); + + queue.add(partialDoc1); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(partialDoc2); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test + public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() throws IOException, InterruptedException { + DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500)); + DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); + PartialUpdateDocument partialDoc = + new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(4000)); + + queue.add(doc); + queue.add(deleteDoc); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); + + queue.add(partialDoc); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void constructorShouldRejectBatchSizeExceeding256MB() { + int exceeding256MB = 256 * 1024 * 1024 + 1; + new StreamDocumentUploadQueue(null, exceeding256MB); + } + + @Test(expected = IllegalArgumentException.class) + public void constructorShouldRejectZeroBatchSize() { + new StreamDocumentUploadQueue(null, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void constructorShouldRejectNegativeBatchSize() { + new StreamDocumentUploadQueue(null, -1); + } + + @Test + public void constructorShouldAcceptMaxAllowedBatchSize() { + int max256MB = 256 * 1024 * 1024; + StreamDocumentUploadQueue q = new StreamDocumentUploadQueue(null, max256MB); + assertNotNull(q); + } + + @Test + public void queueShouldUseSystemPropertyForDefaultBatchSize() { + String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + try { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "1048576"); + int configuredSize = DocumentUploadQueue.getConfiguredBatchSize(); + assertEquals(1048576, configuredSize); + } finally { + if (originalValue != null) { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue); + } else { + System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void systemPropertyExceeding256MBShouldThrow() { + String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + try { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "268435457"); + DocumentUploadQueue.getConfiguredBatchSize(); + } finally { + if (originalValue != null) { + System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue); + } else { + System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY); + } + } + } + + private String generateData(int numBytes) { + if (numBytes <= 0) return ""; + byte[] bytes = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + bytes[i] = 65; + } + return new String(bytes); + } +} diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java index 12cd2f54..ff6fc0ad 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java @@ -4,26 +4,31 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.http.HttpResponse; import java.util.ArrayList; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; public class StreamDocumentUploadQueueTest { - @Mock private UploadStrategy uploadStrategy; + private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024; - @InjectMocks private StreamDocumentUploadQueue queue; + @Mock private UploadStrategy uploadStrategy; + @Mock private UpdateStreamServiceInternal updateStreamService; + @Mock private HttpResponse httpResponse; + private StreamDocumentUploadQueue queue; private AutoCloseable closeable; private DocumentBuilder documentToAdd; private DeleteDocument documentToDelete; @@ -32,20 +37,14 @@ public class StreamDocumentUploadQueueTest { private int oneMegaByte = 1 * 1024 * 1024; private String generateStringFromBytes(int numBytes) { - // Check if the number of bytes is valid if (numBytes <= 0) { return ""; } - - // Create a byte array with the specified length byte[] bytes = new byte[numBytes]; - - // Fill the byte array with a pattern of ASCII characters - byte pattern = 65; // ASCII value for 'A' + byte pattern = 65; for (int i = 0; i < numBytes; i++) { bytes[i] = pattern; } - return new String(bytes); } @@ -63,7 +62,14 @@ private PartialUpdateDocument generatePartialUpdateDocumentFromSize(int numBytes } @Before - public void setup() { + public void setup() throws IOException, InterruptedException { + closeable = MockitoAnnotations.openMocks(this); + + queue = new StreamDocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE); + queue.setUpdateStreamService(updateStreamService); + + when(updateStreamService.createUploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse); + String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte); documentToAdd = @@ -78,8 +84,6 @@ public void setup() { PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); - - closeable = MockitoAnnotations.openMocks(this); } @After @@ -127,17 +131,10 @@ public void testShouldReturnBatch() throws IOException, InterruptedException { @Test public void testFlushShouldNotUploadDocumentsWhenRequiredSizeIsNotMet() throws IOException, InterruptedException { - // Adding 2MB document to the queue => queue has now 3MB of free space - // (5MB - 2MB = 3MB) queue.add(documentToAdd); - // Adding 2MB document to the queue => queue has now 1MB of free space - // (3MB - 2MB = 1MB) queue.add(documentToDelete); - // The maximum queue size has not been reached yet (1MB left of free space). - // Therefore, the accumulated documents will not be automatically flushed. - // Unless the user runs `.flush()` the queue will keep the 4MB of documents - verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class)); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); } @Test @@ -162,21 +159,14 @@ public void testShouldAutomaticallyFlushAccumulatedDocuments() } }); - // Adding 3 documents of 2MB to the queue. After adding the first 2 documents, - // the queue size will reach 6MB, which exceeds the maximum queue size - // limit by 1MB. Therefore, the 2 first added documents will automatically be - // uploaded to the source. queue.add(firstBulkyDocument); queue.add(secondBulkyDocument); - verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class)); + verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); - // The 3rd document added to the queue will be included in a separate batch, - // which will not be uploaded unless the `flush()` method is called or until the - // queue size limit has been reached queue.add(thirdBulkyDocument); - verify(uploadStrategy, times(1)).apply(any(BatchUpdate.class)); - verify(uploadStrategy, times(1)).apply(firstBatch); + verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class)); + verify(updateStreamService, times(1)).createUploadAndPush(firstBatch); } @Test @@ -212,21 +202,15 @@ public void testShouldManuallyFlushAccumulatedDocuments() emptyList, partialEmptyList); - // Adding 3 documents of 2MB to the queue. After adding the first 2 documents, - // the queue size will reach 6MB, which exceeds the maximum queue size - // limit. Therefore, the 2 first added documents will automatically be uploaded - // to the source. queue.add(firstBulkyDocument); queue.add(secondBulkyDocument); queue.add(thirdBulkyDocument); queue.flush(); - // Additional flush will have no effect if documents where already flushed queue.flush(); - verify(uploadStrategy, times(2)).apply(any(StreamUpdate.class)); - verify(uploadStrategy, times(1)).apply(firstBatch); + verify(updateStreamService, times(1)).createUploadAndPush(firstBatch); verify(uploadStrategy, times(1)).apply(secondBatch); } diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java index 005a3fb4..5de3cf05 100644 --- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java +++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java @@ -1,7 +1,10 @@ package com.coveo.pushapiclient; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -9,10 +12,13 @@ import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException; import java.io.IOException; import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.ArrayList; import org.apache.logging.log4j.core.Logger; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -74,7 +80,6 @@ public void addOrUpdateShouldNotCreateFileContainer() throws IOException, Interr service.addOrUpdate(documentA); service.addOrUpdate(documentB); - // File containers are now created during flush, not during add verify(this.platformClient, times(0)).createFileContainer(); } @@ -99,7 +104,6 @@ public void deleteShouldNotCreateFileContainer() throws IOException, Interrupted service.delete(deleteDocumentA); service.delete(deleteDocumentB); - // File containers are now created during flush, not during add verify(this.platformClient, times(0)).createFileContainer(); } @@ -108,7 +112,6 @@ public void partialUpdateShouldNotCreateFileContainer() throws IOException, Inte service.addPartialUpdate(partialUpdateDocumentA); service.addPartialUpdate(partialUpdateDocumentB); - // File containers are now created during flush, not during add verify(this.platformClient, times(0)).createFileContainer(); } @@ -117,7 +120,7 @@ public void closeShouldCallFlushAndPush() throws IOException, InterruptedException, NoOpenFileContainerException { when(queue.isEmpty()).thenReturn(false); when(queue.flushAndPush()).thenReturn(httpResponse); - + service.addOrUpdate(documentA); service.close(); @@ -128,7 +131,7 @@ public void closeShouldCallFlushAndPush() public void closeShouldNotCallFlushAndPushWhenQueueIsEmpty() throws IOException, InterruptedException, NoOpenFileContainerException { when(queue.isEmpty()).thenReturn(true); - + service.close(); verify(queue, times(0)).flushAndPush(); @@ -137,12 +140,89 @@ public void closeShouldNotCallFlushAndPushWhenQueueIsEmpty() @Test public void createUploadAndPushShouldCreateContainerUploadAndPush() throws IOException, InterruptedException { - StreamUpdate streamUpdate = new StreamUpdate(null, null, null); - + StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + service.createUploadAndPush(streamUpdate); verify(platformClient, times(1)).createFileContainer(); - verify(platformClient, times(1)).uploadContentToFileContainer(any(FileContainer.class), any(String.class)); - verify(platformClient, times(1)).pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + verify(platformClient, times(1)) + .uploadContentToFileContainer(any(FileContainer.class), any(String.class)); + verify(platformClient, times(1)) + .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + } + + @Test + public void createUploadAndPushShouldUseNewContainerForEachCall() + throws IOException, InterruptedException { + HttpResponse response1 = createMockHttpResponse("container-1"); + HttpResponse response2 = createMockHttpResponse("container-2"); + + when(platformClient.createFileContainer()).thenReturn(response1).thenReturn(response2); + + StreamUpdate streamUpdate1 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + StreamUpdate streamUpdate2 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + + service.createUploadAndPush(streamUpdate1); + service.createUploadAndPush(streamUpdate2); + + verify(platformClient, times(2)).createFileContainer(); + + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class); + verify(platformClient, times(2)) + .pushFileContainerContentToStreamSource(eq(SOURCE_ID), containerCaptor.capture()); + + assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId); + assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId); + } + + @Test + public void createUploadAndPushShouldPerformOperationsInCorrectOrder() + throws IOException, InterruptedException { + StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + + service.createUploadAndPush(streamUpdate); + + org.mockito.InOrder inOrder = org.mockito.Mockito.inOrder(platformClient); + inOrder.verify(platformClient).createFileContainer(); + inOrder.verify(platformClient).uploadContentToFileContainer(any(FileContainer.class), any(String.class)); + inOrder.verify(platformClient).pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class)); + } + + @Test + public void closeOnEmptyQueueShouldReturnNull() + throws IOException, InterruptedException, NoOpenFileContainerException { + when(queue.isEmpty()).thenReturn(true); + + HttpResponse result = service.close(); + + assertEquals(null, result); + verify(queue, times(0)).flushAndPush(); + } + + @Test + public void closeOnNonEmptyQueueShouldReturnFlushAndPushResponse() + throws IOException, InterruptedException, NoOpenFileContainerException { + when(queue.isEmpty()).thenReturn(false); + when(queue.flushAndPush()).thenReturn(httpResponse); + + HttpResponse result = service.close(); + + assertEquals(httpResponse, result); + } + + @Test + public void serviceShouldSetItselfOnQueueDuringConstruction() { + verify(queue, times(1)).setUpdateStreamService(service); + } + + @SuppressWarnings("unchecked") + private HttpResponse createMockHttpResponse(String fileId) { + HttpResponse response = mock(HttpResponse.class); + doReturn( + String.format( + "{\"uploadUri\": \"https://upload.uri/%s\", \"fileId\": \"%s\"}", fileId, fileId)) + .when(response) + .body(); + return response; } } From 6d05dfe4403d3709f4dd74e0ae73397a1daf226e Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Mon, 26 Jan 2026 14:15:44 -0500 Subject: [PATCH 4/6] fix issues --- .../StreamDocumentUploadQueue.java | 6 +++++ .../coveo/pushapiclient/StreamService.java | 23 ++++++++++++++++--- .../pushapiclient/UpdateStreamService.java | 8 +++---- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java index 91f3c19c..2ca26766 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java +++ b/src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java @@ -54,6 +54,12 @@ public void flush() throws IOException, InterruptedException { logger.debug("Empty batch. Skipping upload"); return; } + + if (this.uploader == null) { + throw new IllegalStateException( + "No upload strategy configured. For UpdateStreamService, use flushAndPush() instead."); + } + // TODO: LENS-871: support concurrent requests StreamUpdate stream = this.getStream(); logger.info("Uploading document Stream"); diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index ebada08f..e1484bb4 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -27,7 +27,7 @@ public class StreamService { * @param userAgents The user agent to use for the requests. */ public StreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -42,7 +42,7 @@ public StreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public StreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -58,7 +58,24 @@ public StreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public StreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null, DocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize()); + } + + /** + * Creates a service to stream your documents to the provided source by interacting with the + * Stream API. + * + *

To perform full document updates or + * deletions, use the {@UpdateStreamService}, since pushing documents with the + * {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can + * also be used for an initial catalog upload. + * + * @param source The source to which you want to send your documents. + * @param options The configuration options for exponential backoff. + * @param userAgents The user agent to use for the requests. + */ + public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents) { + this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index c1df5602..27cc163c 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -24,7 +24,7 @@ public class UpdateStreamService { * @param userAgents The user agent to use for the requests. */ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { - this(source, new BackoffOptionsBuilder().build(), userAgents, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -38,7 +38,7 @@ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) { * @param source The source to which you want to send your documents. */ public UpdateStreamService(StreamEnabledSource source) { - this(source, new BackoffOptionsBuilder().build(), null, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -53,7 +53,7 @@ public UpdateStreamService(StreamEnabledSource source) { * @param options The configuration options for exponential backoff. */ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { - this(source, options, null, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize()); } /** @@ -71,7 +71,7 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) { */ public UpdateStreamService( StreamEnabledSource source, BackoffOptions options, String[] userAgents) { - this(source, options, userAgents, StreamDocumentUploadQueue.DEFAULT_QUEUE_SIZE); + this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize()); } /** From 66b789e80522d8914cac98e83121b75e138a7ba0 Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Mon, 26 Jan 2026 14:18:09 -0500 Subject: [PATCH 5/6] fix null user agent --- src/main/java/com/coveo/pushapiclient/PlatformClient.java | 3 +++ src/main/java/com/coveo/pushapiclient/StreamService.java | 4 +++- .../java/com/coveo/pushapiclient/UpdateStreamService.java | 4 +++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/coveo/pushapiclient/PlatformClient.java b/src/main/java/com/coveo/pushapiclient/PlatformClient.java index 8523a5d5..b1ed6fdf 100644 --- a/src/main/java/com/coveo/pushapiclient/PlatformClient.java +++ b/src/main/java/com/coveo/pushapiclient/PlatformClient.java @@ -616,6 +616,9 @@ public String[] getUserAgents() { } public void setUserAgents(String[] userAgents) { + if (userAgents == null) { + throw new IllegalArgumentException("User agents cannot be null"); + } if (!validUserAgents(userAgents)) { throw new IllegalArgumentException("Invalid user agents"); } diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java index e1484bb4..13085a63 100644 --- a/src/main/java/com/coveo/pushapiclient/StreamService.java +++ b/src/main/java/com/coveo/pushapiclient/StreamService.java @@ -103,7 +103,9 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[ this.source = source; this.queue = new DocumentUploadQueue(uploader, maxQueueSize); this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options); - platformClient.setUserAgents(userAgents); + if (userAgents != null) { + platformClient.setUserAgents(userAgents); + } this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger); } diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java index 27cc163c..62106b2a 100644 --- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java +++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java @@ -94,7 +94,9 @@ public UpdateStreamService( this.platformClient = new PlatformClient( source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options); - this.platformClient.setUserAgents(userAgents); + if (userAgents != null) { + this.platformClient.setUserAgents(userAgents); + } this.updateStreamServiceInternal = new UpdateStreamServiceInternal( source, From 682bb0d163005999e5062f8d2c250c52c904690e Mon Sep 17 00:00:00 2001 From: ylakhdar Date: Thu, 29 Jan 2026 09:45:41 -0500 Subject: [PATCH 6/6] format --- ...StreamDocumentUploadQueueBatchingTest.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java index 3e278d6c..222139af 100644 --- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java +++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java @@ -21,16 +21,20 @@ import org.mockito.MockitoAnnotations; /** - * Tests for container rotation and batching behavior in the stream update workflow. Each batch that - * exceeds the configured limit should trigger creation of a new file container, upload, and + * Tests for container rotation and batching behavior in the stream update + * workflow. Each batch that + * exceeds the configured limit should trigger creation of a new file container, + * upload, and * immediate push. */ public class StreamDocumentUploadQueueBatchingTest { private static final int SMALL_BATCH_SIZE = 5000; - @Mock private UpdateStreamServiceInternal updateStreamService; - @Mock private HttpResponse httpResponse; + @Mock + private UpdateStreamServiceInternal updateStreamService; + @Mock + private HttpResponse httpResponse; private StreamDocumentUploadQueue queue; private AutoCloseable closeable; @@ -142,9 +146,8 @@ public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, Inter public void flushAndPushShouldPassCorrectStreamUpdateToService() throws IOException, InterruptedException { DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc"); DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); - PartialUpdateDocument partialDoc = - new PartialUpdateDocument( - "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); + PartialUpdateDocument partialDoc = new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value"); queue.add(doc); queue.add(deleteDoc); @@ -178,11 +181,10 @@ public void deleteDocumentsTriggerFlushWhenExceedingLimit() throws IOException, @Test public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException { - PartialUpdateDocument partialDoc1 = - new PartialUpdateDocument("https://doc.uri/1", PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); - PartialUpdateDocument partialDoc2 = - new PartialUpdateDocument( - "https://doc.uri/2", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(SMALL_BATCH_SIZE)); + PartialUpdateDocument partialDoc1 = new PartialUpdateDocument("https://doc.uri/1", + PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v"); + PartialUpdateDocument partialDoc2 = new PartialUpdateDocument( + "https://doc.uri/2", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(SMALL_BATCH_SIZE)); queue.add(partialDoc1); verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class)); @@ -195,9 +197,8 @@ public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOExce public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() throws IOException, InterruptedException { DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500)); DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2"); - PartialUpdateDocument partialDoc = - new PartialUpdateDocument( - "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(4000)); + PartialUpdateDocument partialDoc = new PartialUpdateDocument( + "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(4000)); queue.add(doc); queue.add(deleteDoc); @@ -262,7 +263,8 @@ public void systemPropertyExceeding256MBShouldThrow() { } private String generateData(int numBytes) { - if (numBytes <= 0) return ""; + if (numBytes <= 0) + return ""; byte[] bytes = new byte[numBytes]; for (int i = 0; i < numBytes; i++) { bytes[i] = 65;