Skip to content

Conversation

@tanghaodong25
Copy link
Contributor

What changes were proposed in this pull request?

How was this PR tested?

  • Tests have Added for the changes
  • Production environment verified

@@ -0,0 +1,139 @@
package org.apache.geaflow.store.lucene;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues mentioned above have been fixed.

@@ -0,0 +1,126 @@
package org.apache.geaflow.store.lucene;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues mentioned above have been fixed.

@Override
public K searchVectorIndex(boolean isVertex, String fieldName, float[] vector, int topK) {
try {
// 打开索引读取器
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add English comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues mentioned above have been fixed.

<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>9.8.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to manage the dependency in the root pom.xml


// 根据K的类型选择不同的Field
if (key instanceof Float) {
doc.add(new FloatField(KEY_FIELD_NAME, (float) key, Field.Store.YES));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store keys as StringField to avoid using deleted IntField/LongField

 String keyAsString = String.valueOf(key);
doc.add(new StringField(KEY_FIELD_NAME, keyAsString, Field.Store.YES));
doc.add(new KnnVectorField(fieldName, vector));

If you need native numeric storage and use numericValue() when retrieving, you can use StoredField instead and use doc.getField(...).numericValue() when retrieving the value.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues mentioned above have been addressed and fixed.

@kitalkuyo-gita
Copy link
Contributor

kitalkuyo-gita commented Oct 24, 2025

@tanghaodong25
In general, there are many areas that can be improved in the current code. I wrote a sample for reference only.
The addition and deletion of luence needs to ensure atomicity. So I maintain an update variable here.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.util.QueryBuilder;

/**
 * Lucene Search Engine in-memory version. Common search, sorted search, update, and delete interfaces are encapsulated.
 * Note: All update operations only update the cache. Search interfaces will not observe changes immediately,
 * as an asynchronous thread periodically commits the updates.
 */
public class LuceneSearchEngine<T> {

	private Analyzer analyzer;
	private QueryBuilder parser;
	private ByteBuffersDirectory directory;
	private IndexWriter iwriter;
	private Function<T, Document> buildDoc;
	private AtomicReference<DirectoryReader> reader;
	private AtomicReference<IndexSearcher> searcher;

	private ScheduledThreadPoolExecutor executor;
	private AtomicBoolean update;

	private static final int COMMIT_DELAY = 20;

	/**
	 * Requires providing a conversion method from the entity class to a Document.
	 * For fuzzy matching, indexed fields using {@link org.apache.lucene.document.TextField}
	 * need to be added to the Document.
	 * {@link org.apache.lucene.document.StringField} only supports single-term exact matching;
	 * it is highly recommended to add a unique ID value of this type for delete and update operations.
	 *
	 * @param elements
	 * @param buildDoc
	 * @throws IOException
	 */
	public LuceneSearchEngine(Collection<T> elements, Function<T, Document> buildDoc) throws IOException{
		this.buildDoc = buildDoc;
		directory = new ByteBuffersDirectory();
		analyzer = new StandardAnalyzer();
		parser = new QueryBuilder(analyzer);
		IndexWriterConfig config = new IndexWriterConfig(analyzer);
		this.iwriter = new IndexWriter(directory, config);
		update = new AtomicBoolean(false);
		addElement(elements);
		iwriter.commit();
		reader = new AtomicReference<DirectoryReader>(DirectoryReader.open(directory));
		searcher = new AtomicReference<IndexSearcher>(new IndexSearcher(reader.get()));
		executor = new ScheduledThreadPoolExecutor(1, new PoolThreadFactory("LuceneSearchEngine", true, 1));
		executor.scheduleWithFixedDelay(() ->{
			if(update.compareAndSet(true, false)){
				try {
					iwriter.commit();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}, 0, COMMIT_DELAY, TimeUnit.SECONDS);
	}

	public void addElement(Collection<T> elements){
		try {
			for (T element : elements) {
				Document document = this.buildDoc.apply(element);
				iwriter.addDocument(document);
			}
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private IndexSearcher chackAndGetIndexSearcher(){
		DirectoryReader directoryReader = reader.get();
		try {
			DirectoryReader newDirectoryReader = DirectoryReader.openIfChanged(directoryReader);
			if(newDirectoryReader != null && reader.compareAndSet(directoryReader, newDirectoryReader)){
				directoryReader = reader.get();
				searcher.set(new IndexSearcher(directoryReader));
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return searcher.get();
	}

	/**
	 * The matching pattern depends on the analyzer's parsing result. If it contains Chinese characters,
	 * it is essentially term-by-term matching; do not use it to query fields of type StringField.
	 * @param field The name of the field to match
	 * @param queryText The content to match
	 * @param page The page number, starting from 1
	 * @param pageSize The size of each page
	 * @param sort The sorting parameter, no sorting performed if null
	 * @return List<Document> The matched Document objects
	 *
	 */
	public List<Document> search(String field, String queryText, int page, int pageSize, Sort sort){
		Query query = parser.createPhraseQuery(field, queryText);
		if(query == null){
			return Collections.emptyList();
		}
		IndexSearcher isearcher = chackAndGetIndexSearcher();
		try {
			ScoreDoc startDoc = null;
			TopDocs topDocs = null;
			if(page > 1){
				if(sort != null){
					topDocs = isearcher.search(query, (page - 1) * pageSize, sort);
				}else {
					topDocs = isearcher.search(query, (page - 1) * pageSize);
				}
				if(topDocs.scoreDocs.length != 0){
					startDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
				}
			}
			if(sort != null){
				topDocs = isearcher.searchAfter(startDoc, query, pageSize, sort);
			}else {
				topDocs = isearcher.searchAfter(startDoc, query, pageSize);
			}
			List<Document> result = new ArrayList<Document>(topDocs.scoreDocs.length);
			for(ScoreDoc scoreDocs : topDocs.scoreDocs){
				result.add(isearcher.doc(scoreDocs.doc));
			}
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}

	public List<Document> wildCardSearch(String field, String queryText, int page, int pageSize, Sort sort){
		IndexSearcher isearcher = chackAndGetIndexSearcher();
		try {
			 Term t1 = new Term(field, queryText);
			 WildcardQuery query = new WildcardQuery(t1);

			ScoreDoc startDoc = null;
			TopDocs topDocs = null;
			if(page > 1){
				if(sort != null){
					topDocs = isearcher.search(query, (page - 1) * pageSize, sort);
				}else {
					topDocs = isearcher.search(query, (page - 1) * pageSize);
				}
				if(topDocs.scoreDocs.length != 0){
					startDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
				}
			}
			if(sort != null){
				topDocs = isearcher.searchAfter(startDoc, query, pageSize, sort);
			}else {
				topDocs = isearcher.searchAfter(startDoc, query, pageSize);
			}
			List<Document> result = new ArrayList<Document>(topDocs.scoreDocs.length);
			for(ScoreDoc scoreDocs : topDocs.scoreDocs){
				result.add(isearcher.doc(scoreDocs.doc));
			}
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * Enforces non-analyzed exact matching to prevent accidental deletion of wrong values.
	 */
	public void deleleElement(String field, String queryText){
		try {
			Query query = new TermQuery(new Term(field, queryText));
			iwriter.deleteDocuments(query);
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}


	/**
	 * Enforces non-analyzed exact matching to prevent accidental update of wrong values.
	 * @param field The name of the field to specify for matching the unique ID.
	 */
	public void updateElement(Collection<T> elements, String field){
		try {
			for (T element : elements) {
				Document document = buildDoc.apply(element);
				Term query = new Term(field, document.get(field));
				iwriter.updateDocument(query, document);
			}
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void close(){
		if(reader.get() != null){
			try {
				reader.get().close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(iwriter != null){
			try {
				iwriter.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(directory != null){
			try {
				directory.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(executor != null){
			executor.shutdownNow();
		}
	}

	private static class PoolThreadFactory implements ThreadFactory{
		private final ThreadGroup group;
		private final String namePrefix;
		private final Boolean daemon;


		public PoolThreadFactory(String namePrefix, Boolean daemon, int index) {
			SecurityManager s = System.getSecurityManager();
			this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
			this.namePrefix = String.format("%s-%d", namePrefix, index);
			this.daemon = daemon;
		}
		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(group, r, namePrefix, 0);
			// The newly created thread is initially marked as being a daemon thread if and only if
			// the thread creating it is currently marked as a daemon thread.
			if (this.daemon != null) {
				t.setDaemon(this.daemon.booleanValue());
			}

			if (t.getPriority() != Thread.NORM_PRIORITY) {
				t.setPriority(Thread.NORM_PRIORITY);
			}
			return t;
		}

	}

}

@tanghaodong25
Copy link
Contributor Author

kitalkuyo-gita

Thanks for the thoughtful feedback! I really appreciate the sample and your guidance on ensuring atomicity for addition and deletion in lucene with the update variable. I’ll revise the code accordingly and push an updated version soon.

@tanghaodong25
Copy link
Contributor Author

@tanghaodong25 In general, there are many areas that can be improved in the current code. I wrote a sample for reference only. The addition and deletion of luence needs to ensure atomicity. So I maintain an update variable here.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.util.QueryBuilder;

/**
 * Lucene Search Engine in-memory version. Common search, sorted search, update, and delete interfaces are encapsulated.
 * Note: All update operations only update the cache. Search interfaces will not observe changes immediately,
 * as an asynchronous thread periodically commits the updates.
 */
public class LuceneSearchEngine<T> {

	private Analyzer analyzer;
	private QueryBuilder parser;
	private ByteBuffersDirectory directory;
	private IndexWriter iwriter;
	private Function<T, Document> buildDoc;
	private AtomicReference<DirectoryReader> reader;
	private AtomicReference<IndexSearcher> searcher;

	private ScheduledThreadPoolExecutor executor;
	private AtomicBoolean update;

	private static final int COMMIT_DELAY = 20;

	/**
	 * Requires providing a conversion method from the entity class to a Document.
	 * For fuzzy matching, indexed fields using {@link org.apache.lucene.document.TextField}
	 * need to be added to the Document.
	 * {@link org.apache.lucene.document.StringField} only supports single-term exact matching;
	 * it is highly recommended to add a unique ID value of this type for delete and update operations.
	 *
	 * @param elements
	 * @param buildDoc
	 * @throws IOException
	 */
	public LuceneSearchEngine(Collection<T> elements, Function<T, Document> buildDoc) throws IOException{
		this.buildDoc = buildDoc;
		directory = new ByteBuffersDirectory();
		analyzer = new StandardAnalyzer();
		parser = new QueryBuilder(analyzer);
		IndexWriterConfig config = new IndexWriterConfig(analyzer);
		this.iwriter = new IndexWriter(directory, config);
		update = new AtomicBoolean(false);
		addElement(elements);
		iwriter.commit();
		reader = new AtomicReference<DirectoryReader>(DirectoryReader.open(directory));
		searcher = new AtomicReference<IndexSearcher>(new IndexSearcher(reader.get()));
		executor = new ScheduledThreadPoolExecutor(1, new PoolThreadFactory("LuceneSearchEngine", true, 1));
		executor.scheduleWithFixedDelay(() ->{
			if(update.compareAndSet(true, false)){
				try {
					iwriter.commit();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}, 0, COMMIT_DELAY, TimeUnit.SECONDS);
	}

	public void addElement(Collection<T> elements){
		try {
			for (T element : elements) {
				Document document = this.buildDoc.apply(element);
				iwriter.addDocument(document);
			}
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private IndexSearcher chackAndGetIndexSearcher(){
		DirectoryReader directoryReader = reader.get();
		try {
			DirectoryReader newDirectoryReader = DirectoryReader.openIfChanged(directoryReader);
			if(newDirectoryReader != null && reader.compareAndSet(directoryReader, newDirectoryReader)){
				directoryReader = reader.get();
				searcher.set(new IndexSearcher(directoryReader));
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return searcher.get();
	}

	/**
	 * The matching pattern depends on the analyzer's parsing result. If it contains Chinese characters,
	 * it is essentially term-by-term matching; do not use it to query fields of type StringField.
	 * @param field The name of the field to match
	 * @param queryText The content to match
	 * @param page The page number, starting from 1
	 * @param pageSize The size of each page
	 * @param sort The sorting parameter, no sorting performed if null
	 * @return List<Document> The matched Document objects
	 *
	 */
	public List<Document> search(String field, String queryText, int page, int pageSize, Sort sort){
		Query query = parser.createPhraseQuery(field, queryText);
		if(query == null){
			return Collections.emptyList();
		}
		IndexSearcher isearcher = chackAndGetIndexSearcher();
		try {
			ScoreDoc startDoc = null;
			TopDocs topDocs = null;
			if(page > 1){
				if(sort != null){
					topDocs = isearcher.search(query, (page - 1) * pageSize, sort);
				}else {
					topDocs = isearcher.search(query, (page - 1) * pageSize);
				}
				if(topDocs.scoreDocs.length != 0){
					startDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
				}
			}
			if(sort != null){
				topDocs = isearcher.searchAfter(startDoc, query, pageSize, sort);
			}else {
				topDocs = isearcher.searchAfter(startDoc, query, pageSize);
			}
			List<Document> result = new ArrayList<Document>(topDocs.scoreDocs.length);
			for(ScoreDoc scoreDocs : topDocs.scoreDocs){
				result.add(isearcher.doc(scoreDocs.doc));
			}
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}

	public List<Document> wildCardSearch(String field, String queryText, int page, int pageSize, Sort sort){
		IndexSearcher isearcher = chackAndGetIndexSearcher();
		try {
			 Term t1 = new Term(field, queryText);
			 WildcardQuery query = new WildcardQuery(t1);

			ScoreDoc startDoc = null;
			TopDocs topDocs = null;
			if(page > 1){
				if(sort != null){
					topDocs = isearcher.search(query, (page - 1) * pageSize, sort);
				}else {
					topDocs = isearcher.search(query, (page - 1) * pageSize);
				}
				if(topDocs.scoreDocs.length != 0){
					startDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
				}
			}
			if(sort != null){
				topDocs = isearcher.searchAfter(startDoc, query, pageSize, sort);
			}else {
				topDocs = isearcher.searchAfter(startDoc, query, pageSize);
			}
			List<Document> result = new ArrayList<Document>(topDocs.scoreDocs.length);
			for(ScoreDoc scoreDocs : topDocs.scoreDocs){
				result.add(isearcher.doc(scoreDocs.doc));
			}
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * Enforces non-analyzed exact matching to prevent accidental deletion of wrong values.
	 */
	public void deleleElement(String field, String queryText){
		try {
			Query query = new TermQuery(new Term(field, queryText));
			iwriter.deleteDocuments(query);
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}


	/**
	 * Enforces non-analyzed exact matching to prevent accidental update of wrong values.
	 * @param field The name of the field to specify for matching the unique ID.
	 */
	public void updateElement(Collection<T> elements, String field){
		try {
			for (T element : elements) {
				Document document = buildDoc.apply(element);
				Term query = new Term(field, document.get(field));
				iwriter.updateDocument(query, document);
			}
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void close(){
		if(reader.get() != null){
			try {
				reader.get().close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(iwriter != null){
			try {
				iwriter.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(directory != null){
			try {
				directory.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(executor != null){
			executor.shutdownNow();
		}
	}

	private static class PoolThreadFactory implements ThreadFactory{
		private final ThreadGroup group;
		private final String namePrefix;
		private final Boolean daemon;


		public PoolThreadFactory(String namePrefix, Boolean daemon, int index) {
			SecurityManager s = System.getSecurityManager();
			this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
			this.namePrefix = String.format("%s-%d", namePrefix, index);
			this.daemon = daemon;
		}
		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(group, r, namePrefix, 0);
			// The newly created thread is initially marked as being a daemon thread if and only if
			// the thread creating it is currently marked as a daemon thread.
			if (this.daemon != null) {
				t.setDaemon(this.daemon.booleanValue());
			}

			if (t.getPriority() != Thread.NORM_PRIORITY) {
				t.setPriority(Thread.NORM_PRIORITY);
			}
			return t;
		}

	}

}

Concurrency is not considered for now, and comments have been added. For deletion scenarios, we still need to ensure consistency with the graph data, which will be addressed later in a unified review.

@tanghaodong25
Copy link
Contributor Author

@kitalkuyo-gita Please take another look and let me know if this resolves your concerns? cc @DukeWangYu

Copy link
Contributor

@qingwen220 qingwen220 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@qingwen220 qingwen220 merged commit 45b1969 into apache:master Dec 31, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants