import org.infinispan.Cache; import org.infinispan.config.ConfigurationException; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.io.ExposedByteArrayOutputStream; import org.infinispan.loaders.CacheLoaderConfig; import org.infinispan.loaders.CacheLoaderException; import org.infinispan.loaders.CacheLoaderMetadata; import org.infinispan.loaders.bucket.Bucket; import org.infinispan.loaders.bucket.BucketBasedCacheStore; import org.infinispan.loaders.file.FileCacheStore; import org.infinispan.loaders.file.FileCacheStoreConfig; import org.infinispan.marshall.StreamingMarshaller; import org.infinispan.util.Util; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @CacheLoaderMetadata(configurationClass = FileCacheStoreConfig.class) public class CombinedBucketFileCacheStore extends BucketBasedCacheStore { static final Log log = LogFactory.getLog(FileCacheStore.class); private static final boolean trace = log.isTraceEnabled(); private int streamBufferSize; private boolean purgeWithGlobalLock = false; // TODO should get a configuration entry private int maxConcurrentOpenFiles = 500; // TODO add configuration option private int fileNameMask = 0xfffffc00; // TODO should get a configuration entry private long minPurgeSleep = 90000; // TODO should get a configuration entry private long updatePurgeGap = 2500; // TODO should get a configuration entry private long maxPurgePause = 600000; // TODO should get a configuration entry private boolean checkIfPurgeable = true; // TODO should get a configuration entry /** * Flag indicating whether a purge is currenty running (avoid concurrent purges). */ private AtomicBoolean purging = new AtomicBoolean(false); private long lastPurge; long lastPurgeCheck; long lastUpdate; volatile boolean purgeableBuckets = true; FileCacheStoreConfig config; File root; FileSync fileSync; @Override protected Integer getLockFromKey(Object key) { return Integer.valueOf(key.hashCode() & fileNameMask); } /** * @return root directory where all files for this {@link org.infinispan.loaders.CacheStore CacheStore} are written. */ public File getRoot() { return root; } @Override public void init(CacheLoaderConfig config, Cache cache, StreamingMarshaller m) throws CacheLoaderException { super.init(config, cache, m); this.config = (FileCacheStoreConfig) config; } @Override protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException { try { File[] listFiles; if (root != null && (listFiles = root.listFiles()) != null) { for (File bucketFile : listFiles) { Bucket bucket = loadBucket(bucketFile); if (handler.handle(bucket)) break; } } } catch (InterruptedException ie) { if (log.isDebugEnabled()) { log.debug("Interrupted, so stop looping over buckets."); } Thread.currentThread().interrupt(); } } @Override protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException { try { int numFiles = objectInput.readInt(); byte[] buffer = new byte[streamBufferSize]; int bytesRead, totalBytesRead = 0; for (int i = 0; i < numFiles; i++) { String fName = (String) objectInput.readObject(); int numBytes = objectInput.readInt(); FileOutputStream fos = new FileOutputStream(root.getAbsolutePath() + File.separator + fName); BufferedOutputStream bos = new BufferedOutputStream(fos, streamBufferSize); try { while (numBytes > totalBytesRead) { if (numBytes - totalBytesRead > streamBufferSize) { bytesRead = objectInput.read(buffer, 0, streamBufferSize); } else { bytesRead = objectInput.read(buffer, 0, numBytes - totalBytesRead); } if (bytesRead == -1) { break; } totalBytesRead += bytesRead; bos.write(buffer, 0, bytesRead); } bos.flush(); fos.flush(); totalBytesRead = 0; } finally { safeClose(bos); safeClose(fos); } } } catch (IOException e) { throw new CacheLoaderException("I/O error", e); } catch (ClassNotFoundException e) { throw new CacheLoaderException("Unexpected exception", e); } } @Override protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException { try { File[] files = root.listFiles(); if (files == null) throw new CacheLoaderException("Root not directory or IO error occurred"); objectOutput.writeInt(files.length); byte[] buffer = new byte[streamBufferSize]; for (File file : files) { int bytesRead, totalBytesRead = 0; BufferedInputStream bis = null; FileInputStream fileInStream = null; try { if (trace) { log.tracef("Opening file in %s", file); } fileInStream = new FileInputStream(file); int sz = fileInStream.available(); bis = new BufferedInputStream(fileInStream); objectOutput.writeObject(file.getName()); objectOutput.writeInt(sz); while (sz > totalBytesRead) { bytesRead = bis.read(buffer, 0, streamBufferSize); if (bytesRead == -1) { break; } totalBytesRead += bytesRead; objectOutput.write(buffer, 0, bytesRead); } } finally { Util.close(bis); Util.close(fileInStream); } } } catch (IOException e) { throw new CacheLoaderException("I/O exception while generating stream", e); } } @Override protected void clearLockSafe() throws CacheLoaderException { File[] toDelete = root.listFiles(); if (toDelete == null) { return; } for (File f : toDelete) { if (!deleteFile(f)) { log.problemsRemovingFile(f); } } } @Override protected boolean supportsMultiThreadedPurge() { return true; } /** * */ @Override protected void purgeInternal() throws CacheLoaderException { // if cache store contains no expireable cache entries, we do not need to run purge if (!purgeableBuckets) return; // prevent parallel purge operations if (purging.compareAndSet(false, true)) { try { long currentTime = System.currentTimeMillis(); if (lastPurge + minPurgeSleep > currentTime) return; // do not run purge, if last update is less than "updatePurgeGap" ms ago and // last purge run is not more than "maxPurgePause" ms ago if (lastUpdate + updatePurgeGap > currentTime && lastPurgeCheck + maxPurgePause > currentTime) return; lastPurgeCheck = lastPurge = System.currentTimeMillis(); if (purgeWithGlobalLock) { if (multiThreadedPurge) // Underlying implementation fails to do so multiThreadedPurge = false; purgeGlobalLock(); } else purgeNonGlobalLock(); } finally { purging.set(false); } } } /** * Purge implementation using per cache key locks - thus it does not block the whole application during a purge. */ private void purgeNonGlobalLock() throws CacheLoaderException { try { File[] files = getRoot().listFiles(); if (files == null) throw new CacheLoaderException("Root not directory or IO error occurred"); for (final File bucketFile : files) { if (lastPurgeCheck > bucketFile.lastModified()) continue; if (multiThreadedPurge) { purgerService.execute(new Runnable() { @Override public void run() { try { maybePurgeBucketLock(bucketFile); } catch (InterruptedException ie) { if (log.isDebugEnabled()) log.debug("Interrupted, so finish work."); } catch (CacheLoaderException e) { log.problemsPurgingFile(bucketFile, e); } } }); } else maybePurgeBucketLock(bucketFile); } } catch (InterruptedException ie) { if (log.isDebugEnabled()) { log.debug("Interrupted, so stop loading and finish with purging."); } Thread.currentThread().interrupt(); } } /** * Purge implementation using a global lock. */ protected void purgeGlobalLock() throws CacheLoaderException { if (trace) { log.trace("purgeInternal()"); } if (acquireGlobalLock(false)) { try { File[] files = root.listFiles(); if (files == null) throw new CacheLoaderException("Root not directory or IO error occurred"); for (final File bucketFile : files) { if (lastPurgeCheck > bucketFile.lastModified()) continue; if (false && multiThreadedPurge) { // TODO must not use multithreaded purge with GLOBAL lock purgerService.execute(new Runnable() { @Override public void run() { try { maybePurgeBucket(bucketFile); } catch (InterruptedException ie) { if (log.isDebugEnabled()) { log.debug("Interrupted, so finish work."); } } catch (CacheLoaderException e) { log.problemsPurgingFile(bucketFile, e); } } }); } else maybePurgeBucket(bucketFile); } } catch (InterruptedException ie) { if (log.isDebugEnabled()) { log.debug("Interrupted, so stop loading and finish with purging."); } Thread.currentThread().interrupt(); } finally { releaseGlobalLock(false); if (trace) { log.trace("Exit purgeInternal()"); } } } else { log.unableToAcquireLockToPurgeStore(); } } void maybePurgeBucket(File bucketFile) throws CacheLoaderException, InterruptedException { Bucket bucket; if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) { updateBucket(bucket); } } void maybePurgeBucketLock(File bucketFile) throws CacheLoaderException, InterruptedException { Bucket bucket = loadBucket(bucketFile); if (bucket == null) return; boolean hasExpired = false; for (InternalCacheEntry entry : bucket.getEntries().values()) if (entry.isExpired()) { hasExpired = true; break; } if (!hasExpired) return; Integer key = Integer.valueOf(bucketFile.getName()); try { lockForWriting(key); maybePurgeBucket(bucketFile); } finally { unlock(key); } } @Override protected Bucket loadBucket(Integer hash) throws CacheLoaderException { try { return loadBucket(new File(root, String.valueOf(hash))); } catch (InterruptedException ie) { if (log.isDebugEnabled()) { log.debug("Interrupted, so stop loading bucket and return null."); } Thread.currentThread().interrupt(); return null; } } protected Bucket loadBucket(File bucketFile) throws CacheLoaderException, InterruptedException { Bucket bucket = null; if (bucketFile.exists()) { if (trace) { log.trace("Found bucket file: '" + bucketFile + "'"); } InputStream is = null; try { // It could happen that the output buffer might not have been // flushed, so just in case, flush it to be able to read it. fileSync.flush(bucketFile); if (bucketFile.length()==0) return null; is = new FileInputStream(bucketFile); bucket = (Bucket) objectFromInputStreamInReentrantMode(is); } catch (InterruptedException ie) { throw ie; } catch (Exception e) { log.errorReadingFromFile(bucketFile.getAbsoluteFile(), e); throw new CacheLoaderException("Error while reading from file", e); } finally { safeClose(is); } } if (bucket != null) bucket.setBucketId(bucketFile.getName()); return bucket; } @Override public void updateBucket(Bucket b) throws CacheLoaderException { purgeableBuckets |= bucketHasExpireable(b); File f = new File(root, b.getBucketIdAsString()); boolean empty = b.isEmpty(); if (f.exists()) { // delete the file, if it is empty (reduces time to purge the file cache) if (empty ? !deleteFile(f) : !purgeFile(f)) { log.problemsRemovingFile(f); } else if (trace) { log.tracef("Successfully deleted file: '%s'", f.getName()); } } if (!empty) { try { byte[] bytes = marshaller.objectToByteBuffer(b); if (bytes==null || bytes.length==0) { System.out.println("EMPTY: "+f.getName()); } fileSync.write(bytes, f); lastUpdate = System.currentTimeMillis(); } catch (IOException ex) { log.errorSavingBucket(b, ex); throw new CacheLoaderException(ex); } catch (InterruptedException ie) { if (trace) { log.trace("Interrupted while marshalling a bucket"); } Thread.currentThread().interrupt(); // Restore interrupted status } } } @Override public Class getConfigurationClass() { return FileCacheStoreConfig.class; } @Override public void start() throws CacheLoaderException { super.start(); String location = config.getLocation(); if (location == null || location.trim().length() == 0) { location = "Infinispan-FileCacheStore"; // use relative path! } location += File.separator + cache.getName(); root = new File(location); if (!root.exists()) { if (!root.mkdirs()) { log.problemsCreatingDirectory(root); } } if (!root.exists()) { throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!"); } streamBufferSize = config.getStreamBufferSize(); switch(config.getFsyncMode()) { case DEFAULT : fileSync = new BufferedFileSync(); break; case PER_WRITE: fileSync = new PerWriteFileSync(); break; case PERIODIC: fileSync = new PeriodicFileSync(config.getFsyncInterval()); break; } // check if current cache store contains any purgeable bucket if (checkIfPurgeable) { final boolean hasPurgeableBuckets[] = new boolean[1]; loopOverBuckets(new BucketHandler() { @Override public boolean handle(Bucket bucket) throws CacheLoaderException { hasPurgeableBuckets[0] |= bucketHasExpireable(bucket); return hasPurgeableBuckets[0]; } }); purgeableBuckets = hasPurgeableBuckets[0]; } lastPurgeCheck = System.currentTimeMillis(); } boolean bucketHasExpireable(Bucket bucket) { boolean hasExpireable = false; for (InternalCacheEntry entry : bucket.getEntries().values()) { hasExpireable |= entry.canExpire(); if (hasExpireable) break; } return hasExpireable; } @Override public void stop() throws CacheLoaderException { super.stop(); fileSync.stop(); } public Bucket loadBucketContainingKey(String key) throws CacheLoaderException { return loadBucket(key.hashCode()); } private boolean deleteFile(File f) { if (trace) { log.tracef("Really delete file %s", f); } return fileSync.delete(f); } private boolean purgeFile(File f) { if (trace) { log.tracef("Really purge file %s", f); } try { fileSync.purge(f); return true; } catch (IOException e) { if (trace) log.trace("Error encountered while clearing file: " + f, e); return false; } } private Object objectFromInputStreamInReentrantMode(InputStream is) throws IOException, ClassNotFoundException, InterruptedException { int len = is.available(); Object o = null; if (len != 0) { ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len); byte[] buf = new byte[Math.min(len, 1024)]; int bytesRead; while ((bytesRead = is.read(buf, 0, buf.length)) != -1) { bytes.write(buf, 0, bytesRead); } is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size()); ObjectInput unmarshaller = marshaller.startObjectInput(is, false); try { o = marshaller.objectFromObjectStream(unmarshaller); } finally { marshaller.finishObjectInput(unmarshaller); } } return o; } /** * Specifies how the changes written to a file will be synched * with the underlying file system. */ private interface FileSync { /** * Writes the given bytes to the file. * * @param bytes byte array containing the bytes to write. * @param f File instance representing the location where to store the data. * @throws IOException if an I/O error occurs */ void write(byte[] bytes, File f) throws IOException; /** * Force the file changes to be flushed to the underlying file system. * Client code calling this flush method should in advance check whether * the file exists and so this method assumes that check was already done. * * @param f File instance representing the location changes should be flushed to. * @throws IOException if an I/O error occurs */ void flush(File f) throws IOException; /** * Forces the file to be purged. Implementations are free to decide what * the best option should be here. For example, whether to delete the * file, whether to empty it...etc. * * @param f File instance that should be purged. * @throws IOException if an I/O error occurs */ void purge(File f) throws IOException; /** * Stop the file synching mechanism. This offers implementors the * opportunity to do any cleanup when the cache stops. */ void stop(); boolean delete(File f); } /** * This implementation differs from the original implementation. * It limits the number of concurrent open channels to 500 and replaces the * ConcurrentHashMap with a synchronized LinkedHashMap. */ private class BufferedFileSync implements FileSync { protected final LinkedHashMap streams = new LinkedHashMap() { @Override protected boolean removeEldestEntry(Map.Entry eldest) { if (size()<=maxConcurrentOpenFiles) return false; try { eldest.getValue().close(); } catch (IOException e) { e.printStackTrace(); } return true; } }; @Override public void write(byte[] bytes, File f) throws IOException { if (bytes==null || bytes.length==0) return; String path = f.getPath(); FileChannel channel; synchronized (streams) { channel = streams.get(path); if (channel == null) channel = createChannel(f); else if (!f.exists()) { f.createNewFile(); FileChannel oldChannel = channel; Util.close(oldChannel); channel = createChannel(f); } channel.write(ByteBuffer.wrap(bytes)); } } /** * Must be called in synchronized (streams) context. */ private FileChannel createChannel(File f) throws FileNotFoundException { FileChannel channel = new RandomAccessFile(f, "rw").getChannel(); streams.put(f.getPath(), channel); return channel; } @Override public void flush(File f) throws IOException { FileChannel channel; synchronized (streams) { channel = streams.get(f.getPath()); } if (channel != null) channel.force(false); } @Override public boolean delete(File f) { String path = f.getPath(); synchronized (streams) { FileChannel channel = streams.get(path); if (channel!=null) { streams.remove(path); Util.close(channel); } return f.delete(); } } @Override public void purge(File f) throws IOException { // Avoid a delete per-se because it hampers any fsync-like functionality // cos any cached file channel write won't change the file's exists // status. So, clear the file rather than delete it. FileChannel channel; synchronized (streams) { channel = streams.get(f.getPath()); } if (channel == null) channel = createChannel(f); channel.truncate(0); // Apart from truncating, it's necessary to reset the position! channel.position(0); } @Override public void stop() { Collection channels; synchronized (streams) { channels = streams.values(); streams.clear(); } for (FileChannel channel : channels) { try { channel.force(true); } catch (IOException e) { log.errorFlushingToFileChannel(channel, e); } Util.close(channel); } } } // TODO maybe re-add the old implementation using a new FsyncMode private class BufferedFileSyncORIGINAL implements FileSync { protected final ConcurrentMap streams = new ConcurrentHashMap(); @Override public void write(byte[] bytes, File f) throws IOException { if (bytes==null || bytes.length==0) return; String path = f.getPath(); FileChannel channel = streams.get(path); if (channel == null) { channel = createChannel(f); FileChannel existingChannel = streams.putIfAbsent(path, channel); if (existingChannel != null) { Util.close(channel); channel = existingChannel; } } else if (!f.exists()) { f.createNewFile(); FileChannel oldChannel = channel; channel = createChannel(f); boolean replaced = streams.replace(path, oldChannel, channel); if (replaced) { Util.close(oldChannel); } else { Util.close(channel); channel = streams.get(path); } } channel.write(ByteBuffer.wrap(bytes)); } private FileChannel createChannel(File f) throws FileNotFoundException { return new RandomAccessFile(f, "rw").getChannel(); } @Override public void flush(File f) throws IOException { FileChannel channel = streams.get(f.getPath()); if (channel != null) channel.force(false); } @Override public boolean delete(File f) { String path = f.getPath(); FileChannel channel = streams.get(path); if (channel!=null) { streams.remove(path); Util.close(channel); } return f.delete(); } @Override public void purge(File f) throws IOException { // Avoid a delete per-se because it hampers any fsync-like functionality // cos any cached file channel write won't change the file's exists // status. So, clear the file rather than delete it. FileChannel channel = streams.get(f.getPath()); if (channel == null) { channel = createChannel(f); String path = f.getPath(); FileChannel existingChannel = streams.putIfAbsent(path, channel); if (existingChannel != null) { Util.close(channel); channel = existingChannel; } } channel.truncate(0); // Apart from truncating, it's necessary to reset the position! channel.position(0); } @Override public void stop() { for (FileChannel channel : streams.values()) { try { channel.force(true); } catch (IOException e) { log.errorFlushingToFileChannel(channel, e); } Util.close(channel); } streams.clear(); } } private class PeriodicFileSync extends BufferedFileSync { private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); protected final ConcurrentMap flushErrors = new ConcurrentHashMap(); private PeriodicFileSync(long interval) { executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { for (Map.Entry entry : streams.entrySet()) { if (trace) log.tracef("Flushing channel in %s", entry.getKey()); FileChannel channel = entry.getValue(); try { channel.force(true); } catch (IOException e) { if (trace) log.tracef(e, "Error flushing output stream for %s", entry.getKey()); flushErrors.putIfAbsent(entry.getKey(), e); // If an error is encountered, close it. Next time it's used, // the exception will be propagated back to the user. Util.close(channel); } } } }, interval, interval, TimeUnit.MILLISECONDS); } @Override public void write(byte[] bytes, File f) throws IOException { String path = f.getPath(); IOException error = flushErrors.get(path); if (error != null) throw new IOException(String.format( "Periodic flush of channel for %s failed", path), error); super.write(bytes, f); } @Override public void stop() { executor.shutdown(); super.stop(); } } private class PerWriteFileSync implements FileSync { @Override public void write(byte[] bytes, File f) throws IOException { if (bytes==null || bytes.length==0) return; FileOutputStream fos = null; try { fos = new FileOutputStream(f); fos.write(bytes); fos.flush(); } finally { if (fos != null) fos.close(); } } @Override public void flush(File f) throws IOException { // No-op since flush always happens upon write } @Override public boolean delete(File f) { return f.delete(); } @Override public void purge(File f) throws IOException { f.delete(); } @Override public void stop() { // No-op } } }