others linux服务器运维 django3 监控 k8s golang 数据库 大数据 前端 devops 理论基础 java oracle 运维日志

activemq 教程 (基于python客户端)

访问量:1467 创建时间:2022-01-12

官方文档地址:https://activemq.apache.org/using-activemq

不同语言客户端:https://activemq.apache.org/cross-language-clients.html

安装

环境准备,centos7,jdk1.8

[root@oracle57 ~]# cd /export/
[root@oracle57 export]# ll
total 306136
-rw-r--r-- 1 root root  54282929 Jan 12 07:58 apache-activemq-5.14.4-bin.tar.gz
-rw-r--r-- 1 root root  65041245 Jan 12 07:59 apache-activemq-5.16.3-bin.tar.gz
-rw-r--r-- 1 root root 194151339 Dec 30  2019 jdk-8u231-linux-x64.tar.gz

jdk环境配置

[root@oracle57 export]# tar xf jdk-8u231-linux-x64.tar.gz
[root@oracle57 jdk1.8.0_231]# vim /etc/profile
export JAVA_HOME=/export/jdk1.8.0_231
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
[root@oracle57 jdk1.8.0_231]# source /etc/profile
[root@oracle57 jdk1.8.0_231]# java -version
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode)

activemq 安装与启动

[root@oracle57 jdk1.8.0_231]# cd /export/
[root@oracle57 export]# tar xf apache-activemq-5.16.3-bin.tar.gz 
[root@oracle57 export]# ln -s  apache-activemq-5.16.3  activemq
###启动
[root@oracle57 activemq]# ./bin/activemq start
INFO: Loading '/export/apache-activemq-5.16.3//bin/env'
INFO: Using java '/export/jdk1.8.0_231/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/export/apache-activemq-5.16.3//data/activemq.pid' (pid '21046')
[root@oracle57 activemq]# ./bin/activemq status
INFO: Loading '/export/apache-activemq-5.16.3/bin/env'
INFO: Using java '/export/jdk1.8.0_231/bin/java'
ActiveMQ is running (pid '21046')
###查看amq的默认监听端口
netstat -nl|grep 61616

配置activemq 的WebConsole网页管理控制台端口与账号

#修改host的值,配置控制台监听地址
[root@oracle57 activemq]# vim conf/jetty.xml
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8161"/>

#根据自己的需求设置账号密码,格式  用户名:密码,rolename
[root@oracle57 activemq]# vim conf/jetty-realm.properties 
admin: admin, admin
user: user, user

#重启服务
[root@oracle57 activemq]# ./bin/activemq restart

访问地址http://192.168.0.57:8161/ 默认账号密码admin,admin, 登陆后Topics里面有一个默认的topic -- ActiveMQ.Advisory.MasterBroker.

Home是当前的欢迎页,Queues是点到点形式,Topics是发布订阅模式,Subscribers消息订阅监控查询,Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接,Network当前网络的链接状态,Scheduled计划任务,Send可以测试发送消息。

在web上测试:Navigate to “Queues”;Add a queue name and click create;Send test message by klicking on “Send to

介绍

Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

点对点与发布订阅最初是由JMS定义的。这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)

JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。

ActiveMQ的Broker相当于一个ActiveMQ服务器实例. 一个运行的amq实例。

消息形式
点对点(queue)消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
多对多publish/subscribe(topic)支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息,在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

支持多种传输协议:The AUTO Transport,The VM Transport,The AMQP Transport,The MQTT Transport,The TCP Transport,The NIO Transport,The SSL Transport,The NIO SSL Transport,The Peer Transport等

python测试

安装客户端包stomp.py,python3支持,python2不支持

[root@oracle57 ~]# pip3 install stomp.py

发送queue消息

[root@oracle57 ~]# vim test.py
# -*- coding: utf-8 -*-
import stomp
import sys

#stomp.Connection() = stomp.Connection11().  
#stomp.Connection()默认是connect.StompConnection11(协议1.1),还可以可以选择1.0、1.2。 
#conn = stomp.Connection() 参数为空,默认连接本机的61613端口,不同协议端口不同
conn = stomp.Connection([('127.0.0.1', 61613)])
conn.connect('admin', 'password', wait=True)
conn.send(body=' 1212'.join(sys.argv[1:]), destination='test')
conn.disconnect()

[root@oracle57 ~]# python3 test.py 

发送与接收消息

###测试queue将topic改为queue.
[root@oracle57 ~]# vim test1.py 
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'password', wait=True)
conn.subscribe(destination='/topic/test', id=1, ack='auto')
conn.send(body=' '.join(sys.argv[1:]), destination='/topic/test')
time.sleep(2)
conn.disconnect()

[root@oracle57 ~]# python3 test1.py hello
received a message "hello"

参考文档:https://www.cnblogs.com/biehongli/p/11522793.html

登陆评论: 使用GITHUB登陆