From ad22dcb5394cb671ec2bc185c3e946c3184e560c Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Wed, 10 Dec 2025 15:19:38 -0500 Subject: [PATCH] MLE-26420 Can now perform incremental writes IncrementalWriteFilter is the entry point, with a Builder for customizing its behavior. --- marklogic-client-api/build.gradle | 5 + .../filter/IncrementalWriteEvalFilter.java | 57 +++++ .../filter/IncrementalWriteFilter.java | 187 +++++++++++++++ .../filter/IncrementalWriteOpticFilter.java | 55 +++++ .../filter/IncrementalWriteFilterTest.java | 48 ++++ .../filter/IncrementalWriteTest.java | 226 ++++++++++++++++++ .../ml-config/databases/content-database.json | 16 ++ 7 files changed, 594 insertions(+) create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java diff --git a/marklogic-client-api/build.gradle b/marklogic-client-api/build.gradle index 23b984883..b292e3c79 100644 --- a/marklogic-client-api/build.gradle +++ b/marklogic-client-api/build.gradle @@ -37,6 +37,11 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${jacksonVersion}" + // Dependencies for hash generation. Can be safely omitted if not using the incremental write feature. But neither + // has any transitive dependencies, and thus their impact on the dependency tree is minimal. + implementation "io.github.erdtman:java-json-canonicalization:1.1" + implementation "net.openhft:zero-allocation-hashing:0.27ea1" + // Only used by extras (which some examples then depend on) compileOnly 'org.jdom:jdom2:2.0.6.1' compileOnly 'org.dom4j:dom4j:2.2.0' diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java new file mode 100644 index 000000000..50257bb14 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.marklogic.client.datamovement.DocumentWriteSetFilter; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.io.JacksonHandle; + +import java.util.function.Consumer; + +/** + * Uses server-side JavaScript code to get the existing hash values for a set of URIs. + * + * @since 8.1.0 + */ +class IncrementalWriteEvalFilter extends IncrementalWriteFilter { + + private static final String EVAL_SCRIPT = """ + const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris)); + const response = {}; + for (var tuple of tuples) { + response[tuple[0]] = tuple[1]; + } + response + """; + + IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + super(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + + @Override + public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) { + ArrayNode uris = new ObjectMapper().createArrayNode(); + for (DocumentWriteOperation doc : context.getDocumentWriteSet()) { + if (DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) { + uris.add(doc.getUri()); + } + } + + JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT) + .addVariable("fieldName", fieldName) + .addVariable("uris", new JacksonHandle(uris)) + .evalAs(JsonNode.class); + + return filterDocuments(context, uri -> { + if (response.has(uri)) { + return response.get(uri).asText(); + } + return null; + }); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java new file mode 100644 index 000000000..03c1c465a --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.datamovement.DocumentWriteSetFilter; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.impl.DocumentWriteOperationImpl; +import com.marklogic.client.impl.HandleAccessor; +import com.marklogic.client.io.BaseHandle; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.Format; +import net.openhft.hashing.LongHashFunction; +import org.erdtman.jcs.JsonCanonicalizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A DocumentWriteSetFilter that skips writing documents whose content has not changed since the last write + * based on a hash value stored in a MarkLogic field. + * + * @since 8.1.0 + */ +public abstract class IncrementalWriteFilter implements DocumentWriteSetFilter { + + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private String fieldName = "incrementalWriteHash"; + private boolean canonicalizeJson = true; + private boolean useEvalQuery = false; + private Consumer skippedDocumentsConsumer; + + /** + * @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash". + */ + public Builder fieldName(String fieldName) { + this.fieldName = fieldName; + return this; + } + + /** + * @param canonicalizeJson whether to canonicalize JSON content before hashing; defaults to true. + * Delegates to https://github.com/erdtman/java-json-canonicalization for canonicalization. + */ + public Builder canonicalizeJson(boolean canonicalizeJson) { + this.canonicalizeJson = canonicalizeJson; + return this; + } + + /** + * @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false. + */ + public Builder useEvalQuery(boolean useEvalQuery) { + this.useEvalQuery = useEvalQuery; + return this; + } + + /** + * @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed. + */ + public Builder onDocumentsSkipped(Consumer skippedDocumentsConsumer) { + this.skippedDocumentsConsumer = skippedDocumentsConsumer; + return this; + } + + public IncrementalWriteFilter build() { + if (useEvalQuery) { + return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + } + + protected final String fieldName; + private final boolean canonicalizeJson; + private final Consumer skippedDocumentsConsumer; + + // Hardcoding this for now, with a good general purpose hashing function. + // See https://xxhash.com for benchmarks. + private final LongHashFunction hashFunction = LongHashFunction.xx3(); + + public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + this.fieldName = fieldName; + this.canonicalizeJson = canonicalizeJson; + this.skippedDocumentsConsumer = skippedDocumentsConsumer; + } + + protected final DocumentWriteSet filterDocuments(Context context, Function hashRetriever) { + final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet(); + final List skippedDocuments = new ArrayList<>(); + + for (DocumentWriteOperation doc : context.getDocumentWriteSet()) { + if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) { + newWriteSet.add(doc); + continue; + } + + final String contentHash = serializeContent(doc); + final String existingHash = hashRetriever.apply(doc.getUri()); + if (logger.isTraceEnabled()) { + logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash); + } + + if (existingHash != null) { + if (!existingHash.equals(contentHash)) { + newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash)); + } else if (skippedDocumentsConsumer != null) { + skippedDocuments.add(doc); + } else { + // No consumer, so skip the document silently. + } + } else { + newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash)); + } + } + + if (!skippedDocuments.isEmpty()) { + skippedDocumentsConsumer.accept(skippedDocuments.toArray(new DocumentWriteOperation[0])); + } + + return newWriteSet; + } + + private String serializeContent(DocumentWriteOperation doc) { + String content = HandleAccessor.contentAsString(doc.getContent()); + + Format format = null; + if (doc.getContent() instanceof BaseHandle baseHandle) { + format = baseHandle.getFormat(); + } + + if (canonicalizeJson && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) { + JsonCanonicalizer jc; + try { + jc = new JsonCanonicalizer(content); + return jc.getEncodedString(); + } catch (IOException e) { + // Going to improve this in the next PR, as I think we can throw an exception if Format = JSON. + logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}", + doc.getUri(), e.getMessage()); + } + } + + return content; + } + + private boolean isPossiblyJsonContent(String content) { + // This isn't 100% reliable, as the content could be text that just happens to start with { or [, and so + // we'll still need to catch an exception if we try to canonicalize non-JSON content. + String trimmed = content.trim(); + return trimmed.startsWith("{") || trimmed.startsWith("["); + } + + private String computeHash(String content) { + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + long hash = hashFunction.hashBytes(bytes); + return Long.toHexString(hash); + } + + protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) { + DocumentMetadataHandle newMetadata = new DocumentMetadataHandle(); + if (op.getMetadata() != null) { + DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata(); + newMetadata.setPermissions(originalMetadata.getPermissions()); + newMetadata.setCollections(originalMetadata.getCollections()); + newMetadata.setQuality(originalMetadata.getQuality()); + newMetadata.setProperties(originalMetadata.getProperties()); + newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues()); + } + newMetadata.getMetadataValues().put(fieldName, hash); + return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI()); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java new file mode 100644 index 000000000..592186eb1 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.row.RowTemplate; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * Uses an Optic query to get the existing hash values for a set of URIs. + * + * @since 8.1.0 + */ +class IncrementalWriteOpticFilter extends IncrementalWriteFilter { + + IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + super(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + + @Override + public DocumentWriteSet apply(Context context) { + final String[] uris = context.getDocumentWriteSet().stream() + .filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType())) + .map(DocumentWriteOperation::getUri) + .toArray(String[]::new); + + // It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using + // a serialized query instead. That doesn't allow a user to override the query though. + Map existingHashes = new RowTemplate(context.getDatabaseClient()).query(op -> + op.fromLexicons(Map.of( + "uri", op.cts.uriReference(), + "hash", op.cts.fieldReference(super.fieldName) + )).where( + op.cts.documentQuery(op.xs.stringSeq(uris)) + ), + + rows -> { + Map map = new HashMap<>(); + rows.forEach(row -> { + String uri = row.getString("uri"); + String existingHash = row.getString("hash"); + map.put(uri, existingHash); + }); + return map; + } + ); + + return filterDocuments(context, uri -> existingHashes.get(uri)); + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java new file mode 100644 index 000000000..f1fe81518 --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.impl.DocumentWriteOperationImpl; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.StringHandle; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests that make no connection to MarkLogic. + */ +class IncrementalWriteFilterTest { + + /** + * Verifies that when a hash is added, a new metadata object is created so that a doc-specific hash field can be + * added without affecting any other document that might be sharing the same metadata object. + */ + @Test + void addHashToMetadata() { + DocumentMetadataHandle metadata = new DocumentMetadataHandle() + .withCollections("c1") + .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ) + .withQuality(2) + .withProperty("prop1", "value1") + .withMetadataValue("meta1", "value1"); + + DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("")); + DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("")); + + doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123"); + + assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object"); + + DocumentMetadataHandle metadata2 = (DocumentMetadataHandle) doc2.getMetadata(); + assertEquals("c1", metadata2.getCollections().iterator().next(), "collection should be preserved"); + assertEquals(DocumentMetadataHandle.Capability.READ, metadata2.getPermissions().get("rest-reader").iterator().next(), "permission should be preserved"); + assertEquals(2, metadata2.getQuality(), "quality should be preserved"); + assertEquals("value1", metadata2.getProperties().get("prop1"), "property should be preserved"); + + assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved"); + assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added"); + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java new file mode 100644 index 000000000..89c1417ee --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.marklogic.client.DatabaseClient; +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.WriteBatcher; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.impl.DocumentWriteOperationImpl; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.JacksonHandle; +import com.marklogic.client.io.StringHandle; +import com.marklogic.client.test.AbstractClientTest; +import com.marklogic.client.test.Common; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class IncrementalWriteTest extends AbstractClientTest { + + private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle() + .withCollections("incremental-test") + .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE); + + AtomicInteger writtenCount = new AtomicInteger(); + AtomicInteger skippedCount = new AtomicInteger(); + ObjectMapper objectMapper = new ObjectMapper(); + + IncrementalWriteFilter filter; + + @BeforeEach + void setup() { + // Need a user with eval privileges so that the eval filter can be tested. + Common.client = Common.newEvalClient(); + } + + @Test + void opticFilter() { + filter = IncrementalWriteFilter.newBuilder() + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + runTest(); + } + + @Test + void evalFilter() { + filter = IncrementalWriteFilter.newBuilder() + .useEvalQuery(true) + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + runTest(); + } + + @Test + void filterRemovesAllDocuments() { + new WriteBatcherTemplate(Common.client).runWriteJob( + writeBatcher -> writeBatcher + .withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet()) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), + + writeBatcher -> { + for (int i = 1; i <= 10; i++) { + writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("")); + } + } + ); + + assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " + + "This test is verifying that no error will occur either when the filter doesn't return any documents."); + assertCollectionSize("incremental-test", 0); + } + + @Test + void jsonKeysOutOfOrder() { + filter = IncrementalWriteFilter.newBuilder() + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + List docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("number", i); + doc.put("text", "hello"); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(10, writtenCount.get()); + assertEquals(0, skippedCount.get()); + + docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("text", "hello"); + doc.put("number", i); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(10, writtenCount.get()); + assertEquals(10, skippedCount.get(), "Since JSON canonicalization is enabled by default, the documents " + + "should be recognized as unchanged even though their keys are in a different order."); + } + + @Test + void jsonKeysOutOfOrderWithNoCanonicalization() { + filter = IncrementalWriteFilter.newBuilder() + .canonicalizeJson(false) + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + List docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("number", i); + doc.put("text", "hello"); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(10, writtenCount.get()); + assertEquals(0, skippedCount.get()); + + docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("text", "hello"); + doc.put("number", i); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(20, writtenCount.get(), "Since JSON canonicalization is disabled, all documents should be " + + "written again since their keys are in a different order."); + assertEquals(0, skippedCount.get()); + } + + private void runTest() { + writeTenDocuments(); + assertEquals(10, writtenCount.get()); + assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write."); + + writeTenDocuments(); + assertEquals(10, skippedCount.get(), "All docs should have been skipped since their content hasn't changed."); + assertEquals(10, writtenCount.get(), "The count of written should still be 10 since all docs should have been skipped on the second write."); + + modifyFiveDocuments(); + assertEquals(10, skippedCount.get()); + assertEquals(15, writtenCount.get(), "5 documents should have been modified, with their hashes being updated."); + + writeTenDocuments(); + assertEquals(15, skippedCount.get(), "The 5 unmodified documents should have been skipped."); + assertEquals(20, writtenCount.get(), "The 5 modified documents should have been overwritten since their content changed."); + } + + private void writeTenDocuments() { + new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher + .withThreadCount(1).withBatchSize(5) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) + .withDocumentWriteSetFilter(filter), + + writeBatcher -> { + for (int i = 1; i <= 10; i++) { + // Consistent URIs are required for incremental writes to work. + String uri = "/incremental/test/doc-" + i + ".xml"; + String content = "This is document number " + i + ""; + writeBatcher.add(uri, METADATA, new StringHandle(content)); + } + } + ); + } + + private void modifyFiveDocuments() { + new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher + .withThreadCount(1).withBatchSize(5) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) + .withDocumentWriteSetFilter(filter), + + writeBatcher -> { + for (int i = 6; i <= 10; i++) { + String uri = "/incremental/test/doc-" + i + ".xml"; + String content = "This is modified content"; + writeBatcher.add(uri, METADATA, new StringHandle(content)); + } + } + ); + } + + private void writeDocs(List docs) { + new WriteBatcherTemplate(Common.client).runWriteJob( + writeBatcher -> writeBatcher + .withDocumentWriteSetFilter(filter) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), + + writeBatcher -> docs.forEach(writeBatcher::add) + ); + } + + // Experimenting with a template that gets rid of some annoying DMSDK boilerplate. + private record WriteBatcherTemplate(DatabaseClient databaseClient) { + + public void runWriteJob(Consumer writeBatcherConfigurer, Consumer writeBatcherUser) { + try (DataMovementManager dmm = databaseClient.newDataMovementManager()) { + WriteBatcher writeBatcher = dmm.newWriteBatcher(); + writeBatcherConfigurer.accept(writeBatcher); + + dmm.startJob(writeBatcher); + writeBatcherUser.accept(writeBatcher); + writeBatcher.flushAndWait(); + writeBatcher.awaitCompletion(); + dmm.stopJob(writeBatcher); + } + } + } +} diff --git a/test-app/src/main/ml-config/databases/content-database.json b/test-app/src/main/ml-config/databases/content-database.json index 99b1c48c3..4e869464f 100644 --- a/test-app/src/main/ml-config/databases/content-database.json +++ b/test-app/src/main/ml-config/databases/content-database.json @@ -188,6 +188,15 @@ { "field-name": "int2", "include-root": false + }, + { + "field-name": "incrementalWriteHash", + "metadata": "", + "stemmed-searches": "off", + "word-searches": false, + "fast-phrase-searches": false, + "fast-case-sensitive-searches": false, + "fast-diacritic-sensitive-searches": false } ], "range-field-index": [ @@ -204,6 +213,13 @@ "collation": "", "range-value-positions": false, "invalid-values": "reject" + }, + { + "scalar-type": "string", + "collation": "http://marklogic.com/collation/", + "field-name": "incrementalWriteHash", + "range-value-positions": false, + "invalid-values": "reject" } ], "geospatial-element-index": [