ActiveMQ 设置远程监控JMX
1.需求:每次都要远程登录127.0.0.1:8161去查看Queues、Topics、Subscribers等一些数据信息 比较麻烦 因此想通过监听ActiveMQ去获取到这些信息发布在自己的平台上面 更加方便
2.修改配置 /activemq/conf/activemq.xml (具体配置路径已自己实际安装的为准) 找到 标签 修改如下:
3. 找到 标签修改如下
4.修改/bin/env文件 添加行:
ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS -Djava.rmi.server.hostname=1.1.1.1(对外暴露ip) -Dcom.sun.management.jmxremote.port=11099 -Dcom.sun.management.jmxremote.rmi.port=11099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false "
5.重启ActiveMq
6.监听JMX的代码如下:
package com.example;import com.alibaba.fastjson.JSON;import org.apache.activemq.broker.jmx.BrokerViewMBean;import org.apache.activemq.broker.jmx.QueueViewMBean;import org.apache.activemq.broker.jmx.SubscriptionViewMBean;import org.apache.activemq.broker.jmx.TopicViewMBean;import javax.management.MBeanServerConnection;import javax.management.MBeanServerInvocationHandler;import javax.management.ObjectName;import javax.management.openmbean.TabularData;import javax.management.remote.JMXConnector;import javax.management.remote.JMXConnectorFactory;import javax.management.remote.JMXServiceURL;public class Test3{ public static void main(String[] args) throws Exception { String url = "service:jmx:rmi:///jndi/rmi://127.0.0.1:11099/jmxrmi"; JMXServiceURL urls = new JMXServiceURL(url); JMXConnector connector = JMXConnectorFactory.connect(urls, null); connector.connect(); MBeanServerConnection conn = connector.getMBeanServerConnection(); //队列(这里brokerName的b要小些,大写会报错) ObjectName name = new ObjectName("myDomain:brokerName=localhost,type=Broker"); BrokerViewMBean mBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance (conn, name, BrokerViewMBean.class, true); for (ObjectName na : mBean.getQueues()) { QueueViewMBean queueBean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, na, QueueViewMBean.class, true); TabularData tabularData = queueBean.browseAsTable(queueBean.getName()); System.err.println(JSON.toJSONString(tabularData)); System.out.println("******************************"); System.out.println("队列的名称:" + queueBean.getName()); System.out.println("队列中剩余的消息数:" + queueBean.getQueueSize()); System.out.println("消费者数:" + queueBean.getConsumerCount()); System.out.println("消息入队的数量:" + queueBean.getEnqueueCount()); System.out.println("消息出队的数量:" + queueBean.getDequeueCount()); } //话题 ObjectName name2 = new ObjectName("myDomain:brokerName=localhost,type=Broker"); BrokerViewMBean mBean2 = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance (conn, name2, BrokerViewMBean.class, true); mBean2.getTemporaryQueueSubscribers(); for (ObjectName na : mBean2.getTopics()) { TopicViewMBean queueBean = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, na, TopicViewMBean.class, true); System.out.println("******************************"); System.out.println("Topic队列的名称:" + queueBean.getName()); System.out.println("Topic消费者数量:" + queueBean.getConsumerCount()); System.out.println("Topic消息入队的数量:" + queueBean.getEnqueueCount()); System.out.println("Topic消息出队的数量:" + queueBean.getDequeueCount()); } //离线持久话题订阅者 ObjectName name5 = new ObjectName("myDomain:brokerName=localhost,type=Broker"); BrokerViewMBean mBean5 = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance (conn, name5, BrokerViewMBean.class, true); for (ObjectName na : mBean5.getInactiveDurableTopicSubscribers()) { SubscriptionViewMBean queueBean = (SubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, na, SubscriptionViewMBean.class, true); System.out.println("离线持久话题订阅者******************************"); System.out.println("客户编号 :" + queueBean.getClientId()); System.out.println("订阅名称 :" + queueBean.getSubscriptionName()); System.out.println("连接 ID :" + queueBean.getConnectionId()); System.out.println("目的地 :" + queueBean.getDestinationName()); System.out.println("选择器:" + queueBean.getSelector()); System.out.println("待处理队列大小 :" + queueBean.getPendingQueueSize()); System.out.println("调度队列大小:" + queueBean.getDispatchedQueueSize()); System.out.println("调度柜台 :" + queueBean.getDispatchedCounter()); System.out.println("排队计数器:" + queueBean.getEnqueueCounter()); System.out.println("出队计数器 :" + queueBean.getDequeueCounter()); } //活跃持久话题订阅者 ObjectName name4 = new ObjectName("myDomain:brokerName=localhost,type=Broker"); BrokerViewMBean mBean4 = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance (conn, name4, BrokerViewMBean.class, true); for (ObjectName na : mBean4.getDurableTopicSubscribers()) { SubscriptionViewMBean queueBean = (SubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, na, SubscriptionViewMBean.class, true); System.out.println("活跃持久话题订阅者******************************"); System.out.println("客户编号 :" + queueBean.getClientId()); System.out.println("订阅名称 :" + queueBean.getSubscriptionName()); System.out.println("连接 ID :" + queueBean.getConnectionId()); System.out.println("目的地 :" + queueBean.getDestinationName()); System.out.println("选择器:" + queueBean.getSelector()); System.out.println("待处理队列大小 :" + queueBean.getPendingQueueSize()); System.out.println("调度队列大小:" + queueBean.getDispatchedQueueSize()); System.out.println("调度柜台 :" + queueBean.getDispatchedCounter()); System.out.println("排队计数器:" + queueBean.getEnqueueCounter()); System.out.println("出队计数器 :" + queueBean.getDequeueCounter()); } //活跃的非持久主题订阅者 ObjectName name3 = new ObjectName("myDomain:brokerName=localhost,type=Broker"); BrokerViewMBean mBean3 = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance (conn, name3, BrokerViewMBean.class, true); for (ObjectName na : mBean3.getTopicSubscribers()) { SubscriptionViewMBean queueBean = (SubscriptionViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, na, SubscriptionViewMBean.class, true); System.out.println("活跃的非持久主题订阅者******************************"); System.out.println("客户编号 :" + queueBean.getClientId()); System.out.println("订阅名称 :" + queueBean.getSubscriptionName()); System.out.println("连接 ID :" + queueBean.getConnectionId()); System.out.println("目的地 :" + queueBean.getDestinationName()); System.out.println("选择器:" + queueBean.getSelector()); System.out.println("待处理队列大小 :" + queueBean.getPendingQueueSize()); System.out.println("调度队列大小:" + queueBean.getDispatchedQueueSize()); System.out.println("调度柜台 :" + queueBean.getDispatchedCounter()); System.out.println("排队计数器:" + queueBean.getEnqueueCounter()); System.out.println("出队计数器 :" + queueBean.getDequeueCounter()); } }}
7.监听结果如下: