- RocketMQ实战与原理解析
- 杨开元
- 446字
- 2023-07-26 11:54:55
2.3 发送/接收消息示例
可以用自己熟悉的开发工具创建一个Java项目,加入RocketMQ Client包的依赖,用代码清单2-1的内容发送消息,这个示例代码是以Sync方式发送消息的。
代码清单2-1 Producer示例程序
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a Producer group name. DefaultMQProducer Producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.100.131:9876");
//Launch the instance. Producer.start(); for (int i = 0; i < 100; i++) { //Create a Message instance, specifying Topic, tag and Message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send Message to deliver Message to one of brokers. SendResult sendResult = Producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the Producer instance is not longer in use. Producer.shutdown(); } }
主要流程是:创建一个DefaultMQProducer对象,设置好GroupName和NameServer地址后启动,然后把待发送的消息拼装成Message对象,使用Producer来发送。接下来看看如何接收消息,也就是使用DefaultMQPushConsumer类实现的消费者程序,如代码清单2-2所示。
代码清单2-2 Consumer示例程序
/* * Instantiate with specified Consumer group name. */ DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer("please rename to unique group name"); /* * Specify name server addresses. Consumer.setNamesrvAddr("192.168.249.47:9876"); /* * Specify where to start in case the specified Consumer group is a brand new one. */ Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //Consumer.setMessageModel(MessageModel.BROADCASTING); /* * Subscribe one more more Topics to consume. */ Consumer.subscribe("TopicTest”, "*"); /* * Register callback to execute on arrival of Messages fetched from brokers. */ Consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the Consumer instance. */ Consumer.start();
Consumer或Producer都必须设置GroupName、NameServer地址以及端口号。然后指明要操作的Topic名称,最后进入发送和接收逻辑。