应用场景:异步处理、应用解耦、流量消峰

简介:Apache出品,是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现

JMS消息模型:

  1) P2P 点对点模型(Queue队列模型)

    特点:每个消息只有一个消费者(消息一旦被消费,消息就不再存在消息队列中)

          生产者和消费者之间在时间上没有依赖性,不需要同时处于运行状态

       消费者在成功消费之后,需要向队列应答成功

  2) Publish/Subscribe(Pub/Sub) 发布/订阅模式(Topic主题模型)

    特点:每个消息可以有多个消费者

          发布者和订阅者有时间依赖性(先订阅主题,再发送消息)

          订阅者必须保持运行状态,才能接收到消息

JMS编程API:

  1)ConnectionFactory 

    创建Connection对象的工厂,针对两种模型分别有QueueConnection和TopicConnectionFactory

  2)Destination

    对消费者来说是消息来源,某个队列(Queue)或某个主题(Topic),对生产者来说是某个队列(Queue)或某个主题(Topic)  

  3)Connection

    表示客户端与JMS系统之间建立的链接(对TCP/IP socket的包装) ,Connection可以产生一个或多个Session

  4) Session

    是对消息进行操作的接口,可以创建生产者、消费者、消息等。也提供了事务功能,如果需要可以将发送/接收消息的动作放到一个事务中

  5)Producter

    生产者:由Session创建,并用于将消息发送到Destination对象中。分两种类型 QueueSender和TopicPublisher,可调用生产者的send或publish方法发送消息

  6)Consumer

    消费者:由Session创建,用于接收Destination中的消息。分两种类型 QueueReceiver和TopicSubscriber,分别通过createReceiver(Queue)或createSubScriber(Topic)来创建。 也可以用createDurableSubscriber来创建持久化的订阅者。

  7)MessageListener

    消息监听器:如果注册了消息监听器,一旦消息到达,自动调用onMessage方法。EJB中的MDB(Message Driven Bean)就是一种MessageLinstener

  

 原生JMS-点对点

Producer:

public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.112.129:61616");

//2.创建连接
Connection connection = factory.createConnection();

//3.打开连接
connection.start();

//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//5.创建目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Queue queue = session.createQueue("queue01");

//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);

//7.发送消息
TextMessage test_message = session.createTextMessage("test message");
producer.send(test_message);

//8.释放资源
session.close();
connection.close();
}

Consumer:
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.112.129:61616");

//2.创建连接
Connection connection = factory.createConnection();

//3.打开连接
connection.start();

//4.创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//5.指定目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Queue queue01 = session.createQueue("queue01");

//6.创建消息消费者
MessageConsumer consumer = session.createConsumer(queue01);

  consumer.setMessageListener(message -> {
  if(message instanceof TextMessage){
  TextMessage txt = (TextMessage)message;
  try {
   System.out.println("接收的消息(2): " + txt.getText());
  } catch (JMSException e) {
   e.printStackTrace();
   }
   }
  });
}

 原生JMS-发布订阅

Producer:

  //5.创建目标地址(Queue:点对点消息;Topic:发布/订阅消息)
Topic topic = session.createTopic("topic01");

//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);

Consumer:

  //5.指定目标地址(Queue:点对点消息;Topic:发布/订阅消息)
  Topic topic = session.createTopic("topic01");

//6.创建消息消费者
MessageConsumer consumer = session.createConsumer(topic01);

springboot整合ActiveMQ
导入坐标
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Producer:
配置类application.yml
server:
port: 9001
spring:
application:
name: activemq-producer
# springboot 与 activemq的整合配置
# 连接工厂
activemq:
broker-url: tcp://192.168.112.129:61616
user: admin
password: admin
# 指定发送模式
jms:
pub-sub-domain: false
测试类 SpringbootProducer 
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringbootProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

@Test
public void ptpSender(){
jmsMessagingTemplate.convertAndSend("springboot_queue", "springboot test -- queue");
}

}
Consumer:
依赖同Producer,配置同Producer
编写Listener
@Component
public class MsgListener {
@JmsListener(destination = "springboot_queue")
public void receiveMsg(Message message) throws JMSException {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
System.out.println("接收的消息: " + txtMsg.getText());
}
}
}




原文地址:http://www.cnblogs.com/Fei-Gao/p/16827933.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性