Uploaded image for project: 'Drools'
  1. Drools
  2. DROOLS-138

AccumulateNode throws NPE

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Unresolved
    • Icon: Minor Minor
    • None
    • 5.5.1.Final
    • None
    • None
    • Hide
          1. Average.java ###
            package drools5fusioneval;

      import java.io.IOException;
      import java.util.Random;
      import org.drools.KnowledgeBase;
      import org.drools.KnowledgeBaseConfiguration;
      import org.drools.KnowledgeBaseFactory;
      import org.drools.builder.KnowledgeBuilder;
      import org.drools.builder.KnowledgeBuilderFactory;
      import org.drools.builder.ResourceType;
      import org.drools.conf.EventProcessingOption;
      import org.drools.io.ResourceFactory;
      import org.drools.runtime.StatefulKnowledgeSession;
      import org.drools.runtime.rule.WorkingMemoryEntryPoint;

      class AvgDFEChannel implements org.drools.runtime.Channel {
      @Override
      public void send(Object o)

      { System.err.println ("Recieved channel message: "+ o); }

      }

      public class Average {
      public static void main(String[] args) throws InterruptedException, IOException {
      KnowledgeBaseConfiguration kbconfig = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
      kbconfig.setOption(EventProcessingOption.STREAM);

      KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(kbconfig);

      KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
      kbuilder.add(ResourceFactory.newClassPathResource("drools5fusioneval/average.drl"), ResourceType.DRL);
      if (kbuilder.hasErrors())

      { System.err.println(kbuilder.getErrors().toString()); }

      kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());

      final StatefulKnowledgeSession session = kbase.newStatefulKnowledgeSession();
      session.registerChannel("heartbeat", new AvgDFEChannel ());
      WorkingMemoryEntryPoint ep01 = session.getWorkingMemoryEntryPoint("ep01");

      new Thread() {
      public void run()

      { session.fireUntilHalt(); }

      }.start();

      Thread.sleep (5000); // give the engine time to get setup

      Server hiwaesdk = new Server ("hiwaesdk");
      session.insert(hiwaesdk);
      long LIMIT = 120;
      Random rnd = new Random (System.nanoTime());

      for (long i = LIMIT; i > 0; i--)

      { int j = rnd.nextInt (212); ep01.insert (new IntEvent (j)); //Thread.sleep (0x1); }

      System.out.println (hiwaesdk);
      }
      }

          1. average.drl ###
            package drools5fusioneval

      declare IntEvent
      @role ( event )
      @expires(15s)
      end

      declare Statistics
      opsec : long
      total : long
      end
      global Statistics stats;

      rule "Init engine"
      salience 100
      when
      then
      stats = new Statistics ();
      stats.setTotal (0);
      stats.setOpsec (0);
      drools.getWorkingMemory ().setGlobal ("stats", stats);
      System.out.println ("Engine initialized");
      end

      rule "number rule"
      when
      $e : IntEvent () from entry-point ep01
      $s : Server (hostname == "hiwaesdk")
      then
      $s.currentTemp = $e.data;
      stats.setOpsec (stats.getOpsec () + 1);
      stats.setTotal (stats.getTotal () + 1);
      end

      rule "average temp"
      when
      Number ($avg : intValue) from accumulate (
      IntEvent ($temp : data) over window:length(10) from entry-point ep01, average ($temp)
      )
      $s : Server (hostname == "hiwaesdk")
      then
      $s.avgTemp = $avg;
      end

      rule "Heartbeat"
      timer ( int: 5s 5s)
      when
      then
      channels["heartbeat"].send ("Heartbeat: "+ stats.getOpsec () / 5 ", " stats.getTotal ());
      stats.setOpsec (0);
      end

      Show
      Average.java ### package drools5fusioneval; import java.io.IOException; import java.util.Random; import org.drools.KnowledgeBase; import org.drools.KnowledgeBaseConfiguration; import org.drools.KnowledgeBaseFactory; import org.drools.builder.KnowledgeBuilder; import org.drools.builder.KnowledgeBuilderFactory; import org.drools.builder.ResourceType; import org.drools.conf.EventProcessingOption; import org.drools.io.ResourceFactory; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.WorkingMemoryEntryPoint; class AvgDFEChannel implements org.drools.runtime.Channel { @Override public void send(Object o) { System.err.println ("Recieved channel message: "+ o); } } public class Average { public static void main(String[] args) throws InterruptedException, IOException { KnowledgeBaseConfiguration kbconfig = KnowledgeBaseFactory.newKnowledgeBaseConfiguration(); kbconfig.setOption(EventProcessingOption.STREAM); KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(kbconfig); KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); kbuilder.add(ResourceFactory.newClassPathResource("drools5fusioneval/average.drl"), ResourceType.DRL); if (kbuilder.hasErrors()) { System.err.println(kbuilder.getErrors().toString()); } kbase.addKnowledgePackages(kbuilder.getKnowledgePackages()); final StatefulKnowledgeSession session = kbase.newStatefulKnowledgeSession(); session.registerChannel("heartbeat", new AvgDFEChannel ()); WorkingMemoryEntryPoint ep01 = session.getWorkingMemoryEntryPoint("ep01"); new Thread() { public void run() { session.fireUntilHalt(); } }.start(); Thread.sleep (5000); // give the engine time to get setup Server hiwaesdk = new Server ("hiwaesdk"); session.insert(hiwaesdk); long LIMIT = 120; Random rnd = new Random (System.nanoTime()); for (long i = LIMIT; i > 0; i--) { int j = rnd.nextInt (212); ep01.insert (new IntEvent (j)); //Thread.sleep (0x1); } System.out.println (hiwaesdk); } } average.drl ### package drools5fusioneval declare IntEvent @role ( event ) @expires(15s) end declare Statistics opsec : long total : long end global Statistics stats; rule "Init engine" salience 100 when then stats = new Statistics (); stats.setTotal (0); stats.setOpsec (0); drools.getWorkingMemory ().setGlobal ("stats", stats); System.out.println ("Engine initialized"); end rule "number rule" when $e : IntEvent () from entry-point ep01 $s : Server (hostname == "hiwaesdk") then $s.currentTemp = $e.data; stats.setOpsec (stats.getOpsec () + 1); stats.setTotal (stats.getTotal () + 1); end rule "average temp" when Number ($avg : intValue) from accumulate ( IntEvent ($temp : data) over window:length(10) from entry-point ep01, average ($temp) ) $s : Server (hostname == "hiwaesdk") then $s.avgTemp = $avg; end rule "Heartbeat" timer ( int: 5s 5s) when then channels ["heartbeat"] .send ("Heartbeat: "+ stats.getOpsec () / 5 ", " stats.getTotal ()); stats.setOpsec (0); end

      AccumulateNode throws a NPE when events are inserted too quickly via an entry-point. A stateful knowledge session to which events are inserted and running an accumulate on the input events. I have a for loop generating the events and inserting them as rapidly as possible. In the rules I use an accumulator to calculate the average of the values contained within the events. The behavior I'm observing is that if I insert ~120 events without any waiting I receive an NPE. If I Thread.sleep for even just 1ms the test goes off without a hitch.

      The NPE is as follows:
      Exception in thread "main" java.lang.NullPointerException
      at org.drools.reteoo.AccumulateNode.getFirstMatch(AccumulateNode.java:1050)
      at org.drools.reteoo.AccumulateNode.modifyLeftTuple(AccumulateNode.java:345)
      at org.drools.reteoo.SingleLeftTupleSinkAdapter.propagateModifyChildLeftTuple(SingleLeftTupleSinkAdapter.java:259)
      at org.drools.reteoo.AccumulateNode.evaluateResultConstraints(AccumulateNode.java:676)
      at org.drools.reteoo.ReteooWorkingMemory$EvaluateResultConstraints.execute(ReteooWorkingMemory.java:590)
      at org.drools.common.PropagationContextImpl.evaluateActionQueue(PropagationContextImpl.java:350)
      at org.drools.rule.SlidingLengthWindow.assertFact(SlidingLengthWindow.java:119)
      at org.drools.rule.BehaviorManager.assertFact(BehaviorManager.java:94)
      at org.drools.reteoo.WindowNode.assertObject(WindowNode.java:167)
      at org.drools.reteoo.CompositeObjectSinkAdapter.doPropagateAssertObject(CompositeObjectSinkAdapter.java:497)
      at org.drools.reteoo.CompositeObjectSinkAdapter.propagateAssertObject(CompositeObjectSinkAdapter.java:382)
      at org.drools.reteoo.ObjectTypeNode.assertObject(ObjectTypeNode.java:235)
      at org.drools.reteoo.EntryPointNode.assertObject(EntryPointNode.java:240)
      at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:350)
      at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:311)
      at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:127)
      at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:55)
      at drools5fusioneval.Average.main(Average.java:66)

            mproctor@redhat.com Mark Proctor
            jpbarto_jira Jason Barto (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: