Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
class IncrementalWriteEvalFilter extends IncrementalWriteFilter {

private static final String EVAL_SCRIPT = """
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(fieldName)], null, cts.documentQuery(uris));
const tuples = cts.valueTuples([cts.uriReference(), cts.fieldReference(hashKeyName)], null, cts.documentQuery(uris));
const response = {};
for (var tuple of tuples) {
response[tuple[0]] = tuple[1];
}
response
""";

IncrementalWriteEvalFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
IncrementalWriteEvalFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
}

@Override
Expand All @@ -45,7 +46,7 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {

try {
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
.addVariable("fieldName", fieldName)
.addVariable("hashKeyName", hashKeyName)
.addVariable("uris", new JacksonHandle(uris))
.evalAs(JsonNode.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -39,16 +40,25 @@ public static Builder newBuilder() {

public static class Builder {

private String fieldName = "incrementalWriteHash";
private String hashKeyName = "incrementalWriteHash";
private String timestampKeyName = "incrementalWriteTimestamp";
private boolean canonicalizeJson = true;
private boolean useEvalQuery = false;
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;

/**
* @param fieldName the name of the MarkLogic field that will hold the hash value; defaults to "incrementalWriteHash".
* @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash".
*/
public Builder fieldName(String fieldName) {
this.fieldName = fieldName;
public Builder hashKeyName(String keyName) {
this.hashKeyName = keyName;
return this;
}

/**
* @param keyName the name of the MarkLogic metadata key that will hold the timestamp value; defaults to "incrementalWriteTimestamp".
*/
public Builder timestampKeyName(String keyName) {
this.timestampKeyName = keyName;
return this;
}

Expand Down Expand Up @@ -79,29 +89,32 @@ public Builder onDocumentsSkipped(Consumer<DocumentWriteOperation[]> skippedDocu

public IncrementalWriteFilter build() {
if (useEvalQuery) {
return new IncrementalWriteEvalFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
}
return new IncrementalWriteOpticFilter(fieldName, canonicalizeJson, skippedDocumentsConsumer);
return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
}
}

protected final String fieldName;
protected final String hashKeyName;
private final String timestampKeyName;
private final boolean canonicalizeJson;
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;

// Hardcoding this for now, with a good general purpose hashing function.
// See https://xxhash.com for benchmarks.
private final LongHashFunction hashFunction = LongHashFunction.xx3();

public IncrementalWriteFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
this.fieldName = fieldName;
public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
this.hashKeyName = hashKeyName;
this.timestampKeyName = timestampKeyName;
this.canonicalizeJson = canonicalizeJson;
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
}

protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
final DocumentWriteSet newWriteSet = context.getDatabaseClient().newDocumentManager().newWriteSet();
final List<DocumentWriteOperation> skippedDocuments = new ArrayList<>();
final String timestamp = Instant.now().toString();

for (DocumentWriteOperation doc : context.getDocumentWriteSet()) {
if (!DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(doc.getOperationType())) {
Expand All @@ -117,14 +130,14 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin

if (existingHash != null) {
if (!existingHash.equals(contentHash)) {
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
newWriteSet.add(addHashToMetadata(doc, hashKeyName, contentHash, timestampKeyName, timestamp));
} else if (skippedDocumentsConsumer != null) {
skippedDocuments.add(doc);
} else {
// No consumer, so skip the document silently.
}
} else {
newWriteSet.add(addHashToMetadata(doc, fieldName, contentHash));
newWriteSet.add(addHashToMetadata(doc, hashKeyName, contentHash, timestampKeyName, timestamp));
}
}

Expand Down Expand Up @@ -173,7 +186,8 @@ private String computeHash(String content) {
return Long.toHexString(hash);
}

protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String fieldName, String hash) {
protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation op, String hashKeyName, String hash,
String timestampKeyName, String timestamp) {
DocumentMetadataHandle newMetadata = new DocumentMetadataHandle();
if (op.getMetadata() != null) {
DocumentMetadataHandle originalMetadata = (DocumentMetadataHandle) op.getMetadata();
Expand All @@ -183,7 +197,10 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation
newMetadata.setProperties(originalMetadata.getProperties());
newMetadata.getMetadataValues().putAll(originalMetadata.getMetadataValues());
}
newMetadata.getMetadataValues().put(fieldName, hash);

newMetadata.getMetadataValues().put(hashKeyName, hash);
newMetadata.getMetadataValues().put(timestampKeyName, timestamp);

return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
*/
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {

IncrementalWriteOpticFilter(String fieldName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
super(fieldName, canonicalizeJson, skippedDocumentsConsumer);
IncrementalWriteOpticFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer) {
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer);
}

@Override
Expand All @@ -38,7 +39,7 @@ public DocumentWriteSet apply(Context context) {
Map<String, String> existingHashes = rowTemplate.query(op ->
op.fromLexicons(Map.of(
"uri", op.cts.uriReference(),
"hash", op.cts.fieldReference(super.fieldName)
"hash", op.cts.fieldReference(super.hashKeyName)
)).where(
op.cts.documentQuery(op.xs.stringSeq(uris))
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.marklogic.client.io.StringHandle;
import org.junit.jupiter.api.Test;

import java.time.Instant;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand All @@ -32,7 +34,8 @@ void addHashToMetadata() {
DocumentWriteOperation doc1 = new DocumentWriteOperationImpl("/1.xml", metadata, new StringHandle("<doc1/>"));
DocumentWriteOperation doc2 = new DocumentWriteOperationImpl("/2.xml", metadata, new StringHandle("<doc2/>"));

doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123");
final String timestamp = Instant.now().toString();
doc2 = IncrementalWriteFilter.addHashToMetadata(doc2, "theField", "abc123", "theTimestamp", timestamp);

assertEquals(metadata, doc1.getMetadata(), "doc1 should still have the original metadata object");

Expand All @@ -44,5 +47,6 @@ void addHashToMetadata() {

assertEquals("value1", metadata2.getMetadataValues().get("meta1"), "metadata value should be preserved");
assertEquals("abc123", metadata2.getMetadataValues().get("theField"), "hash field should be added");
assertEquals(timestamp, metadata2.getMetadataValues().get("theTimestamp"), "timestamp should be added");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ void invalidJsonWithFormat() {
@Test
void noRangeIndexForField() {
filter = IncrementalWriteFilter.newBuilder()
.fieldName("non-existent-field")
.hashKeyName("non-existent-field")
.build();

writeTenDocuments();
Expand All @@ -168,7 +168,7 @@ void noRangeIndexForField() {
@Test
void noRangeIndexForFieldWithEval() {
filter = IncrementalWriteFilter.newBuilder()
.fieldName("non-existent-field")
.hashKeyName("non-existent-field")
.useEvalQuery(true)
.build();

Expand Down Expand Up @@ -218,8 +218,6 @@ private void verifyDocumentsHasHashInMetadataKey() {
while (page.hasNext()) {
DocumentRecord doc = page.next();
DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle());
assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"),
"Document " + doc.getUri() + " should have an incrementalWriteHash in its metadata values.");

String hash = metadata.getMetadataValues().get("incrementalWriteHash");
try {
Expand All @@ -228,6 +226,9 @@ private void verifyDocumentsHasHashInMetadataKey() {
} catch (NumberFormatException e) {
fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash);
}

String timestamp = metadata.getMetadataValues().get("incrementalWriteTimestamp");
assertNotNull(timestamp, "Document " + doc.getUri() + " should have an incrementalWriteTimestamp value.");
}
}

Expand Down