DefaultPGroup Resize

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

DefaultPGroup Resize

Prakash Viswanathan

Hello 


I noticed a problem with DefaultPGroup class of groovyx.gpars.group package.

 

I am using a class similar to the one below and try to update the pool size of the pooledGroup using resize() method which is inherited from PGroup class.

The change in pool size is working when increasing the pool size from 1 to 10 but the reverse is not working, how do I get past this? i.e. I am trying to change the number of consumers for a task in runtime.

 

import java.io.File;

import java.util.Observable;

 

import org.apache.commons.lang.StringUtils;

 

import groovyx.gpars.dataflow.DataFlowQueue;

import groovyx.gpars.dataflow.DataFlowVariable

import groovyx.gpars.group.DefaultPGroup

import static groovyx.gpars.agent.Agent.agent

 

class MyProcessManager implements Observer

{

          def activeTasks = agent(0L)

          def pooledGroup

          int threadCount

          DataFlowQueue<MyProcess> dfq

          MyProcessProducer producer

         

          boolean stop = false

          boolean pause = false

         

          public MyProcessManager(File inputDataFile, int threadCount)

          {

                   pooledGroup = new DefaultPGroup(1)

                   this.threadCount = threadCount

                   dfq = new DataFlowQueue<MyProcess>()

                   producer = new MyProcessProducer( inputDataFile, threadCount, dfq )

                  

                   DynamicMyProcessPropertyUtil dmpu = new DynamicMyProcessPropertyUtil()

                   dmpu.addObserver(this)

                  

                   this.addShutdownHook { if (dmpu) dmpu.deleteObservers() }

          }

         

          public int process()

          {

                   new Thread(producer).start()

                   new MyProcessConsumer().start()

                  

                   def doneFlag = new DataFlowVariable()

                   activeTasks.addListener {oldValue, newValue -> if (newValue==0) doneFlag.bind(true)}

                   if (activeTasks.val > 0) doneFlag.join()

                   pooledGroup.shutdown()

                  

                   return producer.getNumberOfRowsRead()

          }

         

          public void update(Observable obs, Object arg)

          {

                   if (obs instanceof DynamicMyProcessPropertyUtil)

                   {

                             Map map = (Map)arg

                             updateThreadCount map['POOL_SIZE'].toInteger()

                             pause = map['PAUSE'].asBoolean()

                             stop = map['STOP'].asBoolean()

                   }

          }

         

          private void updateThreadCount(int poolSize)

          {

                   if (poolSize > 0 && poolSize != threadCount)

                   {

                             println 'Change Request: ' + poolSize

                             println "Current Thread Count: " + pooledGroup.getPoolSize()

                              threadCount = poolSize

                              pooledGroup.resetDefaultSize()

                              pooledGroup.resize(poolSize)

                              println "Thread Count updated: " + pooledGroup.getPoolSize()

                   }

          }

         

          public class MyProcessConsumer extends Thread

          {

                   @Override

                   public void run()

                   {

                             while (!stop)

                             {

                                      MyProcess process = dfq.val

                                      if(!process){stop = true; break}

                                     

                                      activeTasks << {updateValue it+1}

                                      pooledGroup.task {

                                               

                                                process.run()

                                               

                                                if (pause)

                                                {

                                                          println 'Paused..'

                                                          while(pause)

                                                          {

                                                                   try

                                                                   {

                                                                             Thread.sleep(5000)

                                                                   }

                                                                   catch(InterruptedException e)

                                                                   {

                                                                             Thread.currentThread().interrupt()

                                                                   }

                                                          }

                                                          println 'Resumed..'

                                                }

                                               

                                                activeTasks << {updateValue it-1}

                                      }

                             }

                   }

          }

         

}

 

I am using GPARS 0.11, also tried 0.12 but still same problem.

 

Thanks

Prakash

 

Reply | Threaded
Open this post in threaded view
|

Re: DefaultPGroup Resize

Vaclav
Administrator
Hello Prakash,

DefaultPGroup uses the standard Java thread pool - java.util.concurrent.ThreadPoolExecutor. Its setCorePoolSize() method is used to resize the pool, however size reduction only happens when the extra threads become idle.
    /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle. If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if <tt>corePoolSize</tt>
     * less than zero
     * @see #getCorePoolSize
     */

Could this explain the problems you're having?

Cheers,

Vaclav


On Tue, Mar 22, 2011 at 8:46 PM, Prakash Viswanathan <[hidden email]> wrote:

Hello 


I noticed a problem with DefaultPGroup class of groovyx.gpars.group package.

 

I am using a class similar to the one below and try to update the pool size of the pooledGroup using resize() method which is inherited from PGroup class.

The change in pool size is working when increasing the pool size from 1 to 10 but the reverse is not working, how do I get past this? i.e. I am trying to change the number of consumers for a task in runtime.

 

import java.io.File;

import java.util.Observable;

 

import org.apache.commons.lang.StringUtils;

 

import groovyx.gpars.dataflow.DataFlowQueue;

import groovyx.gpars.dataflow.DataFlowVariable

import groovyx.gpars.group.DefaultPGroup

import static groovyx.gpars.agent.Agent.agent

 

class MyProcessManager implements Observer

{

          def activeTasks = agent(0L)

          def pooledGroup

          int threadCount

          DataFlowQueue<MyProcess> dfq

          MyProcessProducer producer

         

          boolean stop = false

          boolean pause = false

         

          public MyProcessManager(File inputDataFile, int threadCount)

          {

                   pooledGroup = new DefaultPGroup(1)

                   this.threadCount = threadCount

                   dfq = new DataFlowQueue<MyProcess>()

                   producer = new MyProcessProducer( inputDataFile, threadCount, dfq )

                  

                   DynamicMyProcessPropertyUtil dmpu = new DynamicMyProcessPropertyUtil()

                   dmpu.addObserver(this)

                  

                   this.addShutdownHook { if (dmpu) dmpu.deleteObservers() }

          }

         

          public int process()

          {

                   new Thread(producer).start()

                   new MyProcessConsumer().start()

                  

                   def doneFlag = new DataFlowVariable()

                   activeTasks.addListener {oldValue, newValue -> if (newValue==0) doneFlag.bind(true)}

                   if (activeTasks.val > 0) doneFlag.join()

                   pooledGroup.shutdown()

                  

                   return producer.getNumberOfRowsRead()

          }

         

          public void update(Observable obs, Object arg)

          {

                   if (obs instanceof DynamicMyProcessPropertyUtil)

                   {

                             Map map = (Map)arg

                             updateThreadCount map['POOL_SIZE'].toInteger()

                             pause = map['PAUSE'].asBoolean()

                             stop = map['STOP'].asBoolean()

                   }

          }

         

          private void updateThreadCount(int poolSize)

          {

                   if (poolSize > 0 && poolSize != threadCount)

                   {

                             println 'Change Request: ' + poolSize

                             println "Current Thread Count: " + pooledGroup.getPoolSize()

                              threadCount = poolSize

                              pooledGroup.resetDefaultSize()

                              pooledGroup.resize(poolSize)

                              println "Thread Count updated: " + pooledGroup.getPoolSize()

                   }

          }

         

          public class MyProcessConsumer extends Thread

          {

                   @Override

                   public void run()

                   {

                             while (!stop)

                             {

                                      MyProcess process = dfq.val

                                      if(!process){stop = true; break}

                                     

                                      activeTasks << {updateValue it+1}

                                      pooledGroup.task {

                                               

                                                process.run()

                                               

                                                if (pause)

                                                {

                                                          println 'Paused..'

                                                          while(pause)

                                                          {

                                                                   try

                                                                   {

                                                                             Thread.sleep(5000)

                                                                   }

                                                                   catch(InterruptedException e)

                                                                   {

                                                                             Thread.currentThread().interrupt()

                                                                   }

                                                          }

                                                          println 'Resumed..'

                                                }

                                               

                                                activeTasks << {updateValue it-1}

                                      }

                             }

                   }

          }

         

}

 

I am using GPARS 0.11, also tried 0.12 but still same problem.

 

Thanks

Prakash

 




--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech
Reply | Threaded
Open this post in threaded view
|

Re: DefaultPGroup Resize

Prakash Viswanathan
Thanks for the reply Vaclav, 

Looks like I have to create custom abstraction to handle this, I don't see the thread pool size coming down even after two minutes while new jobs are kept added by the producer(I tested by getting the pool size using a timertask after i resize the pool).  

Regards,
Prakash

On Wed, Mar 23, 2011 at 3:05 AM, Vaclav Pech <[hidden email]> wrote:
Hello Prakash,

DefaultPGroup uses the standard Java thread pool - java.util.concurrent.ThreadPoolExecutor. Its setCorePoolSize() method is used to resize the pool, however size reduction only happens when the extra threads become idle.
    /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle. If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if <tt>corePoolSize</tt>
     * less than zero
     * @see #getCorePoolSize
     */

Could this explain the problems you're having?

Cheers,

Vaclav



On Tue, Mar 22, 2011 at 8:46 PM, Prakash Viswanathan <[hidden email]> wrote:

Hello 


I noticed a problem with DefaultPGroup class of groovyx.gpars.group package.

 

I am using a class similar to the one below and try to update the pool size of the pooledGroup using resize() method which is inherited from PGroup class.

The change in pool size is working when increasing the pool size from 1 to 10 but the reverse is not working, how do I get past this? i.e. I am trying to change the number of consumers for a task in runtime.

 

import java.io.File;

import java.util.Observable;

 

import org.apache.commons.lang.StringUtils;

 

import groovyx.gpars.dataflow.DataFlowQueue;

import groovyx.gpars.dataflow.DataFlowVariable

import groovyx.gpars.group.DefaultPGroup

import static groovyx.gpars.agent.Agent.agent

 

class MyProcessManager implements Observer

{

          def activeTasks = agent(0L)

          def pooledGroup

          int threadCount

          DataFlowQueue<MyProcess> dfq

          MyProcessProducer producer

         

          boolean stop = false

          boolean pause = false

         

          public MyProcessManager(File inputDataFile, int threadCount)

          {

                   pooledGroup = new DefaultPGroup(1)

                   this.threadCount = threadCount

                   dfq = new DataFlowQueue<MyProcess>()

                   producer = new MyProcessProducer( inputDataFile, threadCount, dfq )

                  

                   DynamicMyProcessPropertyUtil dmpu = new DynamicMyProcessPropertyUtil()

                   dmpu.addObserver(this)

                  

                   this.addShutdownHook { if (dmpu) dmpu.deleteObservers() }

          }

         

          public int process()

          {

                   new Thread(producer).start()

                   new MyProcessConsumer().start()

                  

                   def doneFlag = new DataFlowVariable()

                   activeTasks.addListener {oldValue, newValue -> if (newValue==0) doneFlag.bind(true)}

                   if (activeTasks.val > 0) doneFlag.join()

                   pooledGroup.shutdown()

                  

                   return producer.getNumberOfRowsRead()

          }

         

          public void update(Observable obs, Object arg)

          {

                   if (obs instanceof DynamicMyProcessPropertyUtil)

                   {

                             Map map = (Map)arg

                             updateThreadCount map['POOL_SIZE'].toInteger()

                             pause = map['PAUSE'].asBoolean()

                             stop = map['STOP'].asBoolean()

                   }

          }

         

          private void updateThreadCount(int poolSize)

          {

                   if (poolSize > 0 && poolSize != threadCount)

                   {

                             println 'Change Request: ' + poolSize

                             println "Current Thread Count: " + pooledGroup.getPoolSize()

                              threadCount = poolSize

                              pooledGroup.resetDefaultSize()

                              pooledGroup.resize(poolSize)

                              println "Thread Count updated: " + pooledGroup.getPoolSize()

                   }

          }

         

          public class MyProcessConsumer extends Thread

          {

                   @Override

                   public void run()

                   {

                             while (!stop)

                             {

                                      MyProcess process = dfq.val

                                      if(!process){stop = true; break}

                                     

                                      activeTasks << {updateValue it+1}

                                      pooledGroup.task {

                                               

                                                process.run()

                                               

                                                if (pause)

                                                {

                                                          println 'Paused..'

                                                          while(pause)

                                                          {

                                                                   try

                                                                   {

                                                                             Thread.sleep(5000)

                                                                   }

                                                                   catch(InterruptedException e)

                                                                   {

                                                                             Thread.currentThread().interrupt()

                                                                   }

                                                          }

                                                          println 'Resumed..'

                                                }

                                               

                                                activeTasks << {updateValue it-1}

                                      }

                             }

                   }

          }

         

}

 

I am using GPARS 0.11, also tried 0.12 but still same problem.

 

Thanks

Prakash

 




--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech

Reply | Threaded
Open this post in threaded view
|

Re: DefaultPGroup Resize

Vaclav
Administrator
Prakash,

I believe the exceeding threads will only get destroyed after there are no tasks in the queue. If the producer keeps tasks coming the threads will never become idle and so will never be removed.
Unfortunately I don't know of a suitable implementation of a truly resizable thread pool. The Fork/Join pool in Java 7 seem even to disallow any explicit size changes.

Cheers,

Vaclav


On Thu, Mar 24, 2011 at 4:46 AM, Prakash Viswanathan <[hidden email]> wrote:
Thanks for the reply Vaclav, 

Looks like I have to create custom abstraction to handle this, I don't see the thread pool size coming down even after two minutes while new jobs are kept added by the producer(I tested by getting the pool size using a timertask after i resize the pool).  

Regards,
Prakash


On Wed, Mar 23, 2011 at 3:05 AM, Vaclav Pech <[hidden email]> wrote:
Hello Prakash,

DefaultPGroup uses the standard Java thread pool - java.util.concurrent.ThreadPoolExecutor. Its setCorePoolSize() method is used to resize the pool, however size reduction only happens when the extra threads become idle.
    /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle. If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if <tt>corePoolSize</tt>
     * less than zero
     * @see #getCorePoolSize
     */

Could this explain the problems you're having?

Cheers,

Vaclav



On Tue, Mar 22, 2011 at 8:46 PM, Prakash Viswanathan <[hidden email]> wrote:

Hello 


I noticed a problem with DefaultPGroup class of groovyx.gpars.group package.

 

I am using a class similar to the one below and try to update the pool size of the pooledGroup using resize() method which is inherited from PGroup class.

The change in pool size is working when increasing the pool size from 1 to 10 but the reverse is not working, how do I get past this? i.e. I am trying to change the number of consumers for a task in runtime.

 

import java.io.File;

import java.util.Observable;

 

import org.apache.commons.lang.StringUtils;

 

import groovyx.gpars.dataflow.DataFlowQueue;

import groovyx.gpars.dataflow.DataFlowVariable

import groovyx.gpars.group.DefaultPGroup

import static groovyx.gpars.agent.Agent.agent

 

class MyProcessManager implements Observer

{

          def activeTasks = agent(0L)

          def pooledGroup

          int threadCount

          DataFlowQueue<MyProcess> dfq

          MyProcessProducer producer

         

          boolean stop = false

          boolean pause = false

         

          public MyProcessManager(File inputDataFile, int threadCount)

          {

                   pooledGroup = new DefaultPGroup(1)

                   this.threadCount = threadCount

                   dfq = new DataFlowQueue<MyProcess>()

                   producer = new MyProcessProducer( inputDataFile, threadCount, dfq )

                  

                   DynamicMyProcessPropertyUtil dmpu = new DynamicMyProcessPropertyUtil()

                   dmpu.addObserver(this)

                  

                   this.addShutdownHook { if (dmpu) dmpu.deleteObservers() }

          }

         

          public int process()

          {

                   new Thread(producer).start()

                   new MyProcessConsumer().start()

                  

                   def doneFlag = new DataFlowVariable()

                   activeTasks.addListener {oldValue, newValue -> if (newValue==0) doneFlag.bind(true)}

                   if (activeTasks.val > 0) doneFlag.join()

                   pooledGroup.shutdown()

                  

                   return producer.getNumberOfRowsRead()

          }

         

          public void update(Observable obs, Object arg)

          {

                   if (obs instanceof DynamicMyProcessPropertyUtil)

                   {

                             Map map = (Map)arg

                             updateThreadCount map['POOL_SIZE'].toInteger()

                             pause = map['PAUSE'].asBoolean()

                             stop = map['STOP'].asBoolean()

                   }

          }

         

          private void updateThreadCount(int poolSize)

          {

                   if (poolSize > 0 && poolSize != threadCount)

                   {

                             println 'Change Request: ' + poolSize

                             println "Current Thread Count: " + pooledGroup.getPoolSize()

                              threadCount = poolSize

                              pooledGroup.resetDefaultSize()

                              pooledGroup.resize(poolSize)

                              println "Thread Count updated: " + pooledGroup.getPoolSize()

                   }

          }

         

          public class MyProcessConsumer extends Thread

          {

                   @Override

                   public void run()

                   {

                             while (!stop)

                             {

                                      MyProcess process = dfq.val

                                      if(!process){stop = true; break}

                                     

                                      activeTasks << {updateValue it+1}

                                      pooledGroup.task {

                                               

                                                process.run()

                                               

                                                if (pause)

                                                {

                                                          println 'Paused..'

                                                          while(pause)

                                                          {

                                                                   try

                                                                   {

                                                                             Thread.sleep(5000)

                                                                   }

                                                                   catch(InterruptedException e)

                                                                   {

                                                                             Thread.currentThread().interrupt()

                                                                   }

                                                          }

                                                          println 'Resumed..'

                                                }

                                               

                                                activeTasks << {updateValue it-1}

                                      }

                             }

                   }

          }

         

}

 

I am using GPARS 0.11, also tried 0.12 but still same problem.

 

Thanks

Prakash

 




--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech




--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech