Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.marklogic.client.FailedRequestException;
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.DocumentWriteSet;
Expand Down Expand Up @@ -42,16 +43,21 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
}
}

JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
.addVariable("fieldName", fieldName)
.addVariable("uris", new JacksonHandle(uris))
.evalAs(JsonNode.class);

return filterDocuments(context, uri -> {
if (response.has(uri)) {
return response.get(uri).asText();
}
return null;
});
try {
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
.addVariable("fieldName", fieldName)
.addVariable("uris", new JacksonHandle(uris))
.evalAs(JsonNode.class);

return filterDocuments(context, uri -> {
if (response.has(uri)) {
return response.get(uri).asText();
}
return null;
});
} catch (FailedRequestException e) {
String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage();
throw new FailedRequestException(message, e.getFailedRequest());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin
continue;
}

final String contentHash = serializeContent(doc);
final String contentHash = computeHash(serializeContent(doc));
final String existingHash = hashRetriever.apply(doc.getUri());
if (logger.isTraceEnabled()) {
logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package com.marklogic.client.datamovement.filter;

import com.marklogic.client.FailedRequestException;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.row.RowTemplate;
Expand Down Expand Up @@ -31,25 +32,32 @@ public DocumentWriteSet apply(Context context) {

// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
// a serialized query instead. That doesn't allow a user to override the query though.
Map<String, String> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
op.fromLexicons(Map.of(
"uri", op.cts.uriReference(),
"hash", op.cts.fieldReference(super.fieldName)
)).where(
op.cts.documentQuery(op.xs.stringSeq(uris))
),

rows -> {
Map<String, String> map = new HashMap<>();
rows.forEach(row -> {
String uri = row.getString("uri");
String existingHash = row.getString("hash");
map.put(uri, existingHash);
});
return map;
}
);

return filterDocuments(context, uri -> existingHashes.get(uri));
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());

try {
Map<String, String> existingHashes = rowTemplate.query(op ->
op.fromLexicons(Map.of(
"uri", op.cts.uriReference(),
"hash", op.cts.fieldReference(super.fieldName)
)).where(
op.cts.documentQuery(op.xs.stringSeq(uris))
),

rows -> {
Map<String, String> map = new HashMap<>();
rows.forEach(row -> {
String uri = row.getString("uri");
String existingHash = row.getString("hash");
map.put(uri, existingHash);
});
return map;
}
);

return filterDocuments(context, uri -> existingHashes.get(uri));
} catch (FailedRequestException e) {
String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage();
throw new FailedRequestException(message, e.getFailedRequest());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.*;
import com.marklogic.client.impl.DocumentWriteOperationImpl;
import com.marklogic.client.io.DocumentMetadataHandle;
import com.marklogic.client.io.Format;
Expand Down Expand Up @@ -150,8 +150,40 @@ void invalidJsonWithFormat() {
"Expecting the server to throw an error. Actual message: " + message);
}

@Test
void noRangeIndexForField() {
filter = IncrementalWriteFilter.newBuilder()
.fieldName("non-existent-field")
.build();

writeTenDocuments();

assertNotNull(batchFailure.get());
String message = batchFailure.get().getMessage();
assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"),
"When the user tries to use the incremental write feature without the required range index, we should " +
"fail with a helpful error message. Actual message: " + message);
}

@Test
void noRangeIndexForFieldWithEval() {
filter = IncrementalWriteFilter.newBuilder()
.fieldName("non-existent-field")
.useEvalQuery(true)
.build();

writeTenDocuments();

assertNotNull(batchFailure.get());
String message = batchFailure.get().getMessage();
assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"),
"When the user tries to use the incremental write feature without the required range index, we should " +
"fail with a helpful error message. Actual message: " + message);
}

private void verifyIncrementalWriteWorks() {
writeTenDocuments();
verifyDocumentsHasHashInMetadataKey();
assertEquals(10, writtenCount.get());
assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write.");

Expand All @@ -169,36 +201,44 @@ private void verifyIncrementalWriteWorks() {
}

private void writeTenDocuments() {
new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher
.withThreadCount(1).withBatchSize(5)
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length))
.withDocumentWriteSetFilter(filter),

writeBatcher -> {
for (int i = 1; i <= 10; i++) {
// Consistent URIs are required for incremental writes to work.
String uri = "/incremental/test/doc-" + i + ".xml";
String content = "<doc>This is document number " + i + "</doc>";
writeBatcher.add(uri, METADATA, new StringHandle(content));
}
docs = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
// Consistent URIs are required for incremental writes to work.
String uri = "/incremental/test/doc-" + i + ".xml";
String content = "<doc>This is document number " + i + "</doc>";
docs.add(new DocumentWriteOperationImpl(uri, METADATA, new StringHandle(content)));
}
writeDocs(docs);
}

private void verifyDocumentsHasHashInMetadataKey() {
GenericDocumentManager mgr = Common.client.newDocumentManager();
mgr.setMetadataCategories(DocumentManager.Metadata.METADATAVALUES);
DocumentPage page = mgr.search(Common.client.newQueryManager().newStructuredQueryBuilder().collection("incremental-test"), 1);
while (page.hasNext()) {
DocumentRecord doc = page.next();
DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle());
assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"),
Comment on lines +217 to +221
Copy link

Copilot AI Dec 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page size of 1 means this will iterate one document at a time from the server. For verifying 10 documents, consider using a larger page size (e.g., 10 or 100) to reduce the number of round trips to the server and improve test performance.

Suggested change
DocumentPage page = mgr.search(Common.client.newQueryManager().newStructuredQueryBuilder().collection("incremental-test"), 1);
while (page.hasNext()) {
DocumentRecord doc = page.next();
DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle());
assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"),
mgr.setPageLength(10);
DocumentPage page = mgr.search(Common.client.newQueryManager().newStructuredQueryBuilder().collection("incremental-test"), 1);
while (page.hasNext()) {
DocumentRecord doc = page.next();
DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle());

Copilot uses AI. Check for mistakes.
"Document " + doc.getUri() + " should have an incrementalWriteHash in its metadata values.");

String hash = metadata.getMetadataValues().get("incrementalWriteHash");
try {
// Can use Java's support for parsing unsigned longs in base 16 to verify the hash is valid.
Long.parseUnsignedLong(hash, 16);
} catch (NumberFormatException e) {
fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash);
}
);
}
}

private void modifyFiveDocuments() {
new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher
.withThreadCount(1).withBatchSize(5)
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length))
.withDocumentWriteSetFilter(filter),

writeBatcher -> {
for (int i = 6; i <= 10; i++) {
String uri = "/incremental/test/doc-" + i + ".xml";
String content = "<doc>This is modified content</doc>";
writeBatcher.add(uri, METADATA, new StringHandle(content));
}
}
);
docs = new ArrayList<>();
for (int i = 6; i <= 10; i++) {
String uri = "/incremental/test/doc-" + i + ".xml";
String content = "<doc>This is modified content</doc>";
docs.add(new DocumentWriteOperationImpl(uri, METADATA, new StringHandle(content)));
}
writeDocs(docs);
}

private void writeDocs(List<DocumentWriteOperation> docs) {
Expand Down