An IndexWriter
creates and maintains an index. It’s the basic Class defined in Lucene Core.
The source code in this post is base Lucene 6.6.0.
A simple demonstration of IndexWriter
,
Directory directory = newDirectory();
IndexWriter indexWriter = new IndexWriter(directory, newIndexWriterConfig());
Document document = new Document();
document.add(new SortedDocValuesField("different_field", new BytesRef("1")));
indexWriter.addDocument(document);
indexWriter.close();
what the addDocument
do is adding a document to this index, as we can find it in IndexWriter.java
.
IndexWriter
// IndexWriter.java
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
return updateDocument(null, doc);
}
// IndexWriter.java
private final DocumentsWriter docWriter;
private final Queue<Event> eventQueue;
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();
try {
boolean success = false;
try {
long seqNo = docWriter.updateDocument(doc, analyzer, term);
if (seqNo < 0) {
seqNo = - seqNo;
processEvents(true, false);
}
success = true;
return seqNo;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception updating document");
}
}
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
// dead code but javac disagrees:
return -1;
}
}
From the above sequence diagram, it’s easily to infer that DocumentsWriter
takes over and does the remaining work.
The class diagram is also necessary, so we can address the relationships between classes.
DocumentsWriter
// DocumentsWriter.java
final DocumentsWriterFlushControl flushControl;
long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen();
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
seqNo = dwpt.updateDocuments(docs, analyzer, delTerm);
} catch (AbortingException ae) {
flushControl.doOnAbort(perThread);
dwpt.abort();
throw ae;
} finally {
// We don't know how many documents were actually
// counted as indexed, so we must subtract here to
// accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally {
perThreadPool.release(perThread);
}
if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}
return seqNo;
}
In short the updateDocuments
in DocumentsWriter
,
- obtain one
ThreadState
fromDocumentsWriterFlushControl
- send
updateDocuments
toDocumentsWriterPerThread
- do some post work after documenting.
DocumentsWriterPerThread
DocumentsWriterPerThread
, as its name means, is a document writer for each thread. What we focus here is updateDocuments
method.
// DocumentsWriterPerThread.java
final DocState docState;
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
int docCount = 0;
boolean allDocsIndexed = false;
try {
for(Iterable<? extends IndexableField> doc : docs) {
// Even on exception, the document is still added (but marked
// deleted), so we don't need to un-reserve at that point.
// Aborting exceptions will actually "lose" more than one
// document, so the counter will be "wrong" in that case, but
// it's very hard to fix (we can't easily distinguish aborting
// vs non-aborting exceptions):
reserveOneDoc();
docState.doc = doc;
docState.docID = numDocsInRAM;
docCount++;
boolean success = false;
try {
consumer.processDocument();
success = true;
} finally {
if (!success) {
// Incr here because finishDocument will not
// be called (because an exc is being thrown):
numDocsInRAM++;
}
}
numDocsInRAM++;
}
allDocsIndexed = true;
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
long seqNo;
if (delTerm != null) {
seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
} else {
deleteSlice.reset();
}
}
return seqNo;
} finally {
if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting
// go and mark all docs from this block as deleted
int docID = numDocsInRAM-1;
final int endDocID = docID - docCount;
while (docID > endDocID) {
deleteDocID(docID);
docID--;
}
}
docState.clear();
}
}
- fill
DocState
- call the
processDocument
ofDocConsumer
- update and apply a
deleteQueue
DocConsumer
DefaultIndexingChain
, the child of DocConsumer
, is the default general purpose indexing chain, which handles indexing all types of fields. It doesn’t do that all by itself, but with the help of two fields,
TermsHash
: writes postings and term vectorsStoredFieldsConsumer
: Writes stored fields
// DefaultIndexingChain.java
final class DefaultIndexingChain extends DocConsumer {
final TermsHash termsHash;
final StoredFieldsConsumer storedFieldsConsumer;
}
Let’s see how they accomplish that.
// DefaultIndexingChain.java
public void processDocument() throws IOException, AbortingException {
// How many indexed field names we've seen (collapses
// multiple field instances by the same name):
int fieldCount = 0;
long fieldGen = nextFieldGen++;
// NOTE: we need two passes here, in case there are
// multi-valued fields, because we must process all
// instances of a given field at once, since the
// analyzer is free to reuse TokenStream across fields
// (i.e., we cannot have more than one TokenStream
// running "at once"):
termsHash.startDocument();
startStoredFields(docState.docID);
boolean aborting = false;
try {
for (IndexableField field : docState.doc) {
fieldCount = processField(field, fieldGen, fieldCount);
}
} catch (AbortingException ae) {
aborting = true;
throw ae;
} finally {
if (aborting == false) {
// Finish each indexed field name seen in the document:
for (int i=0;i<fieldCount;i++) {
fields[i].finish();
}
finishStoredFields();
}
}
try {
termsHash.finishDocument();
} catch (Throwable th) {
// Must abort, on the possibility that on-disk term
// vectors are now corrupt:
throw AbortingException.wrap(th);
}
}
// lite version
private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException, AbortingException {
String fieldName = field.name();
IndexableFieldType fieldType = field.fieldType();
PerField fp = null;
// Invert indexed fields:
if (fieldType.indexOptions() != IndexOptions.NONE) {
fp = getOrAddField(fieldName, fieldType, true);
boolean first = fp.fieldGen != fieldGen;
fp.invert(field, first);
if (first) {
fields[fieldCount++] = fp;
fp.fieldGen = fieldGen;
}
} else {
verifyUnIndexedFieldType(fieldName, fieldType);
}
// Add stored fields:
if (fieldType.stored()) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
if (fieldType.stored()) {
String value = field.stringValue();
try {
storedFieldsConsumer.writeField(fp.fieldInfo, field);
} catch (Throwable th) {
throw AbortingException.wrap(th);
}
}
}
DocValuesType dvType = fieldType.docValuesType();
if (dvType != DocValuesType.NONE) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
indexDocValue(fp, dvType, field);
}
if (fieldType.pointDimensionCount() != 0) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
indexPoint(fp, field);
}
return fieldCount;
}
Then, it is StoredFieldsConsumer
’s turn write field with writeField
.
StoredFieldsConsumer
We we unfold the code of StoredFieldsWriter
, it seems that just a wrapper of StoredFieldsWriter
, the underneath handler.
// StoredFieldsConsumer.java
class StoredFieldsConsumer {
final DocumentsWriterPerThread docWriter;
StoredFieldsWriter writer;
void writeField(FieldInfo info, IndexableField field) throws IOException {
writer.writeField(info, field);
}
}
StoredFieldsWriter
StoredFieldsWriter
provides Codec
API for writing stored fields:
- For every document,
startDocument()
is called, informing theCodec
that a new document has started. writeField(FieldInfo, IndexableField)
is called for each field in the document.- After all documents have been written,
finish(FieldInfos, int)
is called for verification/sanity-checks. - Finally the writer is closed
close()
.
Take SimpleTextStoredFieldsWriter
, derived class of StoredFieldsWriter
, for example, whivh writes plain-text stored fields.
// SimpleTextStoredFieldsWriter.java
public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
@Override
public void startDocument() throws IOException {
write(DOC);
write(Integer.toString(numDocsWritten));
newLine();
numDocsWritten++;
}
@Override
public void writeField(FieldInfo info, IndexableField field) throws IOException {
write(FIELD);
write(Integer.toString(info.number));
newLine();
write(NAME);
write(field.name());
newLine();
write(TYPE);
final Number n = field.numericValue();
write(VALUE);
write(Integer.toString(n.intValue()));
newLine();
}
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (numDocsWritten != numDocs) {
throw new RuntimeException("mergeFields produced an invalid result: docCount is " + numDocs
+ " but only saw " + numDocsWritten + " file=" + out.toString() + "; now aborting this merge to prevent index corruption");
}
write(END);
newLine();
SimpleTextUtil.writeChecksum(out, scratch);
}
}
No matter in startDocument
, writeField
or finish
, the private write
method is called.
// SimpleTextStoredFieldsWriter.java
public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
private IndexOutput out;
private void write(BytesRef bytes) throws IOException {
SimpleTextUtil.write(out, bytes);
}
}
Allow me to finish this post when it comes IndexOuput
, which should be discussed in another new post.
Reference