今天看视频,里面讲了一个经典的例子,是工作中很常用的,特此将这种模式记录下来.这个例子使用了ActiveMQ的选择器,也使用了

之前学的自定义线程池.队列的使用,而且很好的利用多线程并发的处理了任务,提高了吞吐量.

首先看生产端:

package com.jvm.activemq.bhz.mq;

import com.jvm.util.PropertiesUtil;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;

/**
*非常经典的例子,将任务分发给消费者,消费者使用多线程来处理,很好的提高了系统的吞吐量
*
* */
public class Producter {
//1.连接工厂
private ConnectionFactory connectionFactory;
//2.连接对象
private Connection connection;
//3.Session对象
private Session session;
//4.生产者对象
private MessageProducer messageProducer;

public Producter() {
try {
//初始化连接
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
PropertiesUtil.getValue(“activemq.properties”, “url”)
);
this.connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建生产者
messageProducer = this.session.createProducer(null);

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void send() {
try {
//创建队列
Destination destination = session.createQueue(“first”);
//循环发送100个消息,
for (int i = 0; i < 100; i++) {
MapMessage message = session.createMapMessage();
message.setInt(“id”, i);
message.setString(“name”, “张” + i);
message.setString(“age”, “” + i);
//根据id不同,把消息分为2中,一种奇数,一种偶数,也就是说消费端会使用选择器,有选择的处理消息
String receiver = i % 2 == 0 ? “A” : “B”;
//注意使用setStringProperty()方法,选择器过滤是根据这个方法过滤的
message.setStringProperty(“receiver”, receiver);
//发送消息(非持久化,优先级为2,即普通消息,存活时间为1分钟
this.messageProducer.send(destination,message,DeliveryMode.NON_PERSISTENT,2,1000*60L);
System.out.println(“Message send “+ i);
}
this.closeConnection(connection);
} catch (JMSException e) {
e.printStackTrace();
}
}

//关闭连接
public void closeConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
Producter producter=new Producter();
producter.send();
}
}
然后创建2个消费端:

package com.jvm.activemq.bhz.mq;

import com.jvm.util.PropertiesUtil;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerA {
public static final String SELECTOR = “receiver=’A'”; //特别注意此处的写法,必须加单引号,类似于sql语句
//1.连接工厂
private ConnectionFactory connectionFactory;
//2.连接对象
private Connection connection;
//3.Session对象
private Session session;
//4.生产者对象
private MessageConsumer messageConsumer;
//目的地
Destination destination;

public ConsumerA() {
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
PropertiesUtil.getValue(“activemq.properties”, “url”)
);
this.connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.destination = session.createQueue(“first”);
messageConsumer = this.session.createConsumer(destination, SELECTOR);
System.out.println(“ConsumerA …start”);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void receiver() {
try {
//利用消息监听器来实现接收消息,而不是while(true)
this.messageConsumer.setMessageListener(new Listener());
} catch (JMSException e) {
e.printStackTrace();
}
}
//此处很经典,很好的利用了多线程和队列的技术
class Listener implements MessageListener{
//创建一个有界阻塞队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
//创建一个自定义的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), //corePoolSize
20,//maximumPoolSize
120L, //keepAliveTime
TimeUnit.SECONDS, //单位
queue //使用的队列
);
@Override
public void onMessage(Message message) {

if (message instanceof MapMessage) {
//利用线程池开启多个线程去执行任务.相当于并行执行
MapMessage ret=(MapMessage) message;
//直接把任务交给多线程去处理
executor.execute(new MessageTask(ret));
}
if (message instanceof TextMessage) {
//处理流程
//….
}
}
}

public static void main(String[] args) {
ConsumerA consumerA=new ConsumerA();
consumerA.receiver();
}
}
消费者B

package com.jvm.activemq.bhz.mq;

import com.jvm.util.PropertiesUtil;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerB {
public static final String SELECTOR = “receiver=’B'”;//特别注意此处的写法
//1.连接工厂
private ConnectionFactory connectionFactory;
//2.连接对象
private Connection connection;
//3.Session对象
private Session session;
//4.生产者对象
private MessageConsumer messageConsumer;
//目的地
Destination destination;

public ConsumerB() {
try {
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
PropertiesUtil.getValue(“activemq.properties”, “url”)
);
this.connection = connectionFactory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.destination = session.createQueue(“first”);
messageConsumer = this.session.createConsumer(destination, SELECTOR);
System.out.println(“ConsumerB …start”);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

public void receiver() {
try {
this.messageConsumer.setMessageListener(new Listener());
} catch (JMSException e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
//创建一个队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
//创建一个自定义的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), //corePoolSize
20,
120L,
TimeUnit.SECONDS,
queue
);
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
//利用线程池开启多个线程去执行任务.相当于并行执行
MapMessage ret=(MapMessage) message;
//直接把任务交给多线程去处理
executor.execute(new MessageTask(ret));
}
}
}

public static void main(String[] args) {
ConsumerB consumerA=new ConsumerB();
consumerA.receiver();
}
}
处理的任务:(使用Thread.sleep(500)来模拟处理的时间)

package com.jvm.activemq.bhz.mq;

import javax.jms.JMSException;
import javax.jms.MapMessage;
//消息任务,实现了Runable接口
public class MessageTask implements Runnable {

MapMessage mapMessage;
public MessageTask(MapMessage ret) {
this.mapMessage = ret;
}

@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(“当前线程:”+Thread.currentThread().getName()+”处理任务”+this.mapMessage.getString(“id”));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
程序的运行结果:消费者A接收到的是偶数(由于版面的原因,省略了部分结果),消费者B接收的是奇数

ConsumerA …start
当前线程:pool-1-thread-2处理任务2
当前线程:pool-1-thread-1处理任务0
当前线程:pool-1-thread-3处理任务4
当前线程:pool-1-thread-8处理任务14
当前线程:pool-1-thread-5处理任务8
当前线程:pool-1-thread-4处理任务6
当前线程:pool-1-thread-7处理任务12
当前线程:pool-1-thread-9处理任务16
当前线程:pool-1-thread-6处理任务10
当前线程:pool-1-thread-11处理任务20
消费者B:

ConsumerB …start
当前线程:pool-1-thread-1处理任务1
当前线程:pool-1-thread-2处理任务3
当前线程:pool-1-thread-6处理任务11
当前线程:pool-1-thread-5处理任务9
当前线程:pool-1-thread-3处理任务5
当前线程:pool-1-thread-4处理任务7
当前线程:pool-1-thread-11处理任务21

生产者:

Message send 0
Message send 1
Message send 2
Message send 3
Message send 4
Message send 5
Message send 6
Message send 7
Message send 8
Message send 9
结果: 因为每个任务处理的时间是500毫秒,而100个就是50秒,2个消费者单线程是25秒,而利用多线程技术大约只需要2秒就可以处理完.很好的提高了吞吐量.
————————————————
版权声明:本文为CSDN博主「mango奇」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_33804730/article/details/79846120

原文地址:http://www.cnblogs.com/telwanggs/p/16889236.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性