An IndexWriter creates and maintains an index. It’s the basic Class defined in Lucene Core.

The source code in this post is base Lucene 6.6.0.

A simple demonstration of IndexWriter,

Directory directory = newDirectory();
IndexWriter indexWriter = new IndexWriter(directory, newIndexWriterConfig());
Document document = new Document();
document.add(new SortedDocValuesField("different_field", new BytesRef("1")));
indexWriter.addDocument(document);
indexWriter.close();

what the addDocumentdo is adding a document to this index, as we can find it in IndexWriter.java.

IndexWriter

// IndexWriter.java

  public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
    return updateDocument(null, doc);
  }
// IndexWriter.java

  private final DocumentsWriter docWriter;
  private final Queue<Event> eventQueue;

  public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
    ensureOpen();
    try {
      boolean success = false;
      try {
        long seqNo = docWriter.updateDocument(doc, analyzer, term);
        if (seqNo < 0) {
          seqNo = - seqNo;
          processEvents(true, false);
        }
        success = true;
        return seqNo;
      } finally {
        if (!success) {
          if (infoStream.isEnabled("IW")) {
            infoStream.message("IW", "hit exception updating document");
          }
        }
      }
    } catch (AbortingException | VirtualMachineError tragedy) {
      tragicEvent(tragedy, "updateDocument");

      // dead code but javac disagrees:
      return -1;
    }
  }

From the above sequence diagram, it’s easily to infer that DocumentsWriter takes over and does the remaining work.

The class diagram is also necessary, so we can address the relationships between classes.

DocumentsWriter

// DocumentsWriter.java

final DocumentsWriterFlushControl flushControl;

long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
                       final Term delTerm) throws IOException, AbortingException {
    boolean hasEvents = preUpdate();

    final ThreadState perThread = flushControl.obtainAndLock();
    final DocumentsWriterPerThread flushingDWPT;
    long seqNo;

    try {
      // This must happen after we've pulled the ThreadState because IW.close
      // waits for all ThreadStates to be released:
      ensureOpen();
      ensureInitialized(perThread);
      assert perThread.isInitialized();
      final DocumentsWriterPerThread dwpt = perThread.dwpt;
      final int dwptNumDocs = dwpt.getNumDocsInRAM();
      try {
        seqNo = dwpt.updateDocuments(docs, analyzer, delTerm);
      } catch (AbortingException ae) {
        flushControl.doOnAbort(perThread);
        dwpt.abort();
        throw ae;
      } finally {
        // We don't know how many documents were actually
        // counted as indexed, so we must subtract here to
        // accumulate our separate counter:
        numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
      }
      final boolean isUpdate = delTerm != null;
      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);

      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
      perThread.lastSeqNo = seqNo;

    } finally {
      perThreadPool.release(perThread);
    }

    if (postUpdate(flushingDWPT, hasEvents)) {
      seqNo = -seqNo;
    }
    return seqNo;
  }

In short the updateDocuments in DocumentsWriter,

  1. obtain one ThreadState from DocumentsWriterFlushControl
  2. send updateDocuments to DocumentsWriterPerThread
  3. do some post work after documenting.

DocumentsWriterPerThread

DocumentsWriterPerThread, as its name means, is a document writer for each thread. What we focus here is updateDocuments method.

// DocumentsWriterPerThread.java

final DocState docState;

public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
    testPoint("DocumentsWriterPerThread addDocuments start");
    assert deleteQueue != null;
    docState.analyzer = analyzer;
    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
    }
    int docCount = 0;
    boolean allDocsIndexed = false;
    try {
      
      for(Iterable<? extends IndexableField> doc : docs) {
        // Even on exception, the document is still added (but marked
        // deleted), so we don't need to un-reserve at that point.
        // Aborting exceptions will actually "lose" more than one
        // document, so the counter will be "wrong" in that case, but
        // it's very hard to fix (we can't easily distinguish aborting
        // vs non-aborting exceptions):
        reserveOneDoc();
        docState.doc = doc;
        docState.docID = numDocsInRAM;
        docCount++;

        boolean success = false;
        try {
          consumer.processDocument();
          success = true;
        } finally {
          if (!success) {
            // Incr here because finishDocument will not
            // be called (because an exc is being thrown):
            numDocsInRAM++;
          }
        }

        numDocsInRAM++;
      }
      allDocsIndexed = true;

      // Apply delTerm only after all indexing has
      // succeeded, but apply it only to docs prior to when
      // this batch started:
      long seqNo;
      if (delTerm != null) {
        seqNo = deleteQueue.add(delTerm, deleteSlice);
        assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
        deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
        return seqNo;
      } else {
        seqNo = deleteQueue.updateSlice(deleteSlice);
        if (seqNo < 0) {
          seqNo = -seqNo;
          deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
        } else {
          deleteSlice.reset();
        }
      }

      return seqNo;

    } finally {
      if (!allDocsIndexed && !aborted) {
        // the iterator threw an exception that is not aborting 
        // go and mark all docs from this block as deleted
        int docID = numDocsInRAM-1;
        final int endDocID = docID - docCount;
        while (docID > endDocID) {
          deleteDocID(docID);
          docID--;
        }
      }
      docState.clear();
    }
  }
  1. fill DocState
  2. call the processDocument of DocConsumer
  3. update and apply a deleteQueue

DocConsumer

~

DefaultIndexingChain, the child of DocConsumer, is the default general purpose indexing chain, which handles indexing all types of fields. It doesn’t do that all by itself, but with the help of two fields,

  • TermsHash: writes postings and term vectors
  • StoredFieldsConsumer: Writes stored fields
// DefaultIndexingChain.java

final class DefaultIndexingChain extends DocConsumer {
  final TermsHash termsHash;
  final StoredFieldsConsumer storedFieldsConsumer;
}

Let’s see how they accomplish that.

// DefaultIndexingChain.java

public void processDocument() throws IOException, AbortingException {

  // How many indexed field names we've seen (collapses
  // multiple field instances by the same name):
  int fieldCount = 0;

  long fieldGen = nextFieldGen++;

  // NOTE: we need two passes here, in case there are
  // multi-valued fields, because we must process all
  // instances of a given field at once, since the
  // analyzer is free to reuse TokenStream across fields
  // (i.e., we cannot have more than one TokenStream
  // running "at once"):

  termsHash.startDocument();

  startStoredFields(docState.docID);

  boolean aborting = false;
  try {
    for (IndexableField field : docState.doc) {
      fieldCount = processField(field, fieldGen, fieldCount);
    }
  } catch (AbortingException ae) {
    aborting = true;
    throw ae;
  } finally {
    if (aborting == false) {
      // Finish each indexed field name seen in the document:
      for (int i=0;i<fieldCount;i++) {
        fields[i].finish();
      }
      finishStoredFields();
    }
  }

  try {
    termsHash.finishDocument();
  } catch (Throwable th) {
    // Must abort, on the possibility that on-disk term
    // vectors are now corrupt:
    throw AbortingException.wrap(th);
  }
}

// lite version
private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException, AbortingException {
  String fieldName = field.name();
  IndexableFieldType fieldType = field.fieldType();

  PerField fp = null;

  // Invert indexed fields:
  if (fieldType.indexOptions() != IndexOptions.NONE) {
    fp = getOrAddField(fieldName, fieldType, true);
    boolean first = fp.fieldGen != fieldGen;
    fp.invert(field, first);

    if (first) {
      fields[fieldCount++] = fp;
      fp.fieldGen = fieldGen;
    }
  } else {
    verifyUnIndexedFieldType(fieldName, fieldType);
  }

  // Add stored fields:
  if (fieldType.stored()) {
    if (fp == null) {
      fp = getOrAddField(fieldName, fieldType, false);
    }
    if (fieldType.stored()) {
      String value = field.stringValue();
      try {
        storedFieldsConsumer.writeField(fp.fieldInfo, field);
      } catch (Throwable th) {
        throw AbortingException.wrap(th);
      }
    }
  }

  DocValuesType dvType = fieldType.docValuesType();
  if (dvType != DocValuesType.NONE) {
    if (fp == null) {
      fp = getOrAddField(fieldName, fieldType, false);
    }
    indexDocValue(fp, dvType, field);
  }
  if (fieldType.pointDimensionCount() != 0) {
    if (fp == null) {
      fp = getOrAddField(fieldName, fieldType, false);
    }
    indexPoint(fp, field);
  }
  
  return fieldCount;
}

Then, it is StoredFieldsConsumer’s turn write field with writeField.

StoredFieldsConsumer

We we unfold the code of StoredFieldsWriter, it seems that just a wrapper of StoredFieldsWriter, the underneath handler.

// StoredFieldsConsumer.java

class StoredFieldsConsumer {
  final DocumentsWriterPerThread docWriter;
  StoredFieldsWriter writer;

  void writeField(FieldInfo info, IndexableField field) throws IOException {
    writer.writeField(info, field);
  }
}

StoredFieldsWriter

StoredFieldsWriter provides Codec API for writing stored fields:

  1. For every document, startDocument() is called, informing the Codec that a new document has started.
  2. writeField(FieldInfo, IndexableField) is called for each field in the document.
  3. After all documents have been written, finish(FieldInfos, int) is called for verification/sanity-checks.
  4. Finally the writer is closed close().

Take SimpleTextStoredFieldsWriter, derived class of StoredFieldsWriter, for example, whivh writes plain-text stored fields.

// SimpleTextStoredFieldsWriter.java

public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
  @Override
  public void startDocument() throws IOException {
    write(DOC);
    write(Integer.toString(numDocsWritten));
    newLine();
    
    numDocsWritten++;
  }

  @Override
  public void writeField(FieldInfo info, IndexableField field) throws IOException {
    write(FIELD);
    write(Integer.toString(info.number));
    newLine();

    write(NAME);
    write(field.name());
    newLine();
    
    write(TYPE);
    final Number n = field.numericValue();

    write(VALUE);
    write(Integer.toString(n.intValue()));
    newLine();
  }

  @Override
  public void finish(FieldInfos fis, int numDocs) throws IOException {
    if (numDocsWritten != numDocs) {
      throw new RuntimeException("mergeFields produced an invalid result: docCount is " + numDocs 
          + " but only saw " + numDocsWritten + " file=" + out.toString() + "; now aborting this merge to prevent index corruption");
    }
    write(END);
    newLine();
    SimpleTextUtil.writeChecksum(out, scratch);
  }
}

No matter in startDocument, writeField or finish, the private write method is called.

// SimpleTextStoredFieldsWriter.java

public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
  private IndexOutput out;

  private void write(BytesRef bytes) throws IOException {
    SimpleTextUtil.write(out, bytes);
  }
}

Allow me to finish this post when it comes IndexOuput, which should be discussed in another new post.

Reference