Controlling concurrency

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Controlling concurrency

ddimitrov
This post was updated on .

I am trying to use GPars dataflow as a base for log-mining DSL. The log files are typically between 1 and 5Gb and the processing includes some rough line-based filtering, then reconstituting the multiline records, and then fine-grained filtering by records.

Here is some example code:

new File(rootDir).canonicalFile.eachFileRecurse(FILES) { file ->
    if (!(file.absolutePath =~ processes)) return

    def logName = file.parentFile.parentFile.name
    new File(outDir, "${logName}.log").withPrintWriter { out ->
        def currentMessageBuffer    = new StringBuilder()

        // define operations (closures curried down to single argument)
        def beforeInjectionComplete = Filter.gate(~'.*Order injection complete.*')
        def selectedOrders          = Filter.gate(~'.*54951866.*', ~'.*AEH: processTxnGroup - DONE.*')
        def merge                   = Transform.mergeMultiline(currentMessageBuffer, [0, 2, -1])
        def format                  = Transform.formatLog(true)

        // build the pipeline
        def lines     = new SyncDataflowQueue()
        def filtered  = lines.filter beforeInjectionComplete filter selectedOrders
        def formatted = filtered | merge | format
        formatted.wheneverBound { messageCount++ }

        // pump strings into the lines queue and send a poison pill when done with the file
        Thread.start("Reader $logName") { Source.pumpFileLines(lines, file, skipLines) }

        // consume the data and write to output file
        while (true) {
            def line = formatted.val
            if (line in PoisonPill) break
            out.println line
        }
        if (currentMessageBuffer) out.println currentMessageBuffer // flush leftovers

    }
}

This is a simple case processing files one by one and writing the output to a file. More complex cases may use information from one file to enrich another, etc.

As all my operations rely heavily on regular expressions, I was expecting that the dataflow would take care to distribute these among all CPU's using promises to get the values back in line. When running this program in a profiler (Yourkit, sampling), I can see that most of the threads are parking inside getVal() and very few threads do work. In fact it seems like most of the time is spent passing data around and not calculating. As a confirmation, once the 'beforeInjectionComplete' filter starts passing messages down the pipeline, I can see that the reader thread is parked for most of the time. Am I doing something wrong?

In addition, I could see that the group keeps spawning new threads, and sometimes terminates threads for no obvious reason. Again, each thread stays mostly parked. I tried with explicit NonDaemonGroup and DefaultGroup(new ResizablePool(4)), to no avail - threads kept popping up and dying all the time. Is it possible to to run the whole pipeline in a single thread, with the scheduler picking the active task in a work-stealing manner?

A few API notes:

  • It would be nice if I could specify for the fork number for the operators in the filter() and chainWith() methods, or indicate in some way which steps are safe to parallelize and which have to be run in sequence.
  • Because I need to pass a PoisonPill down the queue, the generics are useless (the lowest common super-type of PoisonPill and any other type is Object).
  • While it is possible to declare the type of the channel to be your main payload and the pill will be hidden from operators, it will still show up in the terminating consumer that has to drain the channel to prevent pile-up.

Finally, when I tried to use non-synchronous queue, the process quickly consumed all memory and started thrashing in GC. Is there a way to limit the number of outstanding items in the channel?

Thanks,
Dimitar

Reply | Threaded
Open this post in threaded view
|

Re: Controlling concurrency

Alan Thompson
Hi - I've not tried the dataflow stuff yet, but have had success with both GPars fork/join and the eachParallel() method.  If you have more log files than cores, it might be easiest to just put the file names in a List and call List.eachParallel() with your processing closure (I do text processing on a couple thousand files that way regularly).  If you had just one or a few large files, perhaps split them up into 10-20 chunks (perhaps with head/tail or a simple groovy script), the use the above approach.  
Good luck,
Alan Thompson

On Fri, Nov 30, 2012 at 2:15 AM, ddimitrov <[hidden email]> wrote:
I am trying to use GPars dataflow as a base for log-mining DSL. The log files
are typically between 1 and 5Gb and the processing includes some rough
line-based filtering, then reconstituting the multiline records, and then
fine-grained filtering by records.
Here is some example code:
new File(rootDir).canonicalFile.eachFileRecurse(FILES) { file ->    if
(!(file.absolutePath =~ processes)) return    def logName =
file.parentFile.parentFile.name    new File(outDir,
"${logName}.log").withPrintWriter { out ->        def currentMessageBuffer
= new StringBuilder()        // define operations (closures curried down to
single argument)        def beforeInjectionComplete = Filter.gate(~'.*Order
injection complete.*')        def selectedOrders          =
Filter.gate(~'.*54951866.*', ~'.*AEH: processTxnGroup - DONE.*')        def
merge                   = Transform.mergeMultiline(currentMessageBuffer, [0,
2, -1])        def format                  = Transform.formatLog(true)
// build the pipeline        def lines     = new SyncDataflowQueue()
def filtered  = lines.filter beforeInjectionComplete filter selectedOrders
def formatted = filtered | merge | format        formatted.wheneverBound {
messageCount++ }        // pump strings into the lines queue and send a
poison pill when done with the file        Thread.start("Reader $logName") {
Source.pumpFileLines(lines, file, skipLines) }        // consume the data
and write to output file        while (true) {            def line =
formatted.val            if (line in PoisonPill) break
out.println line        }        if (currentMessageBuffer) out.println
currentMessageBuffer // flush leftovers    }}
This is a simple case processing files one by one and writing the output to
a file. More complex cases may use information from one file to enrich
another, etc.
As all my operations rely heavily on regular expressions, I was expecting
that the dataflow would take care to distribute these among all CPU's using
promises to get the values back in line. When running this program in a
profiler (Yourkit, sampling), I can see that most of the threads are parking
inside getVal() and very few threads do work. In fact it seems like most of
the time is spent passing data around and not calculating. As a
confirmation, once the 'beforeInjectionComplete' filter starts passing
messages down the pipeline, I can see that the reader thread is parked for
most of the time. Am I doing something wrong?
In addition, I could see that the group keeps spawning new threads, and
sometimes terminates threads for no obvious reason. Again, each thread stays
mostly parked. I tried with explicit NonDaemonGroup and DefaultGroup(new
ResizablePool(4)), to no avail - threads kept popping up and dying all the
time. Is it possible to to run the whole pipeline in a single thread, with
the scheduler picking the active task in a work-stealing manner?
A few API notes:

It would be nice if I could specify for the fork number for the operators in
the filter() and chainWith() methods, or indicate in some way which steps
are safe to parallelize and which have to be run in sequence.
Because I need to pass a PoisonPill down the queue, the generics are useless
(the lowest common super-type of PoisonPill and any other type is Object).
While it is possible to declare the type of the channel to be your main
payload and the pill will be hidden from operators, it will still show up in
the terminating consumer that has to drain the channel to prevent pile-up.
Finally, when I tried to use non-synchronous queue, the process quickly
consumed all memory and started thrashing in GC. Is there a way to limit the
number of outstanding items in the channel?Thanks,
Dimitar



--
View this message in context: http://gpars-user-mailing-list.19372.n3.nabble.com/Controlling-concurrency-tp4024752.html
Sent from the GPars - user mailing list mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email



Reply | Threaded
Open this post in threaded view
|

Re: Controlling concurrency

ddimitrov
Thank you Alan,

In fact, my objective is to learn how to use dataflows and find out what are the good cases for its application. In this particular example, some of the filtering is non-trivial, so if I just split the file I would have to do considerable amount of work to cover the seams (e.g. think about finding the maximum run length).

Curiously, when I coded the same algorithm, applying the same filters and transformers inline inside a simple file.eachLine {} and the difference is a dramatic 7-fold improvement. By looking at the process stats, the concurrent version has enormous amount of context-switches, while the inline version has almost none (and the IO is significantly higher). This confirms my observation that the dataflow version  spends most of its time passing data around.

It seems that either GPars dataflow does not support my usecase, or I am using it wrong. I really like the programming model and the fact that once you express your pipeline, the concurrency can be figured out for you automatically, but a parallel slow-down factor of 7 is difficult to justify. As a minimum, I expect there to be a way to force the dataflow graph to run on a single processing thread.
Reply | Threaded
Open this post in threaded view
|

Re: Controlling concurrency

Vaclav
Administrator
In reply to this post by ddimitrov
Hi Dimitar,

here are a few ideas:

Is the CPU utilization close to 100% when running the parallel version of your code? The supplied code snipped indicates you chained operators in a single pipeline, which is likely to suffer from poor work balancing. There may be a need for forking some "hot" operators.

Have you tried to make your tasks more coarse grained? Message passing certainly comes with overhead, which may void potential gains through parallelism for some cases. Perhaps you could consider joining operations of some connected operators into one?

The code snippet relies on the default dataflow thread pool, which is resizeable. I'd recommend you tried a fix-sized thread pool (e.g. through new NonDaemonPGroup(4)) to avoid unnecessary thread creation.
Of course, a dataflow network can be run with a single thread in the thread pool, provided you use asynchronous channels.

More inlined:

As all my operations rely heavily on regular expressions, I was expecting
that the dataflow would take care to distribute these among all CPU's using
promises to get the values back in line. When running this program in a
profiler (Yourkit, sampling), I can see that most of the threads are parking
inside getVal() and very few threads do work. In fact it seems like most of
the time is spent passing data around and not calculating.

This is surprising, since operators should not call getVal() at all. They wait for values in the channels non-blockingly through getValAsync(). It should only be the final consumer (main thread) in your code that waits for messages through getVal(), I suppose. 

[...]
 
It would be nice if I could specify for the fork number for the operators in
the filter() and chainWith() methods, or indicate in some way which steps
are safe to parallelize and which have to be run in sequence.

ATM, you have to resort to the operator factory methods in order to set maxForks, I'm afraid. Worth a JIRA issue?
 
Because I need to pass a PoisonPill down the queue, the generics are useless
(the lowest common super-type of PoisonPill and any other type is Object).
While it is possible to declare the type of the channel to be your main
payload and the pill will be hidden from operators, it will still show up in
the terminating consumer that has to drain the channel to prevent pile-up.

True.
 
Finally, when I tried to use non-synchronous queue, the process quickly
consumed all memory and started thrashing in GC. Is there a way to limit the
number of outstanding items in the channel?Thanks,

Unmanaged channels in non-trivial dataflow networks have a tendency to either fill up quickly or remain empty most of the time. Thus some way to throttle messages is typically needed. Have you tried the Kanban Flow API? In exchange for some additional overhead you gain the ability to control the amount of work in progress.

Regards,

Vaclav


--
E-mail: [hidden email]
Blog: http://www.jroller.com/vaclav
Linkedin page: http://www.linkedin.com/in/vaclavpech
Reply | Threaded
Open this post in threaded view
|

Re: Controlling concurrency

ddimitrov
Thank you Vaclav,
 
The hottest code is in the first two filters, which filter out 99% of the data. The filters have a start and end regexes and everything after the start and before the end is passed through. The start regex can trigger multiple times in the stream. The problem is that the shared state (whether the gate is open or closed) prevents us from forking the operators. I have tried merging the filters, but it doesn't yield significant difference.

I have read your article about the speculation and evaluation and it sounds like a good fit in theory, but I am not sure what would be the communication overhead and we need simpler API facade for this kind of specilative pipelined filtering. This could unlock some parallelism even for stateful operators, as long as the following calculations do not depend on the side-effects.


The code snippet relies on the default dataflow thread pool, which is resizeable. I'd recommend you tried a fix-sized thread pool (e.g. through new NonDaemonPGroup(4)) to avoid unnecessary thread creation.
Of course, a dataflow network can be run with a single thread in the thread pool, provided you use asynchronous channels.

I did try NonDefaultPGroup(4), but strangely enough the executor kept creating and killing threads (the max number of simultaneously running actor threads never exceeded 7, even though I specified 4).


This is surprising, since operators should not call getVal() at all. They wait for values in the channels non-blockingly through getValAsync(). It should only be the final consumer (main thread) in your code that waits for messages through getVal(), I suppose. 

I was too quick to draw conclusions here - most of the threads were parked inside the getTask() method of ThreadPoolExecutor. 


It would be nice if I could specify for the fork number for the operators in the filter() and chainWith() methods, or indicate in some way which steps are safe to parallelize and which have to be run in sequence.

ATM, you have to resort to the operator factory methods in order to set maxForks, I'm afraid. Worth a JIRA issue?

Done: GPARS-241
 

Because I need to pass a PoisonPill down the queue, the generics are useless (the lowest common super-type of PoisonPill and any other type is Object).
While it is possible to declare the type of the channel to be your main payload and the pill will be hidden from operators, it will still show up in the terminating consumer that has to drain the channel to prevent pile-up.

True.

What is the recommendation in that case? Shall we declare the channel with the payload type and push in the poison pill by casting away the generics, violating the type system assumptions? I have no problems with that, except that in the consumer loop I get an ugly warning for checking whether an object is a PoisonPill, even if it is returned from getVal(), declaring generic return type of String. I tried to consume the output inside a wheneverBound closure, but then I am faced with two additional problems: 

1) how do we know when the stream has finished? While we could introduce an EOF marker, we would also require escaping mechanism to handle the cases when it appears as a value. This may turn tedious and inefficient for datatypes with small value domain.

2) how do we prevent a pileup in the channel? Is there an idiom or ready component for terminating the channel (i.e. pipe to null)? 

 
Finally, when I tried to use non-synchronous queue, the process quickly consumed all memory and started thrashing in GC. Is there a way to limit the number of outstanding items in the channel?

Unmanaged channels in non-trivial dataflow networks have a tendency to either fill up quickly or remain empty most of the time. Thus some way to throttle messages is typically needed. Have you tried the Kanban Flow API?
 
Thanks, I will look into KanbanFlow. Meanwhile, is there any plan to introduce size limited sync queues or unbounded queues with spillover to a JSR107 cache or flat file? If you think it fits the scope of the library, I can create the JIRA's ;-)
Reply | Threaded
Open this post in threaded view
|

Re: Controlling concurrency

Vaclav
Administrator
Hi Dimitar,

 
The hottest code is in the first two filters, which filter out 99% of the data. The filters have a start and end regexes and everything after the start and before the end is passed through. The start regex can trigger multiple times in the stream. The problem is that the shared state (whether the gate is open or closed) prevents us from forking the operators. I have tried merging the filters, but it doesn't yield significant difference.


This looks quite tricky. With my limited understanding of the problem I can envision a potential solution:
1. A first operator assigns order numbers to each message so that we can resequentialize them any time and then passes them all on
2. A forked, heavily utilized, operator detects start/stop patterns in the messages and writes a memo (start_pattern_detected, end_pattern_detected, none_detected) into the messages before passing them on
3. Now we get messages not necessarily in the original order, since parallel forks of the 2nd operator may finish at random times
4. A resequencer operator will remember out-of-order messages and forward only messages in the correct sequence (using the numbers assigned by the 1st operator). Since it can read the start/end pattern detection result, it can decide, which messages to send on and which to throw away.


 
I have read your article about the speculation and evaluation and it sounds like a good fit in theory, but I am not sure what would be the communication overhead and we need simpler API facade for this kind of specilative pipelined filtering. This could unlock some parallelism even for stateful operators, as long as the following calculations do not depend on the side-effects.


The code snippet relies on the default dataflow thread pool, which is resizeable. I'd recommend you tried a fix-sized thread pool (e.g. through new NonDaemonPGroup(4)) to avoid unnecessary thread creation.
Of course, a dataflow network can be run with a single thread in the thread pool, provided you use asynchronous channels.

I did try NonDefaultPGroup(4), but strangely enough the executor kept creating and killing threads (the max number of simultaneously running actor threads never exceeded 7, even though I specified 4).


Did you pass the group to all the methods, such a chainWith(group, ...)?
 

Because I need to pass a PoisonPill down the queue, the generics are useless (the lowest common super-type of PoisonPill and any other type is Object).
While it is possible to declare the type of the channel to be your main payload and the pill will be hidden from operators, it will still show up in the terminating consumer that has to drain the channel to prevent pile-up.

True.

What is the recommendation in that case? Shall we declare the channel with the payload type and push in the poison pill by casting away the generics, violating the type system assumptions? I have no problems with that, except that in the consumer loop I get an ugly warning for checking whether an object is a PoisonPill, even if it is returned from getVal(), declaring generic return type of String. I tried to consume the output inside a wheneverBound closure, but then I am faced with two additional problems: 


Yes, with poison flowing down the channels we have to abandon generics.
 
1) how do we know when the stream has finished? While we could introduce an EOF marker, we would also require escaping mechanism to handle the cases when it appears as a value. This may turn tedious and inefficient for datatypes with small value domain.
 
2) how do we prevent a pileup in the channel? Is there an idiom or ready component for terminating the channel (i.e. pipe to null)? 



This is an interesting idea. You may forward the last channel into an empty operator and attach (join) the main thread to it, so that it blocks until the final operator reads the poison:

def terminator = operator([ch], []) {}
terminator.join()

However, since wheneverBound listeners get invoked asynchronously, there's no guarantee that all messages have been processed through wheneverBound() before the terminator receives a poison.
What I'd consider, if you really want to keep generics in your code without any compiler warning, is to define a listener on the terminator instead of using the wheneverBound() handler.

        final listener = new DataflowEventAdapter() {
            @Override
            Object messageArrived(final DataflowProcessor processor, final DataflowReadChannel<Object> channel, final int index, final Object message) {
                ...handle the message the way you did in the wheneverBound() handler
                return message
            }
        }
def terminator = operator(inputs: [ch], outputs: [], listeners: [listener]) {}
terminator.join()

 
 
Thanks, I will look into KanbanFlow. Meanwhile, is there any plan to introduce size limited sync queues or unbounded queues with spillover to a JSR107 cache or flat file? If you think it fits the scope of the library, I can create the JIRA's ;-)

We have no plans at the moment and sort of rely on the kanban flow management to cover most of the cases for limited-size channels. JIRAs would definitely be worthwhile to, at least, initiate the discussion.

Cheers,

Vaclav