-
StackOverflow 文档
-
java-ee 教程
-
Java 消息传递服务(JMS)
-
使用 ActiveMQ 库进行消息传递(activemq jms 提供程序特定实现)
设置 ActiveMQ
- 从 activemq.apache.org 下载 ActiveMQ 发行版并在某处解压缩
- 你可以使用脚本 bin / activemq 立即启动服务器,在 localhost 上运行不安全
- 当它运行时,你可以在 http:// localhost:8161 / admin / 上访问本地服务器的控制台
- 通过修改 conf / activemq.xml 来配置它
- 正如标题所示,以下示例用户 activemq jms 提供程序特定的实现,因此需要将 activemq-all.jar 添加到类路径中。
通过独立客户端发送消息
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsClientMessageSender {
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ-specific
Connection con = null;
try {
con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); // non-transacted session
Queue queue = session.createQueue("test.queue"); // only specifies queue name
MessageProducer producer = session.createProducer(queue);
Message msg = session.createTextMessage("hello queue"); // text message
producer.send(msg);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close(); // free all resources
} catch (JMSException e) { /* Ignore */ }
}
}
}
}
轮询消息
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsClientMessagePoller {
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ-specific
Connection con = null;
try {
con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); // non-transacted session
Queue queue = session.createQueue("test.queue"); // only specifies queue name
MessageConsumer consumer = session.createConsumer(queue);
con.start(); // start the connection
while (true) { // run forever
Message msg = consumer.receive(); // blocking!
if (!(msg instanceof TextMessage))
throw new RuntimeException("Expected a TextMessage");
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText()); // print message content
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
con.close();
} catch (JMSException e) {/* Ignore */ }
}
}
}
使用 MessageListener
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsClientMessageListener {
public static void main(String[] args) {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQ-specific
Connection con = null;
try {
con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); // non-transacted session
Queue queue = session.createQueue("test.queue"); // only specifies queue name
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
if (!(msg instanceof TextMessage))
throw new RuntimeException("no text message");
TextMessage tm = (TextMessage) msg;
System.out.println(tm.getText()); // print message
} catch (JMSException e) {
System.err.println("Error reading message");
}
}
});
con.start(); // start the connection
Thread.sleep(60 * 1000); // receive messages for 60s
} catch (JMSException e1) {
e1.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
con.close(); // free all resources
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}