Actors and DataFlowBroadcasts

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Actors and DataFlowBroadcasts

menacher
This post has NOT been accepted by the mailing list yet.
Hi, First off, I am a gpars noob so please bear with me if it is a dumb question. My problem is about how to combine actors with dataflow broadcasts correctly. I have 1 actor which writes to a broadcast stream and a bunch of actors who have subscribed to this broadcast like the following.
public class Subscriber extends DefaultActor
{
	private Publisher publisher;
	private DataFlowReadChannel inChannel;
	private CountDownLatch latch;
	
	public Subscriber(Publisher publisher,CountDownLatch latch)
	{
		this.publisher= publisher;
		inChannel = publisher.createReadChannel();
		this.latch = latch;
	}
	
	@Override
	protected void act()
	{
	loop(new Runnable() {

            // Run will be executed with first message sent to actor
            @Override
            public void run() {
            	while(true){
	try
	{
		Integer number = inChannel.getVal(); // Get data from the queue
		latch.countDown();
		System.out.println(number);
		if(number > 2){
			break;
		}
	}
	catch (InterruptedException e)
	{
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
             }// end of while true
            	stop();}});// end of loop method
}

And in the publisher I just create a broadcast stream and submit data to it.

Now my questions are
1) Is this the best way an actor can recieve a message from a stream? Can I put the while(true) somewhere else?
2) How do I "unsubscribe" cleanly? I can make the while(someCondition) and set that to false, however will that prevent the message from being delivered to the DataFlowReadChannel? Since no one is now picking it up, will it block other readers?
3) Is there a good example out there for using streams to communicate between Actors? I have seen examples using data flow variables.

Thanks and Regards,
Abraham Menacherry.
P.S: IMHO GPars is an excellent library which brings concurrency to the masses(i.e. me...)