Does eachParallel() always consume all elements from iterator

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Does eachParallel() always consume all elements from iterator

Martin Jones-2
Hi,

I have a file containing a large number of records which I want to
process in parallel.  Something along these lines would work fine:

List myRecords = myFile.getText().split(/record_separator/)
Parallelizer.doParallel() {
     def results = myRecords.collectParallel{
          processRecord(it)
     }
}

The problem is that this approach requires that all the records are
loaded into memory, and all the return values are loaded into memory,
at the same time.  This requires too much memory, as the records and
results are very big. What I would like to do is wrap the input file
in a custom object, which has an Iterator method that reads records
from the file one by one, then call collectParallel on this object.
This code works (GenBankFile is a wrapper class that takes care of
splitting up the input file and keeping track of progress):

class RecordSource{
    private GenBankFile myFile = new GenBankFile(new
File("/home/martin/gbfiles/mits.gb"))
    private currentRecord
    Iterator iterator(){
        return [
            hasNext: {
                currentRecord = myFile.getNextRecord()
                return (currentRecord != null)
            },
            next: {
                println "done " + myFile.getPercentDone() + "%"
                return currentRecord
            }
        ] as Iterator
    }
}

def mySource = new RecordSource
Parallelizer.doParallel() {
     def results = RecordSource.collectParallel{
           processRecord(it)
     }
}

But it does not quite do what I want - it seems that collectParallel()
first consumes all the elements from the iterator, and then carries
out the closure.  Is this always the behaviour of collectParallel?
And is there any way to modify the behaviour to do what I want?
Possibly the best approach is to use Actors, where one load-balancing
actor is responsible for reading the records sequentially from the
file.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: Does eachParallel() always consume all elements from iterator

Vaclav
Administrator
Hi Martin,

you're correct, the ParallelArray-based parallel collection processing algorithms assume all data is in memory, so that a tree-like structure can be built in memory and then executed concurrently. The same happens with iterators.
A possible solution could be either to split your data yourself into reasonably big chunks, which then you iteratively call collectParallel() on, or, as you suggest, us different abstractions, such as actors or dataflow concurrency.
Thinking about minimizing the potential overhead, I would probably favor the former approach with iterative collectParallel().

Do you think this would work?

Vaclav




On Wed, Apr 14, 2010 at 1:52 PM, Martin Jones <[hidden email]> wrote:
Hi,

I have a file containing a large number of records which I want to
process in parallel.  Something along these lines would work fine:

List myRecords = myFile.getText().split(/record_separator/)
Parallelizer.doParallel() {
    def results = myRecords.collectParallel{
         processRecord(it)
    }
}

The problem is that this approach requires that all the records are
loaded into memory, and all the return values are loaded into memory,
at the same time.  This requires too much memory, as the records and
results are very big. What I would like to do is wrap the input file
in a custom object, which has an Iterator method that reads records
from the file one by one, then call collectParallel on this object.
This code works (GenBankFile is a wrapper class that takes care of
splitting up the input file and keeping track of progress):

class RecordSource{
   private GenBankFile myFile = new GenBankFile(new
File("/home/martin/gbfiles/mits.gb"))
   private currentRecord
   Iterator iterator(){
       return [
           hasNext: {
               currentRecord = myFile.getNextRecord()
               return (currentRecord != null)
           },
           next: {
               println "done " + myFile.getPercentDone() + "%"
               return currentRecord
           }
       ] as Iterator
   }
}

def mySource = new RecordSource
Parallelizer.doParallel() {
    def results = RecordSource.collectParallel{
          processRecord(it)
    }
}

But it does not quite do what I want - it seems that collectParallel()
first consumes all the elements from the iterator, and then carries
out the closure.  Is this always the behaviour of collectParallel?
And is there any way to modify the behaviour to do what I want?
Possibly the best approach is to use Actors, where one load-balancing
actor is responsible for reading the records sequentially from the
file.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

   http://xircles.codehaus.org/manage_email





--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech
Reply | Threaded
Open this post in threaded view
|

Re: Does eachParallel() always consume all elements from iterator

Martin Jones-6
Hi Vaclav,

I agree, I think that iterative collectParallel() would work nicely -
I think it should be fewer lines of code that way, and could all be
handled from a single thread.  As a bonus, it would probably be fine
to update the progress counter after every chunk, so that part of the
code need not be in the collectParallel() call.

I'm just disappointed that I don't get to use my cool custom iterator code :-)

Martin



On 15 April 2010 09:14, Vaclav Pech <[hidden email]> wrote:

> Hi Martin,
>
> you're correct, the ParallelArray-based parallel collection processing
> algorithms assume all data is in memory, so that a tree-like structure can
> be built in memory and then executed concurrently. The same happens with
> iterators.
> A possible solution could be either to split your data yourself into
> reasonably big chunks, which then you iteratively call collectParallel() on,
> or, as you suggest, us different abstractions, such as actors or dataflow
> concurrency.
> Thinking about minimizing the potential overhead, I would probably favor the
> former approach with iterative collectParallel().
>
> Do you think this would work?
>
> Vaclav
>
>
>
>
> On Wed, Apr 14, 2010 at 1:52 PM, Martin Jones <[hidden email]> wrote:
>>
>> Hi,
>>
>> I have a file containing a large number of records which I want to
>> process in parallel.  Something along these lines would work fine:
>>
>> List myRecords = myFile.getText().split(/record_separator/)
>> Parallelizer.doParallel() {
>>     def results = myRecords.collectParallel{
>>          processRecord(it)
>>     }
>> }
>>
>> The problem is that this approach requires that all the records are
>> loaded into memory, and all the return values are loaded into memory,
>> at the same time.  This requires too much memory, as the records and
>> results are very big. What I would like to do is wrap the input file
>> in a custom object, which has an Iterator method that reads records
>> from the file one by one, then call collectParallel on this object.
>> This code works (GenBankFile is a wrapper class that takes care of
>> splitting up the input file and keeping track of progress):
>>
>> class RecordSource{
>>    private GenBankFile myFile = new GenBankFile(new
>> File("/home/martin/gbfiles/mits.gb"))
>>    private currentRecord
>>    Iterator iterator(){
>>        return [
>>            hasNext: {
>>                currentRecord = myFile.getNextRecord()
>>                return (currentRecord != null)
>>            },
>>            next: {
>>                println "done " + myFile.getPercentDone() + "%"
>>                return currentRecord
>>            }
>>        ] as Iterator
>>    }
>> }
>>
>> def mySource = new RecordSource
>> Parallelizer.doParallel() {
>>     def results = RecordSource.collectParallel{
>>           processRecord(it)
>>     }
>> }
>>
>> But it does not quite do what I want - it seems that collectParallel()
>> first consumes all the elements from the iterator, and then carries
>> out the closure.  Is this always the behaviour of collectParallel?
>> And is there any way to modify the behaviour to do what I want?
>> Possibly the best approach is to use Actors, where one load-balancing
>> actor is responsible for reading the records sequentially from the
>> file.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe from this list, please visit:
>>
>>    http://xircles.codehaus.org/manage_email
>>
>>
>
>
>
> --
> E-mail: [hidden email]
> Blog: http://www.jroller.com/vaclav
> Linkedin page: http://www.linkedin.com/in/vaclavpech
>

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email