Some advice on AbstractForkJoinWorker please

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Some advice on AbstractForkJoinWorker please

apnakon
This post has NOT been accepted by the mailing list yet.
Hi Guys,

I've used the Actor system in GPars before with great success, but on this occasion I'm trying to solve a problem with a different approach (which is a little foreign to me but seems worth exploring) and would greatly appreciate any input.


The Problem
--------------
Take a list of thousands of IP Addresses and ping each one.  Look at the result send it back to a result set.  The idea is to break this work up over a number of parallel tasks so that the whole piece of work gets completed much faster than if done sequentially.


The Options
-------------
I could use Actors, and this is still very much an option that I have experience with etc.
However, I was wondering if I could use an AbstractForkJoinWorker  solution instead.


Current Thinking
------------------

I have written the code to take a raw CISCO input file to extract the IP Addresses and break these addresses up into Lists of IP Addresses for processing.  What I was trying to do was then use the GPars fork/join support to fire off an individual thread to handle each list.  This thread would then process the IP's and return a List of IP's with 'OK' or 'BAD' appended to each IP entry.  I would then add this sub list to a master list and use that for further processing work.

However, I haven't been successful yet in getting the parallelism to work.  I'm sure I've made mistakes here so what I've done is pasted below the code I have written and would appreciate any help / guidance in getting the parallel processing working correctly.

Many Thanks,

Adrian.

------

First - the calling  code :


package ip

import control.ControlParams
import groovyx.gpars.forkjoin.AbstractForkJoinWorker

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

/**
 *
 * User: ANakon <br>
 * Date: 19/02/13 <br>
 * Time: 13:28 <br>
 * <p>
 *
 * @author
 */
public class RunParallelTasks extends AbstractForkJoinWorker<List<String>>
{

  public RunParallelTasks(ControlParams params,
                       IPContainer fileIPContents)
  {

    List<String> finalResults = new ArrayList<String>()

    //
    // We have loaded up the input data file.  Lets start testing the IP Addresses we have in
    //  fileIPContents.
    //

    int counter = 1;

    withPool(10) { pool ->  //feel free to experiment with the number of fork/join threads in the pool

      //
      // Lets loop through the available lists and pass them thru to runForkJoin
      //

      runForkJoin(fileIPContents.getIpAddressLists()) { item ->

        // Esach item here is a list of IP's
        for (it in item)
        {
          println "starting : " + counter;
          counter++

          forkOffChild(new ParallelProcessIPs(it));

        }


      }

      finalResults.addAll(getChildrenResults() as List<String>);

    } as List<String>


  //  return finalResults;
  }

  @Override
  protected List<String> computeTask()
  {
  }


}


Next, the called code

package ip

import groovyx.gpars.forkjoin.AbstractForkJoinWorker

/**
 *
 * User: ANakon <br>
 * Date: 19/02/13 <br>
 * Time: 11:09 <br>
 * <p>
 *
 * @author  A. Nakon.
 */
public final class ParallelProcessIPs extends AbstractForkJoinWorker<List<String>>
{
  private List<String> ipList;

  def ParallelProcessIPs(final List<String> ipList)
  {
    this.ipList = ipList
  }

  @Override
  protected List<String> computeTask()
  {
    long count = 0;

    List<String> resultList = new ArrayList<>();

    String postPend;

    ipList.each {

      //
      // What we need to do now is do the ping of the IP Address and return the result.
      //

      String command = "ping " + it;

      String result = command.execute().text;

      println result;

      if (result.contains('Reply'))
      {
        postPend = " ok"
      }
      else
      {
        postPend = " bad"
      }

      resultList.add(it + postPend)

//      if (it.isDirectory())
//      {
//        println "Forking a thread for $it"
//        forkOffChild(new ParallelProcessIPs(it))           //fork a child task
//      } else
//      {
//        count++
//      }
    }
    return resultList// + ((childrenResults)?.sum() ?: 0)  //use results of children tasks to calculate and store own result
  }
}

//withPool(1) {pool ->  //feel free to experiment with the number of fork/join threads in the pool
//  println "Number of files: ${runForkJoin(new ParallelProcessIPs(new File("..")))}"
//}
//


================================
Reply | Threaded
Open this post in threaded view
|

Re: Some advice on AbstractForkJoinWorker please

apnakon
This post has NOT been accepted by the mailing list yet.
Hi There,

I think I have it sorted by using :

    fileIPContents.getIpAddressLists().eachParallel {finalResults.addAll(new ParallelProcessIPs(it) as String[])


If this isn't generally the right approach then please let me know.

Thanks,

Adrian.