RocketMQ 基本使用
时间: 2020-03-20来源:OSCHINA
前景提要
【围观】麒麟芯片遭打压成绝版,华为亿元投入又砸向了哪里?>>>
官网 https://rocketmq.incubator.apache.org/docs/quick-start/
rocketmq-all-4.7.0-bin-release.zip
Start Name Server nohup sh bin/mqnamesrv & tailf nohup.out
Start Broker nohup sh bin/mqbroker -n localhost:9876 & tailf nohup.out
注意如果这里启动失败,看一下内存是否足够,可以看一下“runbroker.sh”这个文件,对应的修改参数,如下 JAVA_OPT="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
测试发送与接收 export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭服务 sh bin/mqshutdown broker sh bin/mqshutdown namesrv
在Java项目中的使用
pom.xml <properties> <rocketmq_ver>4.0.0-incubating</rocketmq_ver> </properties> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq_ver}</version> </dependency> </dependencies> package com.example.demo.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * description: * * @author: he QQ: 905845006 * @email: 905845006@qq.com * @date: 2020/3/20 4:13 PM */ public class rocketmq { } /** * 生产者 */ class Producer { private static final String ADDR ="localhost:9876"; public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr(ADDR); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for push1.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for push2.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "pull", "1", "Just for pull.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } } /** * 消费者 */ class Consumer { private static final String ADDR ="localhost:9876"; public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr(ADDR); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext Context) { for (Message msg : msgs) { System.out.println(new String(msg.getBody()) + ":" + msg.toString()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
结果 id:C0A801663174723279CF77AF3C6E0000 result:SEND_OK id:C0A801663174723279CF77AF3C7B0001 result:SEND_OK id:C0A801663174723279CF77AF3C7D0002 result:SEND_OK Just for push1.:MessageExt [queueId=2, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772974, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775615, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EDE8, commitLogOffset=191976, bodyCRC=1396413800, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=1, CONSUME_START_TIME=1490348782880, UNIQ_KEY=C0A801663174723279CF77AF3C6E0000, WAIT=true, TAGS=push}, body=15]] Just for push2.:MessageExt [queueId=3, storeSize=184, queueOffset=14, sysFla

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

热门排行