Skip to content

Commit a96d910

Browse files
committed
MLE-26420 Improved error handling for incremental writes
1 parent f6cfa96 commit a96d910

File tree

2 files changed

+44
-54
lines changed

2 files changed

+44
-54
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ private String serializeContent(DocumentWriteOperation doc) {
149149
jc = new JsonCanonicalizer(content);
150150
return jc.getEncodedString();
151151
} catch (IOException e) {
152-
// Going to improve this in the next PR, as I think we can throw an exception if Format = JSON.
152+
// If the Format is actually JSON, then the write to MarkLogic should ultimately fail, which is the
153+
// error message the user would want to see via a batch failure listener. So in all cases, if we cannot
154+
// canonicalize something that appears to be JSON, we log a warning and return the original content for hashing.
153155
logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}",
154156
doc.getUri(), e.getMessage());
155157
}

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java

Lines changed: 41 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@
55

66
import com.fasterxml.jackson.databind.ObjectMapper;
77
import com.fasterxml.jackson.databind.node.ObjectNode;
8-
import com.marklogic.client.DatabaseClient;
9-
import com.marklogic.client.datamovement.DataMovementManager;
10-
import com.marklogic.client.datamovement.WriteBatcher;
118
import com.marklogic.client.document.DocumentWriteOperation;
129
import com.marklogic.client.impl.DocumentWriteOperationImpl;
1310
import com.marklogic.client.io.DocumentMetadataHandle;
11+
import com.marklogic.client.io.Format;
1412
import com.marklogic.client.io.JacksonHandle;
1513
import com.marklogic.client.io.StringHandle;
1614
import com.marklogic.client.test.AbstractClientTest;
@@ -21,9 +19,9 @@
2119
import java.util.ArrayList;
2220
import java.util.List;
2321
import java.util.concurrent.atomic.AtomicInteger;
24-
import java.util.function.Consumer;
22+
import java.util.concurrent.atomic.AtomicReference;
2523

26-
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
import static org.junit.jupiter.api.Assertions.*;
2725

2826
class IncrementalWriteTest extends AbstractClientTest {
2927

@@ -33,23 +31,26 @@ class IncrementalWriteTest extends AbstractClientTest {
3331

3432
AtomicInteger writtenCount = new AtomicInteger();
3533
AtomicInteger skippedCount = new AtomicInteger();
34+
AtomicReference<Throwable> batchFailure = new AtomicReference<>();
3635
ObjectMapper objectMapper = new ObjectMapper();
3736

37+
List<DocumentWriteOperation> docs = new ArrayList<>();
3838
IncrementalWriteFilter filter;
3939

4040
@BeforeEach
4141
void setup() {
4242
// Need a user with eval privileges so that the eval filter can be tested.
4343
Common.client = Common.newEvalClient();
44-
}
4544

46-
@Test
47-
void opticFilter() {
45+
// Default filter implementation, should be suitable for most tests.
4846
filter = IncrementalWriteFilter.newBuilder()
4947
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
5048
.build();
49+
}
5150

52-
runTest();
51+
@Test
52+
void opticFilter() {
53+
verifyIncrementalWriteWorks();
5354
}
5455

5556
@Test
@@ -59,35 +60,11 @@ void evalFilter() {
5960
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
6061
.build();
6162

62-
runTest();
63-
}
64-
65-
@Test
66-
void filterRemovesAllDocuments() {
67-
new WriteBatcherTemplate(Common.client).runWriteJob(
68-
writeBatcher -> writeBatcher
69-
.withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet())
70-
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)),
71-
72-
writeBatcher -> {
73-
for (int i = 1; i <= 10; i++) {
74-
writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("<doc/>"));
75-
}
76-
}
77-
);
78-
79-
assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " +
80-
"This test is verifying that no error will occur either when the filter doesn't return any documents.");
81-
assertCollectionSize("incremental-test", 0);
63+
verifyIncrementalWriteWorks();
8264
}
8365

8466
@Test
8567
void jsonKeysOutOfOrder() {
86-
filter = IncrementalWriteFilter.newBuilder()
87-
.onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length))
88-
.build();
89-
90-
List<DocumentWriteOperation> docs = new ArrayList<>();
9168
for (int i = 1; i <= 10; i++) {
9269
ObjectNode doc = objectMapper.createObjectNode();
9370
doc.put("number", i);
@@ -146,7 +123,34 @@ void jsonKeysOutOfOrderWithNoCanonicalization() {
146123
assertEquals(0, skippedCount.get());
147124
}
148125

149-
private void runTest() {
126+
@Test
127+
void invalidJsonWithNoFormat() {
128+
docs.add(new DocumentWriteOperationImpl("/aaa.txt", METADATA, new StringHandle("{\"not actually json")));
129+
writeDocs(docs);
130+
131+
assertEquals(1, writtenCount.get(), "When the format is not specified and the content looks like JSON " +
132+
"because it starts with a '{', the JSON canonicalization should fail and log a warning. The " +
133+
"document should still be written with a hash generated based on the text in the document.");
134+
135+
assertNull(batchFailure.get(), "No failure should have been thrown since the format on the content is " +
136+
"not JSON, and thus the content should be hashed as text.");
137+
}
138+
139+
@Test
140+
void invalidJsonWithFormat() {
141+
docs.add(new DocumentWriteOperationImpl("/aaa.json", METADATA, new StringHandle("not actually json").withFormat(Format.JSON)));
142+
writeDocs(docs);
143+
144+
assertNotNull(batchFailure.get(), "A failure should have been thrown by the server since the content is not " +
145+
"JSON. But the failure to canonicalize should still be logged, as the user will be far more interested " +
146+
"in the error from the server.");
147+
148+
String message = batchFailure.get().getMessage();
149+
assertTrue(message.contains("failed to apply resource at documents"),
150+
"Expecting the server to throw an error. Actual message: " + message);
151+
}
152+
153+
private void verifyIncrementalWriteWorks() {
150154
writeTenDocuments();
151155
assertEquals(10, writtenCount.get());
152156
assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write.");
@@ -201,26 +205,10 @@ private void writeDocs(List<DocumentWriteOperation> docs) {
201205
new WriteBatcherTemplate(Common.client).runWriteJob(
202206
writeBatcher -> writeBatcher
203207
.withDocumentWriteSetFilter(filter)
204-
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)),
208+
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length))
209+
.onBatchFailure((batch, failure) -> batchFailure.set(failure)),
205210

206211
writeBatcher -> docs.forEach(writeBatcher::add)
207212
);
208213
}
209-
210-
// Experimenting with a template that gets rid of some annoying DMSDK boilerplate.
211-
private record WriteBatcherTemplate(DatabaseClient databaseClient) {
212-
213-
public void runWriteJob(Consumer<WriteBatcher> writeBatcherConfigurer, Consumer<WriteBatcher> writeBatcherUser) {
214-
try (DataMovementManager dmm = databaseClient.newDataMovementManager()) {
215-
WriteBatcher writeBatcher = dmm.newWriteBatcher();
216-
writeBatcherConfigurer.accept(writeBatcher);
217-
218-
dmm.startJob(writeBatcher);
219-
writeBatcherUser.accept(writeBatcher);
220-
writeBatcher.flushAndWait();
221-
writeBatcher.awaitCompletion();
222-
dmm.stopJob(writeBatcher);
223-
}
224-
}
225-
}
226214
}

0 commit comments

Comments
 (0)