Recursive map/reduce?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Recursive map/reduce?

strayph
I was wondering if it was possible, and if there were examples of doing it.

Maybe something like:

println withPool(threadCount){
    files.collect { root + it }.parallel.map {
        def data = process(it)
        withPool(subThreadCount){
            data.parallel.map {
                recalculate(it)
            }.reduce { a,b -> smoosh(a,b) }
        }.reduce { a, b ->
            big_merge(a,b)
        }
    }
}

Is this wrong?  

Can the inner block not use 'withPool' and would it then just use any free threads in the containing block?

Are there either simpler or faster ways of doing this?

Thanks,

Strayph


Reply | Threaded
Open this post in threaded view
|

Re: Recursive map/reduce?

Vaclav
Administrator
The code is correct and in fact used exactly as intented.
If you wanted to reuse the outer pool in the inner calculation, you can go like this:

println withPool(threadCount){pool ->
   files.collect { root + it }.parallel.map {
       def data = process(it)
       withExistingPool(pool){
           data.parallel.map {
               recalculate(it)
           }.reduce { a,b -> smoosh(a,b) }
       }.reduce { a, b ->
           big_merge(a,b)
       }
   }
}

However, I have to warn you. One of the reasons you can't use the outer pool for the inner calculation automatically is that you risk running out of threads and halting your calculation. The outer calculation waits for the inner calculation to finish blocking its thread. If the pool is not resizeable or big enough at some point there might be no thread available in the pool to run your inner calculation.

Cheers,

Vaclav



On Tue, May 3, 2011 at 11:59 PM, strayph <[hidden email]> wrote:
I was wondering if it was possible, and if there were examples of doing it.

Maybe something like:

println withPool(threadCount){
   files.collect { root + it }.parallel.map {
       def data = process(it)
       withPool(subThreadCount){
           data.parallel.map {
               recalculate(it)
           }.reduce { a,b -> smoosh(a,b) }
       }.reduce { a, b ->
           big_merge(a,b)
       }
   }
}

Is this wrong?

Can the inner block not use 'withPool' and would it then just use any free
threads in the containing block?

Are there either simpler or faster ways of doing this?

Thanks,

Strayph




--
View this message in context: http://gpars-user-mailing-list.19372.n3.nabble.com/Recursive-map-reduce-tp2896277p2896277.html
Sent from the GPars - user mailing list mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

   http://xircles.codehaus.org/manage_email





--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech
Reply | Threaded
Open this post in threaded view
|

Re: Recursive map/reduce?

strayph
Hm...I totally understand.

What I wonder about is the fact that I am recreating a new pool each time in the inner block.

Perhaps there should be a way to preallocate a pool, then use some count of threads for the outer, and the rest for the inner?

println withPool(threadCount, threadCount / 2){pool ->
   files.collect { root + it }.parallel.map {
       def data = process(it)
       withExistingPool(pool){
           data.parallel.map {
               recalculate(it)
           }.reduce { a,b -> smoosh(a,b) }
       }.reduce { a, b ->
           big_merge(a,b)
       }
   }
}

This way the inner block just uses the "remainder" thread count from the outer pool.

You could also possibly subdivide it further as desired.

Strayph

On May 3, 2011, at 10:36 PM, Vaclav [via GPars - user mailing list] wrote:

The code is correct and in fact used exactly as intented.
If you wanted to reuse the outer pool in the inner calculation, you can go like this:

println withPool(threadCount){pool ->
   files.collect { root + it }.parallel.map {
       def data = process(it)
       withExistingPool(pool){
           data.parallel.map {
               recalculate(it)
           }.reduce { a,b -> smoosh(a,b) }
       }.reduce { a, b ->
           big_merge(a,b)
       }
   }
}

However, I have to warn you. One of the reasons you can't use the outer pool for the inner calculation automatically is that you risk running out of threads and halting your calculation. The outer calculation waits for the inner calculation to finish blocking its thread. If the pool is not resizeable or big enough at some point there might be no thread available in the pool to run your inner calculation.

Cheers,

Vaclav



On Tue, May 3, 2011 at 11:59 PM, strayph <<a href="x-msg://723/user/SendEmail.jtp?type=node&amp;node=2897484&amp;i=0&amp;by-user=t" target="_top" rel="nofollow" link="external">[hidden email]> wrote:
I was wondering if it was possible, and if there were examples of doing it.

Maybe something like:

println withPool(threadCount){
   files.collect { root + it }.parallel.map {
       def data = process(it)
       withPool(subThreadCount){
           data.parallel.map {
               recalculate(it)
           }.reduce { a,b -> smoosh(a,b) }
       }.reduce { a, b ->
           big_merge(a,b)
       }
   }
}

Is this wrong?

Can the inner block not use 'withPool' and would it then just use any free
threads in the containing block?

Are there either simpler or faster ways of doing this?

Thanks,

Strayph




--
View this message in context: http://gpars-user-mailing-list.19372.n3.nabble.com/Recursive-map-reduce-tp2896277p2896277.html
Sent from the GPars - user mailing list mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

   http://xircles.codehaus.org/manage_email





--
E-mail: <a href="x-msg://723/user/SendEmail.jtp?type=node&amp;node=2897484&amp;i=1&amp;by-user=t" target="_top" rel="nofollow" link="external">[hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech



If you reply to this email, your message will be added to the discussion below:
http://gpars-user-mailing-list.19372.n3.nabble.com/Recursive-map-reduce-tp2896277p2897484.html
To unsubscribe from Recursive map/reduce?, click here.

Reply | Threaded
Open this post in threaded view
|

Re: Recursive map/reduce?

Vaclav
Administrator
You can't really get more granular than a single thread pool, I'm afraid. What you can do, though, is to create a separate thread pool up-front and re-use it in the inner calculations. Just bear in mind the thread pool will be serving all the inner calculations. And since they may run in parallel they will all demand threads at the same time:

def innerPool = Executors.newFixedSizeThreadPool(innerThreadCount)

println withPool(outerThreadCount){
  files.collect { root + it }.parallel.map {
      def data = process(it)
      withExistingPool(innerPool){
          data.parallel.map {
              recalculate(it)
          }.reduce { a,b -> smoosh(a,b) }
      }.reduce { a, b ->
          big_merge(a,b)
      }
  }
}

On Wed, May 4, 2011 at 5:51 PM, strayph <[hidden email]> wrote:
Hm...I totally understand.

What I wonder about is the fact that I am recreating a new pool each time in the inner block.

Perhaps there should be a way to preallocate a pool, then use some count of threads for the outer, and the rest for the inner?

println withPool(threadCount, threadCount / 2){pool ->
  files.collect { root + it }.parallel.map {
      def data = process(it)
      withExistingPool(pool){
          data.parallel.map {
              recalculate(it)
          }.reduce { a,b -> smoosh(a,b) }
      }.reduce { a, b ->
          big_merge(a,b)
      }
  }
}

This way the inner block just uses the "remainder" thread count from the outer pool.

You could also possibly subdivide it further as desired.

Strayph

On May 3, 2011, at 10:36 PM, Vaclav [via GPars - user mailing list] wrote:

> The code is correct and in fact used exactly as intented.
> If you wanted to reuse the outer pool in the inner calculation, you can go like this:
>
> println withPool(threadCount){pool ->
>    files.collect { root + it }.parallel.map {
>        def data = process(it)
>        withExistingPool(pool){
>            data.parallel.map {
>                recalculate(it)
>            }.reduce { a,b -> smoosh(a,b) }
>        }.reduce { a, b ->
>            big_merge(a,b)
>        }
>    }
> }
>
> However, I have to warn you. One of the reasons you can't use the outer pool for the inner calculation automatically is that you risk running out of threads and halting your calculation. The outer calculation waits for the inner calculation to finish blocking its thread. If the pool is not resizeable or big enough at some point there might be no thread available in the pool to run your inner calculation.
>
> Cheers,
>
> Vaclav
>
>
>
> On Tue, May 3, 2011 at 11:59 PM, strayph <[hidden email]> wrote:
> I was wondering if it was possible, and if there were examples of doing it.
>
> Maybe something like:
>
> println withPool(threadCount){
>    files.collect { root + it }.parallel.map {
>        def data = process(it)
>        withPool(subThreadCount){
>            data.parallel.map {
>                recalculate(it)
>            }.reduce { a,b -> smoosh(a,b) }
>        }.reduce { a, b ->
>            big_merge(a,b)
>        }
>    }
> }
>
> Is this wrong?
>
> Can the inner block not use 'withPool' and would it then just use any free
> threads in the containing block?
>
> Are there either simpler or faster ways of doing this?
>
> Thanks,
>
> Strayph
>
>
>
>
> --
> View this message in context: http://gpars-user-mailing-list.19372.n3.nabble.com/Recursive-map-reduce-tp2896277p2896277.html
> Sent from the GPars - user mailing list mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe from this list, please visit:
>
>    http://xircles.codehaus.org/manage_email
>
>
>
>
>
> --
> E-mail: [hidden email]
> If you reply to this email, your message will be added to the discussion below:
> http://gpars-user-mailing-list.19372.n3.nabble.com/Recursive-map-reduce-tp2896277p2897484.html
> To unsubscribe from Recursive map/reduce?, click here.



--
View this message in context: http://gpars-user-mailing-list.19372.n3.nabble.com/Recursive-map-reduce-tp2896277p2899378.html
Sent from the GPars - user mailing list mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

   http://xircles.codehaus.org/manage_email





--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech