Directory
represents the storage location of the indexes and generally it is a list of files. These files are called index files. Index files are normally created once and then used for read operation or can be deleted.
Directory
Directory
class is abstract with many to-be-implemented methods related to file, input and output.
IndexInput
is returned from reading an existing file.IndexOutput
is created for writing a new, empty file in the directory.- Directory locking is implemented by an instance of
LockFactory
(abstract class).
public abstract class Directory implements Closeable {
// Returns an array of strings, one for each entry in the directory, in sorted (UTF16, java's String.compare) order.
public abstract String[] listAll() throws IOException;
// Removes an existing file in the directory.
public abstract void deleteFile(String name) throws IOException;
// Returns the length of a file in the directory.
public abstract long fileLength(String name) throws IOException;
/** Creates a new, empty file in the directory with the given name.
Returns a stream writing this file. */
public abstract IndexOutput createOutput(String name, IOContext context) throws IOException;
/** Creates a new, empty file for writing in the directory, with a
* temporary file name including prefix and suffix, ending with the
* reserved extension .tmp. */
public abstract IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException;
/**
* Ensure that any writes to these files are moved to
* stable storage. Lucene uses this to properly commit
* changes to the index, to prevent a machine/OS crash
* from corrupting the index.
*/
public abstract void sync(Collection<String> names) throws IOException;
/**
* Renames {@code source} to {@code dest} as an atomic operation,
* where {@code dest} does not yet exist in the directory.
* <p>
* Notes: This method is used by IndexWriter to publish commits.
* It is ok if this operation is not truly atomic, for example
* both {@code source} and {@code dest} can be visible temporarily.
* It is just important that the contents of {@code dest} appear
* atomically, or an exception is thrown.
*/
public abstract void rename(String source, String dest) throws IOException;
/**
* Ensure that directory metadata, such as recent file renames, are made
* durable.
*/
public abstract void syncMetaData() throws IOException;
/** Returns a stream reading an existing file. */
public abstract IndexInput openInput(String name, IOContext context) throws IOException;
/** Returns a stream reading an existing file, computing checksum as it reads */
public ChecksumIndexInput openChecksumInput(String name, IOContext context) throws IOException {
return new BufferedChecksumIndexInput(openInput(name, context));
}
/**
* Returns an obtained {@link Lock}.
* @param name the name of the lock file
*/
public abstract Lock obtainLock(String name) throws IOException;
/** Closes the store. */
@Override
public abstract void close() throws IOException;
@Override
public String toString() {
return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode());
}
/**
* Copies the file <i>src</i> in <i>from</i> to this directory under the new
* file name <i>dest</i>.
*/
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
boolean success = false;
try (IndexInput is = from.openInput(src, context);
IndexOutput os = createOutput(dest, context)) {
os.copyBytes(is, is.length());
success = true;
} finally {
if (!success) {
IOUtils.deleteFilesIgnoringExceptions(this, dest);
}
}
}
protected void ensureOpen() throws AlreadyClosedException {}
}
BaseDirectory
BaseDirectory
is a base implementation for a concrete Directory
that uses a LockFactory
for locking, still abstract.
public abstract class BaseDirectory extends Directory {
volatile protected boolean isOpen = true;
/** Holds the LockFactory instance (implements locking for
* this Directory instance). */
protected final LockFactory lockFactory;
/** Sole constructor. */
protected BaseDirectory(LockFactory lockFactory) {
super();
if (lockFactory == null) {
throw new NullPointerException("LockFactory must not be null, use an explicit instance!");
}
this.lockFactory = lockFactory;
}
@Override
public final Lock obtainLock(String name) throws IOException {
return lockFactory.obtainLock(this, name);
}
@Override
protected final void ensureOpen() throws AlreadyClosedException {
if (!isOpen) {
throw new AlreadyClosedException("this Directory is closed");
}
}
}
RAMDirectory
RAMDirectory
extends BaseDirectory
and provides A memory-resident Directory
implementation. Its locking implementation is by default the SingleInstanceLockFactory
.
public class RAMDirectory extends BaseDirectory implements Accountable {
// fileName -> RAMFile
protected final Map<String,RAMFile> fileMap = new ConcurrentHashMap<>();
protected final AtomicLong sizeInBytes = new AtomicLong();
/** Used to generate temp file names in {@link #createTempOutput}. */
private final AtomicLong nextTempFileCounter = new AtomicLong();
/** Constructs an empty {@link Directory}. */
public RAMDirectory() {
this(new SingleInstanceLockFactory());
}
/** Constructs an empty {@link Directory} with the given {@link LockFactory}. */
public RAMDirectory(LockFactory lockFactory) {
super(lockFactory);
}
/**
* Creates a new RAMDirectory instance from a different
* Directory implementation. This can be used to load
* a disk-based index into memory.
*/
public RAMDirectory(FSDirectory dir, IOContext context) throws IOException {
this(dir, false, context);
}
private RAMDirectory(FSDirectory dir, boolean closeDir, IOContext context) throws IOException {
this();
for (String file : dir.listAll()) {
if (!Files.isDirectory(dir.getDirectory().resolve(file))) {
copyFrom(dir, file, file, context);
}
}
if (closeDir) {
dir.close();
}
}
@Override
public final String[] listAll() {
ensureOpen();
Set<String> fileNames = fileMap.keySet();
List<String> names = new ArrayList<>(fileNames.size());
for (String name : fileNames) {
names.add(name);
}
String[] namesArray = names.toArray(new String[names.size()]);
Arrays.sort(namesArray);
return namesArray;
}
public final boolean fileNameExists(String name) {
ensureOpen();
return fileMap.containsKey(name);
}
/** Returns the length in bytes of a file in the directory. */
@Override
public final long fileLength(String name) throws IOException {
ensureOpen();
RAMFile file = fileMap.get(name);
if (file == null) {
throw new FileNotFoundException(name);
}
return file.getLength();
}
/**
* Return total size in bytes of all files in this directory. This is
* currently quantized to RAMOutputStream.BUFFER_SIZE.
*/
@Override
public final long ramBytesUsed() {
ensureOpen();
return sizeInBytes.get();
}
@Override
public Collection<Accountable> getChildResources() {
return Accountables.namedAccountables("file", fileMap);
}
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
RAMFile file = fileMap.remove(name);
if (file != null) {
file.directory = null;
sizeInBytes.addAndGet(-file.sizeInBytes);
} else {
throw new FileNotFoundException(name);
}
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
RAMFile file = newRAMFile();
if (fileMap.putIfAbsent(name, file) != null) {
throw new FileAlreadyExistsException(name);
}
return new RAMOutputStream(name, file, true);
}
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
ensureOpen();
// Make the file first...
RAMFile file = newRAMFile();
// ... then try to find a unique name for it:
while (true) {
String name = IndexFileNames.segmentFileName(prefix, suffix + "_" + Long.toString(nextTempFileCounter.getAndIncrement(), Character.MAX_RADIX), "tmp");
if (fileMap.putIfAbsent(name, file) == null) {
return new RAMOutputStream(name, file, true);
}
}
}
/**
* Returns a new {@link RAMFile} for storing data. This method can be
* overridden to return different {@link RAMFile} impls, that e.g. override
* {@link RAMFile#newBuffer(int)}.
*/
protected RAMFile newRAMFile() {
return new RAMFile(this);
}
@Override
public void sync(Collection<String> names) throws IOException {
}
@Override
public void rename(String source, String dest) throws IOException {
ensureOpen();
RAMFile file = fileMap.get(source);
if (file == null) {
throw new FileNotFoundException(source);
}
if (fileMap.putIfAbsent(dest, file) != null) {
throw new FileAlreadyExistsException(dest);
}
if (!fileMap.remove(source, file)) {
throw new IllegalStateException("file was unexpectedly replaced: " + source);
}
fileMap.remove(source);
}
@Override
public void syncMetaData() throws IOException {
// we are by definition not durable!
}
/** Returns a stream reading an existing file. */
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
RAMFile file = fileMap.get(name);
if (file == null) {
throw new FileNotFoundException(name);
}
return new RAMInputStream(name, file);
}
/** Closes the store to future operations, releasing associated memory. */
@Override
public void close() {
isOpen = false;
fileMap.clear();
}
}
RAMFile
represents a file in RAM as a list of byte[]
buffers.
public class RAMFile implements Accountable {
protected final ArrayList<byte[]> buffers = new ArrayList<>();
long length;
RAMDirectory directory;
protected long sizeInBytes;
RAMFile(RAMDirectory directory) {
this.directory = directory;
}
protected final byte[] addBuffer(int size) {
byte[] buffer = newBuffer(size);
synchronized(this) {
buffers.add(buffer);
sizeInBytes += size;
}
if (directory != null) {
directory.sizeInBytes.getAndAdd(size);
}
return buffer;
}
protected final synchronized byte[] getBuffer(int index) {
return buffers.get(index);
}
protected final synchronized int numBuffers() {
return buffers.size();
}
/**
* Expert: allocate a new buffer.
* Subclasses can allocate differently.
*/
protected byte[] newBuffer(int size) {
return new byte[size];
}
}
FSDirectory
FSDirectory
is another base class for Directory
implementation that store index files in the file system.
public abstract class FSDirectory extends BaseDirectory {
protected final Path directory; // The underlying filesystem directory
protected FSDirectory(Path path, LockFactory lockFactory) throws IOException {
super(lockFactory);
// If only read access is permitted, createDirectories fails even if the directory already exists.
if (!Files.isDirectory(path)) {
Files.createDirectories(path); // create directory, if it doesn't exist
}
directory = path.toRealPath();
}
/** Just like {@link #open(Path)}, but allows you to
* also specify a custom {@link LockFactory}. */
public static FSDirectory open(Path path, LockFactory lockFactory) throws IOException {
if (Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return new MMapDirectory(path, lockFactory);
} else if (Constants.WINDOWS) {
return new SimpleFSDirectory(path, lockFactory);
} else {
return new NIOFSDirectory(path, lockFactory);
}
}
private static String[] listAll(Path dir, Set<String> skipNames) throws IOException {
List<String> entries = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path path : stream) {
String name = path.getFileName().toString();
if (skipNames == null || skipNames.contains(name) == false) {
entries.add(name);
}
}
}
String[] array = entries.toArray(new String[entries.size()]);
// Directory.listAll javadocs state that we sort the results here, so we don't let filesystem
// specifics leak out of this abstraction:
Arrays.sort(array);
return array;
}
}
In FSDirectory
, a nested FSIndexOutput
is responsible for index output.
final class FSIndexOutput extends OutputStreamIndexOutput {
static final int CHUNK_SIZE = 8192;
FSIndexOutput(String name, OpenOption... options) throws IOException {
super("FSIndexOutput(path=\"" + directory.resolve(name) + "\")", name, new FilterOutputStream(Files.newOutputStream(directory.resolve(name), options)) {
// This implementation ensures, that we never write more than CHUNK_SIZE bytes:
@Override
public void write(byte[] b, int offset, int length) throws IOException {
while (length > 0) {
final int chunk = Math.min(length, CHUNK_SIZE);
out.write(b, offset, chunk);
length -= chunk;
offset += chunk;
}
}
}, CHUNK_SIZE);
}
}
Since FSDirectory
is abstract, There are currently three core subclasses that can be instantiated:
SimpleFSDirectory
, a straightforward implementation using Files.newByteChannel. However, it has poor concurrent performance (multiple threads will bottleneck) as it synchronizes when multiple threads read from the same file.NIOFSDirectory
, uses java.nio’s FileChannel’s positional io when reading to avoid synchronization when reading from the same file.MMapDirectory
, uses memory-mapped IO when reading. This is a good choice if you have plenty of virtual memory relative to your index size.
IndexInput
IndexInput
extends DataInput
, abstract base class for performing read operations of Lucene’s low-level data types. which provides:
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException;
public short readShort() throws IOException;
public int readInt() throws IOException;
public String readString() throws IOException;
public void skipBytes(final long numBytes) throws IOException;
IndexInput
is an abstract base class for input from a file in Directory
. It’s a random-access input stream, for all Lucene index input operations.
public abstract class IndexInput extends DataInput implements Cloneable,Closeable {
// Returns the current position in this file
public abstract long getFilePointer();
// Sets current position in this file
public abstract void seek(long pos) throws IOException;
// Creates a slice of this index input, with the given description, offset, and length
public abstract IndexInput slice(String sliceDescription, long offset, long length) throws IOException;
// Creates a random-access slice of this index input, with the given offset and length
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException;
}
BufferedIndexInput
BufferedIndexInput
is a implementation class for buffered InputInput
, it also implemenets RandomAccessInput
interface.
/**
* Random Access Index API.
* Unlike {@link IndexInput}, this has no concept of file position, all reads
* are absolute. However, like IndexInput, it is only intended for use by a single thread.
*/
public interface RandomAccessInput {
public byte readByte(long pos) throws IOException;
public short readShort(long pos) throws IOException;
public int readInt(long pos) throws IOException;
}
The subclasses of FSDirectory
has its own nested XXXIndexInput
which extends BufferedIndexInput
. So we must make out what it does.
public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput {
protected byte[] buffer;
private long bufferStart = 0; // position in file of buffer
private int bufferLength = 0; // end of valid bytes
private int bufferPosition = 0; // next byte to read
/** Change the buffer size used by this IndexInput */
public final void setBufferSize(int newSize) {
assert buffer == null || bufferSize == buffer.length: "buffer=" + buffer + " bufferSize=" + bufferSize + " buffer.length=" + (buffer != null ? buffer.length : 0);
if (newSize != bufferSize) {
checkBufferSize(newSize);
bufferSize = newSize;
if (buffer != null) {
// Resize the existing buffer and carefully save as
// many bytes as possible starting from the current
// bufferPosition
byte[] newBuffer = new byte[newSize];
final int leftInBuffer = bufferLength-bufferPosition;
final int numToCopy;
if (leftInBuffer > newSize)
numToCopy = newSize;
else
numToCopy = leftInBuffer;
System.arraycopy(buffer, bufferPosition, newBuffer, 0, numToCopy);
bufferStart += bufferPosition;
bufferPosition = 0;
bufferLength = numToCopy;
newBuffer(newBuffer);
}
}
}
private void checkBufferSize(int bufferSize) {
if (bufferSize < MIN_BUFFER_SIZE)
throw new IllegalArgumentException("bufferSize must be at least MIN_BUFFER_SIZE (got " + bufferSize + ")");
}
@Override
public final void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
int available = bufferLength - bufferPosition;
if(len <= available){
// the buffer contains enough data to satisfy this request
if(len>0) // to allow b to be null if len is 0...
System.arraycopy(buffer, bufferPosition, b, offset, len);
bufferPosition+=len;
} else {
// the buffer does not have enough data. First serve all we've got.
if(available > 0){
System.arraycopy(buffer, bufferPosition, b, offset, available);
offset += available;
len -= available;
bufferPosition += available;
}
// and now, read the remaining 'len' bytes:
if (useBuffer && len<bufferSize){
// If the amount left to read is small enough, and
// we are allowed to use our buffer, do it in the usual
// buffered way: fill the buffer and copy from it:
refill();
if(bufferLength<len){
// Throw an exception when refill() could not read len bytes:
System.arraycopy(buffer, 0, b, offset, bufferLength);
throw new EOFException("read past EOF: " + this);
} else {
System.arraycopy(buffer, 0, b, offset, len);
bufferPosition=len;
}
} else {
// The amount left to read is larger than the buffer
// or we've been asked to not use our buffer -
// there's no performance reason not to read it all
// at once. Note that unlike the previous code of
// this function, there is no need to do a seek
// here, because there's no need to reread what we
// had in the buffer.
long after = bufferStart+bufferPosition+len;
if(after > length())
throw new EOFException("read past EOF: " + this);
readInternal(b, offset, len);
bufferStart = after;
bufferPosition = 0;
bufferLength = 0; // trigger refill() on read
}
}
}
@Override
public final short readShort() throws IOException {
if (2 <= (bufferLength-bufferPosition)) {
return (short) (((buffer[bufferPosition++] & 0xFF) << 8) | (buffer[bufferPosition++] & 0xFF));
} else {
return super.readShort();
}
}
}
readInt
, readLong
and so on, similar with readShort
, read data from buffer
array and convert to target type with bit operation.
IndexOutput
IndexOutput
extends DataOutput
, an abstract base class for performing write operations of Lucene’s low-level data types.
public abstract class DataOutput {
public abstract void writeByte(byte b) throws IOException;
public void writeInt(int i) throws IOException {
writeByte((byte)(i >> 24));
writeByte((byte)(i >> 16));
writeByte((byte)(i >> 8));
writeByte((byte) i);
}
}
Derived from DataOutput
, IndexOutput
outputs a file in Directory
, which is used for all Lucene index output operations.
public abstract class IndexOutput extends DataOutput implements Closeable {
// Returns the current position in this file, where the next write will occur.
public abstract long getFilePointer();
}
RAMOutputStream
Eash subclass of Directory
has corresponding IndexOutput
implementation, for example, RAMDirecotry
and RAMOutputStream
.
public class RAMOutputStream extends IndexOutput implements Accountable {
private final RAMFile file;
private byte[] currentBuffer;
private int currentBufferIndex;
private int bufferPosition;
private long bufferStart;
private int bufferLength;
/** Copy the current contents of this buffer to the provided {@link DataOutput}. */
public void writeTo(DataOutput out) throws IOException {
flush();
final long end = file.length;
long pos = 0;
int buffer = 0;
while (pos < end) {
int length = BUFFER_SIZE;
long nextPos = pos + length;
if (nextPos > end) { // at the last buffer
length = (int)(end - pos);
}
out.writeBytes(file.getBuffer(buffer++), length);
pos = nextPos;
}
}
@Override
public void writeByte(byte b) throws IOException {
if (bufferPosition == bufferLength) {
currentBufferIndex++;
switchCurrentBuffer();
}
if (crc != null) {
crc.update(b);
}
currentBuffer[bufferPosition++] = b;
}
@Override
public long getFilePointer() {
return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;
}
}
Reference