发布订阅

Davidoff Shen2022年5月11日大约 4 分钟

发布订阅

Topic 是对发布订阅模式的一种抽象。当我们的业务是消息驱动时,首选 message broker 来传递消息,开发者直接使用 MQ 或者 JMS 的开发接口来发布、订阅消息,使得系统的架构被限定在某种具体的技术环境下,concrete的目标就是为了通过对模式的抽象隔离系统架构、研发对具体技术的依赖。

传统模式下,如果设计、开发者有足够的经验,也会把技术依赖通过封装隔离出来,不过...emm, how interesting...

静态订阅示例

package org.coodex.concrete.demo.api.excepted;

import org.coodex.concrete.api.ConcreteService;
import org.coodex.util.Parameter;

@ConcreteService
public interface MessageTriggerService {

    // 触发一条消息
    void trigger(@Parameter("msg") String msg);
}
ppackage org.coodex.concrete.demo.impl;

import org.coodex.concrete.demo.api.excepted.MessageTriggerService;
import org.coodex.concrete.message.Queue;
import org.coodex.concrete.message.Topic;

import javax.inject.Inject;
import javax.inject.Named;

@Named
public class MessageTriggerServiceImpl implements MessageTriggerService {

    // 定义一个主题,主题由主题类型,消息类型,队列三部分共同确定
    @Inject
    @Queue("demo")
    private Topic<String> topic;

    @Override
    public void trigger(String msg) {
        topic.publish(msg);// 使用这个主题发布一个消息
    }
}

定义一个消费者

package org.coodex.concrete.demo.impl;

import lombok.extern.slf4j.Slf4j;
import org.coodex.concrete.message.MessageConsumer;
import org.coodex.concrete.message.Observer;

import javax.inject.Named;

@Named
@MessageConsumer(queue = "demo")
@Slf4j
public class DemoMessageListener implements Observer<String> {
    @Override
    public void update(String message) throws Throwable {
        log.info("message received: {}", message);
    }
}

跑起来看看,swagger 里点一下触发一个消息

2022-05-09 15:45:36.370  INFO 63078 --- [opic.executor-1] o.c.c.demo.impl.DemoMessageListener      : message received: D7sluiBaKi2

有什么用?这个案例实在太简单了,好像起不到任何作用。我们假设消息消费者和生产者在两个不同的虚拟机里,下面看看怎么做到

queue.demo.yml

destination: rabbitmq:amqp://demo:demo@localhost:5672/demo
# serializer: json #指定使用json序列化消息内容

注意一下,使用 rabbitmq 作为订阅发布的实现技术时,destination 的格式为rabbitmq:amqp(s)://usernmame:password@host:port/virtualHost,username,password,host,port,virtualhost 也可以通过 Config 指定,如上例中的 username 和 password

我们这次使用 rabbitmq 来发布订阅消息

<!-- 使用 rabbitmq 的消息传递者插件 -->
<dependency>
    <groupId>org.coodex.concrete</groupId>
    <artifactId>concrete-courier-rabbitmq</artifactId>
</dependency>

再来一下

topic_amqp

我们看到,没有做任何代码的修改,我们很方便的把 local 方式切换到了 rabbitmq。

除了 rabbitmq 以外,concrete 还提供了 activemq 基于 jms 的插件。

特别的,concrete 还提供的发布者的聚合能力,同一个消息可以发布到多个队列上,destination=aggregated(queue1, queue2)即可。

提示

concrete 面向系统架构方面考虑比较多,截止到目前为止,我们还没有体验到太多。023 开始,concrete 注重系统的模块化,通过紧耦合(java client)、松耦合(发布订阅)的方式来让系统的架构可以更加灵活,配套一个基于配置中心的 Configuration 实现,统一维护模块的 destination,可以灵活的设计、调整整个系统的架构,这样也能有效地降低系统架构设计的成本和经验门槛

动态订阅示例

package org.coodex.concrete.demo.boot;

import org.coodex.concrete.message.Queue;
import org.coodex.concrete.message.Subscription;
import org.coodex.concrete.message.Topic;
import org.coodex.concrete.message.Topics;
import org.coodex.util.GenericTypeHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

import javax.inject.Inject;

@SpringBootApplication
public class TopicExample {

    public static class TopicDemo {
        @Inject
        @Queue("队列1")
        private Topic<String> topic1;
        @Inject
        @Queue("队列2")
        private Topic<String> topic2;

        public Topic<String> getTopic1() {
            return topic1;
        }

        // 等同于topic1
        public Topic<String> getTopic1UseApi() {
            return Topics.get(new GenericTypeHelper.GenericType<Topic<String>>() {
            }.getType(), "队列1");
        }

        public Topic<String> getTopic2() {
            return topic2;
        }
    }

    @Bean
    public TopicDemo getTopicDemo() {
        return new TopicDemo();
    }

    private static void topicDemo(Topic<String> topic, String message) {
        // 0 订阅的情况
        topic.publish(message + ".1");
        // 订阅1
        Subscription subscription1 = topic.subscribe(t -> {
            System.out.println("subscribe1: " + t);
        });
        topic.publish(message + ".2");
        // 这时候应该会显示subscribe1收到消息
        // 订阅2
        Subscription subscription2 = topic.subscribe(t -> {
            System.out.println("subscribe2: " + t);
        });
        topic.publish(message + ".3");
        // 这时候应该显示订阅1和2都收到了消息
        subscription1.cancel();
        topic.publish(message + ".4");
        // 订阅1已经取消了订阅,所以这时候应该显示订阅2收到了消息
        subscription2.cancel();
        topic.publish(message + ".5");
        // 这时候所有订阅都取消了,所以不显示消息
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(TopicExample.class);
        TopicDemo demo = context.getBean(TopicDemo.class);
        topicDemo(demo.getTopic1(), "注入: 队列1");
        topicDemo(demo.getTopic1UseApi(), "原始API: 队列1");
        topicDemo(demo.getTopic2(), "注入: 队列2");
    }
}
subscribe1: 注入: 队列1.2
subscribe2: 注入: 队列1.3
subscribe1: 注入: 队列1.3
subscribe2: 注入: 队列1.4
subscribe1: 原始API: 队列1.2
subscribe2: 原始API: 队列1.3
subscribe1: 原始API: 队列1.3
subscribe2: 原始API: 队列1.4
subscribe1: 注入: 队列2.2
subscribe2: 注入: 队列2.3
subscribe1: 注入: 队列2.3
subscribe2: 注入: 队列2.4

扩展

  • Couries: 我们看到 amqp 的 Courier 和 activemq 的 Courier 都是通过扩展达到的,开发者也可以开发自己的 Couries
  • TopicPrototype,开发者可以自行扩展自己的主题原型,在后面我们将看到一个 TokenBasedTopic,它就是使用原型扩展出来的
上次编辑于:
贡献者: Davidoff Shen
Loading...