package test.infinispan.custom; import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; import com.mongodb.MongoOptions; import com.mongodb.ServerAddress; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; import org.infinispan.Cache; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.loaders.AbstractCacheStore; import org.infinispan.loaders.CacheLoaderConfig; import org.infinispan.loaders.CacheLoaderException; import org.infinispan.loaders.CacheLoaderMetadata; import org.infinispan.marshall.StreamingMarshaller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.net.UnknownHostException; import java.util.LinkedHashSet; import java.util.Set; /** * MongoDB based cache store. * * @author Andrei Pushkin */ @CacheLoaderMetadata(configurationClass = InfinispanMongoStoreConfig.class) public class InfinispanMongoStore extends AbstractCacheStore { private Logger log = LoggerFactory.getLogger(getClass()); InfinispanMongoStoreConfig config; DBCollection coll; @Override public void init(CacheLoaderConfig config, Cache cache, StreamingMarshaller m) throws CacheLoaderException { super.init(config, cache, m); this.config = (InfinispanMongoStoreConfig) config; } @Override public void start() throws CacheLoaderException { log.trace("Starting " + getClass().getSimpleName()); super.start(); Mongo m; try { MongoOptions moptions = new MongoOptions(); moptions.autoConnectRetry = true; moptions.connectionsPerHost = 100; moptions.threadsAllowedToBlockForConnectionMultiplier = 10; // m = new Mongo( "192.168.0.3", 27017 ); ServerAddress srvAddr = new ServerAddress(config.getHost(), config.getPort()); m = new Mongo(srvAddr, moptions); // setting WriteConcern to true enables fsync, however performance degradation is very big: 5-10 times! // It makes sense to enable it only on particular updates (1.4ms vs 12ms fsynced per 1KB update) m.setWriteConcern(WriteConcern.SAFE); } catch (UnknownHostException e) { throw new RuntimeException(e); } coll = m.getDB(config.getDbName()).getCollection(config.getDbCollection()); coll.ensureIndex("expiry"); } public void store(InternalCacheEntry entry) throws CacheLoaderException { log.trace("store: " + entry); BasicDBObject doc = new BasicDBObject(); try { doc.put("value", marshaller.objectToByteBuffer(entry)); } catch (IOException e) { throw new RuntimeException(e); } catch (InterruptedException e) { throw new RuntimeException(e); } if (entry.canExpire()) { long expiry = entry.getExpiryTime(); if (entry.getMaxIdle() > 0) { // Coding getExpiryTime() for transient entries has the risk of being a moving target // which could lead to unexpected results, hence, InternalCacheEntry calls are required expiry = entry.getMaxIdle() + System.currentTimeMillis(); } doc.put("expiry", expiry); } doc.put("_id", entry.getKey()); WriteResult result = coll.save(doc); result.getLastError().throwOnError(); } public InternalCacheEntry load(Object key) throws CacheLoaderException { DBObject read = coll.findOne(new BasicDBObject("_id", key)); if (read == null) { return null; } byte[] bytes = (byte[]) read.get("value"); try { InternalCacheEntry entry = (InternalCacheEntry) marshaller.objectFromByteBuffer(bytes); if (entry != null && entry.isExpired(System.currentTimeMillis())) { return null; } return entry; } catch (IOException e) { throw new RuntimeException(e); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } public Set loadAll() throws CacheLoaderException { Set set = new LinkedHashSet(); DBCursor cursor = coll.find(); while (cursor.hasNext()) { DBObject raw = cursor.next(); byte[] bytes = (byte[]) raw.get("value"); try { set.add((InternalCacheEntry) marshaller.objectFromByteBuffer(bytes)); } catch (IOException e) { throw new RuntimeException(e); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } cursor.close(); return set; } @Override public Set load(int numEntries) throws CacheLoaderException { Set set = new LinkedHashSet(); DBCursor cursor = coll.find(); int i = 0; while (cursor.hasNext() && i++ < numEntries) { DBObject raw = cursor.next(); byte[] bytes = (byte[]) raw.get("value"); try { set.add((InternalCacheEntry) marshaller.objectFromByteBuffer(bytes)); } catch (IOException e) { throw new RuntimeException(e); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } cursor.close(); return set; } @Override protected void purgeInternal() throws CacheLoaderException { DBObject query = new BasicDBObject("expiry", new BasicDBObject("$lt", System.currentTimeMillis())); WriteResult result = coll.remove(query); log.trace("purgeInternal removed " + result.getN() + " expired records"); result.getLastError().throwOnError(); } @Override public Set loadAllKeys(Set keysToExclude) throws CacheLoaderException { DBObject query = new BasicDBObject("_id", new BasicDBObject("$nin", keysToExclude)); DBCursor cursor = coll.find(query); Set keySet = new LinkedHashSet(); while (cursor.hasNext()) { keySet.add(cursor.next()); } log.trace("loadAllKeys returned " + keySet.size() + " excluded " + keysToExclude.size()); return keySet; } @Override public void clear() throws CacheLoaderException { coll.drop(); } @Override public boolean remove(Object key) throws CacheLoaderException { DBObject query = new BasicDBObject("_id", key); WriteResult result = coll.remove(query); log.trace("removed " + result.getN() + " expired records"); result.getLastError().throwOnError(); return false; } public void fromStream(ObjectInput inputStream) throws CacheLoaderException { new UnsupportedOperationException("fromStream").printStackTrace(); } public void toStream(ObjectOutput outputStream) throws CacheLoaderException { new UnsupportedOperationException("toStream").printStackTrace(); } public Class getConfigurationClass() { return InfinispanMongoStoreConfig.class; } }