RabbitMq之延时队列
< 返回列表时间: 2018-10-15来源:OSCHINA
package com.sky.study.delayQueue;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sky.study.ConnectionUtil;
/**
* 延时队列
*
* @author 86940
*
*/
public class DelayQueue {
private final static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-expires", 30000);// 队列过期时间
arguments.put("x-message-ttl", 12000);// 队列上消息过期时间
arguments.put("x-dead-letter-exchange", "exchange-direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel.queueDeclare("delay_queue", true, false, false, arguments);
// 声明队列
channel.queueDeclare(queue_name, true, false, false, null);
channel.exchangeDeclare("exchange-direct", "direct");
// 绑定路由
channel.queueBind(queue_name, "exchange-direct", "message_ttl_routingKey");
String message = "hello world!" + System.currentTimeMillis();
// 设置延时属性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 持久性 non-persistent (1) or persistent (2)
AMQP.BasicProperties properties = builder.deliveryMode(2).build();
// AMQP.BasicProperties properties =
// builder.expiration("30000").deliveryMode(2).build();// routingKey
// =delay_queue 进行转发
channel.basicPublish("", "delay_queue", properties, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 关闭频道和连接
channel.close();
connection.close();
}
}
消费者代码
package com.sky.study.delayQueue;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.sky.study.ConnectionUtil;
public class Consumer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-expires", 30000);//队列过期时间
arguments.put("x-message-ttl", 12000);//队列上消息过期时间
arguments.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");//过期消息转向路由相匹配routingkey
channel.queueDeclare("delay_queue", true, false, false, arguments);

// 声明队列
channel.queueDeclare(queue_name, true, false, false, null);
channel.exchangeDeclare("exchange-direct", "direct");
// 绑定路由
channel.queueBind(queue_name, "exchange-direct", "message_ttl_routingKey");

QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消费队列
channel.basicConsume(queue_name, true, consumer);
while (true) {
// nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
}
}
}
热门排行