#情景引入
小白:起床起床起床起床。。。??炱鸫瞺
我:怎么了又,大驚小怪,嚇到我了。
小白:我有事有事想找你,十萬(wàn)火急呢~~
我:你能有什么事?反正我不信。。那你說(shuō)說(shuō)看~~
小白:就是我有兩個(gè)小表弟,叫大白和二白,他們現(xiàn)在每天睡覺(jué)之前都要分別和我聊天,讓我給他們講故事,如果不講他們就不睡覺(jué)。但是,如果一個(gè)個(gè)的跟他們輪流來(lái)說(shuō)的話,我就需要每天說(shuō)兩遍,而且我還要找準(zhǔn)他們的時(shí)間點(diǎn),這個(gè)有時(shí)候我有事情都無(wú)法實(shí)現(xiàn)這個(gè)問(wèn)題,他們就會(huì)很生氣。。。
我:這不是挺好的嘛,小孩子就是愛(ài)聽(tīng)故事的呀。。。
小白:我也愿意講,但是時(shí)間這個(gè)不是很好控制,有沒(méi)有類(lèi)似,比如我可以之前就描述好了,然后定點(diǎn)給他們兩個(gè)一起發(fā)消息,而可以拋開(kāi)時(shí)間和其他因素的影響呢?
我:這個(gè)嘛,很簡(jiǎn)單呀,你可以讓他們關(guān)注你的一個(gè)公眾號(hào),這樣你再定時(shí)的推送給他們故事不就可以了嘛。?;蛘?,你可以拉他們進(jìn)你的一個(gè)群這樣,就方便了呀~
小白:這樣是可以,但是如果以后還有小表妹要聽(tīng)我講,我就要如此反復(fù)的做。。感謝好麻煩好麻煩。。。
我:emmm,我理解你的意思,你就想實(shí)現(xiàn)一種很多人都能夠進(jìn)行類(lèi)似一種消息推送的方式嘛。。。
小白:對(duì)的對(duì)的。。就是這樣一種,,,我記得我們?cè)诩夹g(shù)方面好像也有一種類(lèi)似的技術(shù),這個(gè)叫做什么去了呢?
我:這就是消息中間件,一種生產(chǎn)者和消費(fèi)者的關(guān)系。
小白:我也想學(xué)我也想學(xué),,你快給我講講,給我講講唄。。
我:真拿你沒(méi)辦法,好吧。。。下面我就給你講一下這方面的知識(shí)。
#情景分析
其實(shí),小白的這個(gè)問(wèn)題,是一種比較普遍的問(wèn)題。既然我們作為技術(shù)人員,當(dāng)然我們就要從技術(shù)成分去分析如何解決了。這里面其實(shí)就是包含著一種消息中間件的技術(shù)。它也是最近技術(shù)層面用得非常非常多的,這也是非常值得我們進(jìn)行學(xué)習(xí)。。這在如今的秒殺系統(tǒng),推薦系統(tǒng)等等,都有廣泛的應(yīng)用。。所以,這章我就主要來(lái)跟大家說(shuō)說(shuō)這方面的知識(shí)。
#基本概念的引導(dǎo)
本模塊主要講解關(guān)于消息中間件的相關(guān)基礎(chǔ)知識(shí),也是方便我們后面的學(xué)習(xí)。
###什么是中間件?
非操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,不是直接給最終用戶使用,不能直接給用戶帶來(lái)價(jià)值的軟件,我們就可以稱(chēng)為中間件(比如Dubbo,Tomcat,Jetty,Jboss都是屬于的)。
###什么是消息中間件?
百度百科解釋?zhuān)合⒅虚g件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成。通過(guò)提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。
關(guān)鍵點(diǎn):關(guān)注于數(shù)據(jù)的發(fā)送和接受,利用高效可靠的異步消息機(jī)制傳遞機(jī)制集成分布式系統(tǒng)。
先簡(jiǎn)單的用下面這個(gè)圖說(shuō)明:
提供者:實(shí)現(xiàn)JMS的消息服務(wù)中間件服務(wù)器。
客戶端:發(fā)送或接受消息的應(yīng)用。
生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端。
消費(fèi)者/訂閱者:接受并處理消息的客戶端。
消息:應(yīng)用程序之間傳遞的數(shù)據(jù)。
消息模式:在客戶端之間傳遞消息的模式,JMS主要是隊(duì)列模式和主體模式。
隊(duì)列模式特點(diǎn):
(1)客戶端包括生產(chǎn)者和消費(fèi)者。
(2)隊(duì)列中的一個(gè)消息只能被一個(gè)消費(fèi)者使用。
(3)消費(fèi)者可以隨時(shí)取消息。
主體模式特點(diǎn):
(1)客戶端包括發(fā)布者和訂閱者。
(2)主題中的消息可以被所有訂閱者消費(fèi)。
(3)消費(fèi)者不能消費(fèi)訂閱之前發(fā)送的消息。
###什么是AMQP?
AMQP,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開(kāi)發(fā)語(yǔ)言等條件的限制。
簡(jiǎn)單點(diǎn)說(shuō):就是對(duì)于消息中間件所接受的消息傳輸層的協(xié)議(不懂傳輸層,那么就需要多看看計(jì)算機(jī)網(wǎng)絡(luò)相關(guān)知識(shí)了,OSI的層次劃分),只有這樣才能保證客戶端和消息中間件能夠進(jìn)行交互(換位思考:HTTP和HTTPS甚至說(shuō)是TCP/IP與UDP協(xié)議都要的道理)。
emmm,比較一下JMS和AMQP的不同吧。。
JMS是定義與Java,而AMQP是一種傳輸層協(xié)議。
JMS是屬于Java的API,而AMQP是跨語(yǔ)言的。
JMS消息類(lèi)型只有兩種(主題和隊(duì)列,后續(xù)會(huì)說(shuō)),而AMQP是有五種。
JMS主要就是針對(duì)Java的開(kāi)發(fā)的Client,而AMQP是面向消息,隊(duì)列,路由。
###什么是ActiveMQ呢?
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開(kāi)源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
簡(jiǎn)單點(diǎn)說(shuō):不就是為了實(shí)現(xiàn)我上述所想要的需求嘛。然后它就是一種實(shí)現(xiàn)的方式。就比如,Tomcat是什么?不就是為了實(shí)現(xiàn)一種client與服務(wù)器之間的交互的一種產(chǎn)品嘛。。所以,不需要死記概念,自己理解就好。
#ActiveMQ的安裝
##環(huán)境:Windows
步驟:
(1)登錄到ActiveMQ的官網(wǎng),下載安裝包。http://activemq.apache.org/activemq-5154-release.html
(2)下載Zip文件
wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz
(2)然后解壓下載的文件
(3)同樣進(jìn)入相對(duì)應(yīng)的目錄,運(yùn)行
./activemq start
(4)然后再訪問(wèn)相同的地址就可以看到啦。(具體看windows安裝步驟)
#ActiveMQ的使用(基于Maven)
首先要再回頭看看JMS中的一些關(guān)鍵接口。
<!--添加activemq的依賴(lài)-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
###情形一:隊(duì)列模型的消息
3. 編寫(xiě)生產(chǎn)者代碼(使用隊(duì)列模型的消息)
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:06 2018/7/14 0014
* @ Description:用于消息的創(chuàng)建類(lèi)
* @ Modified By:
* @Version: $version$
*/
public class MessageProducer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建一個(gè)生產(chǎn)者
javax.jms.MessageProducer producer = session.createProducer(destination);
//創(chuàng)建模擬100個(gè)消息
for (int i = 1 ; i <= 100 ; i++){
TextMessage message = session.createTextMessage("我發(fā)送message:" + i);
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
}
//關(guān)閉連接
connection.close();
}
}
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:30 2018/7/14 0014
* @ Description:消息消費(fèi)者
* @ Modified By:
* @Version: $version$
*/
public class MessageConsumer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建消費(fèi)者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建消費(fèi)的監(jiān)聽(tīng)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("獲取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
查看是否進(jìn)行了消費(fèi)
其實(shí),這就是解釋了,我之前說(shuō)的,隊(duì)列模式的消息,是只會(huì)被一個(gè)消費(fèi)者所使用的,而不會(huì)被共享,這也就是和主題模型的差別哦~~~哈哈
###情形二:主題模型的消息
前面的步驟都一樣,只是生產(chǎn)者和消費(fèi)者的代碼有點(diǎn)區(qū)別:
編寫(xiě)生產(chǎn)者(這個(gè)和隊(duì)列模型其實(shí)很像,稍微修改就可以)
package com.hnu.scw.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:48 2018/7/14 0014
* @ Description:${description}
* @ Modified By:
* @Version: $version$
*/
public class MessageTopicProducer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的主題名稱(chēng)
private static final String TOPIC_NAME = "MyTopicMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createTopic(TOPIC_NAME);
//創(chuàng)建一個(gè)生產(chǎn)者
javax.jms.MessageProducer producer = session.createProducer(destination);
//創(chuàng)建模擬100個(gè)消息
for (int i = 1; i <= 100; i++) {
TextMessage message = session.createTextMessage("當(dāng)前message是(主題模型):" + i);
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
}
//關(guān)閉連接
connection.close();
}
}
package com.hnu.scw.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:50 2018/7/14 0014
* @ Description:${description}
* @ Modified By:
* @Version: $version$
*/
public class MessageTopicConsumer {
//定義ActivMQ的連接地址
private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String TOPIC_NAME = "MyTopicMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createTopic(TOPIC_NAME);
//創(chuàng)建消費(fèi)者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建消費(fèi)的監(jiān)聽(tīng)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("獲取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hnu.scw</groupId>
<artifactId>activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<name>activemq</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<spring.version>4.2.5.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--添加activemq的依賴(lài)-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!--spring整合activemq所需要的依賴(lài)-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd ">
<context:annotation-config />
<!--Activemq的連接工廠-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--spring jms為我們提供的連接池 獲取一個(gè)連接工廠-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 消息目的地 點(diǎn)對(duì)點(diǎn)的模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SpringActiveMQMsg"/>
</bean>
<!-- jms模板 用于進(jìn)行消息發(fā)送-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
</beans>
package com.hnu.scw.spring;
/**
* @ Author :scw
* @ Date :Created in 下午 12:19 2018/7/14 0014
* @ Description:生產(chǎn)者的接口
* @ Modified By:
* @Version: $version$
*/
public interface ProduceService {
void sendMessage(String msg);
}
package com.hnu.scw.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 下午 2:21 2018/7/15 0015
* @ Description:生產(chǎn)者的實(shí)現(xiàn)類(lèi)
* @ Modified By:
* @Version: $version$
*/
public class ProduceServiceImpl implements ProduceService {
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination destination;
/**
* 發(fā)送消息
* @param msg
*/
@Override
public void sendMessage(final String msg) {
jmsTemplate.send(destination , new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(msg);
return textMessage;
}
});
System.out.println("現(xiàn)在發(fā)送的消息為: " + msg);
}
}
<!--注入我們的生產(chǎn)者-->
<bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ Author :scw
* @ Date :Created in 下午 2:27 2018/7/15 0015
* @ Description:生產(chǎn)者的測(cè)試
* @ Modified By:
* @Version: $version$
*/
public class ProducerTest {
public static void main(String[] args){
ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml");
ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class);
//進(jìn)行發(fā)送消息
for (int i = 0; i < 100 ; i++) {
bean.sendMessage("test" + i);
}
//當(dāng)消息發(fā)送完后,關(guān)閉容器
classPathXmlApplicationContext.close();
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd ">
<context:annotation-config />
<!--Activemq的連接工廠-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<!--spring jms為我們提供的連接池 獲取一個(gè)連接工廠-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 消息目的地 點(diǎn)對(duì)點(diǎn)的模式-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="SpringActiveMQMsg"/>
</bean>
<!-- 配置消息監(jiān)聽(tīng)器-->
<bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/>
<!--配置消息容器-->
<bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--配置連接工廠-->
<property name="connectionFactory" ref="connectionFactory"/>
<!--配置監(jiān)聽(tīng)的隊(duì)列-->
<property name="destination" ref="queueDestination"/>
<!--配置消息監(jiān)聽(tīng)器-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
package com.hnu.scw.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @ Author :scw
* @ Date :Created in 下午 3:06 2018/7/15 0015
* @ Description:消息的監(jiān)聽(tīng)者,用于處理消息
* @ Modified By:
* @Version: $version$
*/
public class ComsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接受到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @ Author :scw
* @ Date :Created in 下午 3:13 2018/7/15 0015
* @ Description:消費(fèi)者的測(cè)試
* @ Modified By:
* @Version: $version$
*/
public class ConsumerTest {
public static void main(String[] args){
//啟動(dòng)消費(fèi)者
ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml");
}
}
<!-- 消息目的地 (主題模式)-->
<!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!–配置隊(duì)列模型的消息名稱(chēng)–>
<constructor-arg value="SpringActiveMQMsgTopic"/>
</bean>-->
將上面的代碼替換之前的就可以了。。。
總結(jié):總的來(lái)說(shuō),基于Spring來(lái)使用消息隊(duì)列還是非常方便的,這比我們正常進(jìn)行JMS規(guī)范操作要簡(jiǎn)單很多,畢竟很多對(duì)象都是通過(guò)Spring的IOC進(jìn)行容器管理了,所以,值得推薦使用哦~~~
#ActiveMQ的集群
###為什么要進(jìn)行集群呢?
原因一:實(shí)現(xiàn)高可用:以排除單點(diǎn)故障所引起的服務(wù)終端。
原因二:實(shí)現(xiàn)負(fù)載均衡:以提升效率為更多的客戶進(jìn)行服務(wù)。
###集群的方式有哪些?
方式一:客戶端集群:多個(gè)客戶端消費(fèi)同一個(gè)隊(duì)列。
方式二:Broker clusters:多個(gè)Broker之間同步消息。(實(shí)現(xiàn)負(fù)載均衡)
<networkConnectors>
<networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
</networkConnectors>
<!--修改服務(wù)端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
<networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
<!--并修改下面這個(gè)標(biāo)簽的內(nèi)容 , 作為B和C的共享文件,目錄就是自己之前創(chuàng)建的一個(gè)文件(可以回看上面的整個(gè)結(jié)構(gòu))-->
<persistenceAdapter>
<kahaDB directory="D:\Download\MQJiQun\shareDB"/>
</persistenceAdapter>
(2)修改jetty.xml內(nèi)容,修改服務(wù)器的服務(wù)端口
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
<!--修改服務(wù)端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
<networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
<!--并修改下面這個(gè)標(biāo)簽的內(nèi)容 , 作為B和C的共享文件,目錄就是自己之前創(chuàng)建的一個(gè)文件(可以回看上面的整個(gè)結(jié)構(gòu))-->
<persistenceAdapter>
<kahaDB directory="D:\Download\MQJiQun\shareDB"/>
</persistenceAdapter>
(2)修改jetty.xml中的內(nèi)容
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
步驟:
(1)創(chuàng)建Maven項(xiàng)目
(2)導(dǎo)入依賴(lài)
<!--添加activemq的依賴(lài)-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
(3)編寫(xiě)生產(chǎn)者代碼
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:06 2018/7/14 0014
* @ Description:用于消息的創(chuàng)建類(lèi)
* @ Modified By:
* @Version: $version$
*/
public class MessageProducer {
//通過(guò)集群的方式進(jìn)行消息服務(wù)器的管理(failover就是進(jìn)行動(dòng)態(tài)轉(zhuǎn)移,當(dāng)某個(gè)服務(wù)器宕機(jī),
// 那么就進(jìn)行其他的服務(wù)器選擇,randomize表示隨機(jī)選擇)
private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建一個(gè)生產(chǎn)者
javax.jms.MessageProducer producer = session.createProducer(destination);
//創(chuàng)建模擬100個(gè)消息
for (int i = 1 ; i <= 100 ; i++){
TextMessage message = session.createTextMessage("當(dāng)前message是:" + i);
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
}
//關(guān)閉連接
connection.close();
}
}
(4)編寫(xiě)消費(fèi)者代碼
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @ Author :scw
* @ Date :Created in 上午 11:30 2018/7/14 0014
* @ Description:消息消費(fèi)者
* @ Modified By:
* @Version: $version$
*/
public class MessageConsumer {
//通過(guò)集群的方式進(jìn)行消息服務(wù)器的管理(failover就是進(jìn)行動(dòng)態(tài)轉(zhuǎn)移,當(dāng)某個(gè)服務(wù)器宕機(jī),
// 那么就進(jìn)行其他的服務(wù)器選擇,randomize表示隨機(jī)選擇)
private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
//定義發(fā)送消息的隊(duì)列名稱(chēng)
private static final String QUEUE_NAME = "MyMessage";
public static void main(String[] args) throws JMSException {
//創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
//打開(kāi)連接
connection.start();
//創(chuàng)建會(huì)話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建隊(duì)列目標(biāo)
Destination destination = session.createQueue(QUEUE_NAME);
//創(chuàng)建消費(fèi)者
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建消費(fèi)的監(jiān)聽(tīng)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("獲取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
(5)進(jìn)行查看各自的服務(wù)器的消息隊(duì)列的情況。
#其他的消息中間件
其實(shí),類(lèi)似ActiveMQ這樣的消息中間件,用得比較多的還有就是RabbitMQ和Kafka。它們?nèi)吒髯杂懈髯缘膬?yōu)勢(shì)。大家可以百度進(jìn)行了解,我就不進(jìn)行多說(shuō)了。后面我會(huì)同樣把這兩種消息中間件的使用進(jìn)行詳細(xì)的講解,歡迎大家的關(guān)注哦~總的來(lái)說(shuō),只有適合的場(chǎng)景對(duì)應(yīng)的消息中間件才能發(fā)揮最大的作用,沒(méi)有一種是只有好處而沒(méi)有壞處的~
#總結(jié)
聯(lián)系客服