I've constructed a simple example with two operators (each maxForks:2), that receive an int via a DataFlowBroadcast-readChannel. They both make simple math and send the result to a DataFlowQueue, in which the value is printed inside an endless while loop.
The producer pumps a lot of numbers into the broadcast channel. At some after 5000 insertions the program stops with a runtime error OutOfMemory Java Heap Size. Can someone explain? Here is the code: import groovyx.gpars.dataflow.DataFlowBroadcast import groovyx.gpars.dataflow.DataFlowQueue import static groovyx.gpars.dataflow.DataFlow.operator import static groovyx.gpars.dataflow.DataFlow.task def b = new DataFlowQueue() def a0 = new DataFlowBroadcast() operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val -> bindOutput "$val[*]: ${val*val}" } operator([inputs:[a0.createReadChannel()], outputs:[b], maxForks:2]) { val -> bindOutput "$val[+]: ${val+val}" } task { int i = 1 while (true) { println "${i++}: $b.val" } } for (i in 1..1000000) { a0 << i } So maybe I can answer this myself. It looks like the insertion does not block on an upper limit and creates the error, since the consumption is slower than the production. Can I control the incoming rate by setting an upper limit or can I check the number of elements currently in the channel before adding a new one? klaus. |
Administrator
|
Hello Klaus,
I'm glad to see you've started your gpars experiments so quickly :) The DataflowQueue class (notice the capitalization change in GPars 0.12) has a length() method, however, DataflowBroadcast does not, since its nature (multiple anonymous subscriptions, linked data structure) would make such a method very inefficient.
I managed come up with two solutions usable right away plus one solution requiring further effort on our side. 1. Replacing DataflowBroadcast with two DataflowQueues and a Splitter: mport static groovyx.gpars.dataflow.Dataflow.operator import static groovyx.gpars.dataflow.Dataflow.task import groovyx.gpars.dataflow.DataflowBroadcast import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.splitter def b = new DataflowQueue() def a0 = new DataflowQueue() def a1 = new DataflowQueue() def a2 = new DataflowQueue()
MAX=50 splitter(a0, [a1, a2]) operator([inputs: [a1], outputs: [b], maxForks: 4]) { val -> while(b.length()>MAX) Thread.yield() bindOutput "$val[*]: ${val * val}"
} operator([inputs: [a2], outputs: [b], maxForks: 4]) { val -> while(b.length()>MAX) Thread.yield() bindOutput "$val[+]: ${val + val}" }
task { int i = 1 while (true) { println "${i++}: $b.val" } } for (i in 1..10000000) {
a0 << i while ([a0.length(), a1.length(), a2.length()].max()>MAX) Thread.yield() } I don't like the fact we're accessing the b channel directly from within the operator's body, but couldn't find a better way.
2. Using confirmation messages going backwards to approve sending the next batch of values. import groovyx.gpars.dataflow.DataflowBroadcast import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.operator import static groovyx.gpars.dataflow.Dataflow.task def b = new DataflowQueue() def c = new DataflowQueue()
def a0 = new DataflowBroadcast() operator([inputs: [a0.createReadChannel()], outputs: [b], maxForks: 2]) { val -> if (val == -1) bindOutput val else bindOutput "$val[*]: ${val * val}"
} operator([inputs: [a0.createReadChannel()], outputs: [b], maxForks: 2]) { val -> if (val == -1) bindOutput val else bindOutput "$val[+]: ${val + val}"
} task { int i = 1 while (true) { def value = b.val if (value == -1) c << value else println "${i++}: $value "
} } for (i in 1..10000) { a0 << -1 100.times { a0 << i } c.val c.val
} I like this one much better. 3. The best solution, in my opinion, would be to bridge the last remaining gap between GPars dataflow processes and proper CSP - enable synchronous communication. When communication is synchronous, parties cannot depart too far from one another, since producers have to wait for consumers to read each value. This is already on our long-term agenda - http://jira.codehaus.org/browse/GPARS-121
I hope this wrap up will help you overcome the difficulties. Regards, Vaclav On Fri, May 20, 2011 at 4:49 PM, klausb <[hidden email]> wrote: I've constructed a simple example with two operators (each maxForks:2), that -- E-mail: [hidden email] Blog: http://www.jroller.com/vaclav Linkedin page: http://www.linkedin.com/in/vaclavpech |
This post was updated on .
Hi Vaclav,
yes I got infected right away. I looked at your proposals and got them running with Groovy 1.8. Still I have a few questions/comments: To solution 1: Does this solution require to check the length of b inside the operator at all? I wonder because the producer checks this already and will make sure that none of the pipelines get larger. To solution 2: A slight drawback is, that all operators on the way need to know about this special token, which introduces extra checking on the input. Hard to grasp is the double c.val statement at the end, which (I guess) effectively blocks the loop until the acknowledge is received. Is that correct? Anyway, thanks a lot for your proposals. I will definitely continue working with GPars. - klaus. |
Administrator
|
Hi Klaus,
Yes, you are correct. If the producer checks the length of channel b together with the other three channels, there should be no need to do the checks inside the operators. The code looks much better without the checks :)
To solution 2: I agree, this is not very pretty. Hard to grasp is the double c.val statement at the end, which (I guess) Yes, we need to wait for the special token from both operators, before we submit the next batch of messages. Perhaps we could come up with a more efficient acknowledgement protocol, maybe even using the worker messages and adding some sort of tokens or ids to their ordinary work load.
Regards, Vaclav -- E-mail: [hidden email] Blog: http://www.jroller.com/vaclav Linkedin page: http://www.linkedin.com/in/vaclavpech |
Free forum by Nabble | Edit this page |