调用SDK消费消息
<p class="shortdesc">消息发送成功后,需要启动消费者消费消息。本文主要以调用 TCP 协议的Java SDK 为例,说明如何调用SDK消费消息。</p>
<section><div class="tasklabel"><h2 class="doc-tairway">操作步骤</h2></div><ol class="ol steps"><li class="li step stepexpand">
<span class="ph cmd">根据以下说明设置相关参数,并运行示例代码消费消息。</span>
<div class="itemgroup info">
<pre class="pre codeblock"><code>public static Consumer initConsumer() throws FCException {
// 准备参数。
Properties properties = new Properties();
// 业务系统可以从配置文件中取出属性值,demo中写死。
properties.setProperty(FCConstant.NAME_SERVER_ADDRESS,"NAME_SERVER_ADDR-demo");// 必须配置,环境IP地址。
properties.setProperty(FCConstant.CONSUMER_ID, Constant.CID);// 必须配置,在控制台创建。
// 设置ak/sk,从控制台获取。
properties.setProperty(FCConstant.ACCESS_KEY, "ACCESSKEY-demo");
properties.setProperty(FCConstant.SECRET_KEY, "SECRETKEY-demo");
properties.setProperty(FCConstant.INSTANCE_NAME, UUID.randomUUID().toString());// INSTANCE_NAME属性建议设置为主机ip+随机串。
properties.setProperty("CONSUMER_THREAD_NUM", "10");
Consumer consumer = FCFactory.createConsumer(properties);
// 设置第一次消费消息的时间点,这里设置从最后的消息开始获取。
// 如果不设置该值,默认从队列的最开始消费消息。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
return consumer;
}
public static void subscribe(final Consumer consumer, FCMessageFilter messageFilter, String topic) throws FCException {
final SimpleDateFormat smf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss");
// 订阅 topic的消息(topic需要在控制台中创建)。
consumer.subscribe(topic, messageFilter, new FCMessageListener() {
// PUSH消费模式下,客户端包会启动后台线程不断从MQCP中拉取消息(准实时方式,毫秒级延时)。
// 业务系统收取到消息后,需要实现FCMessageListener,来处理消息。
@Override
public FCConsumeStatus pushMessage(List<FCMessage> messageList) {
// 监听到有消息抵达后,业务系统需要遍历messageList对象,来获取消息。
// messageList的默认大小为1,即消息是一条一条的推送到客户端的。
for (FCMessage msg : messageList) {
// 消息明细 FCMessage对象。
// 建议业务系统将从消息平台拉取消息和处理消息的逻辑解耦。
// 在Consumer监听器中只监听到消息,建议简单地将获取的消息解析存储,然后返回。
// FCConsumeStatus.CONSUME_OK
// 后端可以异步来处理接收到的消息。
log.info(consumer+"--------【{}】:{},\ntime :",new Object[]{new String(msg.getConent()),msg.getMsgId(),smf.format(new Date())});
}
return FCConsumeStatus.CONSUME_OK;
}
});
}
</code></pre>
</div>
</li><li class="li step stepexpand">
<span class="ph cmd">您可通过控制台查看消费者状态,若消费者在线,则说明消费者已成功启动。可参考查看<a class="xref" href="https://pinganyun.com/ssr/help/middleware/pamq/manual.common_operations.subscribe_manage.consumer_state" target="_blank">消费者状态</a>。</span>
</li></ol></section>
提交成功!非常感谢您的反馈,我们会继续努力做到更好!