response = this.updateStreamService.createUploadAndPush(stream);
+
+ this.size = 0;
+ this.documentToAddList.clear();
+ this.documentToDeleteList.clear();
+ this.documentToPartiallyUpdateList.clear();
+
+ return response;
+ }
/**
* Adds the {@link PartialUpdateDocument} to the upload queue and flushes the queue if it exceeds
- * the maximum content length. See {@link PartialUpdateDocument#flush}.
+ * the maximum content length. Each flush creates a new file container, uploads to it, and pushes
+ * it to the stream source.
*
* @param document The document to be deleted from the index.
* @throws IOException If an I/O error occurs during the upload.
@@ -53,7 +120,7 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE
final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length;
if (this.size + sizeOfDoc >= this.maxQueueSize) {
- this.flush();
+ this.flushAndPush();
}
documentToPartiallyUpdateList.add(document);
if (logger.isDebugEnabled()) {
@@ -61,6 +128,58 @@ public void add(PartialUpdateDocument document) throws IOException, InterruptedE
}
this.size += sizeOfDoc;
}
+
+ /**
+ * Adds a {@link DocumentBuilder} to the upload queue and flushes the queue if it exceeds the
+ * maximum content length. Each flush creates a new file container, uploads to it, and pushes
+ * it to the stream source.
+ *
+ * @param document The document to be added to the index.
+ * @throws IOException If an I/O error occurs during the upload.
+ * @throws InterruptedException If the upload process is interrupted.
+ */
+ @Override
+ public void add(DocumentBuilder document) throws IOException, InterruptedException {
+ if (document == null) {
+ return;
+ }
+
+ final int sizeOfDoc = document.marshal().getBytes().length;
+ if (this.size + sizeOfDoc >= this.maxQueueSize) {
+ this.flushAndPush();
+ }
+ documentToAddList.add(document);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding document to batch: " + document.getDocument().uri);
+ }
+ this.size += sizeOfDoc;
+ }
+
+ /**
+ * Adds the {@link DeleteDocument} to the upload queue and flushes the queue if it exceeds the
+ * maximum content length. Each flush creates a new file container, uploads to it, and pushes
+ * it to the stream source.
+ *
+ * @param document The document to be deleted from the index.
+ * @throws IOException If an I/O error occurs during the upload.
+ * @throws InterruptedException If the upload process is interrupted.
+ */
+ @Override
+ public void add(DeleteDocument document) throws IOException, InterruptedException {
+ if (document == null) {
+ return;
+ }
+
+ final int sizeOfDoc = document.marshalJsonObject().toString().getBytes().length;
+ if (this.size + sizeOfDoc >= this.maxQueueSize) {
+ this.flushAndPush();
+ }
+ documentToDeleteList.add(document);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding document to batch: " + document.documentId);
+ }
+ this.size += sizeOfDoc;
+ }
public StreamUpdate getStream() {
return new StreamUpdate(
diff --git a/src/main/java/com/coveo/pushapiclient/StreamService.java b/src/main/java/com/coveo/pushapiclient/StreamService.java
index eda7bcb3..13085a63 100644
--- a/src/main/java/com/coveo/pushapiclient/StreamService.java
+++ b/src/main/java/com/coveo/pushapiclient/StreamService.java
@@ -27,7 +27,7 @@ public class StreamService {
* @param userAgents The user agent to use for the requests.
*/
public StreamService(StreamEnabledSource source, String[] userAgents) {
- this(source, new BackoffOptionsBuilder().build(), userAgents);
+ this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -42,7 +42,7 @@ public StreamService(StreamEnabledSource source, String[] userAgents) {
* @param source The source to which you want to send your documents.
*/
public StreamService(StreamEnabledSource source) {
- this(source, new BackoffOptionsBuilder().build());
+ this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -58,7 +58,7 @@ public StreamService(StreamEnabledSource source) {
* @param options The configuration options for exponential backoff.
*/
public StreamService(StreamEnabledSource source, BackoffOptions options) {
- this(source, options, null);
+ this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -75,6 +75,25 @@ public StreamService(StreamEnabledSource source, BackoffOptions options) {
* @param userAgents The user agent to use for the requests.
*/
public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents) {
+ this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize());
+ }
+
+ /**
+ * Creates a service to stream your documents to the provided source by interacting with the
+ * Stream API.
+ *
+ * To perform full document updates or
+ * deletions, use the {@UpdateStreamService}, since pushing documents with the
+ * {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can
+ * also be used for an initial catalog upload.
+ *
+ * @param source The source to which you want to send your documents.
+ * @param options The configuration options for exponential backoff.
+ * @param userAgents The user agent to use for the requests.
+ * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 256MB, max: 256MB).
+ * @throws IllegalArgumentException if maxQueueSize exceeds 256MB.
+ */
+ public StreamService(StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) {
String apiKey = source.getApiKey();
String organizationId = source.getOrganizationId();
PlatformUrl platformUrl = source.getPlatformUrl();
@@ -82,9 +101,11 @@ public StreamService(StreamEnabledSource source, BackoffOptions options, String[
Logger logger = LogManager.getLogger(StreamService.class);
this.source = source;
- this.queue = new DocumentUploadQueue(uploader);
+ this.queue = new DocumentUploadQueue(uploader, maxQueueSize);
this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options);
- platformClient.setUserAgents(userAgents);
+ if (userAgents != null) {
+ platformClient.setUserAgents(userAgents);
+ }
this.service = new StreamServiceInternal(this.source, this.queue, this.platformClient, logger);
}
diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java
index 948c3461..62106b2a 100644
--- a/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java
+++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamService.java
@@ -12,8 +12,6 @@ public class UpdateStreamService {
private final PlatformClient platformClient;
private final UpdateStreamServiceInternal updateStreamServiceInternal;
- private FileContainer fileContainer;
-
/**
* Creates a service to stream your documents to the provided source by interacting with the
* Stream API. This provides the ability to incrementally add, update, or delete documents via a
@@ -26,7 +24,7 @@ public class UpdateStreamService {
* @param userAgents The user agent to use for the requests.
*/
public UpdateStreamService(StreamEnabledSource source, String[] userAgents) {
- this(source, new BackoffOptionsBuilder().build(), userAgents);
+ this(source, new BackoffOptionsBuilder().build(), userAgents, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -40,7 +38,7 @@ public UpdateStreamService(StreamEnabledSource source, String[] userAgents) {
* @param source The source to which you want to send your documents.
*/
public UpdateStreamService(StreamEnabledSource source) {
- this(source, new BackoffOptionsBuilder().build());
+ this(source, new BackoffOptionsBuilder().build(), null, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -55,7 +53,25 @@ public UpdateStreamService(StreamEnabledSource source) {
* @param options The configuration options for exponential backoff.
*/
public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) {
- this(source, options, null);
+ this(source, options, null, DocumentUploadQueue.getConfiguredBatchSize());
+ }
+
+ /**
+ * Creates a service to stream your documents to the provided source by interacting with the
+ * Stream API. This provides the ability to incrementally add, update, or delete documents via a
+ * stream.
+ *
+ *
To perform a full source rebuild, use the
+ * {@link StreamService}.
+ *
+ * @param source The source to push to
+ * @param options The backoff parameters
+ * @param userAgents The user-agents to append to the "User-Agent" HTTP header when performing
+ * requests against the Coveo Platform.
+ */
+ public UpdateStreamService(
+ StreamEnabledSource source, BackoffOptions options, String[] userAgents) {
+ this(source, options, userAgents, DocumentUploadQueue.getConfiguredBatchSize());
}
/**
@@ -69,18 +85,22 @@ public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) {
* @param source The source to which you want to send your documents.
* @param options The configuration options for exponential backoff.
* @param userAgents The user agent to use for the requests.
+ * @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 256MB, max: 256MB).
+ * @throws IllegalArgumentException if maxQueueSize exceeds 256MB.
*/
public UpdateStreamService(
- StreamEnabledSource source, BackoffOptions options, String[] userAgents) {
+ StreamEnabledSource source, BackoffOptions options, String[] userAgents, int maxQueueSize) {
Logger logger = LogManager.getLogger(UpdateStreamService.class);
this.platformClient =
new PlatformClient(
source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options);
- this.platformClient.setUserAgents(userAgents);
+ if (userAgents != null) {
+ this.platformClient.setUserAgents(userAgents);
+ }
this.updateStreamServiceInternal =
new UpdateStreamServiceInternal(
source,
- new StreamDocumentUploadQueue(this.getUploadStrategy()),
+ new StreamDocumentUploadQueue(null, maxQueueSize), // UploadStrategy no longer needed
this.platformClient,
logger);
}
@@ -91,9 +111,10 @@ public UpdateStreamService(
* documents into it.
*
*
If called several times, the service will automatically batch documents and create new
- * stream chunks whenever the data payload exceeds the batch size limit set for the
- * Stream API.
+ * file containers whenever the data payload exceeds the batch size limit (default: 256MB, configurable via constructor).
+ * Each batch is sent to its own file container and immediately pushed to the stream
+ * source, following the
+ * catalog stream API best practices.
*
*
Once there are no more documents to add, it is important to call the {@link
* UpdateStreamService#close} function in order to send any buffered documents and push the file
@@ -118,7 +139,7 @@ public UpdateStreamService(
* @throws IOException If the creation of the file container or adding the document fails.
*/
public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException {
- fileContainer = updateStreamServiceInternal.addOrUpdate(document);
+ updateStreamServiceInternal.addOrUpdate(document);
}
/**
@@ -130,9 +151,10 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte
* Partial item updates section.
*
*
If called several times, the service will automatically batch documents and create new
- * stream chunks whenever the data payload exceeds the batch size limit set for the
- * Stream API.
+ * file containers whenever the data payload exceeds the batch size limit (default: 256MB, configurable via constructor).
+ * Each batch is sent to its own file container and immediately pushed to the stream
+ * source, following the
+ * catalog stream API best practices.
*
*
Once there are no more documents to add, it is important to call the {@link
* UpdateStreamService#close} function in order to send any buffered documents and push the file
@@ -158,7 +180,7 @@ public void addOrUpdate(DocumentBuilder document) throws IOException, Interrupte
*/
public void addPartialUpdate(PartialUpdateDocument document)
throws IOException, InterruptedException {
- fileContainer = updateStreamServiceInternal.addPartialUpdate(document);
+ updateStreamServiceInternal.addPartialUpdate(document);
}
/**
@@ -167,9 +189,10 @@ public void addPartialUpdate(PartialUpdateDocument document)
* it.
*
*
If called several times, the service will automatically batch documents and create new
- * stream chunks whenever the data payload exceeds the batch size limit set for the
- * Stream API.
+ * file containers whenever the data payload exceeds the batch size limit (default: 256MB, configurable via constructor).
+ * Each batch is sent to its own file container and immediately pushed to the stream
+ * source, following the
+ * catalog stream API best practices.
*
*
Once there are no more documents to add, it is important to call the {@link
* UpdateStreamService#close} function in order to send any buffered documents and push the file
@@ -194,7 +217,7 @@ public void addPartialUpdate(PartialUpdateDocument document)
* @throws IOException If the creation of the file container or adding the document fails.
*/
public void delete(DeleteDocument document) throws IOException, InterruptedException {
- fileContainer = updateStreamServiceInternal.delete(document);
+ updateStreamServiceInternal.delete(document);
}
/**
@@ -214,11 +237,4 @@ public HttpResponse close()
throws IOException, InterruptedException, NoOpenFileContainerException {
return updateStreamServiceInternal.close();
}
-
- private UploadStrategy getUploadStrategy() {
- return (streamUpdate) -> {
- String batchUpdateJson = new Gson().toJson(streamUpdate.marshal());
- return this.platformClient.uploadContentToFileContainer(fileContainer, batchUpdateJson);
- };
- }
}
diff --git a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java
index 32f7ddc9..8a646099 100644
--- a/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java
+++ b/src/main/java/com/coveo/pushapiclient/UpdateStreamServiceInternal.java
@@ -23,54 +23,59 @@ public UpdateStreamServiceInternal(
this.queue = queue;
this.platformClient = platformClient;
this.logger = logger;
+ // Set this instance on the queue so it can call createUploadAndPush
+ queue.setUpdateStreamService(this);
}
public FileContainer addOrUpdate(DocumentBuilder document)
throws IOException, InterruptedException {
- if (this.fileContainer == null) {
- this.fileContainer = this.createFileContainer();
- }
queue.add(document);
return this.fileContainer;
}
public FileContainer addPartialUpdate(PartialUpdateDocument document)
throws IOException, InterruptedException {
- if (this.fileContainer == null) {
- this.fileContainer = this.createFileContainer();
- }
queue.add(document);
return this.fileContainer;
}
public FileContainer delete(DeleteDocument document) throws IOException, InterruptedException {
- if (this.fileContainer == null) {
- this.fileContainer = this.createFileContainer();
- }
queue.add(document);
return this.fileContainer;
}
public HttpResponse close()
throws IOException, InterruptedException, NoOpenFileContainerException {
- return this.pushFileContainer(this.getSourceId());
+ HttpResponse lastResponse = null;
+ if (!queue.isEmpty()) {
+ lastResponse = queue.flushAndPush();
+ }
+ return lastResponse;
}
- private FileContainer createFileContainer() throws IOException, InterruptedException {
+ /**
+ * Creates a new file container, uploads the content, and pushes it to the stream source.
+ * This method is called by the queue's flush operation to ensure each batch gets its own container.
+ *
+ * @param streamUpdate The batch of documents to upload
+ * @return The HTTP response from pushing the file container
+ * @throws IOException If an I/O error occurs
+ * @throws InterruptedException If the operation is interrupted
+ */
+ public HttpResponse createUploadAndPush(StreamUpdate streamUpdate)
+ throws IOException, InterruptedException {
+ // Step 1: Create a new file container
this.logger.info("Creating new file container");
- HttpResponse response = this.platformClient.createFileContainer();
- return new Gson().fromJson(response.body(), FileContainer.class);
- }
-
- private HttpResponse pushFileContainer(String sourceId)
- throws NoOpenFileContainerException, IOException, InterruptedException {
- if (this.fileContainer == null) {
- throw new NoOpenFileContainerException(
- "No open file container detected. A new container will automatically be created once you start adding, updating or deleting documents.");
- }
- queue.flush();
- this.logger.info("Pushing to file container " + this.fileContainer.fileId);
- return this.platformClient.pushFileContainerContentToStreamSource(sourceId, this.fileContainer);
+ HttpResponse createResponse = this.platformClient.createFileContainer();
+ FileContainer container = new Gson().fromJson(createResponse.body(), FileContainer.class);
+
+ // Step 2: Upload content to the file container
+ String batchUpdateJson = new Gson().toJson(streamUpdate.marshal());
+ this.platformClient.uploadContentToFileContainer(container, batchUpdateJson);
+
+ // Step 3: Push the file container to the stream source
+ this.logger.info("Pushing file container " + container.fileId + " to stream source");
+ return this.platformClient.pushFileContainerContentToStreamSource(this.getSourceId(), container);
}
private String getSourceId() {
diff --git a/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java
index f488f3a5..51a3541e 100644
--- a/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java
+++ b/src/test/java/com/coveo/pushapiclient/DocumentUploadQueueTest.java
@@ -12,16 +12,16 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
public class DocumentUploadQueueTest {
- @Mock private UploadStrategy uploadStrategy;
+ private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024;
- @InjectMocks private DocumentUploadQueue queue;
+ @Mock private UploadStrategy uploadStrategy;
+ private DocumentUploadQueue queue;
private AutoCloseable closeable;
private DocumentBuilder documentToAdd;
private DeleteDocument documentToDelete;
@@ -29,20 +29,14 @@ public class DocumentUploadQueueTest {
private int oneMegaByte = 1 * 1024 * 1024;
private String generateStringFromBytes(int numBytes) {
- // Check if the number of bytes is valid
if (numBytes <= 0) {
return "";
}
-
- // Create a byte array with the specified length
byte[] bytes = new byte[numBytes];
-
- // Fill the byte array with a pattern of ASCII characters
- byte pattern = 65; // ASCII value for 'A'
+ byte pattern = 65;
for (int i = 0; i < numBytes; i++) {
bytes[i] = pattern;
}
-
return new String(bytes);
}
@@ -53,6 +47,10 @@ private DocumentBuilder generateDocumentFromSize(int numBytes) {
@Before
public void setup() {
+ closeable = MockitoAnnotations.openMocks(this);
+
+ queue = new DocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE);
+
String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte);
documentToAdd =
@@ -60,8 +58,6 @@ public void setup() {
.withData(twoMegaByteData);
documentToDelete = new DeleteDocument("https://my.document.uri?ref=3");
-
- closeable = MockitoAnnotations.openMocks(this);
}
@After
diff --git a/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java
new file mode 100644
index 00000000..2ebf689d
--- /dev/null
+++ b/src/test/java/com/coveo/pushapiclient/FileContainerRotationIntegrationTest.java
@@ -0,0 +1,225 @@
+package com.coveo.pushapiclient;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * Integration tests for file container rotation when pushing large amounts of data. These tests
+ * verify the end-to-end flow from UpdateStreamService through to PlatformClient, using a small
+ * batch size to trigger rotation without needing large test data.
+ */
+public class FileContainerRotationIntegrationTest {
+
+ private static final int SMALL_BATCH_SIZE = 1000;
+ private static final String SOURCE_ID = "test-source-id";
+ private static final String ORG_ID = "test-org";
+ private static final String API_KEY = "test-api-key";
+
+ private PlatformClient platformClient;
+ private StreamEnabledSource source;
+ private AtomicInteger containerCounter;
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ platformClient = mock(PlatformClient.class);
+ source = mock(StreamEnabledSource.class);
+ containerCounter = new AtomicInteger(0);
+
+ doReturn(SOURCE_ID).when(source).getId();
+ doReturn(ORG_ID).when(source).getOrganizationId();
+ doReturn(API_KEY).when(source).getApiKey();
+ doReturn(new PlatformUrl(Environment.PRODUCTION, Region.US)).when(source).getPlatformUrl();
+
+ doAnswer(invocation -> createContainerResponse()).when(platformClient).createFileContainer();
+ doReturn(createGenericResponse()).when(platformClient).uploadContentToFileContainer(any(), anyString());
+ doReturn(createGenericResponse()).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldCreateMultipleContainersWhenDataExceedsBatchSize() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 600));
+ service.addOrUpdate(createDocument("doc2", 600));
+ service.addOrUpdate(createDocument("doc3", 600));
+ service.addOrUpdate(createDocument("doc4", 600));
+ service.close();
+
+ verify(platformClient, times(4)).createFileContainer();
+ verify(platformClient, times(4)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldCreateSingleContainerWhenDataFitsInOneBatch() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 100));
+ service.addOrUpdate(createDocument("doc2", 100));
+ service.close();
+
+ verify(platformClient, times(1)).createFileContainer();
+ verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldHandleMixedOperationsWithRotation() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 400));
+ service.delete(new DeleteDocument("doc2"));
+ service.addPartialUpdate(createPartialUpdate("doc3", 400));
+ service.addOrUpdate(createDocument("doc4", 400));
+ service.close();
+
+ verify(platformClient, times(3)).createFileContainer();
+ verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldUseUniqueContainerIdForEachBatch() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 600));
+ service.addOrUpdate(createDocument("doc2", 600));
+ service.addOrUpdate(createDocument("doc3", 600));
+ service.close();
+
+ ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class);
+ verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), containerCaptor.capture());
+
+ assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId);
+ assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId);
+ assertEquals("container-3", containerCaptor.getAllValues().get(2).fileId);
+ }
+
+ @Test
+ public void shouldPushImmediatelyWhenBatchSizeExceeded() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ service.addOrUpdate(createDocument("doc1", 600));
+ verify(platformClient, times(0)).pushFileContainerContentToStreamSource(anyString(), any());
+
+ service.addOrUpdate(createDocument("doc2", 600));
+ verify(platformClient, times(1)).pushFileContainerContentToStreamSource(anyString(), any());
+
+ service.addOrUpdate(createDocument("doc3", 600));
+ verify(platformClient, times(2)).pushFileContainerContentToStreamSource(anyString(), any());
+
+ service.close();
+ verify(platformClient, times(3)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldHandleLargeNumberOfDocumentsWithRotation() throws Exception {
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ for (int i = 0; i < 20; i++) {
+ service.addOrUpdate(createDocument("doc" + i, 200));
+ }
+ service.close();
+
+ int expectedContainers = 10;
+ verify(platformClient, times(expectedContainers)).createFileContainer();
+ verify(platformClient, times(expectedContainers)).pushFileContainerContentToStreamSource(anyString(), any());
+ }
+
+ @Test
+ public void shouldNeverPushMultipleBatchesToSameContainer() throws Exception {
+ Map pushCountPerContainer = new HashMap<>();
+ List containerCreationOrder = new ArrayList<>();
+
+ doAnswer(invocation -> {
+ HttpResponse response = createContainerResponse();
+ String fileId = "container-" + containerCounter.get();
+ containerCreationOrder.add(fileId);
+ pushCountPerContainer.put(fileId, 0);
+ return response;
+ }).when(platformClient).createFileContainer();
+
+ doAnswer(invocation -> {
+ FileContainer container = invocation.getArgument(1);
+ int currentCount = pushCountPerContainer.getOrDefault(container.fileId, 0);
+ pushCountPerContainer.put(container.fileId, currentCount + 1);
+ return createGenericResponse();
+ }).when(platformClient).pushFileContainerContentToStreamSource(anyString(), any());
+
+ UpdateStreamServiceInternal service = createServiceWithSmallBatchSize();
+
+ for (int i = 0; i < 10; i++) {
+ service.addOrUpdate(createDocument("doc" + i, 400));
+ }
+ service.close();
+
+ for (Map.Entry entry : pushCountPerContainer.entrySet()) {
+ assertEquals(
+ "Container " + entry.getKey() + " should receive exactly 1 push, but received " + entry.getValue(),
+ Integer.valueOf(1),
+ entry.getValue());
+ }
+
+ assertTrue("Should have created multiple containers", containerCreationOrder.size() > 1);
+ }
+
+ private UpdateStreamServiceInternal createServiceWithSmallBatchSize() {
+ StreamDocumentUploadQueue queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE);
+ org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger(getClass());
+ return new UpdateStreamServiceInternal(source, queue, platformClient, logger);
+ }
+
+ private DocumentBuilder createDocument(String id, int dataSize) {
+ return new DocumentBuilder("https://example.com/" + id, "Title " + id)
+ .withData(generateData(dataSize));
+ }
+
+ private PartialUpdateDocument createPartialUpdate(String id, int dataSize) {
+ return new PartialUpdateDocument(
+ "https://example.com/" + id,
+ PartialUpdateOperator.FIELDVALUEREPLACE,
+ "field",
+ generateData(dataSize));
+ }
+
+ private String generateData(int size) {
+ byte[] bytes = new byte[size];
+ for (int i = 0; i < size; i++) {
+ bytes[i] = 65;
+ }
+ return new String(bytes);
+ }
+
+ @SuppressWarnings("unchecked")
+ private HttpResponse createContainerResponse() {
+ HttpResponse response = mock(HttpResponse.class);
+ int id = containerCounter.incrementAndGet();
+ doReturn(String.format(
+ "{\"uploadUri\": \"https://upload.uri/container-%d\", \"fileId\": \"container-%d\"}", id, id))
+ .when(response)
+ .body();
+ return response;
+ }
+
+ @SuppressWarnings("unchecked")
+ private HttpResponse createGenericResponse() {
+ HttpResponse response = mock(HttpResponse.class);
+ doReturn("{\"status\": \"ok\"}").when(response).body();
+ return response;
+ }
+}
diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java
new file mode 100644
index 00000000..222139af
--- /dev/null
+++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueBatchingTest.java
@@ -0,0 +1,274 @@
+package com.coveo.pushapiclient;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for container rotation and batching behavior in the stream update
+ * workflow. Each batch that
+ * exceeds the configured limit should trigger creation of a new file container,
+ * upload, and
+ * immediate push.
+ */
+public class StreamDocumentUploadQueueBatchingTest {
+
+ private static final int SMALL_BATCH_SIZE = 5000;
+
+ @Mock
+ private UpdateStreamServiceInternal updateStreamService;
+ @Mock
+ private HttpResponse httpResponse;
+
+ private StreamDocumentUploadQueue queue;
+ private AutoCloseable closeable;
+
+ @Before
+ public void setUp() throws Exception {
+ closeable = MockitoAnnotations.openMocks(this);
+ queue = new StreamDocumentUploadQueue(null, SMALL_BATCH_SIZE);
+ queue.setUpdateStreamService(updateStreamService);
+
+ when(updateStreamService.createUploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ closeable.close();
+ }
+
+ @Test
+ public void addingDocumentsThatExceedBatchSizeShouldTriggerFlushAndPush()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000));
+ DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000));
+
+ queue.add(doc1);
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+
+ queue.add(doc2);
+ verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void addMultipleSmallDocumentsShouldNotTriggerFlushUntilLimitReached()
+ throws IOException, InterruptedException {
+ DocumentBuilder smallDoc1 = new DocumentBuilder("https://doc.uri/1", "Small Doc 1");
+ DocumentBuilder smallDoc2 = new DocumentBuilder("https://doc.uri/2", "Small Doc 2");
+
+ queue.add(smallDoc1);
+ queue.add(smallDoc2);
+
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+ assertFalse(queue.isEmpty());
+ }
+
+ @Test
+ public void accumulatedDocumentsExceedingLimitShouldFlushPreviousBatch()
+ throws IOException, InterruptedException {
+ DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(2000));
+ DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(2000));
+ DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(2000));
+
+ queue.add(doc1);
+ queue.add(doc2);
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+
+ queue.add(doc3);
+ verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class));
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class);
+ verify(updateStreamService).createUploadAndPush(captor.capture());
+ assertEquals(2, captor.getValue().getAddOrUpdate().size());
+ }
+
+ @Test
+ public void multipleBatchesShouldCreateMultipleContainers() throws IOException, InterruptedException {
+ DocumentBuilder doc1 = new DocumentBuilder("https://doc.uri/1", "Doc 1").withData(generateData(3000));
+ DocumentBuilder doc2 = new DocumentBuilder("https://doc.uri/2", "Doc 2").withData(generateData(3000));
+ DocumentBuilder doc3 = new DocumentBuilder("https://doc.uri/3", "Doc 3").withData(generateData(3000));
+ DocumentBuilder doc4 = new DocumentBuilder("https://doc.uri/4", "Doc 4").withData(generateData(3000));
+
+ queue.add(doc1);
+ queue.add(doc2);
+ queue.add(doc3);
+ queue.add(doc4);
+
+ verify(updateStreamService, times(3)).createUploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void flushAndPushShouldClearQueueAfterBatch() throws IOException, InterruptedException {
+ DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10));
+ queue.add(doc);
+ assertFalse(queue.isEmpty());
+
+ queue.flushAndPush();
+
+ assertTrue(queue.isEmpty());
+ }
+
+ @Test
+ public void flushAndPushShouldReturnResponseFromService() throws IOException, InterruptedException {
+ DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(10));
+ queue.add(doc);
+
+ HttpResponse response = queue.flushAndPush();
+
+ assertEquals(httpResponse, response);
+ }
+
+ @Test
+ public void flushAndPushOnEmptyQueueShouldReturnNull() throws IOException, InterruptedException {
+ HttpResponse response = queue.flushAndPush();
+
+ assertNull(response);
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void flushAndPushShouldPassCorrectStreamUpdateToService() throws IOException, InterruptedException {
+ DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc");
+ DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2");
+ PartialUpdateDocument partialDoc = new PartialUpdateDocument(
+ "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", "value");
+
+ queue.add(doc);
+ queue.add(deleteDoc);
+ queue.add(partialDoc);
+
+ queue.flushAndPush();
+
+ ArgumentCaptor captor = ArgumentCaptor.forClass(StreamUpdate.class);
+ verify(updateStreamService).createUploadAndPush(captor.capture());
+
+ StreamUpdate captured = captor.getValue();
+ assertEquals(1, captured.getAddOrUpdate().size());
+ assertEquals(1, captured.getDelete().size());
+ assertEquals(1, captured.getPartialUpdate().size());
+ }
+
+ @Test
+ public void deleteDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException {
+ queue = new StreamDocumentUploadQueue(null, 50);
+ queue.setUpdateStreamService(updateStreamService);
+
+ DeleteDocument deleteDoc1 = new DeleteDocument("https://doc.uri/1");
+ DeleteDocument deleteDoc2 = new DeleteDocument("https://doc.uri/with/very/long/path/that/exceeds");
+
+ queue.add(deleteDoc1);
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+
+ queue.add(deleteDoc2);
+ verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void partialUpdateDocumentsTriggerFlushWhenExceedingLimit() throws IOException, InterruptedException {
+ PartialUpdateDocument partialDoc1 = new PartialUpdateDocument("https://doc.uri/1",
+ PartialUpdateOperator.FIELDVALUEREPLACE, "f", "v");
+ PartialUpdateDocument partialDoc2 = new PartialUpdateDocument(
+ "https://doc.uri/2", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(SMALL_BATCH_SIZE));
+
+ queue.add(partialDoc1);
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+
+ queue.add(partialDoc2);
+ verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test
+ public void mixedDocumentTypesShouldAccumulateAndFlushCorrectly() throws IOException, InterruptedException {
+ DocumentBuilder doc = new DocumentBuilder("https://doc.uri/1", "Doc").withData(generateData(1500));
+ DeleteDocument deleteDoc = new DeleteDocument("https://doc.uri/2");
+ PartialUpdateDocument partialDoc = new PartialUpdateDocument(
+ "https://doc.uri/3", PartialUpdateOperator.FIELDVALUEREPLACE, "field", generateData(4000));
+
+ queue.add(doc);
+ queue.add(deleteDoc);
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
+
+ queue.add(partialDoc);
+ verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void constructorShouldRejectBatchSizeExceeding256MB() {
+ int exceeding256MB = 256 * 1024 * 1024 + 1;
+ new StreamDocumentUploadQueue(null, exceeding256MB);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void constructorShouldRejectZeroBatchSize() {
+ new StreamDocumentUploadQueue(null, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void constructorShouldRejectNegativeBatchSize() {
+ new StreamDocumentUploadQueue(null, -1);
+ }
+
+ @Test
+ public void constructorShouldAcceptMaxAllowedBatchSize() {
+ int max256MB = 256 * 1024 * 1024;
+ StreamDocumentUploadQueue q = new StreamDocumentUploadQueue(null, max256MB);
+ assertNotNull(q);
+ }
+
+ @Test
+ public void queueShouldUseSystemPropertyForDefaultBatchSize() {
+ String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ try {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "1048576");
+ int configuredSize = DocumentUploadQueue.getConfiguredBatchSize();
+ assertEquals(1048576, configuredSize);
+ } finally {
+ if (originalValue != null) {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue);
+ } else {
+ System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ }
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void systemPropertyExceeding256MBShouldThrow() {
+ String originalValue = System.getProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ try {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, "268435457");
+ DocumentUploadQueue.getConfiguredBatchSize();
+ } finally {
+ if (originalValue != null) {
+ System.setProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY, originalValue);
+ } else {
+ System.clearProperty(DocumentUploadQueue.BATCH_SIZE_PROPERTY);
+ }
+ }
+ }
+
+ private String generateData(int numBytes) {
+ if (numBytes <= 0)
+ return "";
+ byte[] bytes = new byte[numBytes];
+ for (int i = 0; i < numBytes; i++) {
+ bytes[i] = 65;
+ }
+ return new String(bytes);
+ }
+}
diff --git a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java
index 12cd2f54..ff6fc0ad 100644
--- a/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java
+++ b/src/test/java/com/coveo/pushapiclient/StreamDocumentUploadQueueTest.java
@@ -4,26 +4,31 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.net.http.HttpResponse;
import java.util.ArrayList;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
public class StreamDocumentUploadQueueTest {
- @Mock private UploadStrategy uploadStrategy;
+ private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024;
- @InjectMocks private StreamDocumentUploadQueue queue;
+ @Mock private UploadStrategy uploadStrategy;
+ @Mock private UpdateStreamServiceInternal updateStreamService;
+ @Mock private HttpResponse httpResponse;
+ private StreamDocumentUploadQueue queue;
private AutoCloseable closeable;
private DocumentBuilder documentToAdd;
private DeleteDocument documentToDelete;
@@ -32,20 +37,14 @@ public class StreamDocumentUploadQueueTest {
private int oneMegaByte = 1 * 1024 * 1024;
private String generateStringFromBytes(int numBytes) {
- // Check if the number of bytes is valid
if (numBytes <= 0) {
return "";
}
-
- // Create a byte array with the specified length
byte[] bytes = new byte[numBytes];
-
- // Fill the byte array with a pattern of ASCII characters
- byte pattern = 65; // ASCII value for 'A'
+ byte pattern = 65;
for (int i = 0; i < numBytes; i++) {
bytes[i] = pattern;
}
-
return new String(bytes);
}
@@ -63,7 +62,14 @@ private PartialUpdateDocument generatePartialUpdateDocumentFromSize(int numBytes
}
@Before
- public void setup() {
+ public void setup() throws IOException, InterruptedException {
+ closeable = MockitoAnnotations.openMocks(this);
+
+ queue = new StreamDocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE);
+ queue.setUpdateStreamService(updateStreamService);
+
+ when(updateStreamService.createUploadAndPush(any(StreamUpdate.class))).thenReturn(httpResponse);
+
String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte);
documentToAdd =
@@ -78,8 +84,6 @@ public void setup() {
PartialUpdateOperator.FIELDVALUEREPLACE,
"field",
"value");
-
- closeable = MockitoAnnotations.openMocks(this);
}
@After
@@ -127,17 +131,10 @@ public void testShouldReturnBatch() throws IOException, InterruptedException {
@Test
public void testFlushShouldNotUploadDocumentsWhenRequiredSizeIsNotMet()
throws IOException, InterruptedException {
- // Adding 2MB document to the queue => queue has now 3MB of free space
- // (5MB - 2MB = 3MB)
queue.add(documentToAdd);
- // Adding 2MB document to the queue => queue has now 1MB of free space
- // (3MB - 2MB = 1MB)
queue.add(documentToDelete);
- // The maximum queue size has not been reached yet (1MB left of free space).
- // Therefore, the accumulated documents will not be automatically flushed.
- // Unless the user runs `.flush()` the queue will keep the 4MB of documents
- verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class));
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
}
@Test
@@ -162,21 +159,14 @@ public void testShouldAutomaticallyFlushAccumulatedDocuments()
}
});
- // Adding 3 documents of 2MB to the queue. After adding the first 2 documents,
- // the queue size will reach 6MB, which exceeds the maximum queue size
- // limit by 1MB. Therefore, the 2 first added documents will automatically be
- // uploaded to the source.
queue.add(firstBulkyDocument);
queue.add(secondBulkyDocument);
- verify(uploadStrategy, times(0)).apply(any(BatchUpdate.class));
+ verify(updateStreamService, times(0)).createUploadAndPush(any(StreamUpdate.class));
- // The 3rd document added to the queue will be included in a separate batch,
- // which will not be uploaded unless the `flush()` method is called or until the
- // queue size limit has been reached
queue.add(thirdBulkyDocument);
- verify(uploadStrategy, times(1)).apply(any(BatchUpdate.class));
- verify(uploadStrategy, times(1)).apply(firstBatch);
+ verify(updateStreamService, times(1)).createUploadAndPush(any(StreamUpdate.class));
+ verify(updateStreamService, times(1)).createUploadAndPush(firstBatch);
}
@Test
@@ -212,21 +202,15 @@ public void testShouldManuallyFlushAccumulatedDocuments()
emptyList,
partialEmptyList);
- // Adding 3 documents of 2MB to the queue. After adding the first 2 documents,
- // the queue size will reach 6MB, which exceeds the maximum queue size
- // limit. Therefore, the 2 first added documents will automatically be uploaded
- // to the source.
queue.add(firstBulkyDocument);
queue.add(secondBulkyDocument);
queue.add(thirdBulkyDocument);
queue.flush();
- // Additional flush will have no effect if documents where already flushed
queue.flush();
- verify(uploadStrategy, times(2)).apply(any(StreamUpdate.class));
- verify(uploadStrategy, times(1)).apply(firstBatch);
+ verify(updateStreamService, times(1)).createUploadAndPush(firstBatch);
verify(uploadStrategy, times(1)).apply(secondBatch);
}
diff --git a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java
index 726769b8..5de3cf05 100644
--- a/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java
+++ b/src/test/java/com/coveo/pushapiclient/UpdateStreamServiceInternalTest.java
@@ -1,7 +1,10 @@
package com.coveo.pushapiclient;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -9,10 +12,13 @@
import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException;
import java.io.IOException;
import java.net.http.HttpResponse;
+import java.util.ArrayList;
+import java.util.ArrayList;
import org.apache.logging.log4j.core.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -70,11 +76,11 @@ public void closeService() throws Exception {
}
@Test
- public void addOrUpdateShouldCreateFileContainer() throws IOException, InterruptedException {
+ public void addOrUpdateShouldNotCreateFileContainer() throws IOException, InterruptedException {
service.addOrUpdate(documentA);
service.addOrUpdate(documentB);
- verify(this.platformClient, times(1)).createFileContainer();
+ verify(this.platformClient, times(0)).createFileContainer();
}
@Test
@@ -94,62 +100,129 @@ public void addOrUpdateAndPartialAndDeleteShouldAddDocumentsToQueue()
}
@Test
- public void deleteShouldCreateFileContainer() throws IOException, InterruptedException {
+ public void deleteShouldNotCreateFileContainer() throws IOException, InterruptedException {
service.delete(deleteDocumentA);
service.delete(deleteDocumentB);
- verify(this.platformClient, times(1)).createFileContainer();
+ verify(this.platformClient, times(0)).createFileContainer();
}
@Test
- public void partialUpdateShouldCreateFileContainer() throws IOException, InterruptedException {
+ public void partialUpdateShouldNotCreateFileContainer() throws IOException, InterruptedException {
service.addPartialUpdate(partialUpdateDocumentA);
service.addPartialUpdate(partialUpdateDocumentB);
- verify(this.platformClient, times(1)).createFileContainer();
+ verify(this.platformClient, times(0)).createFileContainer();
}
@Test
- public void closeShouldPushFileContainerOnAddOrUpdate()
+ public void closeShouldCallFlushAndPush()
throws IOException, InterruptedException, NoOpenFileContainerException {
+ when(queue.isEmpty()).thenReturn(false);
+ when(queue.flushAndPush()).thenReturn(httpResponse);
+
service.addOrUpdate(documentA);
service.close();
- verify(platformClient, times(1))
- .pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class));
+ verify(queue, times(1)).flushAndPush();
}
@Test
- public void closeShouldPushFileContainerOnDelete()
+ public void closeShouldNotCallFlushAndPushWhenQueueIsEmpty()
throws IOException, InterruptedException, NoOpenFileContainerException {
- service.delete(deleteDocumentA);
+ when(queue.isEmpty()).thenReturn(true);
+
service.close();
+ verify(queue, times(0)).flushAndPush();
+ }
+
+ @Test
+ public void createUploadAndPushShouldCreateContainerUploadAndPush()
+ throws IOException, InterruptedException {
+ StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+
+ service.createUploadAndPush(streamUpdate);
+
+ verify(platformClient, times(1)).createFileContainer();
+ verify(platformClient, times(1))
+ .uploadContentToFileContainer(any(FileContainer.class), any(String.class));
verify(platformClient, times(1))
.pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class));
}
@Test
- public void closeShouldFlushBufferedDocuments()
- throws IOException, InterruptedException, NoOpenFileContainerException {
- service.addOrUpdate(documentA);
- service.close();
+ public void createUploadAndPushShouldUseNewContainerForEachCall()
+ throws IOException, InterruptedException {
+ HttpResponse response1 = createMockHttpResponse("container-1");
+ HttpResponse response2 = createMockHttpResponse("container-2");
+
+ when(platformClient.createFileContainer()).thenReturn(response1).thenReturn(response2);
+
+ StreamUpdate streamUpdate1 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+ StreamUpdate streamUpdate2 = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+
+ service.createUploadAndPush(streamUpdate1);
+ service.createUploadAndPush(streamUpdate2);
+
+ verify(platformClient, times(2)).createFileContainer();
- verify(queue, times(1)).flush();
+ ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(FileContainer.class);
+ verify(platformClient, times(2))
+ .pushFileContainerContentToStreamSource(eq(SOURCE_ID), containerCaptor.capture());
+
+ assertEquals("container-1", containerCaptor.getAllValues().get(0).fileId);
+ assertEquals("container-2", containerCaptor.getAllValues().get(1).fileId);
+ }
+
+ @Test
+ public void createUploadAndPushShouldPerformOperationsInCorrectOrder()
+ throws IOException, InterruptedException {
+ StreamUpdate streamUpdate = new StreamUpdate(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+
+ service.createUploadAndPush(streamUpdate);
+
+ org.mockito.InOrder inOrder = org.mockito.Mockito.inOrder(platformClient);
+ inOrder.verify(platformClient).createFileContainer();
+ inOrder.verify(platformClient).uploadContentToFileContainer(any(FileContainer.class), any(String.class));
+ inOrder.verify(platformClient).pushFileContainerContentToStreamSource(eq(SOURCE_ID), any(FileContainer.class));
}
@Test
- public void shouldLogInfoOnCreateFileContainer()
+ public void closeOnEmptyQueueShouldReturnNull()
throws IOException, InterruptedException, NoOpenFileContainerException {
- service.addOrUpdate(documentA);
- verify(logger, times(1)).info("Creating new file container");
- service.close();
- verify(logger, times(1)).info("Pushing to file container file-id");
+ when(queue.isEmpty()).thenReturn(true);
+
+ HttpResponse result = service.close();
+
+ assertEquals(null, result);
+ verify(queue, times(0)).flushAndPush();
}
- @Test(expected = NoOpenFileContainerException.class)
- public void shouldThrowExceptionOnCloseIfNoOpenFileContainer()
+ @Test
+ public void closeOnNonEmptyQueueShouldReturnFlushAndPushResponse()
throws IOException, InterruptedException, NoOpenFileContainerException {
- service.close();
+ when(queue.isEmpty()).thenReturn(false);
+ when(queue.flushAndPush()).thenReturn(httpResponse);
+
+ HttpResponse result = service.close();
+
+ assertEquals(httpResponse, result);
+ }
+
+ @Test
+ public void serviceShouldSetItselfOnQueueDuringConstruction() {
+ verify(queue, times(1)).setUpdateStreamService(service);
+ }
+
+ @SuppressWarnings("unchecked")
+ private HttpResponse createMockHttpResponse(String fileId) {
+ HttpResponse response = mock(HttpResponse.class);
+ doReturn(
+ String.format(
+ "{\"uploadUri\": \"https://upload.uri/%s\", \"fileId\": \"%s\"}", fileId, fileId))
+ .when(response)
+ .body();
+ return response;
}
}