diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java index 50257bb14..46aba85c1 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.marklogic.client.FailedRequestException; import com.marklogic.client.datamovement.DocumentWriteSetFilter; import com.marklogic.client.document.DocumentWriteOperation; import com.marklogic.client.document.DocumentWriteSet; @@ -42,16 +43,21 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) { } } - JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT) - .addVariable("fieldName", fieldName) - .addVariable("uris", new JacksonHandle(uris)) - .evalAs(JsonNode.class); - - return filterDocuments(context, uri -> { - if (response.has(uri)) { - return response.get(uri).asText(); - } - return null; - }); + try { + JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT) + .addVariable("fieldName", fieldName) + .addVariable("uris", new JacksonHandle(uris)) + .evalAs(JsonNode.class); + + return filterDocuments(context, uri -> { + if (response.has(uri)) { + return response.get(uri).asText(); + } + return null; + }); + } catch (FailedRequestException e) { + String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage(); + throw new FailedRequestException(message, e.getFailedRequest()); + } } } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java index 18dc50387..8fe27d05d 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java @@ -109,7 +109,7 @@ protected final DocumentWriteSet filterDocuments(Context context, Function existingHashes = new RowTemplate(context.getDatabaseClient()).query(op -> - op.fromLexicons(Map.of( - "uri", op.cts.uriReference(), - "hash", op.cts.fieldReference(super.fieldName) - )).where( - op.cts.documentQuery(op.xs.stringSeq(uris)) - ), - - rows -> { - Map map = new HashMap<>(); - rows.forEach(row -> { - String uri = row.getString("uri"); - String existingHash = row.getString("hash"); - map.put(uri, existingHash); - }); - return map; - } - ); - - return filterDocuments(context, uri -> existingHashes.get(uri)); + RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient()); + + try { + Map existingHashes = rowTemplate.query(op -> + op.fromLexicons(Map.of( + "uri", op.cts.uriReference(), + "hash", op.cts.fieldReference(super.fieldName) + )).where( + op.cts.documentQuery(op.xs.stringSeq(uris)) + ), + + rows -> { + Map map = new HashMap<>(); + rows.forEach(row -> { + String uri = row.getString("uri"); + String existingHash = row.getString("hash"); + map.put(uri, existingHash); + }); + return map; + } + ); + + return filterDocuments(context, uri -> existingHashes.get(uri)); + } catch (FailedRequestException e) { + String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage(); + throw new FailedRequestException(message, e.getFailedRequest()); + } } } diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java index 438784f8e..388473553 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.*; import com.marklogic.client.impl.DocumentWriteOperationImpl; import com.marklogic.client.io.DocumentMetadataHandle; import com.marklogic.client.io.Format; @@ -150,8 +150,40 @@ void invalidJsonWithFormat() { "Expecting the server to throw an error. Actual message: " + message); } + @Test + void noRangeIndexForField() { + filter = IncrementalWriteFilter.newBuilder() + .fieldName("non-existent-field") + .build(); + + writeTenDocuments(); + + assertNotNull(batchFailure.get()); + String message = batchFailure.get().getMessage(); + assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"), + "When the user tries to use the incremental write feature without the required range index, we should " + + "fail with a helpful error message. Actual message: " + message); + } + + @Test + void noRangeIndexForFieldWithEval() { + filter = IncrementalWriteFilter.newBuilder() + .fieldName("non-existent-field") + .useEvalQuery(true) + .build(); + + writeTenDocuments(); + + assertNotNull(batchFailure.get()); + String message = batchFailure.get().getMessage(); + assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"), + "When the user tries to use the incremental write feature without the required range index, we should " + + "fail with a helpful error message. Actual message: " + message); + } + private void verifyIncrementalWriteWorks() { writeTenDocuments(); + verifyDocumentsHasHashInMetadataKey(); assertEquals(10, writtenCount.get()); assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write."); @@ -169,36 +201,44 @@ private void verifyIncrementalWriteWorks() { } private void writeTenDocuments() { - new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher - .withThreadCount(1).withBatchSize(5) - .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) - .withDocumentWriteSetFilter(filter), - - writeBatcher -> { - for (int i = 1; i <= 10; i++) { - // Consistent URIs are required for incremental writes to work. - String uri = "/incremental/test/doc-" + i + ".xml"; - String content = "This is document number " + i + ""; - writeBatcher.add(uri, METADATA, new StringHandle(content)); - } + docs = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + // Consistent URIs are required for incremental writes to work. + String uri = "/incremental/test/doc-" + i + ".xml"; + String content = "This is document number " + i + ""; + docs.add(new DocumentWriteOperationImpl(uri, METADATA, new StringHandle(content))); + } + writeDocs(docs); + } + + private void verifyDocumentsHasHashInMetadataKey() { + GenericDocumentManager mgr = Common.client.newDocumentManager(); + mgr.setMetadataCategories(DocumentManager.Metadata.METADATAVALUES); + DocumentPage page = mgr.search(Common.client.newQueryManager().newStructuredQueryBuilder().collection("incremental-test"), 1); + while (page.hasNext()) { + DocumentRecord doc = page.next(); + DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle()); + assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"), + "Document " + doc.getUri() + " should have an incrementalWriteHash in its metadata values."); + + String hash = metadata.getMetadataValues().get("incrementalWriteHash"); + try { + // Can use Java's support for parsing unsigned longs in base 16 to verify the hash is valid. + Long.parseUnsignedLong(hash, 16); + } catch (NumberFormatException e) { + fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash); } - ); + } } private void modifyFiveDocuments() { - new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher - .withThreadCount(1).withBatchSize(5) - .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) - .withDocumentWriteSetFilter(filter), - - writeBatcher -> { - for (int i = 6; i <= 10; i++) { - String uri = "/incremental/test/doc-" + i + ".xml"; - String content = "This is modified content"; - writeBatcher.add(uri, METADATA, new StringHandle(content)); - } - } - ); + docs = new ArrayList<>(); + for (int i = 6; i <= 10; i++) { + String uri = "/incremental/test/doc-" + i + ".xml"; + String content = "This is modified content"; + docs.add(new DocumentWriteOperationImpl(uri, METADATA, new StringHandle(content))); + } + writeDocs(docs); } private void writeDocs(List docs) {