1。 首先先引入相關(guān)的lib包,重點需引用activemq-client-5.8.0.jar,activemq-core-5.7.0.jar,activemq-pool-5.8.0.jar,activemq-protobuf-1.1.jar等包,其他包
自行配置。
2。 一些公共工具類的代碼:
JMSProducer.java
- package com.ffcs.icity.jms;
-
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.ExceptionListener;
- import javax.jms.JMSException;
- import javax.jms.MapMessage;
- import javax.jms.Message;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.pool.PooledConnectionFactory;
-
- /**
- * JMS消息生產(chǎn)者
- * @author linwei
- *
- */
- public class JMSProducer implements ExceptionListener{
-
- //設(shè)置連接的最大連接數(shù)
- public final static int DEFAULT_MAX_CONNECTIONS=5;
- private int maxConnections = DEFAULT_MAX_CONNECTIONS;
- //設(shè)置每個連接中使用的最大活動會話數(shù)
- private int maximumActiveSessionPerConnection = DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION;
- public final static int DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION=300;
- //線程池數(shù)量
- private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;
- public final static int DEFAULT_THREAD_POOL_SIZE=50;
- //強制使用同步返回數(shù)據(jù)的格式
- private boolean useAsyncSendForJMS = DEFAULT_USE_ASYNC_SEND_FOR_JMS;
- public final static boolean DEFAULT_USE_ASYNC_SEND_FOR_JMS=true;
- //是否持久化消息
- private boolean isPersistent = DEFAULT_IS_PERSISTENT;
- public final static boolean DEFAULT_IS_PERSISTENT=true;
-
- //連接地址
- private String brokerUrl;
-
- private String userName;
-
- private String password;
-
- private ExecutorService threadPool;
-
- private PooledConnectionFactory connectionFactory;
-
- public JMSProducer(String brokerUrl, String userName, String password) {
- this(brokerUrl, userName, password, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION, DEFAULT_THREAD_POOL_SIZE, DEFAULT_USE_ASYNC_SEND_FOR_JMS, DEFAULT_IS_PERSISTENT);
- }
-
- public JMSProducer(String brokerUrl, String userName, String password, int maxConnections, int maximumActiveSessionPerConnection, int threadPoolSize,boolean useAsyncSendForJMS, boolean isPersistent) {
- this.useAsyncSendForJMS = useAsyncSendForJMS;
- this.isPersistent = isPersistent;
- this.brokerUrl = brokerUrl;
- this.userName = userName;
- this.password = password;
- this.maxConnections = maxConnections;
- this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
- this.threadPoolSize = threadPoolSize;
- init();
- }
-
- private void init() {
- //設(shè)置JAVA線程池
- this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize);
- //ActiveMQ的連接工廠
- ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);
- actualConnectionFactory.setUseAsyncSend(this.useAsyncSendForJMS);
- //Active中的連接池工廠
- this.connectionFactory = new PooledConnectionFactory(actualConnectionFactory);
- this.connectionFactory.setCreateConnectionOnStartup(true);
- this.connectionFactory.setMaxConnections(this.maxConnections);
- this.connectionFactory.setMaximumActiveSessionPerConnection(this.maximumActiveSessionPerConnection);
- }
-
-
- /**
- * 執(zhí)行發(fā)送消息的具體方法
- * @param queue
- * @param map
- */
- public void send(final String queue, final Map<String, Object> map) {
- //直接使用線程池來執(zhí)行具體的調(diào)用
- this.threadPool.execute(new Runnable(){
- @Override
- public void run() {
- try {
- sendMsg(queue,map);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- /**
- * 真正的執(zhí)行消息發(fā)送
- * @param queue
- * @param map
- * @throws Exception
- */
- private void sendMsg(String queue, Map<String, Object> map) throws Exception {
-
- Connection connection = null;
- Session session = null;
- try {
- //從連接池工廠中獲取一個連接
- connection = this.connectionFactory.createConnection();
- /*createSession(boolean transacted,int acknowledgeMode)
- transacted - indicates whether the session is transacted acknowledgeMode - indicates whether the consumer or the client
- will acknowledge any messages it receives; ignored if the session is transacted.
- Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE.
- */
- //false 參數(shù)表示 為非事務型消息,后面的參數(shù)表示消息的確認類型
- session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
- //Destination is superinterface of Queue
- //PTP消息方式
- Destination destination = session.createQueue(queue);
- //Creates a MessageProducer to send messages to the specified destination
- MessageProducer producer = session.createProducer(destination);
- //set delevery mode
- producer.setDeliveryMode(this.isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- //map convert to javax message
- Message message = getMessage(session, map);
- producer.send(message);
- } finally {
- closeSession(session);
- closeConnection(connection);
- }
- }
-
- private Message getMessage(Session session, Map<String, Object> map) throws JMSException {
- MapMessage message = session.createMapMessage();
- if (map != null && !map.isEmpty()) {
- Set<String> keys = map.keySet();
- for (String key : keys) {
- message.setObject(key, map.get(key));
- }
- }
- return message;
- }
-
- private void closeSession(Session session) {
- try {
- if (session != null) {
- session.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void closeConnection(Connection connection) {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onException(JMSException e) {
- e.printStackTrace();
- }
-
- }
JMSConsumer.java
MessageHandler.java
- package com.ffcs.icity.jms;
-
- import javax.jms.Message;
-
-
- /**
- * 提供消息操作的回調(diào)接口
- * @author linwei
- *
- */
- public interface MessageHandler {
-
-
- /**
- * 消息回調(diào)提供的調(diào)用方法
- * @param message
- */
- public void handle(Message message);
- }
MultiThreadMessageListener.java
- package com.ffcs.icity.jms;
-
- import java.util.concurrent.ExecutorService;
-
- import javax.jms.Message;
- import javax.jms.MessageListener;
-
-
- /**
- * 消息消費者中使用的多線程消息監(jiān)聽服務
- * @author linwei
- *
- */
- public class MultiThreadMessageListener implements MessageListener {
-
- //默認線程池數(shù)量
- public final static int DEFAULT_HANDLE_THREAD_POOL=10;
- //最大的處理線程數(shù).
- private int maxHandleThreads;
- //提供消息回調(diào)調(diào)用接口
- private MessageHandler messageHandler;
-
- private ExecutorService handleThreadPool;
-
-
- public MultiThreadMessageListener(MessageHandler messageHandler){
- this(DEFAULT_HANDLE_THREAD_POOL, messageHandler);
- }
-
- public MultiThreadMessageListener(int maxHandleThreads,MessageHandler messageHandler){
- this.maxHandleThreads=maxHandleThreads;
- this.messageHandler=messageHandler;
- //支持阻塞的固定大小的線程池(自行手動創(chuàng)建的)
- this.handleThreadPool = new FixedAndBlockedThreadPoolExecutor(this.maxHandleThreads);
- }
-
-
- /**
- * 監(jiān)聽程序中自動調(diào)用的方法
- */
- @Override
- public void onMessage(final Message message) {
- //使用支持阻塞的固定大小的線程池來執(zhí)行操作
- this.handleThreadPool.execute(new Runnable() {
- public void run() {
- try {
- MultiThreadMessageListener.this.messageHandler.handle(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- }
FixedAndBlockedThreadPoolExecutor.java
- package com.ffcs.icity.jms;
-
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
-
-
- /**
- * 支持阻塞的固定大小的線程池
- * @author linwei
- *
- */
- public class FixedAndBlockedThreadPoolExecutor extends ThreadPoolExecutor {
-
-
- //一個可重入的互斥鎖 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監(jiān)視器鎖相同的一些基本行為和語義,但功能更強大。
- //使用 lock 塊來調(diào)用 try,在之前/之后的構(gòu)造中
- private ReentrantLock lock = new ReentrantLock();
-
- private Condition condition = this.lock.newCondition();
-
- public FixedAndBlockedThreadPoolExecutor(int size) {
- super(size, size, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- }
-
-
- /**
- * 當線程池中沒有空閑線程時,會掛起此方法的調(diào)用線程.直到線程池中有線程有空閑線程.
- */
- @Override
- public void execute(Runnable command) {
- //進行同步鎖定
- this.lock.lock();
- super.execute(command);
- try {
- //如果線程池的數(shù)量已經(jīng)達到最大線程池的數(shù)量,則進行掛起操作
- if (getPoolSize() == getMaximumPoolSize()) {
- this.condition.await();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- this.lock.unlock();
- }
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- try {
- this.lock.lock();
- this.condition.signal();
- } finally {
- this.lock.unlock();
- }
- }
-
-
- }
3. 調(diào)用例子說明:
生產(chǎn)者調(diào)用代碼,JMSProducerTest.java
- package com.ffcs.icity.test;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import com.ffcs.icity.jms.JMSProducer;
-
- public class JMSProducerTest {
-
-
- public static void main(String[] args) {
-
- locationTest();
- System.out.println("over.");
- }
-
- private static void locationTest() {
- //** JMSProducer 可以設(shè)置成全局的靜態(tài)變量,只需實例化一次即可使用,禁止循環(huán)重復實例化JMSProducer(因為其內(nèi)部存在一個線程池)
-
- //支持openwire協(xié)議的默認連接為 tcp://localhost:61616,支持 stomp協(xié)議的默認連接為tcp://localhost:61613。
- //tcp和nio的區(qū)別
- //nio://localhost:61617 以及 tcp://localhost:61616均可在 activemq.xml配置文件中進行配置
- JMSProducer producer = new JMSProducer("nio://localhost:61617", "system", "manager");
- Map<String, Object> map = new HashMap<String, Object>();
- map.put("id", "1");
- map.put("name", "sss1113333");
- map.put("password", "password");
- producer.send("test", map);
- }
-
- }
消費者調(diào)用代碼,JMSConsumerTest.java
- package com.ffcs.icity.test;
-
- import javax.jms.MapMessage;
- import javax.jms.Message;
-
- import com.ffcs.icity.jms.JMSConsumer;
- import com.ffcs.icity.jms.MessageHandler;
- import com.ffcs.icity.jms.MultiThreadMessageListener;
-
- public class JMSConsumerTest {
-
-
- public static void main(String[] args) throws Exception {
-
- //** JMSConsumer 可以設(shè)置成全局的靜態(tài)變量,只需實例化一次即可使用,禁止循環(huán)重復實例化JMSConsumer(因為其內(nèi)部存在一個線程池)
-
- JMSConsumer consumer = new JMSConsumer();
- consumer.setBrokerUrl("tcp://localhost:61616");
- consumer.setQueue("test");
- consumer.setUserName("system");
- consumer.setPassword("manager");
- consumer.setQueuePrefetch(500);
- consumer.setMessageListener(new MultiThreadMessageListener(50,new MessageHandler() {
- public void handle(Message message) {
- try {
- System.out.println("name is " + ((MapMessage)message).getString("name"));
- Thread.sleep(5000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }));
- consumer.start();
-
- // Thread.sleep(5000);
- // consumer.shutdown();
-
- }
-
-
- }
本站僅提供存儲服務,所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請
點擊舉報。