From 8ea32e4d92e2b44ee482b52a0f3e0ddca32804bd Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Wed, 10 Dec 2025 15:19:38 -0500 Subject: [PATCH] MLE-26420 Can now perform incremental writes Added DocumentWriteSetFilter as a generic interface for modifying a DocumentWriteSet before it's written. IncrementalWriteFilter is then the entry point, with a Builder for customizing its behavior. Also started moving some tests into "com.marklogic.client.datamovement" so we can have unit tests that verify protected methods. --- marklogic-client-api/build.gradle | 5 + .../datamovement/DocumentWriteSetFilter.java | 39 +++ .../client/datamovement/WriteBatcher.java | 13 + .../filter/IncrementalWriteEvalFilter.java | 57 +++++ .../filter/IncrementalWriteFilter.java | 178 ++++++++++++++ .../filter/IncrementalWriteOpticFilter.java | 55 +++++ .../datamovement/impl/BatchWriteSet.java | 24 +- .../client/datamovement/impl/BatchWriter.java | 13 +- .../datamovement/impl/WriteBatcherImpl.java | 15 +- .../okhttp/RetryIOExceptionInterceptor.java | 5 +- .../WriteNakedPropertiesTest.java | 13 +- .../filter/IncrementalWriteFilterTest.java | 48 ++++ .../filter/IncrementalWriteTest.java | 226 ++++++++++++++++++ .../datamovement/IncrementalWriteTest.java | 68 ------ .../ml-config/databases/content-database.json | 22 +- 15 files changed, 691 insertions(+), 90 deletions(-) create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java rename marklogic-client-api/src/test/java/com/marklogic/client/{test => }/datamovement/WriteNakedPropertiesTest.java (80%) create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java create mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java delete mode 100644 marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java diff --git a/marklogic-client-api/build.gradle b/marklogic-client-api/build.gradle index 23b984883..b292e3c79 100644 --- a/marklogic-client-api/build.gradle +++ b/marklogic-client-api/build.gradle @@ -37,6 +37,11 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${jacksonVersion}" + // Dependencies for hash generation. Can be safely omitted if not using the incremental write feature. But neither + // has any transitive dependencies, and thus their impact on the dependency tree is minimal. + implementation "io.github.erdtman:java-json-canonicalization:1.1" + implementation "net.openhft:zero-allocation-hashing:0.27ea1" + // Only used by extras (which some examples then depend on) compileOnly 'org.jdom:jdom2:2.0.6.1' compileOnly 'org.dom4j:dom4j:2.2.0' diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java new file mode 100644 index 000000000..7f00d8161 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/DocumentWriteSetFilter.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement; + +import com.marklogic.client.DatabaseClient; +import com.marklogic.client.document.DocumentWriteSet; + +import java.util.function.Function; + +/** + * A filter that can modify a DocumentWriteSet before it is written to the database. + * + * @since 8.1.0 + */ +public interface DocumentWriteSetFilter extends Function { + + interface Context { + /** + * @return the DocumentWriteSet to be written + */ + DocumentWriteSet getDocumentWriteSet(); + + /** + * @return the batch number + */ + long getBatchNumber(); + + /** + * @return the DatabaseClient being used for this batch + */ + DatabaseClient getDatabaseClient(); + + /** + * @return the temporal collection name, or null if not writing to a temporal collection + */ + String getTemporalCollection(); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java index 656b029cb..0facc145f 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java @@ -357,4 +357,17 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle, * @param writeBatch the information about the batch that failed */ void retryWithFailureListeners(WriteBatch writeBatch); + + /** + * Sets a filter to modify or replace the DocumentWriteSet before it is written. + * The filter can return either the modified DocumentWriteSet or a new one. + * If the filter returns null or an empty DocumentWriteSet, no write will occur. + * + * @param filter the function to apply before writing + * @return this instance for method chaining + * @since 8.1.0 + */ + default WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) { + return this; + } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java new file mode 100644 index 000000000..5b22dcdc7 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.marklogic.client.datamovement.DocumentWriteSetFilter; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.io.JacksonHandle; + +import java.util.function.Consumer; + +/** + * Uses server-side JavaScript code to get the existing hash values for a set of URIs. + * + * @since 8.1.0 + */ +class IncrementalWriteEvalFilter extends IncrementalWriteFilter { + + private static final String EVAL_SCRIPT = """ + const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris)); + const response = {}; + for (var tuple of tuples) { + response[tuple[0]] = tuple[1]; + } + response + """; + + IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + super(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + + @Override + public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) { + ArrayNode uris = new ObjectMapper().createArrayNode(); + context.getDocumentWriteSet().stream().forEach(op -> { + if (DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType())) { + uris.add(op.getUri()); + } + }); + + JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT) + .addVariable("fieldName", fieldName) + .addVariable("uris", new JacksonHandle(uris)) + .evalAs(JsonNode.class); + + return filterDocuments(context, uri -> { + if (response.has(uri)) { + return response.get(uri).asText(); + } + return null; + }); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java new file mode 100644 index 000000000..e14b4b0b3 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.datamovement.DocumentWriteSetFilter; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.impl.DocumentWriteOperationImpl; +import com.marklogic.client.impl.HandleAccessor; +import com.marklogic.client.io.BaseHandle; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.Format; +import com.marklogic.client.io.marker.AbstractWriteHandle; +import net.openhft.hashing.LongHashFunction; +import org.erdtman.jcs.JsonCanonicalizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A DocumentWriteSetFilter that skips writing documents whose content has not changed since the last write + * based on a hash value stored in a MarkLogic field. + * + * @since 8.1.0 + */ +public abstract class IncrementalWriteFilter implements DocumentWriteSetFilter { + + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private String fieldName = "incrementalWriteHash"; + private boolean canonicalizeJson = true; + private boolean useEvalQuery = false; + private Consumer skippedDocumentsConsumer; + + /** + * @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash". + */ + public Builder fieldName(String fieldName) { + this.fieldName = fieldName; + return this; + } + + /** + * @param canonicalizeJson whether to canonicalize JSON content before hashing; defaults to true. + * Delegates to https://github.com/erdtman/java-json-canonicalization for canonicalization. + */ + public Builder canonicalizeJson(boolean canonicalizeJson) { + this.canonicalizeJson = canonicalizeJson; + return this; + } + + /** + * @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false. + */ + public Builder useEvalQuery(boolean useEvalQuery) { + this.useEvalQuery = useEvalQuery; + return this; + } + + /** + * @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed. + */ + public Builder onDocumentsSkipped(Consumer skippedDocumentsConsumer) { + this.skippedDocumentsConsumer = skippedDocumentsConsumer; + return this; + } + + public IncrementalWriteFilter build() { + if (useEvalQuery) { + return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + } + + protected final String fieldName; + private final boolean canonicalizeJson; + private final Consumer skippedDocumentsConsumer; + + // Hardcoding this for now, with a good general purpose hashing function. + // See https://xxhash.com for benchmarks. + private final LongHashFunction hashFunction = LongHashFunction.xx3(); + + public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + this.fieldName = fieldName; + this.canonicalizeJson = canonicalizeJson; + this.skippedDocumentsConsumer = skippedDocumentsConsumer; + } + + protected final DocumentWriteSet filterDocuments(Context context, Function hashRetriever) { + final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet(); + final List skippedDocuments = new ArrayList<>(); + + for (DocumentWriteOperation doc : context.getDocumentWriteSet()) { + if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) { + newWriteSet.add(doc); + continue; + } + + final String content = serializeContent(doc.getContent()); + final String contentHash = computeHash(content); + final String existingHash = hashRetriever.apply(doc.getUri()); + if (logger.isTraceEnabled()) { + logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash); + } + + if (existingHash != null) { + if (!existingHash.equals(contentHash)) { + newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash)); + } else if (skippedDocumentsConsumer != null) { + skippedDocuments.add(doc); + } + } else { + newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash)); + } + } + + if (!skippedDocuments.isEmpty()) { + skippedDocumentsConsumer.accept(skippedDocuments.toArray(new DocumentWriteOperation[0])); + } + + return newWriteSet; + } + + private String serializeContent(AbstractWriteHandle contentHandle) { + String content = HandleAccessor.contentAsString(contentHandle); + + Format format = null; + if (contentHandle instanceof BaseHandle baseHandle) { + format = baseHandle.getFormat(); + } + + if (canonicalizeJson && (Format.JSON.equals(format) || content.startsWith("{"))) { + JsonCanonicalizer jc; + try { + jc = new JsonCanonicalizer(content); + } catch (IOException e) { + throw new RuntimeException("Unable to parse JSON content, cause: " + e.getMessage(), e); + } + return jc.getEncodedString(); + } + + return content; + } + + private String computeHash(String content) { + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + long hash = hashFunction.hashBytes(bytes); + return Long.toHexString(hash); + } + + protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) { + DocumentMetadataHandle newMetadata = new DocumentMetadataHandle(); + if (op.getMetadata() != null) { + DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata(); + newMetadata.setPermissions(originalMetadata.getPermissions()); + newMetadata.setCollections(originalMetadata.getCollections()); + newMetadata.setQuality(originalMetadata.getQuality()); + newMetadata.setProperties(originalMetadata.getProperties()); + newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues()); + } + newMetadata.getMetadataValues().put(fieldName, hash); + return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI()); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java new file mode 100644 index 000000000..592186eb1 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.row.RowTemplate; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * Uses an Optic query to get the existing hash values for a set of URIs. + * + * @since 8.1.0 + */ +class IncrementalWriteOpticFilter extends IncrementalWriteFilter { + + IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer) { + super(fieldName, canonicalizeJson, skippedDocumentsConsumer); + } + + @Override + public DocumentWriteSet apply(Context context) { + final String[] uris = context.getDocumentWriteSet().stream() + .filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType())) + .map(DocumentWriteOperation::getUri) + .toArray(String[]::new); + + // It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using + // a serialized query instead. That doesn't allow a user to override the query though. + Map existingHashes = new RowTemplate(context.getDatabaseClient()).query(op -> + op.fromLexicons(Map.of( + "uri", op.cts.uriReference(), + "hash", op.cts.fieldReference(super.fieldName) + )).where( + op.cts.documentQuery(op.xs.stringSeq(uris)) + ), + + rows -> { + Map map = new HashMap<>(); + rows.forEach(row -> { + String uri = row.getString("uri"); + String existingHash = row.getString("hash"); + map.put(uri, existingHash); + }); + return map; + } + ); + + return filterDocuments(context, uri -> existingHashes.get(uri)); + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java index 0c08fdd7b..f6e91c91a 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java @@ -4,6 +4,7 @@ package com.marklogic.client.datamovement.impl; import com.marklogic.client.DatabaseClient; +import com.marklogic.client.datamovement.DocumentWriteSetFilter; import com.marklogic.client.datamovement.WriteBatch; import com.marklogic.client.datamovement.WriteBatcher; import com.marklogic.client.datamovement.WriteEvent; @@ -16,15 +17,17 @@ * Mutable class that captures the documents to be written. Documents are added via calls to "getDocumentWriteSet()", where the * DocumentWriteSet is empty when this class is constructed. */ -class BatchWriteSet { +class BatchWriteSet implements DocumentWriteSetFilter.Context { private final WriteBatcher batcher; - private final DocumentWriteSet documentWriteSet; private final long batchNumber; private final DatabaseClient client; private final ServerTransform transform; private final String temporalCollection; + // Can be overridden after creation + private DocumentWriteSet documentWriteSet; + private long itemsSoFar; private Runnable onSuccess; private Consumer onFailure; @@ -38,10 +41,21 @@ class BatchWriteSet { this.batchNumber = batchNumber; } + /** + * Must be called if a DocumentWriteSetFilter modified the DocumentWriteSet owned by this class. + * + * @since 8.1.0 + */ + void updateWithFilteredDocumentWriteSet(DocumentWriteSet filteredDocumentWriteSet) { + this.documentWriteSet = filteredDocumentWriteSet; + } + + @Override public DocumentWriteSet getDocumentWriteSet() { return documentWriteSet; } + @Override public long getBatchNumber() { return batchNumber; } @@ -50,6 +64,11 @@ public void setItemsSoFar(long itemsSoFar) { this.itemsSoFar = itemsSoFar; } + @Override + public DatabaseClient getDatabaseClient() { + return client; + } + public DatabaseClient getClient() { return client; } @@ -58,6 +77,7 @@ public ServerTransform getTransform() { return transform; } + @Override public String getTemporalCollection() { return temporalCollection; } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java index 2173034dd..a2ebe835d 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java @@ -3,6 +3,7 @@ */ package com.marklogic.client.datamovement.impl; +import com.marklogic.client.datamovement.DocumentWriteSetFilter; import com.marklogic.client.document.DocumentWriteOperation; import com.marklogic.client.document.DocumentWriteSet; import com.marklogic.client.document.XMLDocumentManager; @@ -13,7 +14,7 @@ import java.io.Closeable; import java.util.function.Consumer; -record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable { +record BatchWriter(BatchWriteSet batchWriteSet, DocumentWriteSetFilter filter) implements Runnable { private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class); @@ -28,6 +29,16 @@ public void run() { logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost()); DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet(); + if (filter != null) { + documentWriteSet = filter.apply(batchWriteSet); + if (documentWriteSet == null || documentWriteSet.isEmpty()) { + logger.debug("Filter returned empty write set for batch {}, skipping write", batchWriteSet.getBatchNumber()); + closeAllHandles(); + return; + } + batchWriteSet.updateWithFilteredDocumentWriteSet(documentWriteSet); + } + writeDocuments(documentWriteSet); // This seems like it should be part of a finally block - but it's able to throw an exception. Which implies diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java index 154068522..1b376fb85 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java @@ -126,6 +126,7 @@ public class WriteBatcherImpl private boolean initialized = false; private CompletableThreadPoolExecutor threadPool = null; private DocumentMetadataHandle defaultMetadata; + private DocumentWriteSetFilter documentWriteSetFilter; public WriteBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) { super(moveMgr); @@ -200,7 +201,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) { writeSet.getDocumentWriteSet().add(doc); } if ( writeSet.getDocumentWriteSet().size() > minBatchSize ) { - threadPool.submit( new BatchWriter(writeSet) ); + threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) ); } } return this; @@ -308,7 +309,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) { for (WriteEvent doc : batch.getItems()) { writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } - BatchWriter runnable = new BatchWriter(writeSet); + BatchWriter runnable = new BatchWriter(writeSet, documentWriteSetFilter); runnable.run(); } @Override @@ -379,7 +380,7 @@ private void flush(boolean waitForCompletion) { DocumentWriteOperation doc = iter.next(); writeSet.getDocumentWriteSet().add(doc); } - threadPool.submit( new BatchWriter(writeSet) ); + threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) ); } if (waitForCompletion) awaitCompletion(); @@ -597,7 +598,7 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) { writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent()); } - BatchWriter retryWriterTask = new BatchWriter(writeSet); + BatchWriter retryWriterTask = new BatchWriter(writeSet, documentWriteSetFilter); Runnable fretryWriterTask = (Runnable) threadPool.submit(retryWriterTask); threadPool.replaceTask(writerTask, fretryWriterTask); // jump to the next task @@ -846,4 +847,10 @@ public void addAll(Stream operations) { public DocumentMetadataHandle getDocumentMetadata() { return defaultMetadata; } + + @Override + public WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) { + this.documentWriteSetFilter = filter; + return this; + } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java b/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java index 656e399c5..b2f57e0c3 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java @@ -3,6 +3,7 @@ */ package com.marklogic.client.impl.okhttp; +import com.marklogic.client.MarkLogicIOException; import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; @@ -47,7 +48,7 @@ public Response intercept(Chain chain) throws IOException { for (int attempt = 0; attempt <= maxRetries; attempt++) { try { return chain.proceed(request); - } catch (IOException e) { + } catch (MarkLogicIOException | IOException e) { if (attempt == maxRetries || !isRetryableIOException(e)) { logger.warn("Not retryable: {}; {}", e.getClass(), e.getMessage()); throw e; @@ -65,7 +66,7 @@ public Response intercept(Chain chain) throws IOException { throw new IllegalStateException("Unexpected end of retry loop"); } - private boolean isRetryableIOException(IOException e) { + private boolean isRetryableIOException(Exception e) { return e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof UnknownHostException || diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java similarity index 80% rename from marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java rename to marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java index e97f87158..fb7b58d89 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java @@ -1,14 +1,12 @@ /* * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. */ -package com.marklogic.client.test.datamovement; +package com.marklogic.client.datamovement; import com.marklogic.client.DatabaseClient; -import com.marklogic.client.datamovement.DataMovementManager; -import com.marklogic.client.datamovement.WriteBatcher; import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.test.AbstractClientTest; import com.marklogic.client.test.Common; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.xml.namespace.QName; @@ -16,12 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -public class WriteNakedPropertiesTest { - - @BeforeEach - void setup() { - Common.newRestAdminClient().newXMLDocumentManager().delete("/naked.xml"); - } +class WriteNakedPropertiesTest extends AbstractClientTest { @Test void test() { diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java new file mode 100644 index 000000000..f1fe81518 --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilterTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.impl.DocumentWriteOperationImpl; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.StringHandle; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests that make no connection to MarkLogic. + */ +class IncrementalWriteFilterTest { + + /** + * Verifies that when a hash is added, a new metadata object is created so that a doc-specific hash field can be + * added without affecting any other document that might be sharing the same metadata object. + */ + @Test + void addHashToMetadata() { + DocumentMetadataHandle metadata = new DocumentMetadataHandle() + .withCollections("c1") + .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ) + .withQuality(2) + .withProperty("prop1", "value1") + .withMetadataValue("meta1", "value1"); + + DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("")); + DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("")); + + doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123"); + + assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object"); + + DocumentMetadataHandle metadata2 = (DocumentMetadataHandle) doc2.getMetadata(); + assertEquals("c1", metadata2.getCollections().iterator().next(), "collection should be preserved"); + assertEquals(DocumentMetadataHandle.Capability.READ, metadata2.getPermissions().get("rest-reader").iterator().next(), "permission should be preserved"); + assertEquals(2, metadata2.getQuality(), "quality should be preserved"); + assertEquals("value1", metadata2.getProperties().get("prop1"), "property should be preserved"); + + assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved"); + assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added"); + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java new file mode 100644 index 000000000..89c1417ee --- /dev/null +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.marklogic.client.DatabaseClient; +import com.marklogic.client.datamovement.DataMovementManager; +import com.marklogic.client.datamovement.WriteBatcher; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.impl.DocumentWriteOperationImpl; +import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.JacksonHandle; +import com.marklogic.client.io.StringHandle; +import com.marklogic.client.test.AbstractClientTest; +import com.marklogic.client.test.Common; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class IncrementalWriteTest extends AbstractClientTest { + + private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle() + .withCollections("incremental-test") + .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE); + + AtomicInteger writtenCount = new AtomicInteger(); + AtomicInteger skippedCount = new AtomicInteger(); + ObjectMapper objectMapper = new ObjectMapper(); + + IncrementalWriteFilter filter; + + @BeforeEach + void setup() { + // Need a user with eval privileges so that the eval filter can be tested. + Common.client = Common.newEvalClient(); + } + + @Test + void opticFilter() { + filter = IncrementalWriteFilter.newBuilder() + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + runTest(); + } + + @Test + void evalFilter() { + filter = IncrementalWriteFilter.newBuilder() + .useEvalQuery(true) + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + runTest(); + } + + @Test + void filterRemovesAllDocuments() { + new WriteBatcherTemplate(Common.client).runWriteJob( + writeBatcher -> writeBatcher + .withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet()) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), + + writeBatcher -> { + for (int i = 1; i <= 10; i++) { + writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("")); + } + } + ); + + assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " + + "This test is verifying that no error will occur either when the filter doesn't return any documents."); + assertCollectionSize("incremental-test", 0); + } + + @Test + void jsonKeysOutOfOrder() { + filter = IncrementalWriteFilter.newBuilder() + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + List docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("number", i); + doc.put("text", "hello"); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(10, writtenCount.get()); + assertEquals(0, skippedCount.get()); + + docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("text", "hello"); + doc.put("number", i); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(10, writtenCount.get()); + assertEquals(10, skippedCount.get(), "Since JSON canonicalization is enabled by default, the documents " + + "should be recognized as unchanged even though their keys are in a different order."); + } + + @Test + void jsonKeysOutOfOrderWithNoCanonicalization() { + filter = IncrementalWriteFilter.newBuilder() + .canonicalizeJson(false) + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + List docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("number", i); + doc.put("text", "hello"); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(10, writtenCount.get()); + assertEquals(0, skippedCount.get()); + + docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + ObjectNode doc = objectMapper.createObjectNode(); + doc.put("text", "hello"); + doc.put("number", i); + docs.add(new DocumentWriteOperationImpl("/incremental/test/doc-" + i + ".json", METADATA, new JacksonHandle(doc))); + } + + writeDocs(docs); + assertEquals(20, writtenCount.get(), "Since JSON canonicalization is disabled, all documents should be " + + "written again since their keys are in a different order."); + assertEquals(0, skippedCount.get()); + } + + private void runTest() { + writeTenDocuments(); + assertEquals(10, writtenCount.get()); + assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write."); + + writeTenDocuments(); + assertEquals(10, skippedCount.get(), "All docs should have been skipped since their content hasn't changed."); + assertEquals(10, writtenCount.get(), "The count of written should still be 10 since all docs should have been skipped on the second write."); + + modifyFiveDocuments(); + assertEquals(10, skippedCount.get()); + assertEquals(15, writtenCount.get(), "5 documents should have been modified, with their hashes being updated."); + + writeTenDocuments(); + assertEquals(15, skippedCount.get(), "The 5 unmodified documents should have been skipped."); + assertEquals(20, writtenCount.get(), "The 5 modified documents should have been overwritten since their content changed."); + } + + private void writeTenDocuments() { + new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher + .withThreadCount(1).withBatchSize(5) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) + .withDocumentWriteSetFilter(filter), + + writeBatcher -> { + for (int i = 1; i <= 10; i++) { + // Consistent URIs are required for incremental writes to work. + String uri = "/incremental/test/doc-" + i + ".xml"; + String content = "This is document number " + i + ""; + writeBatcher.add(uri, METADATA, new StringHandle(content)); + } + } + ); + } + + private void modifyFiveDocuments() { + new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher + .withThreadCount(1).withBatchSize(5) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) + .withDocumentWriteSetFilter(filter), + + writeBatcher -> { + for (int i = 6; i <= 10; i++) { + String uri = "/incremental/test/doc-" + i + ".xml"; + String content = "This is modified content"; + writeBatcher.add(uri, METADATA, new StringHandle(content)); + } + } + ); + } + + private void writeDocs(List docs) { + new WriteBatcherTemplate(Common.client).runWriteJob( + writeBatcher -> writeBatcher + .withDocumentWriteSetFilter(filter) + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), + + writeBatcher -> docs.forEach(writeBatcher::add) + ); + } + + // Experimenting with a template that gets rid of some annoying DMSDK boilerplate. + private record WriteBatcherTemplate(DatabaseClient databaseClient) { + + public void runWriteJob(Consumer writeBatcherConfigurer, Consumer writeBatcherUser) { + try (DataMovementManager dmm = databaseClient.newDataMovementManager()) { + WriteBatcher writeBatcher = dmm.newWriteBatcher(); + writeBatcherConfigurer.accept(writeBatcher); + + dmm.startJob(writeBatcher); + writeBatcherUser.accept(writeBatcher); + writeBatcher.flushAndWait(); + writeBatcher.awaitCompletion(); + dmm.stopJob(writeBatcher); + } + } + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java deleted file mode 100644 index 83a81e1c5..000000000 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/IncrementalWriteTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. - */ -package com.marklogic.client.test.datamovement; - -import com.marklogic.client.DatabaseClient; -import com.marklogic.client.datamovement.DataMovementManager; -import com.marklogic.client.datamovement.WriteBatcher; -import com.marklogic.client.io.DocumentMetadataHandle; -import com.marklogic.client.io.StringHandle; -import com.marklogic.client.test.AbstractClientTest; -import com.marklogic.client.test.Common; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -class IncrementalWriteTest extends AbstractClientTest { - - private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle() - .withCollections("incremental-test") - .withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE); - - @Test - void test() { - AtomicInteger writtenCount = new AtomicInteger(); - - try (DatabaseClient client = Common.newClient()) { - WriteBatcherTemplate template = new WriteBatcherTemplate(client); - - template.runWriteJob(writeBatcher -> writeBatcher - .withThreadCount(1) - .withBatchSize(10) - .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), - - writeBatcher -> { - for (int i = 1; i <= 20; i++) { - String uri = "/incremental/test/doc-" + i + ".xml"; - String content = "" + i + "This is document number " + i + ""; - writeBatcher.add(uri, METADATA, new StringHandle(content)); - } - } - ); - } - - assertEquals(20, writtenCount.get()); - } - - // Experimenting with a template that gets rid of some annoying DMSDK boilerplate. - private record WriteBatcherTemplate(DatabaseClient databaseClient) { - - public void runWriteJob(Consumer writeBatcherConfigurer, Consumer writeBatcherUser) { - try (DataMovementManager dmm = databaseClient.newDataMovementManager()) { - WriteBatcher writeBatcher = dmm.newWriteBatcher(); - writeBatcherConfigurer.accept(writeBatcher); - - dmm.startJob(writeBatcher); - - writeBatcherUser.accept(writeBatcher); - writeBatcher.awaitCompletion(); - - dmm.stopJob(writeBatcher); - } - } - } -} diff --git a/test-app/src/main/ml-config/databases/content-database.json b/test-app/src/main/ml-config/databases/content-database.json index ec7a36f01..e1f3f1328 100644 --- a/test-app/src/main/ml-config/databases/content-database.json +++ b/test-app/src/main/ml-config/databases/content-database.json @@ -188,7 +188,16 @@ { "field-name": "int2", "include-root": false - } + }, + { + "field-name": "incrementalWriteHash", + "metadata": "", + "stemmed-searches": "off", + "word-searches": false, + "fast-phrase-searches": false, + "fast-case-sensitive-searches": false, + "fast-diacritic-sensitive-searches": false + } ], "range-field-index": [ { @@ -204,7 +213,14 @@ "collation": "", "range-value-positions": false, "invalid-values": "reject" - } + }, + { + "scalar-type": "string", + "collation": "http://marklogic.com/collation/", + "field-name": "incrementalWriteHash", + "range-value-positions": false, + "invalid-values": "reject" + } ], "geospatial-element-index": [ { @@ -300,4 +316,4 @@ "location": "rdfs.rules" } ] -} \ No newline at end of file +}