Vote count:
0
I have time consuming tasks that I would like to split up across multiple workers using RabbitMQ.
However, when a new worker / consumer attaches to the queue, the new worker doesn't appear to receive any messages that have been previously backlogged in the queue. Instead, the new worker only appears to receive new message that arrive after the worker has subscribed to the queue.
Maybe I'm doing something wrong, but how do I get RabbitMQ to re-distribute the backlogged messages to new workers, as new workers subscribe to the queue?
Here is some test code written in Scala:
RabbitMQSendTest code blasts 100 messages to RabbitMQ. RabbitMQRecvTest simulates time consuming tasks. RabbitMQ only seems to dispatch messages to clients which have subscribed prior to the messages arriving. However, I would like the existing messages in the queue to be re-distributed to new workers as workers subscribe to the queue.
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer
object RabbitMQSendTest {
def main (args:Array[String]):Unit = {
val r = new RabbitMQSendTest ()
r.mainSendTest
}
}
class RabbitMQSendTest {
val QUEUE_NAME = "hello"
def mainSendTest () = {
try {
val factory = new ConnectionFactory ()
factory.setHost("127.0.0.1")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
channel.basicQos(5)
for (i <- (0 to 100)) {
val message = "Hello World " + i
channel.basicPublish("", QUEUE_NAME,null, message.getBytes())
println (" [x] Sent " + message)
Thread.sleep(100)
}
channel.close()
connection.close()
} catch
{
case ex:Exception => ex.printStackTrace()
}
}
}
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer
object RabbitMQRecvTest {
def main (args:Array[String]):Unit = {
val r = new RabbitMQTest ()
r.recvTest
}
}
class RabbitMQTest {
val QUEUE_NAME = "hello"
def recvTest () = {
val factory = new ConnectionFactory ()
factory.setHost("127.0.0.1")
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
println ("[*] Waiting for messages")
val consumer = new QueueingConsumer (channel)
val autoAck = false
channel.basicConsume(QUEUE_NAME, autoAck, consumer)
while (true) {
val delivery = consumer.nextDelivery()
val message = new String (delivery.getBody())
println ("Received: " + message)
doWork (message)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
def doWork (message:String) = {
println ("Doing Work on : " + message)
Thread.sleep(10000)
}
}
Aucun commentaire:
Enregistrer un commentaire