Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions src/main/java/com/coveo/pushapiclient/DocumentUploadQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,118 @@
/** Represents a queue for uploading documents using a specified upload strategy */
class DocumentUploadQueue {
private static final Logger logger = LogManager.getLogger(DocumentUploadQueue.class);
protected final UploadStrategy uploader;
protected final int maxQueueSize = 5 * 1024 * 1024;

/** Maximum allowed queue size based on Stream API limit (256 MB) */
protected static final int MAX_ALLOWED_QUEUE_SIZE = 256 * 1024 * 1024;

/** Default queue size (5 MB) */
protected static final int DEFAULT_QUEUE_SIZE = 5 * 1024 * 1024;

/** System property name for configuring the default batch size */
public static final String BATCH_SIZE_PROPERTY = "coveo.push.batchSize";

protected UploadStrategy uploader;
protected final int maxQueueSize;
protected ArrayList<DocumentBuilder> documentToAddList;
protected ArrayList<DeleteDocument> documentToDeleteList;
protected int size;

/**
* Constructs a new DocumentUploadQueue object with a default maximum queue size limit of 5MB.
* Validates batch size against constraints (> 0 and <= 256MB). Used by getConfiguredBatchSize and
* constructors to ensure consistent validation logic.
*
* @param sizeBytes The batch size in bytes to validate
* @throws IllegalArgumentException if size exceeds MAX_ALLOWED_QUEUE_SIZE or is <= 0
*/
protected static void validateBatchSize(int sizeBytes) {
if (sizeBytes > MAX_ALLOWED_QUEUE_SIZE) {
throw new IllegalArgumentException(
String.format(
"Batch size (%d bytes) exceeds the Stream API limit of %d bytes (%d MB)",
sizeBytes, MAX_ALLOWED_QUEUE_SIZE, MAX_ALLOWED_QUEUE_SIZE / (1024 * 1024)));
}
if (sizeBytes <= 0) {
throw new IllegalArgumentException("Batch size must be greater than 0");
}
}

/**
* Gets the configured batch size from system properties, or returns the default if not set.
*
* <p>The system property is read as bytes. When not set, returns DEFAULT_QUEUE_SIZE (5 MB).
*
* <p>Example: Set a 50 MB batch size via system property:
*
* <pre>
* java -Dcoveo.push.batchSize=52428800 -jar app.jar // 50 * 1024 * 1024 bytes
* </pre>
*
* @return The configured batch size in bytes (e.g., 52428800 for 50 MB)
* @throws IllegalArgumentException if the configured value exceeds 256MB or is invalid
*/
public static int getConfiguredBatchSize() {
String propertyValue = System.getProperty(BATCH_SIZE_PROPERTY);
if (propertyValue == null || propertyValue.trim().isEmpty()) {
return DEFAULT_QUEUE_SIZE;
}

int configuredSize;
try {
configuredSize = Integer.parseInt(propertyValue.trim());
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
String.format(
"Invalid value for system property %s: '%s'. Must be a valid integer.",
BATCH_SIZE_PROPERTY, propertyValue),
e);
}

validateBatchSize(configuredSize);

logger.info(
String.format(
"Using configured batch size from system property %s: %d bytes (%.2f MB)",
BATCH_SIZE_PROPERTY, configuredSize, configuredSize / (1024.0 * 1024.0)));
return configuredSize;
}

/**
* Constructs a new DocumentUploadQueue with the default batch size.
*
* <p>Uses the configured batch size from system property "coveo.push.batchSize" if set, otherwise
* defaults to DEFAULT_QUEUE_SIZE (5 MB = 5242880 bytes).
*
* @param uploader The upload strategy to be used for document uploads.
* @throws IllegalArgumentException if the system property value exceeds 256MB or is invalid.
*/
public DocumentUploadQueue(UploadStrategy uploader) {
this(uploader, getConfiguredBatchSize());
}

/**
* Constructs a new DocumentUploadQueue object with a configurable maximum queue size limit.
*
* @param uploader The upload strategy to be used for document uploads.
* @param maxQueueSize The maximum queue size in bytes (e.g., 52428800 for 50 MB). Must not exceed
* 256MB (Stream API limit).
* @throws IllegalArgumentException if maxQueueSize exceeds the API limit of 256MB.
*/
public DocumentUploadQueue(UploadStrategy uploader, int maxQueueSize) {
validateBatchSize(maxQueueSize);
this.documentToAddList = new ArrayList<>();
this.documentToDeleteList = new ArrayList<>();
this.uploader = uploader;
this.maxQueueSize = maxQueueSize;
}

/**
* Default constructor for testing purposes (used by Mockito @InjectMocks). Initializes with
* default batch size; uploader is injected by Mockito.
*/
public DocumentUploadQueue() {
this.documentToAddList = new ArrayList<>();
this.documentToDeleteList = new ArrayList<>();
this.maxQueueSize = DEFAULT_QUEUE_SIZE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,31 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class DocumentUploadQueueTest {

@Mock private UploadStrategy uploadStrategy;
private static final int TEST_BATCH_SIZE = 5 * 1024 * 1024;

@InjectMocks private DocumentUploadQueue queue;
@Mock private UploadStrategy uploadStrategy;

private DocumentUploadQueue queue;
private AutoCloseable closeable;
private DocumentBuilder documentToAdd;
private DeleteDocument documentToDelete;

private int oneMegaByte = 1 * 1024 * 1024;

private String generateStringFromBytes(int numBytes) {
// Check if the number of bytes is valid
if (numBytes <= 0) {
return "";
}

// Create a byte array with the specified length
byte[] bytes = new byte[numBytes];

// Fill the byte array with a pattern of ASCII characters
byte pattern = 65; // ASCII value for 'A'
byte pattern = 65;
for (int i = 0; i < numBytes; i++) {
bytes[i] = pattern;
}

return new String(bytes);
}

Expand All @@ -53,15 +47,17 @@ private DocumentBuilder generateDocumentFromSize(int numBytes) {

@Before
public void setup() {
closeable = MockitoAnnotations.openMocks(this);

queue = new DocumentUploadQueue(uploadStrategy, TEST_BATCH_SIZE);

String twoMegaByteData = generateStringFromBytes(2 * oneMegaByte);

documentToAdd =
new DocumentBuilder("https://my.document.uri?ref=1", "My new document")
.withData(twoMegaByteData);

documentToDelete = new DeleteDocument("https://my.document.uri?ref=3");

closeable = MockitoAnnotations.openMocks(this);
}

@After
Expand Down