Merge remote-tracking branch 'origin/dev/v4.3.0' into umbrella-fix-build

This commit is contained in:
Zaiming Shi 2020-12-11 08:55:57 +01:00
commit 7bdd093ef5
93 changed files with 684 additions and 447 deletions

2
.gitignore vendored
View File

@ -35,4 +35,4 @@ Mnesia.*/
_checkouts
rebar.config.rendered
/rebar3
rebar.lock
rebar.lock

1
.tool-versions Normal file
View File

@ -0,0 +1 @@
erlang 22.3.4.13

View File

@ -1,4 +1,4 @@
REBAR_VERSION = 3.14.3-emqx-1
REBAR_VERSION = 3.14.3-emqx-2
REBAR = ./rebar3
PROFILE ?= emqx

View File

@ -119,7 +119,7 @@ translate_env() ->
#{host := Host0,
port := Port,
path := Path} = uri_string:parse(list_to_binary(URL)),
{ok, Host} = inet:parse_address(binary_to_list(Host0)),
Host = get_addr(binary_to_list(Host0)),
[{Name, {Host, Port, binary_to_list(Path)}} | Acc]
end
end, [], [acl_req, auth_req, super_req]),
@ -145,3 +145,16 @@ same_host_and_port([{_, {Host, Port, _}}, URL = {_, {Host, Port, _}} | Rest]) ->
same_host_and_port([URL | Rest]);
same_host_and_port(_) ->
false.
get_addr(Hostname) ->
case inet:parse_address(Hostname) of
{ok, {_,_,_,_} = Addr} -> Addr;
{ok, {_,_,_,_,_,_,_,_} = Addr} -> Addr;
{error, einval} ->
case inet:getaddr(Hostname, inet) of
{error, _} ->
{ok, Addr} = inet:getaddr(Hostname, inet6),
Addr;
{ok, Addr} -> Addr
end
end.

View File

@ -1,10 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -0,0 +1,26 @@
version: '3'
services:
erlang:
image: erlang:22.1
volumes:
- ../:/emqx_auth_ldap
networks:
- emqx_bridge
depends_on:
- ldap_server
tty: true
ldap_server:
build: ./emqx-ldap
image: emqx-ldap:1.0
restart: always
ports:
- 389:389
- 636:636
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,26 @@
FROM buildpack-deps:stretch
ENV VERSION=2.4.50
RUN apt-get update && apt-get install -y groff groff-base
RUN wget ftp://ftp.openldap.org/pub/OpenLDAP/openldap-release/openldap-${VERSION}.tgz \
&& gunzip -c openldap-${VERSION}.tgz | tar xvfB - \
&& cd openldap-${VERSION} \
&& ./configure && make depend && make && make install \
&& cd .. && rm -rf openldap-${VERSION}
COPY ./slapd.conf /usr/local/etc/openldap/slapd.conf
COPY ./emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
COPY ./emqx.schema /usr/local/etc/openldap/schema/emqx.schema
COPY ./*.pem /usr/local/etc/openldap/
RUN mkdir -p /usr/local/etc/openldap/data \
&& slapadd -l /usr/local/etc/openldap/schema/emqx.io.ldif -f /usr/local/etc/openldap/slapd.conf
WORKDIR /usr/local/etc/openldap
EXPOSE 389 636
ENTRYPOINT ["/usr/local/libexec/slapd", "-h", "ldap:/// ldaps:///", "-d", "3", "-f", "/usr/local/etc/openldap/slapd.conf"]
CMD []

View File

@ -0,0 +1,16 @@
include /usr/local/etc/openldap/schema/core.schema
include /usr/local/etc/openldap/schema/cosine.schema
include /usr/local/etc/openldap/schema/inetorgperson.schema
include /usr/local/etc/openldap/schema/ppolicy.schema
include /usr/local/etc/openldap/schema/emqx.schema
TLSCACertificateFile /usr/local/etc/openldap/cacert.pem
TLSCertificateFile /usr/local/etc/openldap/cert.pem
TLSCertificateKeyFile /usr/local/etc/openldap/key.pem
database bdb
suffix "dc=emqx,dc=io"
rootdn "cn=root,dc=emqx,dc=io"
rootpw {SSHA}eoF7NhNrejVYYyGHqnt+MdKNBh4r1w3W
directory /usr/local/etc/openldap/data

View File

@ -63,10 +63,8 @@ check(ClientInfo = #{ clientid := Clientid
emqx_metrics:inc(?AUTH_METRICS(ignore)),
ok;
List ->
case [ Hash || <<Salt:4/binary, Hash/binary>> <- lists:sort(fun emqx_auth_mnesia_cli:comparing/2, List),
Hash =:= hash(NPassword, Salt, HashType)
] of
[] ->
case match_password(NPassword, HashType, List) of
false ->
?LOG(error, "[Mnesia] Auth from mnesia failed: ~p", [ClientInfo]),
emqx_metrics:inc(?AUTH_METRICS(failure)),
{stop, AuthResult#{anonymous => false, auth_result => password_error}};
@ -78,7 +76,34 @@ check(ClientInfo = #{ clientid := Clientid
description() -> "Authentication with Mnesia".
match_password(Password, HashType, HashList) ->
lists:any(
fun(Secret) ->
case is_salt_hash(Secret, HashType) of
true ->
<<Salt:4/binary, Hash/binary>> = Secret,
Hash =:= hash(Password, Salt, HashType);
_ ->
Secret =:= hash(Password, HashType)
end
end, HashList).
hash(undefined, HashType) ->
hash(<<>>, HashType);
hash(Password, HashType) ->
emqx_passwd:hash(HashType, Password).
hash(undefined, SaltBin, HashType) ->
hash(<<>>, SaltBin, HashType);
hash(Password, SaltBin, HashType) ->
emqx_passwd:hash(HashType, <<SaltBin/binary, Password/binary>>).
is_salt_hash(_, plain) ->
true;
is_salt_hash(Secret, HashType) ->
not (byte_size(Secret) == len(HashType)).
len(md5) -> 32;
len(sha) -> 40;
len(sha256) -> 64;
len(sha512) -> 128.

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -0,0 +1,30 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_pgsql
networks:
- emqx_bridge
depends_on:
- pgsql_server
tty: true
pgsql_server:
build:
context: ./pgsql
args:
BUILD_FROM: postgres:${PGSQL_TAG}
image: emqx-pgsql
restart: always
environment:
POSTGRES_PASSWORD: public
POSTGRES_USER: root
POSTGRES_DB: mqtt
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,8 @@
ARG BUILD_FROM=postgres:11
FROM ${BUILD_FROM}
COPY pg.conf /etc/postgresql/postgresql.conf
COPY server-cert.pem /etc/postgresql/server-cert.pem
COPY server-key.pem /etc/postgresql/server-key.pem
RUN chown -R postgres:postgres /etc/postgresql \
&& chmod 600 /etc/postgresql/*.pem
CMD ["-c", "config_file=/etc/postgresql/postgresql.conf"]

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -0,0 +1,39 @@
version: '2.4'
# network configuration is limited in version 3
# https://github.com/docker/compose/issues/4958
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- app_net
depends_on:
- redis_cluster
tty: true
redis_cluster:
image: redis:6.0.9
container_name: redis-cluster
volumes:
- ../test/emqx_auth_redis_SUITE_data/certs:/tls
- ./redis/:/data/conf
command: bash -c "/bin/bash /data/conf/redis.sh -t && while true; do echo 1; sleep 1; done"
networks:
app_net:
# Assign a public address. Erlang container cannot find cluster nodes by network-scoped alias (redis_cluster).
ipv4_address: 172.16.239.10
ipv6_address: 2001:3200:3200::20
networks:
app_net:
driver: bridge
enable_ipv6: true
ipam:
driver: default
config:
- subnet: 172.16.239.0/24
gateway: 172.16.239.1
- subnet: 2001:3200:3200::/64
gateway: 2001:3200:3200::1

View File

@ -0,0 +1,38 @@
version: '2.4'
# network configuration is limited in version 3
# https://github.com/docker/compose/issues/4958
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- app_net
depends_on:
- redis_cluster
tty: true
redis_cluster:
image: redis:${REDIS_TAG}
container_name: redis-cluster
volumes:
- ./redis/:/data/conf
command: bash -c "/bin/bash /data/conf/redis.sh && while true; do echo 1; sleep 1; done"
networks:
app_net:
# Assign a public address. Erlang container cannot find cluster nodes by network-scoped alias (redis_cluster).
ipv4_address: 172.16.239.10
ipv6_address: 2001:3200:3200::20
networks:
app_net:
driver: bridge
enable_ipv6: true
ipam:
driver: default
config:
- subnet: 172.16.239.0/24
gateway: 172.16.239.1
- subnet: 2001:3200:3200::/64
gateway: 2001:3200:3200::1

View File

@ -0,0 +1,31 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- emqx_bridge
depends_on:
- redis_server
tty: true
redis_server:
image: redis:6.0.9
volumes:
- ../test/emqx_auth_redis_SUITE_data/certs:/tls
command:
- redis-server
- "--bind 0.0.0.0 ::"
- --tls-port 6380
- --tls-cert-file /tls/redis.crt
- --tls-key-file /tls/redis.key
- --tls-ca-cert-file /tls/ca.crt
restart: always
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,25 @@
version: '3'
services:
erlang:
image: erlang:22.3
volumes:
- ../:/emqx_auth_redis
networks:
- emqx_bridge
depends_on:
- redis_server
tty: true
redis_server:
image: redis:${REDIS_TAG}
command:
- redis-server
- "--bind 0.0.0.0 ::"
restart: always
networks:
- emqx_bridge
networks:
emqx_bridge:
driver: bridge

View File

@ -0,0 +1,3 @@
cluster-enabled yes
cluster-node-timeout 10000
bind 0.0.0.0 ::

View File

@ -0,0 +1,71 @@
#!/bin/bash
tls=false;
while getopts t OPT
do
case $OPT in
t) tls=true
;;
\?) exit
;;
esac
done
rm -f \
/data/conf/r7000i.log \
/data/conf/r7001i.log \
/data/conf/r7002i.log \
/data/conf/nodes.7000.conf \
/data/conf/nodes.7001.conf \
/data/conf/nodes.7002.conf ;
if $tls ; then
redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf --daemonize yes \
--tls-port 8000 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf --daemonize yes \
--tls-port 8001 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf --daemonize yes \
--tls-port 8002 \
--tls-cert-file /tls/redis.crt \
--tls-key-file /tls/redis.key \
--tls-ca-cert-file /tls/ca.crt
else
redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf --daemonize yes ;
redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf --daemonize yes ;
redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf --daemonize yes ;
fi
REDIS_LOAD_FLG=true;
while $REDIS_LOAD_FLG;
do
sleep 1;
redis-cli -p 7000 info 1> /data/conf/r7000i.log 2> /dev/null;
if [ -s /data/conf/r7000i.log ]; then
:
else
continue;
fi
redis-cli -p 7001 info 1> /data/conf/r7001i.log 2> /dev/null;
if [ -s /data/conf/r7001i.log ]; then
:
else
continue;
fi
redis-cli -p 7002 info 1> /data/conf/r7002i.log 2> /dev/null;
if [ -s /data/conf/r7002i.log ]; then
:
else
continue;
fi
yes "yes" | redis-cli --cluster create 172.16.239.10:7000 172.16.239.10:7001 172.16.239.10:7002;
REDIS_LOAD_FLG=false;
done
exit 0;

View File

@ -24,3 +24,5 @@ erlang.mk
rebar.lock
/.idea/
.DS_Store
/.ci/redis/nodes.*.conf
/.ci/redis/*.log

View File

@ -1,10 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -69,21 +69,18 @@ set_special_configs(_App) ->
ok.
init_redis_rows() ->
{ok, Connection} = ?POOL(?APP),
%% Users
[eredis:q(Connection, ["HMSET", Key|FiledValue]) || {Key, FiledValue} <- ?INIT_AUTH],
[q(["HMSET", Key|FiledValue]) || {Key, FiledValue} <- ?INIT_AUTH],
%% ACLs
emqx_modules:load_module(emqx_mod_acl_internal, false),
Result = [eredis:q(Connection, ["HSET", Key, Filed, Value]) || {Key, Filed, Value} <- ?INIT_ACL],
Result = [q(["HSET", Key, Filed, Value]) || {Key, Filed, Value} <- ?INIT_ACL],
ct:pal("redis init result: ~p~n", [Result]).
deinit_redis_rows() ->
{ok, Connection} = ?POOL(?APP),
AuthKeys = [Key || {Key, _Filed, _Value} <- ?INIT_AUTH],
AclKeys = [Key || {Key, _Value} <- ?INIT_ACL],
eredis:q(Connection, ["DEL" | AuthKeys]),
eredis:q(Connection, ["DEL" | AclKeys]).
q(["DEL" | AuthKeys]),
q(["DEL" | AclKeys]).
%%--------------------------------------------------------------------
%% Cases
@ -121,9 +118,8 @@ t_check_auth(_) ->
{error, _} = emqx_access_control:authenticate(Bcrypt#{password => <<"password">>}).
t_check_auth_hget(_) ->
{ok, Connection} = ?POOL(?APP),
eredis:q(Connection, ["HSET", "mqtt_user:hset", "password", "hset"]),
eredis:q(Connection, ["HSET", "mqtt_user:hset", "is_superuser", "1"]),
q(["HSET", "mqtt_user:hset", "password", "hset"]),
q(["HSET", "mqtt_user:hset", "is_superuser", "1"]),
reload([{password_hash, plain}, {auth_cmd, "HGET mqtt_user:%u password"}]),
Hset = #{clientid => <<"hset">>, username => <<"hset">>, zone => external},
{ok, #{is_superuser := true}} = emqx_access_control:authenticate(Hset#{password => <<"hset">>}).
@ -164,6 +160,16 @@ t_acl_super(_) ->
end,
emqtt:disconnect(C).
t_check_cluster_connection(_) ->
?assertMatch({error, _Reason}, reload([{server, [{type,cluster},
{pool_size,8},
{auto_reconnect,1},
{database,0},
{password,[]},
{sentinel,[]},
{servers,[{"wrong",6379},{"wrong",6380},{"wrong",6381}]}]}])).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
@ -172,3 +178,13 @@ reload(Config) when is_list(Config) ->
application:stop(?APP),
[application:set_env(?APP, K, V) || {K, V} <- Config],
application:start(?APP).
q(Cmd) ->
{ok, Server} = application:get_env(?APP, server),
case proplists:get_value(type, Server) of
single ->
{ok, Connection} = ?POOL(?APP),
eredis:q(Connection, Cmd);
cluster ->
eredis_cluster:q(emqx_auth_redis, Cmd)
end.

View File

@ -1,10 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<"*.">>, []}
]
}.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
webpackJsonp([20],{"4odX":function(t,e){},LbE0:function(t,e,s){"use strict";Object.defineProperty(e,"__esModule",{value:!0});var a={name:"help-view",components:{},data:function(){return{lang:window.localStorage.getItem("language")||"en"}},computed:{learnEnterprise:function(){return"zh"===this.lang?"https://www.emqx.io/cn/products/enterprise":"https://www.emqx.io/products/enterprise"},freeTrial:function(){return"zh"===this.lang?"https://www.emqx.io/cn/downloads#enterprise":"https://www.emqx.io/downloads#enterprise"},docsLink:function(){return"zh"===this.lang?"https://docs.emqx.io/broker/v4/cn":"https://docs.emqx.io/broker/v4/en"},faqLink:function(){return"zh"===this.lang?"https://docs.emqx.io/broker/latest/cn/faq/faq.html":"https://docs.emqx.io/tutorial/v4/en/faq/faq.html"}},methods:{}},n={render:function(){var t=this,e=t.$createElement,s=t._self._c||e;return s("div",{staticClass:"help-view"},[s("div",{staticClass:"page-title"},[t._v(t._s(t.$t("leftbar.help")))]),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.quickStart")))]),t._v(" "),s("p",[t._v(t._s(t.$t("help.emqxDesc")))]),t._v(" "),s("a",{attrs:{target:"_blank",href:"https://github.com/emqx/emqx"}},[t._v("Github")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.emqxEnterprise")))]),t._v(" "),s("p",{domProps:{innerHTML:t._s(t.$t("help.enterpriseDesc"))}}),t._v(" "),s("a",{attrs:{target:"_blank",href:t.learnEnterprise}},[t._v("\n "+t._s(t.$t("oper.learnMore"))+"\n ")]),t._v(" "),s("a",{attrs:{target:"_blank",href:t.freeTrial}},[t._v("\n "+t._s(t.$t("help.freeTrial"))+"\n ")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.useDocs")))]),t._v(" "),s("p",[t._v(t._s(t.$t("help.docsDesc")))]),t._v(" "),s("a",{attrs:{target:"_blank",href:t.docsLink}},[t._v("\n "+t._s(t.$t("help.forwardView"))+"\n ")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v("FAQ")]),t._v(" "),s("p",[t._v(t._s(t.$t("help.faqDesc")))]),t._v(" "),s("a",{attrs:{target:"_blank",href:t.faqLink}},[t._v("\n "+t._s(t.$t("help.forwardFaq"))+"\n ")])]),t._v(" "),s("el-divider"),t._v(" "),s("div",{staticClass:"help-item"},[s("h3",[t._v(t._s(t.$t("help.followUs")))]),t._v(" "),t._m(0),t._v(" "),t._m(1),t._v(" "),t._m(2),t._v(" "),t._m(3),t._v(" "),t._m(4),t._v(" "),t._m(5)])],1)},staticRenderFns:[function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://github.com/emqx/emqx"}},[e("i",{staticClass:"iconfont icon-git"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://twitter.com/emqtt"}},[e("i",{staticClass:"iconfont icon-tuite"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://emqx.slack.com/"}},[e("i",{staticClass:"iconfont icon-slack"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://stackoverflow.com/questions/tagged/emq"}},[e("i",{staticClass:"iconfont icon-stack-overflow"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://groups.google.com/forum/#!forum/emqtt"}},[e("i",{staticClass:"iconfont icon-icons-google_groups"})])},function(){var t=this.$createElement,e=this._self._c||t;return e("a",{staticClass:"follow-link",attrs:{target:"_blank",href:"https://www.youtube.com/channel/UCDU9GWFk8NTGiTvPx_2XskA"}},[e("i",{staticClass:"iconfont icon-youtube"})])}]};var i=s("VU/8")(a,n,!1,function(t){s("4odX")},null,null);e.default=i.exports}});

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +0,0 @@
!function(e){var a=window.webpackJsonp;window.webpackJsonp=function(c,t,f){for(var o,d,b,i=0,u=[];i<c.length;i++)d=c[i],n[d]&&u.push(n[d][0]),n[d]=0;for(o in t)Object.prototype.hasOwnProperty.call(t,o)&&(e[o]=t[o]);for(a&&a(c,t,f);u.length;)u.shift()();if(f)for(i=0;i<f.length;i++)b=r(r.s=f[i]);return b};var c={},n={27:0};function r(a){if(c[a])return c[a].exports;var n=c[a]={i:a,l:!1,exports:{}};return e[a].call(n.exports,n,n.exports,r),n.l=!0,n.exports}r.e=function(e){var a=n[e];if(0===a)return new Promise(function(e){e()});if(a)return a[2];var c=new Promise(function(c,r){a=n[e]=[c,r]});a[2]=c;var t=document.getElementsByTagName("head")[0],f=document.createElement("script");f.type="text/javascript",f.charset="utf-8",f.async=!0,f.timeout=12e4,r.nc&&f.setAttribute("nonce",r.nc),f.src=r.p+"static/js/"+e+"."+{0:"7a09d1383e1319441399",1:"fcd6fde8b053e80bc68f",2:"71ffb214c95162432f13",3:"25b49772270df4b9915d",4:"93d4473fcf7768693652",5:"8935139a413f40d70253",6:"ef8e6aa7a51fa7564f71",7:"92a348a80764134ff2a9",8:"e86f6131cc8a9138368d",9:"473ceac05f7dfe3f3e92",10:"188c5e479f887d471dde",11:"3861aeb3036b8f41a6e8",12:"43feccc8f1584bdba5c2",13:"026a13a2a59abd354bd5",14:"0342a1a3d29f1adca947",15:"7d11711536eb5b2ca561",16:"6bfd6f3eb9216e73149c",17:"1d56280c16e6e2b81cff",18:"a0c394cb4b55bee2fa82",19:"060521bb4ba4f7a81ac0",20:"308aa0fdf6653ef3299f",21:"306758a2a6ef73532290",22:"d968dc6f54a690adde18",23:"7837b8f015b7d486b74f",24:"ccbcfda924431cb282e2",25:"8b2dccd8a7e1f91a5040",26:"9cd922cc7e5d035cbcc7"}[e]+".js";var o=setTimeout(d,12e4);function d(){f.onerror=f.onload=null,clearTimeout(o);var a=n[e];0!==a&&(a&&a[1](new Error("Loading chunk "+e+" failed.")),n[e]=void 0)}return f.onerror=f.onload=d,t.appendChild(f),c},r.m=e,r.c=c,r.d=function(e,a,c){r.o(e,a)||Object.defineProperty(e,a,{configurable:!1,enumerable:!0,get:c})},r.n=function(e){var a=e&&e.__esModule?function(){return e.default}:function(){return e};return r.d(a,"a",a),a},r.o=function(e,a){return Object.prototype.hasOwnProperty.call(e,a)},r.p="/",r.oe=function(e){throw console.error(e),e}}([]);

View File

@ -1,24 +0,0 @@
%%-*- mode: erlang -*-
%% .app.src.script
RemoveLeadingV =
fun(Tag) ->
case re:run(Tag, "^[v]?[0-9]\.[0-9]\.([0-9]|(rc|beta|alpha)\.[0-9])", [{capture, none}]) of
nomatch ->
re:replace(Tag, "/", "-", [{return ,list}]);
_ ->
%% if it is a version number prefixed by 'v' or 'e', then remove it
re:replace(Tag, "[v]", "", [{return ,list}])
end
end,
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
false -> CONFIG; % env var not defined
[] -> CONFIG; % env var set to empty string
Tag ->
[begin
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- CONFIG]
end.

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -1,24 +0,0 @@
%%-*- mode: erlang -*-
%% .app.src.script
RemoveLeadingV =
fun(Tag) ->
case re:run(Tag, "^[v]?[0-9]\.[0-9]\.([0-9]|(rc|beta|alpha)\.[0-9])", [{capture, none}]) of
nomatch ->
re:replace(Tag, "/", "-", [{return ,list}]);
_ ->
%% if it is a version number prefixed by 'v' or 'e', then remove it
re:replace(Tag, "[v]", "", [{return ,list}])
end
end,
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
false -> CONFIG; % env var not defined
[] -> CONFIG; % env var set to empty string
Tag ->
[begin
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- CONFIG]
end.

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -25,3 +25,4 @@
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{extra_src_dirs, [{"lwm2m_xml", [{recursive,true}]}]}.

View File

@ -1,8 +0,0 @@
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -78,10 +78,6 @@
, bitxor/2
, bitsl/2
, bitsr/2
, bitsize/1
, subbits/2
, subbits/3
, subbits/6
]).
%% Data Type Convertion
@ -237,7 +233,7 @@ payload() ->
payload(Path) ->
fun(#{payload := Payload}) when erlang:is_map(Payload) ->
map_get(Path, Payload);
emqx_rule_maps:nested_get(map_path(Path), Payload);
(_) -> undefined
end.
@ -405,74 +401,6 @@ bitsl(X, I) when is_integer(X), is_integer(I) ->
bitsr(X, I) when is_integer(X), is_integer(I) ->
X bsr I.
bitsize(Bits) when is_bitstring(Bits) ->
bit_size(Bits).
subbits(Bits, Len) when is_integer(Len), is_bitstring(Bits) ->
subbits(Bits, 1, Len).
subbits(Bits, Start, Len) when is_integer(Start), is_integer(Len), is_bitstring(Bits) ->
get_subbits(Bits, Start, Len, <<"integer">>, <<"unsigned">>, <<"big">>).
subbits(Bits, Start, Len, Type, Signedness, Endianness) when is_integer(Start), is_integer(Len), is_bitstring(Bits) ->
get_subbits(Bits, Start, Len, Type, Signedness, Endianness).
get_subbits(Bits, Start, Len, Type, Signedness, Endianness) ->
Begin = Start - 1,
case Bits of
<<_:Begin, Rem/bits>> when Rem =/= <<>> ->
Sz = bit_size(Rem),
do_get_subbits(Rem, Sz, Len, Type, Signedness, Endianness);
_ -> undefined
end.
-define(match_bits(Bits0, Pattern, ElesePattern),
case Bits0 of
Pattern ->
SubBits;
ElesePattern ->
SubBits
end).
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"big">>) ->
?match_bits(Bits, <<SubBits:Len/integer-unsigned-big-unit:1, _/bits>>,
<<SubBits:Sz/integer-unsigned-big-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"big">>) ->
?match_bits(Bits, <<SubBits:Len/float-unsigned-big-unit:1, _/bits>>,
<<SubBits:Sz/float-unsigned-big-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"big">>) ->
?match_bits(Bits, <<SubBits:Len/bits-unsigned-big-unit:1, _/bits>>,
<<SubBits:Sz/bits-unsigned-big-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"big">>) ->
?match_bits(Bits, <<SubBits:Len/integer-signed-big-unit:1, _/bits>>,
<<SubBits:Sz/integer-signed-big-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"big">>) ->
?match_bits(Bits, <<SubBits:Len/float-signed-big-unit:1, _/bits>>,
<<SubBits:Sz/float-signed-big-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"big">>) ->
?match_bits(Bits, <<SubBits:Len/bits-signed-big-unit:1, _/bits>>,
<<SubBits:Sz/bits-signed-big-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"unsigned">>, <<"little">>) ->
?match_bits(Bits, <<SubBits:Len/integer-unsigned-little-unit:1, _/bits>>,
<<SubBits:Sz/integer-unsigned-little-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"unsigned">>, <<"little">>) ->
?match_bits(Bits, <<SubBits:Len/float-unsigned-little-unit:1, _/bits>>,
<<SubBits:Sz/float-unsigned-little-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"unsigned">>, <<"little">>) ->
?match_bits(Bits, <<SubBits:Len/bits-unsigned-little-unit:1, _/bits>>,
<<SubBits:Sz/bits-unsigned-little-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"integer">>, <<"signed">>, <<"little">>) ->
?match_bits(Bits, <<SubBits:Len/integer-signed-little-unit:1, _/bits>>,
<<SubBits:Sz/integer-signed-little-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"float">>, <<"signed">>, <<"little">>) ->
?match_bits(Bits, <<SubBits:Len/float-signed-little-unit:1, _/bits>>,
<<SubBits:Sz/float-signed-little-unit:1>>);
do_get_subbits(Bits, Sz, Len, <<"bits">>, <<"signed">>, <<"little">>) ->
?match_bits(Bits, <<SubBits:Len/bits-signed-little-unit:1, _/bits>>,
<<SubBits:Sz/bits-signed-little-unit:1>>).
%%------------------------------------------------------------------------------
%% Data Type Convertion Funcs
%%------------------------------------------------------------------------------
@ -682,10 +610,52 @@ map_get(Key, Map) ->
map_get(Key, Map, undefined).
map_get(Key, Map, Default) ->
emqx_rule_maps:nested_get(map_path(Key), Map, Default).
case maps:find(Key, Map) of
{ok, Val} -> Val;
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_rule_utils:bin(Key),
case maps:find(BinKey, Map) of
{ok, Val} -> Val;
error -> Default
end;
error when is_binary(Key) ->
try %% the map may have an equivalent atom-form key
AtomKey = list_to_existing_atom(binary_to_list(Key)),
case maps:find(AtomKey, Map) of
{ok, Val} -> Val;
error -> Default
end
catch error:badarg ->
Default
end;
error ->
Default
end.
map_put(Key, Val, Map) ->
emqx_rule_maps:nested_put(map_path(Key), Val, Map).
case maps:find(Key, Map) of
{ok, _} -> maps:put(Key, Val, Map);
error when is_atom(Key) ->
%% the map may have an equivalent binary-form key
BinKey = emqx_rule_utils:bin(Key),
case maps:find(BinKey, Map) of
{ok, _} -> maps:put(BinKey, Val, Map);
error -> maps:put(Key, Val, Map)
end;
error when is_binary(Key) ->
try %% the map may have an equivalent atom-form key
AtomKey = list_to_existing_atom(binary_to_list(Key)),
case maps:find(AtomKey, Map) of
{ok, _} -> maps:put(AtomKey, Val, Map);
error -> maps:put(Key, Val, Map)
end
catch error:badarg ->
maps:put(Key, Val, Map)
end;
error ->
maps:put(Key, Val, Map)
end.
mget(Key, Map) ->
mget(Key, Map, undefined).

View File

@ -489,75 +489,22 @@ t_contains(_) ->
t_map_get(_) ->
?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])),
?assertEqual(1, apply_func(map_get, [<<"a.b">>, #{a => #{b => 1}}])),
?assertEqual(undefined, apply_func(map_get, [<<"a.c">>, #{a => #{b => 1}}])).
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
t_map_put(_) ->
?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])),
?assertEqual(#{<<"a">> => #{<<"b">> => 1}}, apply_func(map_put, [<<"a.b">>, 1, #{}])),
?assertEqual(#{a => #{b => 1, <<"c">> => 1}}, apply_func(map_put, [<<"a.c">>, 1, #{a => #{b => 1}}])).
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
t_mget(_) ->
t_mget(_) ->
?assertEqual(1, apply_func(map_get, [<<"a">>, #{a => 1}])),
?assertEqual(1, apply_func(map_get, [<<"a">>, #{<<"a">> => 1}])),
?assertEqual(undefined, apply_func(map_get, [<<"a">>, #{}])).
t_mput(_) ->
t_mput(_) ->
?assertEqual(#{<<"a">> => 1}, apply_func(map_put, [<<"a">>, 1, #{}])),
?assertEqual(#{<<"a">> => 2}, apply_func(map_put, [<<"a">>, 2, #{<<"a">> => 1}])),
?assertEqual(#{a => 2}, apply_func(map_put, [<<"a">>, 2, #{a => 1}])).
t_bitsize(_) ->
?assertEqual(8, apply_func(bitsize, [<<"a">>])),
?assertEqual(4, apply_func(bitsize, [<<15:4>>])).
t_subbits(_) ->
?assertEqual(1, apply_func(subbits, [<<255:8>>, 1])),
?assertEqual(3, apply_func(subbits, [<<255:8>>, 2])),
?assertEqual(7, apply_func(subbits, [<<255:8>>, 3])),
?assertEqual(15, apply_func(subbits, [<<255:8>>, 4])),
?assertEqual(31, apply_func(subbits, [<<255:8>>, 5])),
?assertEqual(63, apply_func(subbits, [<<255:8>>, 6])),
?assertEqual(127, apply_func(subbits, [<<255:8>>, 7])),
?assertEqual(255, apply_func(subbits, [<<255:8>>, 8])).
t_subbits2(_) ->
?assertEqual(1, apply_func(subbits, [<<255:8>>, 1, 1])),
?assertEqual(3, apply_func(subbits, [<<255:8>>, 1, 2])),
?assertEqual(7, apply_func(subbits, [<<255:8>>, 1, 3])),
?assertEqual(15, apply_func(subbits, [<<255:8>>, 1, 4])),
?assertEqual(31, apply_func(subbits, [<<255:8>>, 1, 5])),
?assertEqual(63, apply_func(subbits, [<<255:8>>, 1, 6])),
?assertEqual(127, apply_func(subbits, [<<255:8>>, 1, 7])),
?assertEqual(255, apply_func(subbits, [<<255:8>>, 1, 8])).
t_subbits2_1(_) ->
?assertEqual(1, apply_func(subbits, [<<255:8>>, 2, 1])),
?assertEqual(3, apply_func(subbits, [<<255:8>>, 2, 2])),
?assertEqual(7, apply_func(subbits, [<<255:8>>, 2, 3])),
?assertEqual(15, apply_func(subbits, [<<255:8>>, 2, 4])),
?assertEqual(31, apply_func(subbits, [<<255:8>>, 2, 5])),
?assertEqual(63, apply_func(subbits, [<<255:8>>, 2, 6])),
?assertEqual(127, apply_func(subbits, [<<255:8>>, 2, 7])),
?assertEqual(127, apply_func(subbits, [<<255:8>>, 2, 8])).
t_subbits2_integer(_) ->
?assertEqual(456, apply_func(subbits, [<<456:32/integer>>, 1, 32, <<"integer">>, <<"signed">>, <<"big">>])),
?assertEqual(-456, apply_func(subbits, [<<-456:32/integer>>, 1, 32, <<"integer">>, <<"signed">>, <<"big">>])).
t_subbits2_float(_) ->
R = apply_func(subbits, [<<5.3:64/float>>, 1, 64, <<"float">>, <<"unsigned">>, <<"big">>]),
RL = (5.3 - R),
ct:pal(";;;;~p", [R]),
?assert( (RL >= 0 andalso RL < 0.0001) orelse (RL =< 0 andalso RL > -0.0001)),
R2 = apply_func(subbits, [<<-5.3:64/float>>, 1, 64, <<"float">>, <<"signed">>, <<"big">>]),
RL2 = (5.3 + R2),
ct:pal(";;;;~p", [R2]),
?assert( (RL2 >= 0 andalso RL2 < 0.0001) orelse (RL2 =< 0 andalso RL2 > -0.0001)).
%%------------------------------------------------------------------------------
%% Test cases for Hash funcs

View File

@ -137,4 +137,4 @@ receive_msg() ->
ct:print("==========+~p~n", [Msg]),
receive_msg()
after 200 -> ok
end.
end.

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -1139,6 +1139,13 @@ listener.tcp.external.send_timeout_close = on
## Value: on | off
## listener.tcp.external.tune_buffer = off
## The socket is set to a busy state when the amount of data queued internally
## by the ERTS socket implementation reaches this limit.
##
## Value: on | off
## Defaults to 1MB
## listener.tcp.external.high_watermark = 1MB
## The TCP_NODELAY flag for MQTT connections. Small amounts of data are
## sent immediately if the option is enabled.
##
@ -1317,6 +1324,11 @@ listener.ssl.external.access.1 = allow all
## Value: Duration
listener.ssl.external.handshake_timeout = 15s
## Maximum number of non-self-issued intermediate certificates that can follow the peer certificate in a valid certification path.
##
## Value: Number
#listener.ssl.external.depth = 10
## Path to the file containing the user's private PEM-encoded key.
##
## See: http://erlang.org/doc/man/ssl.html

View File

@ -32,7 +32,6 @@ endif
.PHONY: $(PROFILES:%=relup-%)
$(PROFILES:%=relup-%): $(REBAR)
ifneq ($(OS),Windows_NT)
@ln -snf _build/$(@:relup-%=%)/lib
@if [ ! -z $$(ls | grep -E "$(@:relup-%=%)-$(SYSTEM)-(.*)-$$(uname -m).zip" | head -1 ) ]; then \
mkdir -p tmp/relup_packages/$(@:relup-%=%); \
cp $(@:relup-%=%)-$(SYSTEM)-*-$$(uname -m).zip tmp/relup_packages/$(@:relup-%=%); \
@ -42,9 +41,6 @@ endif
.PHONY: $(PROFILES:%=%-tar) $(PKG_PROFILES:%=%-tar)
$(PROFILES:%=%-tar) $(PKG_PROFILES:%=%-tar): $(REBAR)
ifneq ($(OS),Windows_NT)
@ln -snf _build/$(subst -tar,,$(@))/lib
endif
ifneq ($(shell echo $(@) |grep edge),)
export EMQX_DESC="EMQ X Edge"
else

View File

@ -1244,6 +1244,11 @@ end}.
hidden
]}.
{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [
{datatype, bytesize},
{default, "1MB"}
]}.
{mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [
{datatype, flag},
hidden
@ -1336,6 +1341,11 @@ end}.
hidden
]}.
{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [
{datatype, bytesize},
{default, "1MB"}
]}.
{mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [
{datatype, flag},
hidden
@ -1368,6 +1378,11 @@ end}.
{datatype, {duration, ms}}
]}.
{mapping, "listener.ssl.$name.depth", "emqx.listeners", [
{default, 10},
{datatype, integer}
]}.
{mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [
{datatype, string}
]}.
@ -1839,6 +1854,7 @@ end}.
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
end,
@ -1878,6 +1894,7 @@ end}.
{ciphers, Ciphers},
{user_lookup_fun, UserLookupFun},
{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)},
{depth, cuttlefish:conf_get(Prefix ++ ".depth", Conf, undefined)},
{dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
{certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},

View File

@ -12,7 +12,10 @@
warn_obsolete_guard,compressed]}.
{overrides,[{add,[{erl_opts,[compressed,deterministic,
{parse_transform,mod_vsn}]}]}]}.
{parse_transform,mod_vsn}]}]}
,{add,[{extra_src_dirs, [{"etc", [{recursive,true}]}]}]}
]}.
{extra_src_dirs, [{"etc", [{recursive,true}]}]}.
{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
deprecated_function_calls,warnings_as_errors, deprecated_functions]}.

View File

@ -1,48 +1,9 @@
%% -*-: erlang -*-
{DefaultLen, DefaultSize} =
case WordSize = erlang:system_info(wordsize) of
8 -> % arch_64
{10000, cuttlefish_bytesize:parse("64MB")};
4 -> % arch_32
{1000, cuttlefish_bytesize:parse("32MB")}
end,
{"4.2.3",
[
{"4.2.2", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
]},
{"4.2.1", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []}
]},
{"4.2.0", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []},
{apply, {application, set_env,
[emqx, force_shutdown_policy,
#{message_queue_len => DefaultLen,
max_heap_size => DefaultSize div WordSize}]}}
]}
],
[
{"4.2.2", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
]},
{"4.2.1", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []}
]},
{"4.2.0", [
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []},
{load_module, emqx_json, brutal_purge, soft_purge, []}
]}
]
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -359,6 +359,8 @@ normalize_message(partition, #{occurred := Node}) ->
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) ->
list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId]));
normalize_message(_Name, _UnknownDetails) ->
<<"Unknown alarm">>.

View File

@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) ->
Zone;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(username, #channel{clientinfo = #{username := Username}}) ->
Username;
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
SockType;
info(peername, #channel{conninfo = #{peername := Peername}}) ->
Peername;
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
Sockname;
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
ProtoName;
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
ProtoVer;
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
ConnectedAt;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->

View File

@ -80,8 +80,8 @@
limit_timer :: maybe(reference()),
%% Parse State
parse_state :: emqx_frame:parse_state(),
%% Serialize function
serialize :: emqx_frame:serialize_fun(),
%% Serialize options
serialize :: emqx_frame:serialize_opts(),
%% Channel State
channel :: emqx_channel:channel(),
%% GC State
@ -103,11 +103,24 @@
-define(ENABLED(X), (X =/= undefined)).
-define(ALARM_TCP_CONGEST(Channel),
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s",
[emqx_channel:info(clientid, Channel),
emqx_channel:info(username, Channel)]))).
-define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername,
clientid, username, proto_name, proto_ver, connected_at
]).
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [ init/4
, init_state/3
, run_loop/2
, system_terminate/4
, system_code_change/4
]}).
-spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
@ -203,7 +216,7 @@ init_state(Transport, Socket, Options) ->
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_fun(),
Serialize = emqx_frame:serialize_opts(),
Channel = emqx_channel:init(ConnInfo, Options),
GcState = emqx_zone:init_gc_state(Zone),
StatsTimer = emqx_zone:stats_timer(Zone),
@ -338,7 +351,7 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}) ->
ok = emqx_misc:cancel_timer(IdleTimer),
Serialize = emqx_frame:serialize_fun(ConnPkt),
Serialize = emqx_frame:serialize_opts(ConnPkt),
NState = State#state{serialize = Serialize,
idle_timer = undefined
},
@ -430,6 +443,7 @@ handle_msg(Msg, State) ->
-spec terminate(any(), state()) -> no_return().
terminate(Reason, State = #state{channel = Channel}) ->
?LOG(debug, "Terminated due to ~p", [Reason]),
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)),
emqx_channel:terminate(Reason, Channel),
_ = close_socket(State),
exit(Reason).
@ -580,7 +594,7 @@ handle_outgoing(Packet, State) ->
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) ->
case Serialize(Packet) of
case emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
[emqx_packet:format(Packet)]),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
@ -596,11 +610,12 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
%% Send data
-spec(send(iodata(), state()) -> ok).
send(IoData, #state{transport = Transport, socket = Socket}) ->
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
inc_counter(outgoing_bytes, Oct),
case Transport:async_send(Socket, IoData) of
maybe_warn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, [nosuspend]) of
ok -> ok;
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
@ -608,6 +623,48 @@ send(IoData, #state{transport = Transport, socket = Socket}) ->
ok
end.
maybe_warn_congestion(Socket, Transport, Channel) ->
IsCongestAlarmSet = is_congestion_alarm_set(),
case is_congested(Socket, Transport) of
true when not IsCongestAlarmSet ->
ok = set_congestion_alarm(),
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel),
tcp_congestion_alarm_details(Socket, Transport, Channel));
false when IsCongestAlarmSet ->
ok = clear_congestion_alarm(),
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
_ -> ok
end.
is_congested(Socket, Transport) ->
case Transport:getstat(Socket, [send_pend]) of
{ok, [{send_pend, N}]} when N > 0 -> true;
_ -> false
end.
is_congestion_alarm_set() ->
case erlang:get(conn_congested) of
true -> true;
_ -> false
end.
set_congestion_alarm() ->
erlang:put(conn_congested, true), ok.
clear_congestion_alarm() ->
erlang:put(conn_congested, false), ok.
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
{ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS),
{ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS),
SockInfo = maps:from_list(Stat ++ Opts),
ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]),
maps:merge(ConnInfo, SockInfo).
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
{IPStr, Port} = emqx_channel:info(Key, Channel),
{Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
conn_info(Key, Channel) ->
{Key, emqx_channel:info(Key, Channel)}.
%%--------------------------------------------------------------------
%% Handle Info
@ -623,7 +680,7 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
end;
handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]),
Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State) ->

View File

@ -27,6 +27,9 @@
, parse/2
, serialize_fun/0
, serialize_fun/1
, serialize_opts/0
, serialize_opts/1
, serialize_pkt/2
, serialize/1
, serialize/2
]).
@ -34,7 +37,7 @@
-export_type([ options/0
, parse_state/0
, parse_result/0
, serialize_fun/0
, serialize_opts/0
]).
-type(options() :: #{strict_mode => boolean(),
@ -42,14 +45,19 @@
version => emqx_types:version()
}).
-type(parse_state() :: {none, options()} | cont_fun()).
-type(parse_state() :: {none, options()} | {cont_state(), options()}).
-type(parse_result() :: {more, cont_fun()}
-type(parse_result() :: {more, parse_state()}
| {ok, emqx_types:packet(), binary(), parse_state()}).
-type(cont_fun() :: fun((binary()) -> parse_result())).
-type(cont_state() :: {Stage :: len | body,
State :: #{hdr := #mqtt_packet_header{},
len := {pos_integer(), non_neg_integer()} | non_neg_integer(),
rest => binary()
}
}).
-type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())).
-type(serialize_opts() :: options()).
-define(none(Options), {none, Options}).
@ -87,7 +95,7 @@ parse(Bin) ->
-spec(parse(binary(), parse_state()) -> parse_result()).
parse(<<>>, {none, Options}) ->
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
{more, {none, Options}};
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
{none, Options = #{strict_mode := StrictMode}}) ->
%% Validate header if strict mode.
@ -102,11 +110,19 @@ parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
end,
parse_remaining_len(Rest, Header1, Options);
parse(Bin, Cont) when is_binary(Bin), is_function(Cont) ->
Cont(Bin).
parse(Bin, {{len, #{hdr := Header,
len := {Multiplier, Length}}
}, Options}) when is_binary(Bin) ->
parse_remaining_len(Bin, Header, Multiplier, Length, Options);
parse(Bin, {{body, #{hdr := Header,
len := Length,
rest := Rest}
}, Options}) when is_binary(Bin) ->
parse_frame(<<Rest/binary, Bin/binary>>, Header, Length, Options).
parse_remaining_len(<<>>, Header, Options) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
{more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
parse_remaining_len(Rest, Header, Options) ->
parse_remaining_len(Rest, Header, 1, 0, Options).
@ -114,7 +130,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
when Length > MaxSize ->
error(frame_too_large);
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
%% Match DISCONNECT without payload
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
@ -150,9 +166,7 @@ parse_frame(Bin, Header, Length, Options) ->
{ok, packet(Header, Variable), Rest, ?none(Options)}
end;
TooShortBin ->
{more, fun(BinMore) ->
parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, Options)
end}
{more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}}
end.
-compile({inline, [packet/1, packet/2, packet/3]}).
@ -443,6 +457,20 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) ->
end
end.
serialize_opts() ->
?DEFAULT_OPTIONS.
serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) ->
MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE),
#{version => ProtoVer, max_size => MaxSize}.
serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) ->
IoData = serialize(Packet, Ver),
case is_too_large(IoData, MaxSize) of
true -> <<>>;
false -> IoData
end.
-spec(serialize(emqx_types:packet()) -> iodata()).
serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4).
@ -746,4 +774,3 @@ fixqos(?PUBREL, 0) -> 1;
fixqos(?SUBSCRIBE, 0) -> 1;
fixqos(?UNSUBSCRIBE, 0) -> 1;
fixqos(_Type, QoS) -> QoS.

View File

@ -35,7 +35,7 @@
-type(checker() :: #{ name := name()
, capacity := non_neg_integer()
, interval := non_neg_integer()
, consumer := function() | esockd_rate_limit:bucket()
, consumer := esockd_rate_limit:bucket() | emqx_zone:zone()
}).
-type(name() :: conn_bytes_in
@ -53,6 +53,8 @@
-type(limiter() :: #limiter{}).
-dialyzer({nowarn_function, [consume/3]}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
@ -84,7 +86,7 @@ do_init_checker(Zone, {Name, {Capacity, Interval}}) ->
_ ->
esockd_limiter:create({Zone, Name}, Capacity, Interval)
end,
Ck#{consumer => fun(I) -> esockd_limiter:consume({Zone, Name}, I) end};
Ck#{consumer => Zone};
_ ->
Ck#{consumer => esockd_rate_limit:new(Capacity / Interval, Capacity)}
end.
@ -126,7 +128,7 @@ consume(Pubs, Bytes, #{name := Name, consumer := Cons}) ->
_ ->
case is_overall_limiter(Name) of
true ->
{_, Intv} = Cons(Tokens),
{_, Intv} = esockd_limiter:consume({Cons, Name}, Tokens),
{Intv, Cons};
_ ->
esockd_rate_limit:check(Tokens, Cons)

View File

@ -70,8 +70,8 @@
limit_timer :: maybe(reference()),
%% Parse State
parse_state :: emqx_frame:parse_state(),
%% Serialize Fun
serialize :: emqx_frame:serialize_fun(),
%% Serialize options
serialize :: emqx_frame:serialize_opts(),
%% Channel
channel :: emqx_channel:channel(),
%% GC State
@ -231,7 +231,7 @@ websocket_init([Req, Opts]) ->
MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_fun(),
Serialize = emqx_frame:serialize_opts(),
Channel = emqx_channel:init(ConnInfo, Opts),
GcState = emqx_zone:init_gc_state(Zone),
StatsTimer = emqx_zone:stats_timer(Zone),
@ -292,7 +292,7 @@ websocket_info({cast, Msg}, State) ->
handle_info(Msg, State);
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
Serialize = emqx_frame:serialize_fun(ConnPkt),
Serialize = emqx_frame:serialize_opts(ConnPkt),
NState = State#state{serialize = Serialize},
handle_incoming(Packet, cancel_idle_timer(NState));
@ -544,7 +544,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT
serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) ->
case Serialize(Packet) of
case emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
[emqx_packet:format(Packet)]),
ok = emqx_metrics:inc('delivery.dropped.too_large'),

View File

@ -85,10 +85,11 @@ done
cleanup_app(){
local app="$1"
pushd "apps/$app"
rm -f Makefile rebar.config.script
rm -rf ".github" ".ci"
rm -rf src/*.app.src.script
rm -rf src/*.appup.src
rm -f Makefile rebar.config.script LICENSE src/*.app.src.script src/*.appup.src
rm -rf ".github"
# restore rebar.config and app.src
git checkout rebar.config
git checkout src/*.app.src
popd
}

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -24,8 +24,8 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").

View File

@ -21,7 +21,7 @@
-import(lists, [nth/2]).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(CM, emqx_cm).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
@ -52,6 +52,9 @@ init_per_suite(Config) ->
ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end),
ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end),
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
Config.
end_per_suite(_Config) ->
@ -62,6 +65,7 @@ end_per_suite(_Config) ->
ok = meck:unload(emqx_pd),
ok = meck:unload(emqx_metrics),
ok = meck:unload(emqx_hooks),
ok = meck:unload(emqx_alarm),
ok.
init_per_testcase(_TestCase, Config) ->
@ -77,6 +81,7 @@ init_per_testcase(_TestCase, Config) ->
{ok, [{K, 0} || K <- Options]}
end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
Config.

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -25,7 +25,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
%%--------------------------------------------------------------------
%% Setups

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -25,7 +25,7 @@
, replvar/2
]).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -18,7 +18,7 @@
-export([start_link/4, stop/1]).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos().
-type topic() :: emqx_topic:topic().

View File

@ -17,7 +17,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

View File

@ -18,7 +18,7 @@
-export([start_link/3, stop/1, send/6]).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
start_link(ResponseTopic, QoS, Options0) ->
Parent = self(),

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(R, emqx_router).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(SYSMON, emqx_sys_mon).

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(TRIE, emqx_trie).

View File

@ -80,7 +80,7 @@ t_get_port_info(_Config) ->
{ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]),
emqx_vm:get_port_info(),
ok = gen_tcp:close(Sock),
[Port | _] = erlang:ports().
[_Port | _] = erlang:ports().
t_transform_port(_Config) ->
[Port | _] = erlang:ports(),

View File

@ -16,8 +16,8 @@
-module(emqx_ws_connection_SUITE).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).

View File

@ -19,8 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(lists, [nth/2]).
@ -348,16 +348,19 @@ t_connect_will_delay_interval(_) ->
{will_topic, Topic},
{will_payload, Payload},
{will_props, #{'Will-Delay-Interval' => 3}},
{properties, #{'Session-Expiry-Interval' => 7200}},
{keepalive, 2}
{properties, #{'Session-Expiry-Interval' => 7200}}
]),
{ok, _} = emqtt:connect(Client2),
timer:sleep(5000),
%% terminate the client without sending the DISCONNECT
emqtt:stop(Client2),
%% should not get the will msg in 2.5s
timer:sleep(1500),
?assertEqual(0, length(receive_messages(1))),
timer:sleep(7000),
%% should get the will msg in 4.5s
timer:sleep(1000),
?assertEqual(1, length(receive_messages(1))),
%% try again, but let the session expire quickly
{ok, Client3} = emqtt:start_link([
{clientid, <<"t_connect_will_delay_interval">>},
{proto_ver, v5},
@ -367,14 +370,16 @@ t_connect_will_delay_interval(_) ->
{will_topic, Topic},
{will_payload, Payload},
{will_props, #{'Will-Delay-Interval' => 7200}},
{properties, #{'Session-Expiry-Interval' => 3}},
{keepalive, 2}
{properties, #{'Session-Expiry-Interval' => 3}}
]),
{ok, _} = emqtt:connect(Client3),
timer:sleep(5000),
%% terminate the client without sending the DISCONNECT
emqtt:stop(Client3),
%% should not get the will msg in 2.5s
timer:sleep(1500),
?assertEqual(0, length(receive_messages(1))),
timer:sleep(7000),
%% should get the will msg in 4.5s
timer:sleep(1000),
?assertEqual(1, length(receive_messages(1))),
ok = emqtt:disconnect(Client1),

View File

@ -16,7 +16,7 @@
-module(prop_emqx_frame).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("proper/include/proper.hrl").
%%--------------------------------------------------------------------

View File

@ -16,7 +16,7 @@
-module(prop_emqx_reason_codes).
-include("emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("proper/include/proper.hrl").
%%--------------------------------------------------------------------