Index: core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java (revision 7ab107a398867320f3d7429dbccc893501639284)
+++ core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java (revision )
@@ -22,16 +22,6 @@
*/
package org.infinispan.loaders.file;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
import org.infinispan.io.ExposedByteArrayOutputStream;
@@ -45,6 +35,12 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.concurrent.*;
+
/**
* A filesystem-based implementation of a {@link org.infinispan.loaders.bucket.BucketBasedCacheStore}. This file store
* stores stuff in the following format: /{location}/cache name/bucket_number.bucket
@@ -58,530 +54,535 @@
@CacheLoaderMetadata(configurationClass = FileCacheStoreConfig.class)
public class FileCacheStore extends BucketBasedCacheStore {
- static final Log log = LogFactory.getLog(FileCacheStore.class);
- private static final boolean trace = log.isTraceEnabled();
- private int streamBufferSize;
+ static final Log log = LogFactory.getLog(FileCacheStore.class);
+ private static final boolean trace = log.isTraceEnabled();
+ private int streamBufferSize;
- FileCacheStoreConfig config;
- File root;
- FileSync fileSync;
+ FileCacheStoreConfig config;
+ File root;
+ FileSync fileSync;
- /**
- * @return root directory where all files for this {@link org.infinispan.loaders.CacheStore CacheStore} are written.
- */
- public File getRoot() {
- return root;
- }
+ /**
+ * @return root directory where all files for this {@link org.infinispan.loaders.CacheStore CacheStore} are written.
+ */
+ public File getRoot() {
+ return root;
+ }
- @Override
- public void init(CacheLoaderConfig config, Cache, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
- super.init(config, cache, m);
- this.config = (FileCacheStoreConfig) config;
- }
+ @Override
+ public void init(CacheLoaderConfig config, Cache, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
+ super.init(config, cache, m);
+ this.config = (FileCacheStoreConfig) config;
+ }
- @Override
- protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
- try {
- File[] listFiles;
- if (root != null && (listFiles = root.listFiles()) != null) {
- for (File bucketFile : listFiles) {
- Bucket bucket = loadBucket(bucketFile);
- if (handler.handle(bucket)) {
- break;
- }
- }
- }
- } catch (InterruptedException ie) {
- if (log.isDebugEnabled()) {
- log.debug("Interrupted, so stop looping over buckets.");
- }
- Thread.currentThread().interrupt();
- }
- }
+ @Override
+ protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
+ try {
+ File[] listFiles;
+ if (root != null && (listFiles = root.listFiles()) != null) {
+ for (File bucketFile : listFiles) {
+ Bucket bucket = loadBucket(bucketFile);
+ if (handler.handle(bucket)) {
+ break;
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ if (log.isDebugEnabled()) {
+ log.debug("Interrupted, so stop looping over buckets.");
+ }
+ Thread.currentThread().interrupt();
+ }
+ }
- @Override
- protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
- try {
- int numFiles = objectInput.readInt();
- byte[] buffer = new byte[streamBufferSize];
- int bytesRead, totalBytesRead = 0;
- for (int i = 0; i < numFiles; i++) {
- String fName = (String) objectInput.readObject();
- int numBytes = objectInput.readInt();
- FileOutputStream fos = new FileOutputStream(root.getAbsolutePath() + File.separator + fName);
- BufferedOutputStream bos = new BufferedOutputStream(fos, streamBufferSize);
+ @Override
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+ try {
+ int numFiles = objectInput.readInt();
+ byte[] buffer = new byte[streamBufferSize];
+ int bytesRead, totalBytesRead = 0;
+ for (int i = 0; i < numFiles; i++) {
+ String fName = (String) objectInput.readObject();
+ int numBytes = objectInput.readInt();
+ FileOutputStream fos = new FileOutputStream(root.getAbsolutePath() + File.separator + fName);
+ BufferedOutputStream bos = new BufferedOutputStream(fos, streamBufferSize);
- try {
- while (numBytes > totalBytesRead) {
- if (numBytes - totalBytesRead > streamBufferSize) {
- bytesRead = objectInput.read(buffer, 0, streamBufferSize);
- } else {
- bytesRead = objectInput.read(buffer, 0, numBytes - totalBytesRead);
- }
+ try {
+ while (numBytes > totalBytesRead) {
+ if (numBytes - totalBytesRead > streamBufferSize) {
+ bytesRead = objectInput.read(buffer, 0, streamBufferSize);
+ } else {
+ bytesRead = objectInput.read(buffer, 0, numBytes - totalBytesRead);
+ }
- if (bytesRead == -1) {
- break;
- }
- totalBytesRead += bytesRead;
- bos.write(buffer, 0, bytesRead);
- }
- bos.flush();
- fos.flush();
- totalBytesRead = 0;
- } finally {
- safeClose(bos);
- safeClose(fos);
- }
- }
- } catch (IOException e) {
- throw new CacheLoaderException("I/O error", e);
- } catch (ClassNotFoundException e) {
- throw new CacheLoaderException("Unexpected exception", e);
- }
- }
+ if (bytesRead == -1) {
+ break;
+ }
+ totalBytesRead += bytesRead;
+ bos.write(buffer, 0, bytesRead);
+ }
+ bos.flush();
+ fos.flush();
+ totalBytesRead = 0;
+ } finally {
+ safeClose(bos);
+ safeClose(fos);
+ }
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException("I/O error", e);
+ } catch (ClassNotFoundException e) {
+ throw new CacheLoaderException("Unexpected exception", e);
+ }
+ }
- @Override
- protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
- try {
- File[] files = root.listFiles();
- if (files == null)
- throw new CacheLoaderException("Root not directory or IO error occurred");
+ @Override
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+ try {
+ File[] files = root.listFiles();
+ if (files == null)
+ throw new CacheLoaderException("Root not directory or IO error occurred");
- objectOutput.writeInt(files.length);
- byte[] buffer = new byte[streamBufferSize];
- for (File file : files) {
- int bytesRead, totalBytesRead = 0;
- BufferedInputStream bis = null;
- FileInputStream fileInStream = null;
- try {
- if (trace) {
- log.tracef("Opening file in %s", file);
- }
- fileInStream = new FileInputStream(file);
- int sz = fileInStream.available();
- bis = new BufferedInputStream(fileInStream);
- objectOutput.writeObject(file.getName());
- objectOutput.writeInt(sz);
+ objectOutput.writeInt(files.length);
+ byte[] buffer = new byte[streamBufferSize];
+ for (File file : files) {
+ int bytesRead, totalBytesRead = 0;
+ BufferedInputStream bis = null;
+ FileInputStream fileInStream = null;
+ try {
+ if (trace) {
+ log.tracef("Opening file in %s", file);
+ }
+ fileInStream = new FileInputStream(file);
+ int sz = fileInStream.available();
+ bis = new BufferedInputStream(fileInStream);
+ objectOutput.writeObject(file.getName());
+ objectOutput.writeInt(sz);
- while (sz > totalBytesRead) {
- bytesRead = bis.read(buffer, 0, streamBufferSize);
- if (bytesRead == -1) {
- break;
- }
- totalBytesRead += bytesRead;
- objectOutput.write(buffer, 0, bytesRead);
- }
- } finally {
- Util.close(bis);
- Util.close(fileInStream);
- }
- }
- } catch (IOException e) {
- throw new CacheLoaderException("I/O exception while generating stream", e);
- }
- }
+ while (sz > totalBytesRead) {
+ bytesRead = bis.read(buffer, 0, streamBufferSize);
+ if (bytesRead == -1) {
+ break;
+ }
+ totalBytesRead += bytesRead;
+ objectOutput.write(buffer, 0, bytesRead);
+ }
+ } finally {
+ Util.close(bis);
+ Util.close(fileInStream);
+ }
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException("I/O exception while generating stream", e);
+ }
+ }
- @Override
- protected void clearLockSafe() throws CacheLoaderException {
- File[] toDelete = root.listFiles();
- if (toDelete == null) {
- return;
- }
- for (File f : toDelete) {
- if (!deleteFile(f)) {
- log.problemsRemovingFile(f);
- }
- }
- }
+ @Override
+ protected void clearLockSafe() throws CacheLoaderException {
+ File[] toDelete = root.listFiles();
+ if (toDelete == null) {
+ return;
+ }
+ for (File f : toDelete) {
+ if (!deleteFile(f)) {
+ log.problemsRemovingFile(f);
+ }
+ }
+ }
- @Override
- protected boolean supportsMultiThreadedPurge() {
- return true;
- }
+ @Override
+ protected boolean supportsMultiThreadedPurge() {
+ return true;
+ }
- @Override
- protected void purgeInternal() throws CacheLoaderException {
- if (trace) {
- log.trace("purgeInternal()");
- }
- if (acquireGlobalLock(false)) {
- try {
- File[] files = root.listFiles();
- if (files == null)
- throw new CacheLoaderException("Root not directory or IO error occurred");
+ @Override
+ protected void purgeInternal() throws CacheLoaderException {
+ if (trace) {
+ log.trace("purgeInternal()");
+ }
+ if (acquireGlobalLock(false)) {
+ try {
+ File[] files = root.listFiles();
+ if (files == null)
+ throw new CacheLoaderException("Root not directory or IO error occurred");
- for (final File bucketFile : files) {
- if (multiThreadedPurge) {
- purgerService.execute(new Runnable() {
- @Override
- public void run() {
- Bucket bucket;
- try {
- if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) {
- updateBucket(bucket);
- }
- } catch (InterruptedException ie) {
- if (log.isDebugEnabled()) {
- log.debug("Interrupted, so finish work.");
- }
- } catch (CacheLoaderException e) {
- log.problemsPurgingFile(bucketFile, e);
- }
- }
- });
- } else {
- Bucket bucket;
- if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) {
- updateBucket(bucket);
- }
- }
- }
- } catch (InterruptedException ie) {
- if (log.isDebugEnabled()) {
- log.debug("Interrupted, so stop loading and finish with purging.");
- }
- Thread.currentThread().interrupt();
- } finally {
- releaseGlobalLock(false);
- if (trace) {
- log.trace("Exit purgeInternal()");
- }
- }
- } else {
- log.unableToAcquireLockToPurgeStore();
- }
- }
+ for (final File bucketFile : files) {
+ if (multiThreadedPurge) {
+ purgerService.execute(new Runnable() {
+ @Override
+ public void run() {
+ Bucket bucket;
+ try {
+ if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) {
+ updateBucket(bucket);
+ }
+ } catch (InterruptedException ie) {
+ if (log.isDebugEnabled()) {
+ log.debug("Interrupted, so finish work.");
+ }
+ } catch (CacheLoaderException e) {
+ log.problemsPurgingFile(bucketFile, e);
+ }
+ }
+ });
+ } else {
+ Bucket bucket;
+ if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) {
+ updateBucket(bucket);
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ if (log.isDebugEnabled()) {
+ log.debug("Interrupted, so stop loading and finish with purging.");
+ }
+ Thread.currentThread().interrupt();
+ } finally {
+ releaseGlobalLock(false);
+ if (trace) {
+ log.trace("Exit purgeInternal()");
+ }
+ }
+ } else {
+ log.unableToAcquireLockToPurgeStore();
+ }
+ }
- @Override
- protected Bucket loadBucket(Integer hash) throws CacheLoaderException {
- try {
- return loadBucket(new File(root, String.valueOf(hash)));
- } catch (InterruptedException ie) {
- if (log.isDebugEnabled()) {
- log.debug("Interrupted, so stop loading bucket and return null.");
- }
- Thread.currentThread().interrupt();
- return null;
- }
- }
+ @Override
+ protected Bucket loadBucket(Integer hash) throws CacheLoaderException {
+ try {
+ return loadBucket(new File(root, String.valueOf(hash)));
+ } catch (InterruptedException ie) {
+ if (log.isDebugEnabled()) {
+ log.debug("Interrupted, so stop loading bucket and return null.");
+ }
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
- protected Bucket loadBucket(File bucketFile) throws CacheLoaderException, InterruptedException {
- Bucket bucket = null;
- if (bucketFile.exists()) {
- if (trace) {
- log.trace("Found bucket file: '" + bucketFile + "'");
- }
- InputStream is = null;
- try {
- // It could happen that the output buffer might not have been
- // flushed, so just in case, flush it to be able to read it.
- fileSync.flush(bucketFile);
- is = new FileInputStream(bucketFile);
- bucket = (Bucket) objectFromInputStreamInReentrantMode(is);
- } catch (InterruptedException ie) {
- throw ie;
- } catch (Exception e) {
- log.errorReadingFromFile(bucketFile.getAbsoluteFile(), e);
- throw new CacheLoaderException("Error while reading from file", e);
- } finally {
- safeClose(is);
- }
- }
- if (bucket != null) {
- bucket.setBucketId(bucketFile.getName());
- }
- return bucket;
- }
+ protected Bucket loadBucket(File bucketFile) throws CacheLoaderException, InterruptedException {
+ Bucket bucket = null;
+ if (bucketFile.exists()) {
+ if (trace) {
+ log.trace("Found bucket file: '" + bucketFile + "'");
+ }
+ InputStream is = null;
+ try {
+ // It could happen that the output buffer might not have been
+ // flushed, so just in case, flush it to be able to read it.
+ fileSync.flush(bucketFile);
+ is = new FileInputStream(bucketFile);
+ bucket = (Bucket) objectFromInputStreamInReentrantMode(is);
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (Exception e) {
+ log.errorReadingFromFile(bucketFile.getAbsoluteFile(), e);
+ throw new CacheLoaderException("Error while reading from file", e);
+ } finally {
+ safeClose(is);
+ }
+ }
+ if (bucket != null) {
+ bucket.setBucketId(bucketFile.getName());
+ }
+ return bucket;
+ }
- @Override
- public void updateBucket(Bucket b) throws CacheLoaderException {
- File f = new File(root, b.getBucketIdAsString());
- if (f.exists()) {
- if (!purgeFile(f)) {
- log.problemsRemovingFile(f);
- }
- } else if (trace) {
- log.tracef("Successfully deleted file: '%s'", f.getName());
- }
+ @Override
+ public void updateBucket(Bucket b) throws CacheLoaderException {
+ File f = new File(root, b.getBucketIdAsString());
+ if (f.exists()) {
+ if (!purgeFile(f)) {
+ log.problemsRemovingFile(f);
+ }
+ } else if (trace) {
+ log.tracef("Successfully deleted file: '%s'", f.getName());
+ }
- if (!b.getEntries().isEmpty()) {
- try {
- byte[] bytes = marshaller.objectToByteBuffer(b);
- fileSync.write(bytes, f);
- } catch (IOException ex) {
- log.errorSavingBucket(b, ex);
- throw new CacheLoaderException(ex);
- } catch (InterruptedException ie) {
- if (trace) {
- log.trace("Interrupted while marshalling a bucket");
- }
- Thread.currentThread().interrupt(); // Restore interrupted status
- }
- }
- }
+ if (!b.getEntries().isEmpty()) {
+ try {
+ byte[] bytes = marshaller.objectToByteBuffer(b);
+ fileSync.write(bytes, f);
+ } catch (IOException ex) {
+ log.errorSavingBucket(b, ex);
+ throw new CacheLoaderException(ex);
+ } catch (InterruptedException ie) {
+ if (trace) {
+ log.trace("Interrupted while marshalling a bucket");
+ }
+ Thread.currentThread().interrupt(); // Restore interrupted status
+ }
+ }
+ }
- @Override
- public Class extends CacheLoaderConfig> getConfigurationClass() {
- return FileCacheStoreConfig.class;
- }
+ @Override
+ public Class extends CacheLoaderConfig> getConfigurationClass() {
+ return FileCacheStoreConfig.class;
+ }
- @Override
- public void start() throws CacheLoaderException {
- super.start();
- String location = config.getLocation();
- if (location == null || location.trim().length() == 0) {
- location = "Infinispan-FileCacheStore"; // use relative path!
- }
- location += File.separator + cache.getName();
- root = new File(location);
- if (!root.exists()) {
- if (!root.mkdirs()) {
- log.problemsCreatingDirectory(root);
- }
- }
- if (!root.exists()) {
- throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!");
- }
- streamBufferSize = config.getStreamBufferSize();
+ @Override
+ public void start() throws CacheLoaderException {
+ super.start();
+ String location = config.getLocation();
+ if (location == null || location.trim().length() == 0) {
+ location = "Infinispan-FileCacheStore"; // use relative path!
+ }
+ location += File.separator + cache.getName();
+ root = new File(location);
+ if (!root.exists()) {
+ if (!root.mkdirs()) {
+ log.problemsCreatingDirectory(root);
+ }
+ }
+ if (!root.exists()) {
+ throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!");
+ }
+ streamBufferSize = config.getStreamBufferSize();
- switch(config.getFsyncMode()) {
- case DEFAULT :
- fileSync = new BufferedFileSync();
- break;
- case PER_WRITE:
- fileSync = new PerWriteFileSync();
- break;
- case PERIODIC:
- fileSync = new PeriodicFileSync(config.getFsyncInterval());
- break;
- }
- }
+ switch(config.getFsyncMode()) {
+ case DEFAULT :
+ fileSync = new BufferedFileSync();
+ break;
+ case PER_WRITE:
+ fileSync = new PerWriteFileSync();
+ break;
+ case PERIODIC:
+ fileSync = new PeriodicFileSync(config.getFsyncInterval());
+ break;
+ }
+ }
- @Override
- public void stop() throws CacheLoaderException {
- super.stop();
- fileSync.stop();
- }
+ @Override
+ public void stop() throws CacheLoaderException {
+ super.stop();
+ fileSync.stop();
+ }
- public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
- return loadBucket(key.hashCode());
- }
+ public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
+ return loadBucket(key.hashCode());
+ }
- private boolean deleteFile(File f) {
- if (trace) {
- log.tracef("Really delete file %s", f);
- }
- return f.delete();
- }
+ private boolean deleteFile(File f) {
+ if (trace) {
+ log.tracef("Really delete file %s", f);
+ }
+ return f.delete();
+ }
- private boolean purgeFile(File f) {
- if (trace) {
- log.tracef("Really clear file %s", f);
- }
- try {
- fileSync.purge(f);
- return true;
- }
- catch (IOException e) {
- if (trace)
- log.trace("Error encountered while clearing file: " + f, e);
- return false;
- }
- }
+ private boolean purgeFile(File f) {
+ if (trace) {
+ log.tracef("Really clear file %s", f);
+ }
+ try {
+ fileSync.purge(f);
+ return true;
+ }
+ catch (IOException e) {
+ if (trace)
+ log.trace("Error encountered while clearing file: " + f, e);
+ return false;
+ }
+ }
- private Object objectFromInputStreamInReentrantMode(InputStream is) throws IOException, ClassNotFoundException, InterruptedException {
- int len = is.available();
- Object o = null;
- if (len != 0) {
- ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len);
- byte[] buf = new byte[Math.min(len, 1024)];
- int bytesRead;
- while ((bytesRead = is.read(buf, 0, buf.length)) != -1) {
- bytes.write(buf, 0, bytesRead);
- }
- is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size());
- ObjectInput unmarshaller = marshaller.startObjectInput(is, true);
- try {
- o = marshaller.objectFromObjectStream(unmarshaller);
- } finally {
- marshaller.finishObjectInput(unmarshaller);
- }
- }
- return o;
- }
+ private Object objectFromInputStreamInReentrantMode(InputStream is) throws IOException, ClassNotFoundException, InterruptedException {
+ int len = is.available();
+ Object o = null;
+ if (len != 0) {
+ ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len);
+ byte[] buf = new byte[Math.min(len, 1024)];
+ int bytesRead;
+ while ((bytesRead = is.read(buf, 0, buf.length)) != -1) {
+ bytes.write(buf, 0, bytesRead);
+ }
+ is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size());
+ ObjectInput unmarshaller = marshaller.startObjectInput(is, true);
+ try {
+ o = marshaller.objectFromObjectStream(unmarshaller);
+ } finally {
+ marshaller.finishObjectInput(unmarshaller);
+ }
+ }
+ return o;
+ }
- /**
- * Specifies how the changes written to a file will be synched
- * with the underlying file system.
- */
- private interface FileSync {
+ /**
+ * Specifies how the changes written to a file will be synched
+ * with the underlying file system.
+ */
+ private interface FileSync {
- /**
- * Writes the given bytes to the file.
- *
- * @param bytes byte array containing the bytes to write.
- * @param f File instance representing the location where to store the data.
- * @throws IOException if an I/O error occurs
- */
- void write(byte[] bytes, File f) throws IOException;
+ /**
+ * Writes the given bytes to the file.
+ *
+ * @param bytes byte array containing the bytes to write.
+ * @param f File instance representing the location where to store the data.
+ * @throws IOException if an I/O error occurs
+ */
+ void write(byte[] bytes, File f) throws IOException;
- /**
- * Force the file changes to be flushed to the underlying file system.
- * Client code calling this flush method should in advance check whether
- * the file exists and so this method assumes that check was already done.
- *
- * @param f File instance representing the location changes should be flushed to.
- * @throws IOException if an I/O error occurs
- */
- void flush(File f) throws IOException;
+ /**
+ * Force the file changes to be flushed to the underlying file system.
+ * Client code calling this flush method should in advance check whether
+ * the file exists and so this method assumes that check was already done.
+ *
+ * @param f File instance representing the location changes should be flushed to.
+ * @throws IOException if an I/O error occurs
+ */
+ void flush(File f) throws IOException;
- /**
- * Forces the file to be purged. Implementations are free to decide what
- * the best option should be here. For example, whether to delete the
- * file, whether to empty it...etc.
- *
- * @param f File instance that should be purged.
- * @throws IOException if an I/O error occurs
- */
- void purge(File f) throws IOException;
+ /**
+ * Forces the file to be purged. Implementations are free to decide what
+ * the best option should be here. For example, whether to delete the
+ * file, whether to empty it...etc.
+ *
+ * @param f File instance that should be purged.
+ * @throws IOException if an I/O error occurs
+ */
+ void purge(File f) throws IOException;
- /**
- * Stop the file synching mechanism. This offers implementors the
- * opportunity to do any cleanup when the cache stops.
- */
- void stop();
+ /**
+ * Stop the file synching mechanism. This offers implementors the
+ * opportunity to do any cleanup when the cache stops.
+ */
+ void stop();
- }
+ }
- private class BufferedFileSync implements FileSync {
- protected final ConcurrentMap streams =
- new ConcurrentHashMap();
+ private class BufferedFileSync implements FileSync {
+ protected final ConcurrentMap streams =
+ new ConcurrentHashMap();
- @Override
- public void write(byte[] bytes, File f) throws IOException {
+ @Override
+ public void write(byte[] bytes, File f) throws IOException {
+ FileChannel channel = getChannel(f);
+ channel.write(ByteBuffer.wrap(bytes));
+ }
+
+ private FileChannel getChannel(File f) throws IOException {
- String path = f.getPath();
- FileChannel channel = streams.get(path);
- if (channel == null) {
- channel = createChannel(f);
- streams.putIfAbsent(path, channel);
- } else if (!f.exists()) {
- f.createNewFile();
- FileChannel oldChannel = channel;
- channel = createChannel(f);
- streams.replace(path, oldChannel, channel);
- }
+ String path = f.getPath();
+ FileChannel channel = streams.get(path);
+ if (channel == null) {
+ channel = createChannel(f);
+ streams.putIfAbsent(path, channel);
+ } else if (!f.exists()) {
+ f.createNewFile();
+ FileChannel oldChannel = channel;
+ channel = createChannel(f);
+ streams.replace(path, oldChannel, channel);
+ }
- channel.write(ByteBuffer.wrap(bytes));
+ return channel;
- }
+ }
- private FileChannel createChannel(File f) throws FileNotFoundException {
- return new RandomAccessFile(f, "rw").getChannel();
- }
+ private FileChannel createChannel(File f) throws FileNotFoundException {
+ return new RandomAccessFile(f, "rw").getChannel();
+ }
- @Override
- public void flush(File f) throws IOException {
- FileChannel channel = streams.get(f.getPath());
- if (channel != null)
- channel.force(true);
+ @Override
+ public void flush(File f) throws IOException {
+ FileChannel channel = streams.get(f.getPath());
+ if (channel != null)
+ channel.force(true);
- }
+ }
- @Override
- public void purge(File f) throws IOException {
- // Avoid a delete per-se because it hampers any fsync-like functionality
- // cos any cached file channel write won't change the file's exists
- // status. So, clear the file rather than delete it.
+ @Override
+ public void purge(File f) throws IOException {
+ // Avoid a delete per-se because it hampers any fsync-like functionality
+ // cos any cached file channel write won't change the file's exists
+ // status. So, clear the file rather than delete it.
- FileChannel channel = streams.get(f.getPath());
+ FileChannel channel = getChannel(f);
- channel.truncate(0);
- // Apart from truncating, it's necessary to reset the position!
- channel.position(0);
- }
+ channel.truncate(0);
+ // Apart from truncating, it's necessary to reset the position!
+ channel.position(0);
+ }
- @Override
- public void stop() {
- for (FileChannel channel : streams.values())
- Util.close(channel);
+ @Override
+ public void stop() {
+ for (FileChannel channel : streams.values())
+ Util.close(channel);
- streams.clear();
- }
+ streams.clear();
+ }
- }
+ }
- private class PeriodicFileSync extends BufferedFileSync {
- private final ScheduledExecutorService executor =
- Executors.newSingleThreadScheduledExecutor();
- protected final ConcurrentMap flushErrors =
- new ConcurrentHashMap();
+ private class PeriodicFileSync extends BufferedFileSync {
+ private final ScheduledExecutorService executor =
+ Executors.newSingleThreadScheduledExecutor();
+ protected final ConcurrentMap flushErrors =
+ new ConcurrentHashMap();
- private PeriodicFileSync(long interval) {
- executor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- for (Map.Entry entry : streams.entrySet()) {
- if (trace)
- log.tracef("Flushing channel in %s", entry.getKey());
- FileChannel channel = entry.getValue();
- try {
- channel.force(true);
- } catch (IOException e) {
- if (trace)
- log.tracef(e, "Error flushing output stream for %s", entry.getKey());
- flushErrors.putIfAbsent(entry.getKey(), e);
- // If an error is encountered, close it. Next time it's used,
- // the exception will be propagated back to the user.
- Util.close(channel);
- }
- }
- }
- }, interval, interval, TimeUnit.MILLISECONDS);
- }
+ private PeriodicFileSync(long interval) {
+ executor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ for (Map.Entry entry : streams.entrySet()) {
+ if (trace)
+ log.tracef("Flushing channel in %s", entry.getKey());
+ FileChannel channel = entry.getValue();
+ try {
+ channel.force(true);
+ } catch (IOException e) {
+ if (trace)
+ log.tracef(e, "Error flushing output stream for %s", entry.getKey());
+ flushErrors.putIfAbsent(entry.getKey(), e);
+ // If an error is encountered, close it. Next time it's used,
+ // the exception will be propagated back to the user.
+ Util.close(channel);
+ }
+ }
+ }
+ }, interval, interval, TimeUnit.MILLISECONDS);
+ }
- @Override
- public void write(byte[] bytes, File f) throws IOException {
- String path = f.getPath();
- IOException error = flushErrors.get(path);
- if (error != null)
- throw new IOException(String.format(
- "Periodic flush of channel for %s failed", path), error);
+ @Override
+ public void write(byte[] bytes, File f) throws IOException {
+ String path = f.getPath();
+ IOException error = flushErrors.get(path);
+ if (error != null)
+ throw new IOException(String.format(
+ "Periodic flush of channel for %s failed", path), error);
- super.write(bytes, f);
- }
+ super.write(bytes, f);
+ }
- @Override
- public void stop() {
- executor.shutdown();
- super.stop();
- }
- }
+ @Override
+ public void stop() {
+ executor.shutdown();
+ super.stop();
+ }
+ }
- private class PerWriteFileSync implements FileSync {
- @Override
- public void write(byte[] bytes, File f) throws IOException {
- FileOutputStream fos = null;
- try {
- fos = new FileOutputStream(f);
- fos.write(bytes);
- fos.flush();
- } finally {
+ private class PerWriteFileSync implements FileSync {
+ @Override
+ public void write(byte[] bytes, File f) throws IOException {
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(f);
+ fos.write(bytes);
+ fos.flush();
+ } finally {
- if (fos != null)
- fos.close();
+ Util.close(fos);
- }
- }
+ }
+ }
- @Override
- public void flush(File f) throws IOException {
- // No-op since flush always happens upon write
- }
+ @Override
+ public void flush(File f) throws IOException {
+ // No-op since flush always happens upon write
+ }
- @Override
- public void purge(File f) throws IOException {
- f.delete();
- }
+ @Override
+ public void purge(File f) throws IOException {
+ f.delete();
+ }
- @Override
- public void stop() {
- // No-op
- }
- }
+ @Override
+ public void stop() {
+ // No-op
+ }
+ }
}
+