Managing upper limit of Queues?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Managing upper limit of Queues?

klausb
I've constructed a simple example with two operators (each maxForks:2), that receive an int via a DataFlowBroadcast-readChannel. They both make simple math and send the result to a DataFlowQueue, in which the value is printed inside an endless while loop.

The producer pumps a lot of numbers into the broadcast channel. At some after 5000 insertions the program stops with a runtime error OutOfMemory Java Heap Size. Can someone explain?

Here is the code:
import groovyx.gpars.dataflow.DataFlowBroadcast
import groovyx.gpars.dataflow.DataFlowQueue
import static groovyx.gpars.dataflow.DataFlow.operator
import static groovyx.gpars.dataflow.DataFlow.task

def b = new DataFlowQueue()
def a0 = new DataFlowBroadcast()

operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val ->
  bindOutput "$val[*]: ${val*val}"
}

operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val ->
  bindOutput "$val[+]: ${val+val}"
}

task {
  int i = 1
  while (true) {
    println "${i++}: $b.val"
  }
}

for (i in 1..1000000) {
  a0 << i
}

So maybe I can answer this myself.

It looks like the insertion does not block on an upper limit and creates the error, since the consumption is slower than the production.

Can I control the incoming rate by setting an upper limit or can I check the number of elements currently in the channel before adding a new one?

klaus.
Reply | Threaded
Open this post in threaded view
|

Re: Managing upper limit of Queues?

Vaclav
Administrator
Hello Klaus,

I'm glad to see you've started your gpars experiments so quickly :)
The DataflowQueue class (notice the capitalization change in GPars 0.12) has a length() method, however, DataflowBroadcast does not, since its nature (multiple anonymous subscriptions, linked data structure) would make such a method very inefficient.
I managed come up with two solutions usable right away plus one solution requiring further effort on our side.

1. Replacing DataflowBroadcast with two DataflowQueues and a Splitter:

mport static groovyx.gpars.dataflow.Dataflow.operator
import static groovyx.gpars.dataflow.Dataflow.task
import groovyx.gpars.dataflow.DataflowBroadcast
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.splitter

def b = new DataflowQueue()
def a0 = new DataflowQueue()
def a1 = new DataflowQueue()
def a2 = new DataflowQueue()

MAX=50

splitter(a0, [a1, a2])
operator([inputs: [a1], outputs: [b], maxForks: 4]) { val ->
    while(b.length()>MAX) Thread.yield()
    bindOutput "$val[*]: ${val * val}"
}

operator([inputs: [a2], outputs: [b], maxForks: 4]) { val ->
    while(b.length()>MAX) Thread.yield()
    bindOutput "$val[+]: ${val + val}"
}

task {
    int i = 1
    while (true) {
        println "${i++}: $b.val"
    }
}

for (i in 1..10000000) {
    a0 << i
    while ([a0.length(), a1.length(), a2.length()].max()>MAX) Thread.yield()
}

I don't like the fact we're accessing the b channel directly from within the operator's body, but couldn't find a better way.

2. Using confirmation messages going backwards to approve sending the next batch of values.

import groovyx.gpars.dataflow.DataflowBroadcast
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.operator
import static groovyx.gpars.dataflow.Dataflow.task

def b = new DataflowQueue()
def c = new DataflowQueue()
def a0 = new DataflowBroadcast()

operator([inputs: [a0.createReadChannel()], outputs: [b], maxForks: 2]) { val ->
    if (val == -1) bindOutput val
    else bindOutput "$val[*]: ${val * val}"
}

operator([inputs: [a0.createReadChannel()], outputs: [b], maxForks: 2]) { val ->
    if (val == -1) bindOutput val
    else bindOutput "$val[+]: ${val + val}"
}

task {
    int i = 1
    while (true) {
        def value = b.val
        if (value == -1) c << value
        else println "${i++}: $value "
    }
}

for (i in 1..10000) {
    a0 << -1
    100.times {
        a0 << i
    }
    c.val
    c.val
}

I like this one much better.

3. The best solution, in my opinion, would be to bridge the last remaining gap between GPars dataflow processes and proper CSP - enable synchronous communication. When communication is synchronous, parties cannot depart too far from one another, since producers have to wait for consumers to read each value. This is already on our long-term agenda - http://jira.codehaus.org/browse/GPARS-121

I hope this wrap up will help you overcome the difficulties.

Regards,

Vaclav


On Fri, May 20, 2011 at 4:49 PM, klausb <[hidden email]> wrote:
I've constructed a simple example with two operators (each maxForks:2), that
receive an int via a DataFlowBroadcast-readChannel. They both make simple
math and send the result to a DataFlowQueue, in which the value is printed
inside an endless while loop.

The producer pumps a lot of numbers into the broadcast channel. At some
after 5000 insertions the program stops with a runtime error OutOfMemory
Java Heap Size. Can someone explain?

Here is the code:

import groovyx.gpars.dataflow.DataFlowBroadcast
import groovyx.gpars.dataflow.DataFlowQueue
import static groovyx.gpars.dataflow.DataFlow.operator
import static groovyx.gpars.dataflow.DataFlow.task

def b = new DataFlowQueue()
def a0 = new DataFlowBroadcast()

operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val
->
 bindOutput "$val[*]: ${val*val}"
}

operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val
->
 bindOutput "$val[+]: ${val+val}"
}

task {
 int i = 1
 while (true) {
   println "${i++}: $b.val"
 }
}

for (i in 1..1000000) {
 a0 << i
}


So maybe I can answer this myself.

It looks like the insertion does not block on an upper limit and creates the
error, since the consumption is slower than the production.

Can I control the incoming rate by setting an upper limit or can I check the
number of elements currently in the channel before adding a new one?

klaus.

--
View this message in context: http://gpars-user-mailing-list.19372.n3.nabble.com/Managing-upper-limit-of-Queues-tp2965865p2965865.html
Sent from the GPars - user mailing list mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

   http://xircles.codehaus.org/manage_email





--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech
Reply | Threaded
Open this post in threaded view
|

Re: Managing upper limit of Queues?

klausb
This post was updated on .
Hi Vaclav,

yes I got infected right away.

I looked at your proposals and got them running with Groovy 1.8.
Still I have a few questions/comments:

To solution 1:
Does this solution require to check the length of b inside the operator at all? I wonder because the producer checks this already and will make sure that none of the pipelines get larger.

To solution 2:
A slight drawback is, that all operators on the way need to know about this special token, which introduces extra checking on the input.
Hard to grasp is the double c.val statement at the end, which (I guess) effectively blocks the loop until the acknowledge is received. Is that correct?

Anyway, thanks a lot for your proposals. I will definitely continue working with GPars.

- klaus.
Reply | Threaded
Open this post in threaded view
|

Re: Managing upper limit of Queues?

Vaclav
Administrator
Hi Klaus,


To solution 1:
Does this solution require to check the length of b inside the operator at
all? I wonder because the producer checks this already and will make sure
that none of the pipelines get larger.


Yes, you are correct. If the producer checks the length of channel b together with the other three channels, there should be no need to do the checks inside the operators. The code looks much better without the checks :) 
 
To solution 2:
A slight drawback is, that all operators on the way need to know about this
special token, which introduces extra checking on the input.

I agree, this is not very pretty.
 
Hard to grasp is the double c.val statement at the end, which (I guess)
effectively blocks the loop until the acknowledge is received. Is that
correct?


Yes, we need to wait for the special token from both operators, before we submit the next batch of messages. Perhaps we could come up with a more efficient acknowledgement protocol, maybe even using the worker messages and adding some sort of tokens or ids to their ordinary work load.

Regards,

Vaclav


--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech