From 1ec3d9bb67bcdb604a1ec8f241efee48e71cc7b3 Mon Sep 17 00:00:00 2001 From: twn Date: Thu, 18 Oct 2012 15:58:13 +0200 Subject: [PATCH] ISPN-2033: Fixes jdbc connection leak when the purger does not find any buckets to purge. --- .../loaders/jdbc/binary/JdbcBinaryCacheStore.java | 254 +++++++++++---------- 1 file changed, 134 insertions(+), 120 deletions(-) diff --git a/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java b/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java index d908e3f..1e00f19 100644 --- a/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java +++ b/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java @@ -338,147 +338,161 @@ public void purgeInternal() throws CacheLoaderException { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; - Set expiredBuckets = new HashSet(); + final Set expiredBuckets = new HashSet(); final int batchSize = 100; try { - String sql = tableManipulation.getSelectExpiredRowsSql(); - conn = connectionFactory.getConnection(); - ps = conn.prepareStatement(sql); - ps.setLong(1, System.currentTimeMillis()); - rs = ps.executeQuery(); - while (rs.next()) { - Integer key = rs.getInt(2); - if (immediateLockForWriting(key)) { - if (log.isTraceEnabled()) { - log.tracef("Adding bucket keyed %s for purging.", key); - } - Bucket bucket = null; - try { - InputStream binaryStream = rs.getBinaryStream(1); - bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream); - } catch (Exception ex) { - // If something goes wrong during unmarshalling, unlock the key before rethrowing - unlock(key); - throw ex; - } - bucket.setBucketId(key); - expiredBuckets.add(bucket); - } else { - if (log.isTraceEnabled()) { - log.tracef("Could not acquire write lock for %s, this won't be purged even though it has expired elements", key); + try { + final String sql = tableManipulation.getSelectExpiredRowsSql(); + conn = connectionFactory.getConnection(); + ps = conn.prepareStatement(sql); + ps.setLong(1, System.currentTimeMillis()); + rs = ps.executeQuery(); + while (rs.next()) { + final Integer key = rs.getInt(2); + if (immediateLockForWriting(key)) { + if (log.isTraceEnabled()) { + log.tracef("Adding bucket keyed %s for purging.", key); + } + Bucket bucket = null; + try { + final InputStream binaryStream = rs.getBinaryStream(1); + bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream); + } catch (final Exception ex) { + // If something goes wrong during unmarshalling, unlock the + // key before rethrowing + unlock(key); + throw ex; + } + bucket.setBucketId(key); + expiredBuckets.add(bucket); + } else { + if (log.isTraceEnabled()) { + log.tracef( + "Could not acquire write lock for %s, this won't be purged even though it has expired elements", + key); + } } } + } catch (final SQLException ex) { + // if something happens make sure buckets locks are being release + releaseLocks(expiredBuckets); + log.failedClearingJdbcCacheStore(ex); + throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", ex); + } catch (final Exception e) { + // if something happens make sure buckets locks are being release + releaseLocks(expiredBuckets); + throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", e); + } finally { + JdbcUtil.safeClose(ps); + JdbcUtil.safeClose(rs); } - } catch (Exception ex) { - //if something happens make sure buckets locks are being release - releaseLocks(expiredBuckets); - connectionFactory.releaseConnection(conn); - log.failedClearingJdbcCacheStore(ex); - throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", ex); - } finally { - JdbcUtil.safeClose(ps); - JdbcUtil.safeClose(rs); - } - if (log.isTraceEnabled()) { - log.tracef("Found following buckets: %s which are about to be expired", expiredBuckets); - } + if (log.isTraceEnabled()) { + log.tracef("Found following buckets: %s which are about to be expired", expiredBuckets); + } - if (expiredBuckets.isEmpty()) { - return; - } - Set emptyBuckets = new HashSet(); - //now update all the buckets in batch - try { - String sql = tableManipulation.getUpdateRowSql(); - ps = conn.prepareStatement(sql); - int updateCount = 0; - Iterator it = expiredBuckets.iterator(); - while (it.hasNext()) { - Bucket bucket = it.next(); - bucket.removeExpiredEntries(); - if (!bucket.isEmpty()) { - ByteBuffer byteBuffer = JdbcUtil.marshall(getMarshaller(), bucket); - ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength()); - ps.setLong(2, bucket.timestampOfFirstEntryToExpire()); - ps.setString(3, bucket.getBucketIdAsString()); - ps.addBatch(); - updateCount++; - if (updateCount % batchSize == 0) { - ps.executeBatch(); - if (log.isTraceEnabled()) { - log.tracef("Flushing batch, update count is: %d", updateCount); + if (expiredBuckets.isEmpty()) { + return; + } + final Set emptyBuckets = new HashSet(); + // now update all the buckets in batch + try { + final String sql = tableManipulation.getUpdateRowSql(); + ps = conn.prepareStatement(sql); + int updateCount = 0; + final Iterator it = expiredBuckets.iterator(); + while (it.hasNext()) { + final Bucket bucket = it.next(); + bucket.removeExpiredEntries(); + if (!bucket.isEmpty()) { + final ByteBuffer byteBuffer = JdbcUtil.marshall(getMarshaller(), bucket); + ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength()); + ps.setLong(2, bucket.timestampOfFirstEntryToExpire()); + ps.setString(3, bucket.getBucketIdAsString()); + ps.addBatch(); + updateCount++; + if (updateCount % batchSize == 0) { + ps.executeBatch(); + if (log.isTraceEnabled()) { + log.tracef("Flushing batch, update count is: %d", updateCount); + } } + } else { + it.remove(); + emptyBuckets.add(bucket); } - } else { - it.remove(); - emptyBuckets.add(bucket); } - } - //flush the batch - if (updateCount % batchSize != 0) { + // flush the batch + if (updateCount % batchSize != 0) { + if (log.isTraceEnabled()) { + log.tracef("Flushing batch, update count is: %d", updateCount); + } + ps.executeBatch(); + } if (log.isTraceEnabled()) { - log.tracef("Flushing batch, update count is: %d", updateCount); + log.tracef("Updated %d buckets.", updateCount); } - ps.executeBatch(); - } - if (log.isTraceEnabled()) { - log.tracef("Updated %d buckets.", updateCount); + } catch (final SQLException ex) { + // if something happens make sure buckets locks are being release + releaseLocks(emptyBuckets); + log.failedClearingJdbcCacheStore(ex); + throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", ex); + } catch (final InterruptedException ie) { + if (log.isTraceEnabled()) { + log.trace("Interrupted while marshalling to purge expired entries"); + } + Thread.currentThread().interrupt(); + } catch (final Exception e) { + // if something happens make sure buckets locks are being release + releaseLocks(emptyBuckets); + throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", e); + } finally { + // release locks for the updated buckets.This won't include empty + // buckets, as these were migrated to emptyBuckets + releaseLocks(expiredBuckets); + JdbcUtil.safeClose(ps); } - } catch (SQLException ex) { - //if something happens make sure buckets locks are being release - releaseLocks(emptyBuckets); - connectionFactory.releaseConnection(conn); - log.failedClearingJdbcCacheStore(ex); - throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", ex); - } catch (InterruptedException ie) { + if (log.isTraceEnabled()) { - log.trace("Interrupted while marshalling to purge expired entries"); + log.tracef("About to remove empty buckets %s", emptyBuckets); } - Thread.currentThread().interrupt(); - } finally { - //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets - releaseLocks(expiredBuckets); - JdbcUtil.safeClose(ps); - } - - - if (log.isTraceEnabled()) { - log.tracef("About to remove empty buckets %s", emptyBuckets); - } - if (emptyBuckets.isEmpty()) { - return; - } - //then remove the empty buckets - try { - String sql = tableManipulation.getDeleteRowSql(); - ps = conn.prepareStatement(sql); - int deletionCount = 0; - for (Bucket bucket : emptyBuckets) { - ps.setString(1, bucket.getBucketIdAsString()); - ps.addBatch(); - deletionCount++; - if (deletionCount % batchSize == 0) { - if (log.isTraceEnabled()) { - log.tracef("Flushing deletion batch, total deletion count so far is %d", deletionCount); + if (emptyBuckets.isEmpty()) { + return; + } + // then remove the empty buckets + try { + final String sql = tableManipulation.getDeleteRowSql(); + ps = conn.prepareStatement(sql); + int deletionCount = 0; + for (final Bucket bucket : emptyBuckets) { + ps.setString(1, bucket.getBucketIdAsString()); + ps.addBatch(); + deletionCount++; + if (deletionCount % batchSize == 0) { + if (log.isTraceEnabled()) { + log.tracef("Flushing deletion batch, total deletion count so far is %d", deletionCount); + } + ps.executeBatch(); } - ps.executeBatch(); } - } - if (deletionCount % batchSize != 0) { - int[] batchResult = ps.executeBatch(); - if (log.isTraceEnabled()) { - log.tracef("Flushed the batch and received following results: %s", Arrays.toString(batchResult)); + if (deletionCount % batchSize != 0) { + final int[] batchResult = ps.executeBatch(); + if (log.isTraceEnabled()) { + log.tracef("Flushed the batch and received following results: %s", Arrays.toString(batchResult)); + } } + } catch (final SQLException ex) { + // if something happens make sure buckets locks are being release + log.failedClearingJdbcCacheStore(ex); + throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", ex); + } catch (final Exception e) { + throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", e); + } finally { + releaseLocks(emptyBuckets); + JdbcUtil.safeClose(ps); } - } catch (SQLException ex) { - //if something happens make sure buckets locks are being release - log.failedClearingJdbcCacheStore(ex); - throw new CacheLoaderException("Failed clearing JdbcBinaryCacheStore", ex); } finally { - releaseLocks(emptyBuckets); - JdbcUtil.safeClose(ps); connectionFactory.releaseConnection(conn); } } -- 1.7.11.msysgit.1