EMQ(emqttd) 2.x 安装和使用(物联网传输控制协议的Broker)
支持下国产开源。
MQTT物联网传输控制协议:《MQTT-3.1.1-CN.pdf》
下载:emqttd-centos64-v2.0-rc.2-20161019.zip
安装:
$ unzip emqttd-centos64-v2.0-rc.2-20161019.zip -d /data/
$ mv /data/emqttd /data/emqttd-centos64-v2.0-rc.2-20161019
$ ln -s /data/emqttd-centos64-v2.0-rc.2-20161019 /data/emqttd
系统优化配置:
# ulimit -n 1048576
$ sudo sysctl -w fs.file-max=2097152
$ sudo sysctl -w fs.nr_open=2097152
$ sudo sysctl -w net.core.somaxconn=65535
$ sudo sysctl -p
修改配置文件(node.cookie必须每台都要一样,node.name的@后面必须是ip地址或者fqdn方式的主机名):
$ cd /data/emqttd
$ vi etc/emq.conf
## Node name
node.name = emqttd@192.168.60.58
## Cookie for distributed node
node.cookie = emq_dist_cookie_533d99ckd9ji475
## Erlang Process Limit
node.process_limit = 2000000
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 1000000
## Size of acceptor pool
mqtt.listener.tcp.acceptors = 64
## Maximum number of concurrent clients
mqtt.listener.tcp.max_clients = 1000000
## Rate Limit. Format is ‘burst,rate’, Unit is KB/Sec
## mqtt.listener.tcp.rate_limit = 100,10
## TCP Socket Options
mqtt.listener.tcp.backlog = 262144
## Distributed node port range
node.dist_listen_min = 6000
node.dist_listen_max = 6999
## 如果需要启用防火墙,则上面两行去掉注释,注意下面的防火墙端口设置,要打开该段端口。
## Expired after 1 day:
## w – week
## d – day
## h – hour
## m – minute
## s – second
mqtt.session.expired_after = 2w
# 上面为持久会话到期时间,从客户端断开算起,超时后客户端没有收到的消息会丢弃(不想丢失消息的话,该值就要设置的很大)。
## Console log. Enum: off, file, console, both
log.console = both
## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
log.console.level = info
## Console log file
log.console.file = log/console.log
## Error log file
log.error.file = log/error.log
## Enable the crash log. Enum: on, off
log.crash = on
log.crash.file = log/crash.log
修改下启动脚本(在stop处增加一行):
$ vi bin/emqttd
…
stop)
# Wait for the node to completely stop…
PID=”$(relx_get_pid)”
if ! relx_nodetool “stop”; then
exit 1
fi
while $(kill -s 0 “$PID” 2>/dev/null);
do
sleep 1
done
killall epmd
;;
启动:
$ cd emqttd
直接进入控制台模式:
$ ./bin/emqttd console
后台运行模式:
$ ./bin/emqttd start
$ ./bin/emqttd_ctl status
$ ./bin/emqttd stop
开启防火墙:
端口:1883:MQTT协议tcp端口,8883:MQTT(SSL) tcp端口,8083:MTQQ(websocket)、HTTP API端口,18083:dashboard管理控制WEB端口,4369:集群处理epmd端口,6000-6999由上面配置文件定义的epmd需要的端口范围。
sudo firewall-cmd –zone=public –add-port=1883/tcp
sudo firewall-cmd –permanent –zone=public –add-port=1883/tcp
sudo firewall-cmd –zone=public –add-port=8883/tcp
sudo firewall-cmd –permanent –zone=public –add-port=8883/tcp
sudo firewall-cmd –zone=public –add-port=8083/tcp
sudo firewall-cmd –permanent –zone=public –add-port=8083/tcp
sudo firewall-cmd –zone=public –add-port=18083/tcp
sudo firewall-cmd –permanent –zone=public –add-port=18083/tcp
sudo firewall-cmd –zone=public –add-port=4369/tcp
sudo firewall-cmd –permanent –zone=public –add-port=4369/tcp
sudo firewall-cmd –zone=public –add-port=6000-6999/tcp
sudo firewall-cmd –permanent –zone=public –add-port=6000-6999/tcp
sudo firewall-cmd –zone=public –list-all
sudo firewall-cmd –permanent –zone=public –list-all
查看各台的启动状态:
http://192.168.60.55:8083/status
http://192.168.60.55:18083/ 用户名/密码: admin / public
$ vi data/configs/vm.xxx.args
$ vi data/configs/app.xxx.conf
LOG位置:
$ vi log/
把节点加入集群:
在各个节点上执行(重复执行也没关系,其中192.168.60.55这台会提示错误:cannot_join_with_self,这个没关系,自己不用加入自己):
$ ./bin/emqttd_ctl cluster join emqttd@192.168.60.55
$ ./bin/emqttd_ctl cluster status
Cluster status: [{running_nodes,[’emqttd@192.168.60.58′,
’emqttd@192.168.60.56′,
’emqttd@192.168.60.57′,
’emqttd@192.168.60.55′]}]
把节点退出集群:
本机退出集群:
$ ./bin/emqttd_ctl cluster leave
把某节点退出集群:
$ ./bin/emqttd_ctl cluster remove emqttd@192.168.60.56
测试一下:
下载安装MTQQ协议的客户端:https://mosquitto.org/download/
$ sudo rpm -ivh libmosquitto1-1.4.10-1.1.x86_64.rpm
$ sudo rpm -ivh mosquitto-clients-1.4.10-1.1.x86_64.rpm
订阅:
$ mosquitto_sub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -c -i 1111
发布:另开一个ssh或者另外机器上执行:
$ mosquitto_pub -h 192.168.60.57 -p 1883 -t test_topic_1 -q 1 -m “hello mqtt 7”
参数:
-h: 连接到哪台broker。
-p: 连接端口。
-t: topic名字,topic类似文件系统的组织方式,以”/”分隔符来分层,订阅者订阅时可以使用通配符”+”和”#”,发布者不能使用通配符。
-q: qos级别,默认为0,共三个值:0:至多一次,不保证到达订阅者;1:至少一次,保证到达订阅者,但不保证不重复;2:正好一次,保证到达,又保证不重复;非钱类的应用使用qos1就可以了。
-c: 如果订阅者退出,在broker保留所有订阅的消息,一旦重新连接上,则把所有消息发给订阅者,就是持久性订阅(clean_sess=false)。
-i: 设置client id,在即时通信时可以设置为用户id或者用户名等具有唯一性的字段。
-r: 发布方使用的参数,保留消息(每个topic只保留最后一个这种消息,之前的会覆盖 ),即使该消息被一个订阅者读取了,还会一直保留在broker,如果有新的订阅者订阅该topic,则马上会收到该消息,类似qq里面的公告性消息,这类信息的删除,发送一个payload为空的null消息即可:$ mosquitto_pub -h 192.168.60.56 -p 1883 -t test_topic_1 -q 1 -r -n -u user -P 123456。
-m: 该topic的一条消息内容。
加入MariaDB的认证:
修改配置:
$ vi ./etc/emq.conf
## Allow Anonymous authentication
mqtt.allow_anonymous = false
## Default ACL File
mqtt.acl_file = etc/acl.conf
$ vi ./data/loaded_plugins
emq_dashboard.
emq_auth_mysql.
修改plugins配置:
$ vi etc/plugins/emq_auth_mysql.conf
## Mysql Server
auth.mysql.server = 192.168.60.60:3306
## Mysql Pool Size
auth.mysql.pool = 8
## Mysql Username
auth.mysql.username = root
## Mysql Password
auth.mysql.password = 123456
## Mysql Database
auth.mysql.database = mqtt
## Variables: %u = username, %c = clientid
## Authentication Query: select password only
auth.mysql.auth_query = select password from mqtt_user where username = ‘%u’ limit 1
## Password hash: plain, md5, sha, sha256, pbkdf2
auth.mysql.password_hash = plain
## %% Superuser Query
auth.mysql.super_query = select is_superuser from mqtt_user where username = ‘%u’ limit 1
## ACL Query Command
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = ‘%a’ or username = ‘%u’ or username = ‘$all’ or clientid = ‘%c’
## ACL nomatch
auth.mysql.acl_nomatch = deny
往MariaDB插入初始化数据库和表(引擎需要改成InnDB):
认证的用户表也可以共用其他系统的用户表,由上面的emq_auth_mysql.conf来配置:auth.mysql.auth_query、auth.mysql.super_query、auth.mysql.acl_query:
建立mqtt数据库:
$ mysql -uroot -p123456 -hxxxx
MariaDB [test]> create database mqtt;
用户表:
MariaDB [test]> CREATE TABLE `mqtt_user` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`username` varchar(100) DEFAULT NULL,
`password` varchar(100) DEFAULT NULL,
`salt` varchar(20) DEFAULT NULL,
`is_superuser` tinyint(1) DEFAULT 0,
`created` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_username` (`username`)
) DEFAULT CHARSET=utf8;
用户表插入测试数据:
MariaDB [mqtt]> INSERT INTO mqtt_user (id, username, password, salt, is_superuser, created)
VALUES
(1,’superuser’,’123456′,’123456′,True,’2016-10-26 10:00:00′),
(2,’user’,’123456′,’123456′,False,’2016-10-26 10:01:00′);
acl表:
MariaDB [mqtt]> CREATE TABLE `mqtt_acl` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`allow` int(1) DEFAULT NULL COMMENT ‘0: deny, 1: allow’,
`ipaddr` varchar(60) DEFAULT NULL COMMENT ‘IpAddress’,
`username` varchar(100) DEFAULT NULL COMMENT ‘Username’,
`clientid` varchar(100) DEFAULT NULL COMMENT ‘ClientId’,
`access` int(2) NOT NULL COMMENT ‘1: subscribe, 2: publish, 3: pubsub’,
`topic` varchar(100) NOT NULL DEFAULT ” COMMENT ‘Topic Filter’,
PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8;
acl表插入默认数据(注意带$SYS为broker保留的特殊topic用来统计使用,username为$all表示所有在user表中的用户):
MariaDB [mqtt]> INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
VALUES
(1,1,NULL,’$all’,NULL,2,’#’),
(2,1,NULL,’$all’,NULL,1,’#’),
(3,1,NULL,’$all’,NULL,3,’#’),
(4,1,’127.0.0.1′,NULL,NULL,2,’$SYS/#’),
(5,1,’127.0.0.1′,NULL,NULL,2,’#’),
(6,1,NULL,’dashboard’,NULL,1,’$SYS/#’);
测试,重启每台服务器后执行:
$ mosquitto_sub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -c -i 1112 -u user -P 123456
$ mosquitto_pub -h 192.168.60.55 -p 1883 -t test_topic_1 -q 1 -m “hello mqtt 1” -u user -P 123456
重新初始化emq:
$ bin/emqttd stop
$ rm -rf data/mnesia/*
$ rm -rf data/configs/*
$ rm -rf log/*
$ bin/emqttd start
$ bin/emqttd_ctl cluster join emqttd@192.168.60.55
使用Haproxy来实现负载分担:
因为emq的topic和消息在集群的各台服务器上一致,所以数据不能以增加机器的方式扩容,只能增加每台的内存,和客户端的连接则可以以增加机器方式扩容。
$ vi /etc/haproxy/haproxy.cfg
################## EMQ ######################
listen emq_emqttd *:1883
mode tcp
balance source
log global
timeout connect 25s
timeout client 0
timeout server 0
option tcpka
option tcplog
option tcp-check
server emq1 192.168.60.55:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
server emq2 192.168.60.56:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
server emq3 192.168.60.57:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
server emq4 192.168.60.58:1883 check inter 25000 port 1883 rise 2 fall 2 weight 25
开启haproxy上的防火墙:
sudo firewall-cmd –zone=public –add-port=1883/tcp
sudo firewall-cmd –permanent –zone=public –add-port=1883/tcp
sudo firewall-cmd –zone=public –list-all
sudo firewall-cmd –permanent –zone=public –list-all
使用:
broker操作:
显示broker状态:
$ ./bin/emqttd_ctl status
显示broker版本、启用时间等:
$ ./bin/emqttd_ctl broker
显示broker的pubsub进程状态、内存、队列长度、规约数等:
$ ./bin/emqttd_ctl broker pubsub
显示broker的统计信息:客户端连接数、会话数、主题数、订阅数、路由数等:
$ ./bin/emqttd_ctl broker stats
显示broker的测量信息:底层流量、MQTT报文数、消息数等:
$ ./bin/emqttd_ctl broker metrics
Cluster操作:
$ ./bin/emqttd_ctl cluster
cluster join <Node> # Join the cluster
cluster leave # Leave the cluster
cluster remove <Node> # Remove the node from cluster
cluster status # Cluster status
Client操作:
$ ./bin/emqttd_ctl clients
clients list # List all clients
clients show <ClientId> # Show a client
clients kick <ClientId> # Kick out a client
Sessions操作:
$ ./bin/emqttd_ctl sessions
sessions list # List all sessions
sessions list persistent # List all persistent sessions ,桥接Bridge的时候才会用到(即clean_sess=false的类型)
sessions list transient # List all transient sessions
sessions show <ClientId> # Show a session
Routes操作:
$ ./bin/emqttd_ctl routes
routes list # List all routes
routes show <Topic> # Show a route
Topics操作:
$ ./bin/emqttd_ctl topics
topics list # List all topics
topics show <Topic> # Show a topic
Subscription订阅者操作:
$ ./bin/emqttd_ctl subscriptions
subscriptions list # List all subscriptions
subscriptions show <ClientId> # Show subscriptions of a client
subscriptions add <ClientId> <Topic> <QoS> # Add a static subscription manually
subscriptions del <ClientId> # Delete static subscriptions manually
subscriptions del <ClientId> <Topic> # Delete a static subscription manually
Plugins插件操作:
$ ./bin/emqttd_ctl plugins
plugins list # Show loaded plugins
plugins load <Plugin> # Load plugin
plugins unload <Plugin> # Unload plugin
$ ./bin/emqttd_ctl plugins list
Plugin(emq_auth_clientid, version=2.0, description=Authentication with ClientId/Password, active=false)
Plugin(emq_auth_http, version=2.0, description=Authentication/ACL with HTTP API, active=false)
Plugin(emq_auth_ldap, version=2.0, description=Authentication/ACL with LDAP, active=false)
Plugin(emq_auth_mongo, version=2.0, description=Authentication/ACL with MongoDB, active=false)
Plugin(emq_auth_mysql, version=2.0, description=Authentication/ACL with MySQL, active=false)
Plugin(emq_auth_pgsql, version=2.0, description=Authentication/ACL with PostgreSQL, active=false)
Plugin(emq_auth_redis, version=2.0, description=Authentication/ACL with Redis, active=false)
Plugin(emq_auth_username, version=2.0, description=Authentication with Username/Password, active=false)
Plugin(emq_coap, version=0.2, description=CoAP Gateway, active=false)
Plugin(emq_dashboard, version=2.0, description=Dashboard, active=true)
Plugin(emq_mod_rewrite, version=2.0, description=Rewrite Module, active=false)
Plugin(emq_plugin_template, version=2.0, description=EMQ Plugin Template, active=false)
Plugin(emq_recon, version=2.0, description=Recon Plugin, active=false)
Plugin(emq_reloader, version=2.0, description=Reloader Plugin, active=false)
Plugin(emq_sn, version=0.2, description=MQTT-SN Gateway, active=false)
Plugin(emq_stomp, version=2.0, description=Stomp Protocol Plugin, active=false)
Bridges桥接操作:
$ ./bin/emqttd_ctl bridges
bridges list # List bridges
bridges options # Bridge options
bridges start <Node> <Topic> # Start a bridge
bridges start <Node> <Topic> <Options> # Start a bridge with options
bridges stop <Node> <Topic> # Stop a bridge
vm虚机(erlang虚机)性能查看:
$ ./bin/emqttd_ctl vm all
cpu/load1 : 0.05
cpu/load5 : 0.05
cpu/load15 : 0.07
memory/total : 166404232
memory/processes : 32564328
memory/processes_used : 32563952
memory/system : 133839904
memory/atom : 959633
memory/atom_used : 954350
memory/binary : 46088
memory/code : 28250535
memory/ets : 5741416
process/limit : 2097152
process/count : 288
io/max_fds : 1000000
io/active_fds : 1
ports/count : 25
ports/limit : 1048576
端口使用情况查看:
$ ./bin/emqttd_ctl listeners
mnesia数据库信息查看:
$ bin/emqttd_ctl mnesia
管理dashboard用户:
$ ./bin/emqttd_ctl admins
admins add <Username> <Password> <Tags> # Add dashboard user
admins passwd <Username> <Password> # Reset dashboard user password
admins del <Username> # Delete dashboard user
追踪 ,EMQ消息服务器支持追踪来自某个客户端(Client)的全部报文,或者发布到某个主题(Topic)的全部消息。
追踪客户端(Client):
./bin/emqttd_ctl trace client “clientid(mosqsub/23058-vm6)” “trace_clientid.log”
追踪主题(Topic):
./bin/emqttd_ctl trace topic “topic1” “trace_topic.log”
查询追踪:
./bin/emqttd_ctl trace list
停止追踪:
./bin/emqttd_ctl trace client “clientid(mosqsub/23058-vm6)” off
./bin/emqttd_ctl trace topic “topic1” off
钩子(Hook)扩展,EMQ消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook):
钩子 说明
client.connected 客户端上线
client.subscribe 客户端订阅主题前
client.unsubscribe 客户端取消订阅主题
session.subscribed 客户端订阅主题后
session.unsubscribed 客户端取消订阅主题后
message.publish MQTT消息发布
message.delivered MQTT消息送达
message.acked MQTT消息回执
client.disconnected 客户端连接断开
钩子使用例子:
-module(emqttd_plugin_template).
-export([load/1, unload/0]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
load(Env) ->
emqttd:hook(‘message.publish’, fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook(‘message.delivered’, fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook(‘message.acked’, fun ?MODULE:on_message_acked/4, [Env]).
on_message_publish(Message, _Env) ->
io:format(“publish ~s~n”, [emqttd_message:format(Message)]),
{ok, Message}.
on_message_delivered(ClientId, _Username, Message, _Env) ->
io:format(“delivered to client ~s: ~s~n”, [ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, _Username, Message, _Env) ->
io:format(“client ~s acked: ~s~n”, [ClientId, emqttd_message:format(Message)]),
{ok, Message}.
unload() ->
emqttd:unhook(‘message.publish’, fun ?MODULE:on_message_publish/2),
emqttd:unhook(‘message.acked’, fun ?MODULE:on_message_acked/4),
emqttd:unhook(‘message.delivered’, fun ?MODULE:on_message_delivered/4).
MQTT协议的客户端库:
做IM即时通信协议时的考虑:
MQTT作为发布、订阅系统,作为消息推送是很好的。
如果作为IM即时通信,则可以考虑把每个用户id做成一个topic,每个用户订阅名为自己id的topic(比如”USER/1111″),发送方如果需要发布消息给某一个用户,则发布该用户id的topic消息,对方自然就会收到。另外每个用户还要订阅几个系统类的topic(比如”ADMIN/broadcast”,”ADMIN/1111″),以便后台系统发布各类消息。至于群组消息,每加入一个群,则增加订阅一个名为群id的topic(比如”GROUP/1111″)。