From b3e2cc5a18155128e90865534cad8e90d3b0c2e2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 11:35:56 +0800 Subject: [PATCH 01/15] Replace 'jsx' with 'jiffy' to optimize json encode/decode --- rebar.config | 4 ++-- src/emqx_json.erl | 39 +++++++++++++++++++++++---------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/rebar.config b/rebar.config index 98ce306f0..d2cb0cf24 100644 --- a/rebar.config +++ b/rebar.config @@ -1,8 +1,8 @@ {minimum_otp_vsn, "21.0"}. {deps, - [{jsx, "2.10.0"}, - {gproc, "0.8.0"}, + [{gproc, "0.8.0"}, + {jiffy, "1.0.1"}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}}, diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 1424ede51..b54340963 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -40,22 +40,30 @@ , decode/2 ]}). --spec(encode(jsx:json_term()) -> jsx:json_text()). +-type(encode_options() :: jiffy:encode_options()). +-type(decode_options() :: jiffy:decode_options()). + +-type(json_text() :: iolist() | binary()). +-type(json_term() :: jiffy:jiffy_decode_result()). + +-export_type([json_text/0, json_term/0]). +-export_type([decode_options/0, encode_options/0]). + +-spec(encode(json_term()) -> json_text()). encode(Term) -> jsx:encode(Term). --spec(encode(jsx:json_term(), jsx_to_json:config()) - -> jsx:json_text()). +-spec(encode(json_term(), encode_options()) -> json_text()). encode(Term, Opts) -> jsx:encode(Term, Opts). --spec(safe_encode(jsx:json_term()) - -> {ok, jsx:json_text()} | {error, Reason :: term()}). +-spec(safe_encode(json_term()) + -> {ok, json_text()} | {error, Reason :: term()}). safe_encode(Term) -> safe_encode(Term, []). --spec(safe_encode(jsx:json_term(), jsx_to_json:config()) - -> {ok, jsx:json_text()} | {error, Reason :: term()}). +-spec(safe_encode(json_term(), encode_options()) + -> {ok, json_text()} | {error, Reason :: term()}). safe_encode(Term, Opts) -> try encode(Term, Opts) of Json -> {ok, Json} @@ -64,22 +72,21 @@ safe_encode(Term, Opts) -> {error, Reason} end. --spec(decode(jsx:json_text()) -> jsx:json_term()). +-spec(decode(json_text()) -> json_term()). decode(Json) -> - jsx:decode(Json). + case jsx:decode(Json) of {Term} -> Term; Other -> Other end. --spec(decode(jsx:json_text(), jsx_to_json:config()) - -> jsx:json_term()). +-spec(decode(json_text(), decode_options()) -> json_term()). decode(Json, Opts) -> - jsx:decode(Json, Opts). + case jsx:decode(Json, Opts) of {Term} -> Term; Other -> Other end. --spec(safe_decode(jsx:json_text()) - -> {ok, jsx:json_term()} | {error, Reason :: term()}). +-spec(safe_decode(json_text()) + -> {ok, json_term()} | {error, Reason :: term()}). safe_decode(Json) -> safe_decode(Json, []). --spec(safe_decode(jsx:json_text(), jsx_to_json:config()) - -> {ok, jsx:json_term()} | {error, Reason :: term()}). +-spec(safe_decode(json_text(), decode_options()) + -> {ok, json_term()} | {error, Reason :: term()}). safe_decode(Json, Opts) -> try decode(Json, Opts) of Term -> {ok, Term} From eea377eb8ff1343a46f701328482eb0ffd6d76ee Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 14:32:59 +0800 Subject: [PATCH 02/15] Replace 'jsx' with 'jiffy' and add more test cases --- src/emqx_alarm_handler.erl | 21 ++++++---- src/emqx_json.erl | 9 ++--- test/emqx_json_SUITE.erl | 80 +++++++++++++++++++++++++++++++------- 3 files changed, 83 insertions(+), 27 deletions(-) diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index e37c530a6..b57a04380 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -154,16 +154,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity, title = Title, summary = Summary, timestamp = Ts}}) -> - emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, - {desc, [{severity, Severity}, - {title, iolist_to_binary(Title)}, - {summary, iolist_to_binary(Summary)}, - {timestamp, Ts}]}]); + Descr = #{severity => Severity, + title => iolist_to_binary(Title), + summary => iolist_to_binary(Summary), + timestamp => Ts + }, + emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId), + desc => Descr + }); + encode_alarm({AlarmId, undefined}) -> - emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]); + emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId)}); encode_alarm({AlarmId, AlarmDesc}) -> - emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}, - {desc, maybe_to_binary(AlarmDesc)}]). + emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId), + desc => maybe_to_binary(AlarmDesc) + }). alarm_msg(Topic, Payload) -> Msg = emqx_message:make(?MODULE, Topic, Payload), diff --git a/src/emqx_json.erl b/src/emqx_json.erl index b54340963..27792c36c 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -51,11 +51,11 @@ -spec(encode(json_term()) -> json_text()). encode(Term) -> - jsx:encode(Term). + jiffy:encode(Term). -spec(encode(json_term(), encode_options()) -> json_text()). encode(Term, Opts) -> - jsx:encode(Term, Opts). + jiffy:encode(Term, Opts). -spec(safe_encode(json_term()) -> {ok, json_text()} | {error, Reason :: term()}). @@ -73,12 +73,11 @@ safe_encode(Term, Opts) -> end. -spec(decode(json_text()) -> json_term()). -decode(Json) -> - case jsx:decode(Json) of {Term} -> Term; Other -> Other end. +decode(Json) -> decode(Json, []). -spec(decode(json_text(), decode_options()) -> json_term()). decode(Json, Opts) -> - case jsx:decode(Json, Opts) of {Term} -> Term; Other -> Other end. + case jiffy:decode(Json, Opts) of {Term} -> Term; Other -> Other end. -spec(safe_decode(json_text()) -> {ok, json_term()} | {error, Reason :: term()}). diff --git a/test/emqx_json_SUITE.erl b/test/emqx_json_SUITE.erl index ffe900ef4..b08ca37c9 100644 --- a/test/emqx_json_SUITE.erl +++ b/test/emqx_json_SUITE.erl @@ -21,24 +21,76 @@ -include_lib("eunit/include/eunit.hrl"). --define(DEC_OPTS, [{labels, atom}, return_maps]). +-import(emqx_json, + [ encode/1 + , decode/1 + , decode/2 + ]). + +%%-------------------------------------------------------------------- +%% Erlang JSON Erlang +%% ------------------------------------------------------------------- +%% +%% null -> null -> null +%% true -> true -> true +%% false -> false -> false +%% "hi" -> [104, 105] -> [104, 105] +%% <<"hi">> -> "hi" -> <<"hi">> +%% hi -> "hi" -> <<"hi">> +%% 1 -> 1 -> 1 +%% 1.25 -> 1.25 -> 1.25 +%% [] -> [] -> [] +%% [true, 1.0] -> [true, 1.0] -> [true, 1.0] +%% {[]} -> {} -> {[]} +%% {[{foo, bar}]} -> {"foo": "bar"} -> {[{<<"foo">>, <<"bar">>}]} +%% {[{<<"foo">>, <<"bar">>}]} -> {"foo": "bar"} -> {[{<<"foo">>, <<"bar">>}]} +%% #{<<"foo">> => <<"bar">>} -> {"foo": "bar"} -> #{<<"foo">> => <<"bar">>} +%%-------------------------------------------------------------------- all() -> emqx_ct:all(?MODULE). t_decode_encode(_) -> - JsonText = <<"{\"library\": \"jsx\", \"awesome\": true}">>, - JsonTerm = emqx_json:decode(JsonText), - JsonMaps = #{library => <<"jsx">>, awesome => true}, - ?assertEqual(JsonText, emqx_json:encode(JsonTerm, [{space, 1}])), - ?assertEqual(JsonMaps, emqx_json:decode(JsonText, ?DEC_OPTS)). + null = decode(encode(null)), + true = decode(encode(true)), + false = decode(encode(false)), + "hi" = decode(encode("hi")), + <<"hi">> = decode(encode(hi)), + 1 = decode(encode(1)), + 1.25 = decode(encode(1.25)), + [] = decode(encode([])), + [true, 1] = decode(encode([true, 1])), + [] = decode(encode({[]})), + [{<<"foo">>, <<"bar">>}] = decode(encode({[{foo, bar}]})), + [{<<"foo">>, <<"bar">>}] = decode(encode({[{<<"foo">>, <<"bar">>}]})), + #{<<"foo">> := <<"bar">>} = decode(encode(#{<<"foo">> => <<"bar">>}), [return_maps]), + JsonText = <<"{\"bool\":true,\"int\":10,\"foo\":\"bar\"}">>, + JsonMaps = #{<<"bool">> => true, + <<"int">> => 10, + <<"foo">> => <<"bar">> + }, + ?assertEqual(JsonText, encode({decode(JsonText)})), + ?assertEqual(JsonMaps, decode(JsonText, [return_maps])). t_safe_decode_encode(_) -> - JsonText = <<"{\"library\": \"jsx\", \"awesome\": true}">>, - {ok, JsonTerm} = emqx_json:safe_decode(JsonText), - JsonMaps = #{library => <<"jsx">>, awesome => true}, - ?assertEqual({ok, JsonText}, emqx_json:safe_encode(JsonTerm, [{space, 1}])), - ?assertEqual({ok, JsonMaps}, emqx_json:safe_decode(JsonText, ?DEC_OPTS)), - BadJsonText = <<"{\"library\", \"awesome\": true}">>, - ?assertEqual({error, badarg}, emqx_json:safe_decode(BadJsonText)), - {error, badarg} = emqx_json:safe_encode({a, {b ,1}}). + safe_encode_decode(null), + safe_encode_decode(true), + safe_encode_decode(false), + "hi" = safe_encode_decode("hi"), + <<"hi">> = safe_encode_decode(hi), + 1 = safe_encode_decode(1), + 1.25 = safe_encode_decode(1.25), + [] = safe_encode_decode([]), + [true, 1] = safe_encode_decode([true, 1]), + [] = safe_encode_decode({[]}), + [{<<"foo">>, <<"bar">>}] = safe_encode_decode({[{foo, bar}]}), + [{<<"foo">>, <<"bar">>}] = safe_encode_decode({[{<<"foo">>, <<"bar">>}]}), + {ok, Json} = emqx_json:safe_encode(#{<<"foo">> => <<"bar">>}), + {ok, #{<<"foo">> := <<"bar">>}} = emqx_json:safe_decode(Json, [return_maps]). + +safe_encode_decode(Term) -> + {ok, Json} = emqx_json:safe_encode(Term), + case emqx_json:safe_decode(Json) of + {ok, {NTerm}} -> NTerm; + {ok, NTerm} -> NTerm + end. From cce6b462b42dc5cad7c6742ca8ef067d923fde0a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 17:13:54 +0800 Subject: [PATCH 03/15] Update erl_opts and add 'compressed' opt for edge --- rebar.config | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/rebar.config b/rebar.config index d2cb0cf24..3c7113214 100644 --- a/rebar.config +++ b/rebar.config @@ -1,24 +1,32 @@ {minimum_otp_vsn, "21.0"}. {deps, - [{gproc, "0.8.0"}, - {jiffy, "1.0.1"}, - {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}}, - {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, - {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} - ]}. + [{gproc, "0.8.0"}, + {jiffy, "1.0.1"}, + {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}}, + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, + {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + ]}. -{edoc_opts, [{preprocess, true}]}. {erl_opts, [warn_unused_vars, warn_shadow_vars, warn_unused_import, warn_obsolete_guard, - debug_info]}. + no_debug_info, + compressed %% for edge + ]}. + +{overrides, [{override, [{erl_opts, [no_debug_info, compressed]}]}]}. + +{edoc_opts, [{preprocess, true}]}. + {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions]}. + warnings_as_errors, deprecated_functions + ]}. + {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. @@ -28,11 +36,13 @@ {erl_first_files, ["src/emqx_logger.erl"]}. {profiles, - [{test, - [{deps, - [{bbmustache, "1.7.0"}, % hex - {emqtt, {git, "https://github.com/emqx/emqtt", {branch, "develop"}}}, - {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "develop"}}} - ]} - ]} - ]}. + [{test, + [{deps, + [{bbmustache, "1.7.0"}, + {emqtt, {git, "https://github.com/emqx/emqtt", {branch, "develop"}}}, + {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "develop"}}} + ]}, + {erl_opts, [debug_info]} + ]} + ]}. + From 35354a3451766bd709f5847571747853df7ae8a6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 18:10:35 +0800 Subject: [PATCH 04/15] Change the 'override' to 'add' --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 3c7113214..30d446e44 100644 --- a/rebar.config +++ b/rebar.config @@ -18,7 +18,7 @@ compressed %% for edge ]}. -{overrides, [{override, [{erl_opts, [no_debug_info, compressed]}]}]}. +{overrides, [{add, [{erl_opts, [no_debug_info, compressed]}]}]}. {edoc_opts, [{preprocess, true}]}. From d8c3c86dcaa5253e01729bd7e1bf0b4d448501cc Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 18:50:48 +0800 Subject: [PATCH 05/15] Upgrade esockd and ekka deps - Upgrade esockd to 5.6.1 - Upgrade ekka to 0.7.2 --- rebar.config | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 30d446e44..e761ff39a 100644 --- a/rebar.config +++ b/rebar.config @@ -4,8 +4,8 @@ [{gproc, "0.8.0"}, {jiffy, "1.0.1"}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.2"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. From f278a0719053dee6d86b125aba9dcdccfc203649 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 19:31:36 +0800 Subject: [PATCH 06/15] Upgrade emqtt and emqx-ct-helper deps - Upgrade emqtt to v1.2.0 - Upgrade emqx-ct-helpers to v1.2.2 --- rebar.config | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index e761ff39a..d86c58d2f 100644 --- a/rebar.config +++ b/rebar.config @@ -39,8 +39,8 @@ [{test, [{deps, [{bbmustache, "1.7.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {branch, "develop"}}}, - {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "develop"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}, + {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}} ]}, {erl_opts, [debug_info]} ]} From 367398e965606dacf08df8ad6de1b08d22820b15 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sun, 19 Jan 2020 19:41:13 +0800 Subject: [PATCH 07/15] Remove jsx from app.src --- src/emqx.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx.app.src b/src/emqx.app.src index 9c00ca6bb..ac5c50d7c 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -4,7 +4,7 @@ {vsn, "git"}, {modules, []}, {registered, []}, - {applications, [kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy, + {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy, sasl,os_mon]}, {mod, {emqx_app,[]}}, {env, []}, From 462e3974ad72e06c1cb08b88f4bbb17e185397d2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 19 Jan 2020 19:45:46 +0800 Subject: [PATCH 08/15] no_debug_info > debug_info --- rebar.config | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index d86c58d2f..71bd53abc 100644 --- a/rebar.config +++ b/rebar.config @@ -14,11 +14,11 @@ warn_shadow_vars, warn_unused_import, warn_obsolete_guard, - no_debug_info, + debug_info, compressed %% for edge ]}. -{overrides, [{add, [{erl_opts, [no_debug_info, compressed]}]}]}. +{overrides, [{add, [{erl_opts, [compressed]}]}]}. {edoc_opts, [{preprocess, true}]}. From 7ab3da399dbe6acb367b606ec63283bf3c832c84 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 20 Jan 2020 09:41:46 +0800 Subject: [PATCH 09/15] Fix testcase (#3216) --- test/emqx_connection_SUITE.erl | 2 +- test/emqx_mod_presence_SUITE.erl | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 822194443..e0da75fed 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -93,7 +93,7 @@ t_info(_) -> {'$gen_call', From, info} -> gen_server:reply(From, emqx_connection:info(st())) after - 0 -> error("error") + 200 -> error("error") end end), #{sockinfo := SockInfo} = emqx_connection:info(CPid), diff --git a/test/emqx_mod_presence_SUITE.erl b/test/emqx_mod_presence_SUITE.erl index afa7a4312..49d5d206b 100644 --- a/test/emqx_mod_presence_SUITE.erl +++ b/test/emqx_mod_presence_SUITE.erl @@ -63,17 +63,17 @@ recv_and_check_presence(ClientId, Presence) -> binary:split(Topic, <<"/">>, [global])), case Presence of <<"connected">> -> - ?assertMatch(#{clientid := <<"clientid">>, - username := <<"username">>, - ipaddress := <<"127.0.0.1">>, - proto_name := <<"MQTT">>, - proto_ver := ?MQTT_PROTO_V4, - connack := ?RC_SUCCESS, - clean_start := true}, emqx_json:decode(Payload, [{labels, atom}, return_maps])); + ?assertMatch(#{<<"clientid">> := <<"clientid">>, + <<"username">> := <<"username">>, + <<"ipaddress">> := <<"127.0.0.1">>, + <<"proto_name">> := <<"MQTT">>, + <<"proto_ver">> := ?MQTT_PROTO_V4, + <<"connack">> := ?RC_SUCCESS, + <<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])); <<"disconnected">> -> - ?assertMatch(#{clientid := <<"clientid">>, - username := <<"username">>, - reason := <<"normal">>}, emqx_json:decode(Payload, [{labels, atom}, return_maps])) + ?assertMatch(#{<<"clientid">> := <<"clientid">>, + <<"username">> := <<"username">>, + <<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])) end. %%-------------------------------------------------------------------- From d532b5f2e36be7b6711860cc814a7792abf1171d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Feb 2020 15:14:52 +0800 Subject: [PATCH 10/15] Remove the unused 'hibernate_after' config --- etc/emqx.conf | 5 ----- priv/emqx.schema | 6 ------ 2 files changed, 11 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index d5490d954..d41d604c3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -570,11 +570,6 @@ mqtt.strict_mode = false ## Value: duration zone.external.idle_timeout = 15s -## Hibernate after a duration of idle state. -## -## Value: duration -zone.external.hibernate_after = 60s - ## Publish limit for the external MQTT connections. ## ## Value: Number,Duration diff --git a/priv/emqx.schema b/priv/emqx.schema index a7f53ba1f..468dfc343 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -735,12 +735,6 @@ end}. {datatype, {duration, ms}} ]}. -%% @doc Hibernate after a duration of idle state. -{mapping, "zone.$name.hibernate_after", "emqx.zones", [ - {default, "60s"}, - {datatype, {duration, ms}} -]}. - {mapping, "zone.$name.allow_anonymous", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. From 3c459e8e74ee3589713fb0a934053518e88535cd Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Feb 2020 16:18:19 +0800 Subject: [PATCH 11/15] Compatible jsx encode/decode (#3230) --- rebar.config | 2 +- src/emqx_json.erl | 26 +++++++++++++++++++++++--- test/emqx_connection_SUITE.erl | 14 +++++++------- test/emqx_json_SUITE.erl | 6 ++++++ 4 files changed, 37 insertions(+), 11 deletions(-) diff --git a/rebar.config b/rebar.config index 71bd53abc..2827059f7 100644 --- a/rebar.config +++ b/rebar.config @@ -2,7 +2,7 @@ {deps, [{gproc, "0.8.0"}, - {jiffy, "1.0.1"}, + {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.2"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.2"}}}, diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 27792c36c..563909e2c 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -51,11 +51,11 @@ -spec(encode(json_term()) -> json_text()). encode(Term) -> - jiffy:encode(Term). + encode(Term, []). -spec(encode(json_term(), encode_options()) -> json_text()). encode(Term, Opts) -> - jiffy:encode(Term, Opts). + jiffy:encode(to_ejson(Term), Opts). -spec(safe_encode(json_term()) -> {ok, json_text()} | {error, Reason :: term()}). @@ -77,7 +77,7 @@ decode(Json) -> decode(Json, []). -spec(decode(json_text(), decode_options()) -> json_term()). decode(Json, Opts) -> - case jiffy:decode(Json, Opts) of {Term} -> Term; Other -> Other end. + from_ejson(jiffy:decode(Json, Opts)). -spec(safe_decode(json_text()) -> {ok, json_term()} | {error, Reason :: term()}). @@ -94,3 +94,23 @@ safe_decode(Json, Opts) -> {error, Reason} end. +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +-compile({inline, + [ to_ejson/1 + , from_ejson/1 + ]}). + +to_ejson([{_, _}|_] = L) -> + lists:foldl( + fun({Name, Value}, Acc) -> + Acc#{Name => to_ejson(Value)} + end, #{}, L); +to_ejson(T) -> T. + +from_ejson({L}) -> + [{Name, from_ejson(Value)} || {Name, Value} <- L]; +from_ejson(T) -> T. + diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index e0da75fed..8e1362b32 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -93,7 +93,7 @@ t_info(_) -> {'$gen_call', From, info} -> gen_server:reply(From, emqx_connection:info(st())) after - 200 -> error("error") + 100 -> error("error") end end), #{sockinfo := SockInfo} = emqx_connection:info(CPid), @@ -113,15 +113,15 @@ t_stats(_) -> {'$gen_call', From, stats} -> gen_server:reply(From, emqx_connection:stats(st())) after - 0 -> error("error") + 100 -> error("error") end end), Stats = emqx_connection:stats(CPid), ?assertMatch([{recv_oct,0}, - {recv_cnt,0}, - {send_oct,0}, - {send_cnt,0}, - {send_pend,0}| _] , Stats). + {recv_cnt,0}, + {send_oct,0}, + {send_cnt,0}, + {send_pend,0}| _] , Stats). t_process_msg(_) -> with_conn(fun(CPid) -> @@ -384,7 +384,7 @@ trap_exit(Pid, Reason) -> {'EXIT', Pid, Reason} -> ok; {'EXIT', Pid, Other} -> error({unexpect_exit, Other}) after - 0 -> error({expect_exit, Reason}) + 100 -> error({expect_exit, Reason}) end. make_frame(Packet) -> diff --git a/test/emqx_json_SUITE.erl b/test/emqx_json_SUITE.erl index b08ca37c9..d0adf0650 100644 --- a/test/emqx_json_SUITE.erl +++ b/test/emqx_json_SUITE.erl @@ -45,6 +45,10 @@ %% {[{foo, bar}]} -> {"foo": "bar"} -> {[{<<"foo">>, <<"bar">>}]} %% {[{<<"foo">>, <<"bar">>}]} -> {"foo": "bar"} -> {[{<<"foo">>, <<"bar">>}]} %% #{<<"foo">> => <<"bar">>} -> {"foo": "bar"} -> #{<<"foo">> => <<"bar">>} +%% +%% Extension: +%% [{<<"foo">>, <<"bar">>}] -> {"foo": "bar"} -> [{<<"foo">>, <<"bar">>}] +%% %%-------------------------------------------------------------------- all() -> emqx_ct:all(?MODULE). @@ -62,6 +66,7 @@ t_decode_encode(_) -> [] = decode(encode({[]})), [{<<"foo">>, <<"bar">>}] = decode(encode({[{foo, bar}]})), [{<<"foo">>, <<"bar">>}] = decode(encode({[{<<"foo">>, <<"bar">>}]})), + [{<<"foo">>, <<"bar">>}] = decode(encode([{<<"foo">>, <<"bar">>}])), #{<<"foo">> := <<"bar">>} = decode(encode(#{<<"foo">> => <<"bar">>}), [return_maps]), JsonText = <<"{\"bool\":true,\"int\":10,\"foo\":\"bar\"}">>, JsonMaps = #{<<"bool">> => true, @@ -84,6 +89,7 @@ t_safe_decode_encode(_) -> [] = safe_encode_decode({[]}), [{<<"foo">>, <<"bar">>}] = safe_encode_decode({[{foo, bar}]}), [{<<"foo">>, <<"bar">>}] = safe_encode_decode({[{<<"foo">>, <<"bar">>}]}), + [{<<"foo">>, <<"bar">>}] = safe_encode_decode([{<<"foo">>, <<"bar">>}]), {ok, Json} = emqx_json:safe_encode(#{<<"foo">> => <<"bar">>}), {ok, #{<<"foo">> := <<"bar">>}} = emqx_json:safe_decode(Json, [return_maps]). From 368b85027ee7d3f69f37c57e920025b0a63a1a66 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Tue, 4 Feb 2020 09:50:21 +0800 Subject: [PATCH 12/15] Fix issue#3228 --- src/emqx_metrics.erl | 64 +++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 06f13ed86..47c2e9a6a 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -90,6 +90,7 @@ {counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent {counter, 'packets.publish.received'}, % PUBLISH packets received {counter, 'packets.publish.sent'}, % PUBLISH packets sent + {counter, 'packets.publish.inuse'}, % PUBLISH packet_id inuse {counter, 'packets.publish.error'}, % PUBLISH failed for error {counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error {counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped @@ -457,37 +458,38 @@ reserved_idx('packets.connack.error') -> 14; reserved_idx('packets.connack.auth_error') -> 15; reserved_idx('packets.publish.received') -> 16; reserved_idx('packets.publish.sent') -> 17; -reserved_idx('packets.publish.error') -> 18; -reserved_idx('packets.publish.auth_error') -> 19; -reserved_idx('packets.puback.received') -> 20; -reserved_idx('packets.puback.sent') -> 21; -reserved_idx('packets.puback.inuse') -> 22; -reserved_idx('packets.puback.missed') -> 23; -reserved_idx('packets.pubrec.received') -> 24; -reserved_idx('packets.pubrec.sent') -> 25; -reserved_idx('packets.pubrec.inuse') -> 26; -reserved_idx('packets.pubrec.missed') -> 27; -reserved_idx('packets.pubrel.received') -> 28; -reserved_idx('packets.pubrel.sent') -> 29; -reserved_idx('packets.pubrel.missed') -> 30; -reserved_idx('packets.pubcomp.received') -> 31; -reserved_idx('packets.pubcomp.sent') -> 32; -reserved_idx('packets.pubcomp.inuse') -> 33; -reserved_idx('packets.pubcomp.missed') -> 34; -reserved_idx('packets.subscribe.received') -> 35; -reserved_idx('packets.subscribe.error') -> 36; -reserved_idx('packets.subscribe.auth_error') -> 37; -reserved_idx('packets.suback.sent') -> 38; -reserved_idx('packets.unsubscribe.received') -> 39; -reserved_idx('packets.unsubscribe.error') -> 40; -reserved_idx('packets.unsuback.sent') -> 41; -reserved_idx('packets.pingreq.received') -> 42; -reserved_idx('packets.pingresp.sent') -> 43; -reserved_idx('packets.disconnect.received') -> 44; -reserved_idx('packets.disconnect.sent') -> 45; -reserved_idx('packets.auth.received') -> 46; -reserved_idx('packets.auth.sent') -> 47; -reserved_idx('packets.publish.dropped') -> 48; +reserved_idx('packets.publish.inuse') -> 18; +reserved_idx('packets.publish.error') -> 19; +reserved_idx('packets.publish.auth_error') -> 20; +reserved_idx('packets.puback.received') -> 21; +reserved_idx('packets.puback.sent') -> 22; +reserved_idx('packets.puback.inuse') -> 23; +reserved_idx('packets.puback.missed') -> 24; +reserved_idx('packets.pubrec.received') -> 25; +reserved_idx('packets.pubrec.sent') -> 26; +reserved_idx('packets.pubrec.inuse') -> 27; +reserved_idx('packets.pubrec.missed') -> 28; +reserved_idx('packets.pubrel.received') -> 29; +reserved_idx('packets.pubrel.sent') -> 30; +reserved_idx('packets.pubrel.missed') -> 31; +reserved_idx('packets.pubcomp.received') -> 32; +reserved_idx('packets.pubcomp.sent') -> 33; +reserved_idx('packets.pubcomp.inuse') -> 34; +reserved_idx('packets.pubcomp.missed') -> 35; +reserved_idx('packets.subscribe.received') -> 36; +reserved_idx('packets.subscribe.error') -> 37; +reserved_idx('packets.subscribe.auth_error') -> 38; +reserved_idx('packets.suback.sent') -> 39; +reserved_idx('packets.unsubscribe.received') -> 40; +reserved_idx('packets.unsubscribe.error') -> 41; +reserved_idx('packets.unsuback.sent') -> 42; +reserved_idx('packets.pingreq.received') -> 43; +reserved_idx('packets.pingresp.sent') -> 44; +reserved_idx('packets.disconnect.received') -> 45; +reserved_idx('packets.disconnect.sent') -> 46; +reserved_idx('packets.auth.received') -> 47; +reserved_idx('packets.auth.sent') -> 48; +reserved_idx('packets.publish.dropped') -> 49; %% Reserved indices of message's metrics reserved_idx('messages.received') -> 100; reserved_idx('messages.sent') -> 101; From b9bdb030052a42f3e4c7ba335d7fa9f587759edc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 6 Feb 2020 11:23:34 +0800 Subject: [PATCH 13/15] Decode/Encode json from nested proplist --- src/emqx_json.erl | 4 ++++ test/emqx_json_SUITE.erl | 1 + 2 files changed, 5 insertions(+) diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 563909e2c..c81c9b06b 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -103,6 +103,8 @@ safe_decode(Json, Opts) -> , from_ejson/1 ]}). +to_ejson([[{_,_}]|_] = L) -> + [to_ejson(E) || E <- L]; to_ejson([{_, _}|_] = L) -> lists:foldl( fun({Name, Value}, Acc) -> @@ -110,6 +112,8 @@ to_ejson([{_, _}|_] = L) -> end, #{}, L); to_ejson(T) -> T. +from_ejson([{_}|_] = L) -> + [from_ejson(E) || E <- L]; from_ejson({L}) -> [{Name, from_ejson(Value)} || {Name, Value} <- L]; from_ejson(T) -> T. diff --git a/test/emqx_json_SUITE.erl b/test/emqx_json_SUITE.erl index d0adf0650..434180cb9 100644 --- a/test/emqx_json_SUITE.erl +++ b/test/emqx_json_SUITE.erl @@ -67,6 +67,7 @@ t_decode_encode(_) -> [{<<"foo">>, <<"bar">>}] = decode(encode({[{foo, bar}]})), [{<<"foo">>, <<"bar">>}] = decode(encode({[{<<"foo">>, <<"bar">>}]})), [{<<"foo">>, <<"bar">>}] = decode(encode([{<<"foo">>, <<"bar">>}])), + [[{<<"foo">>, <<"bar">>}]] = decode(encode([[{<<"foo">>, <<"bar">>}]])), #{<<"foo">> := <<"bar">>} = decode(encode(#{<<"foo">> => <<"bar">>}), [return_maps]), JsonText = <<"{\"bool\":true,\"int\":10,\"foo\":\"bar\"}">>, JsonMaps = #{<<"bool">> => true, From 839465fbd8d8f6c69beed7a63b1ae9c493e90696 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 7 Feb 2020 09:19:21 +0800 Subject: [PATCH 14/15] Enhance security --- src/emqx_hooks.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index d0233a86c..062559eb3 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -165,7 +165,7 @@ safe_execute(Fun, Args) -> Result -> Result catch _:Reason:Stacktrace -> - ?LOG(error, "Failed to execute ~p(~p): ~p", [Fun, Args, {Reason, Stacktrace}]), + ?LOG(error, "Failed to execute ~p: ~p", [Fun, {Reason, Stacktrace}]), ok end. From f29a8b0d5ce6d267959203baeece3a59f5a252dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=A5=87=E6=80=AA?= Date: Fri, 7 Feb 2020 10:07:36 +0800 Subject: [PATCH 15/15] Mqtt protocol tests (#3237) Add more test cases for MQTT --- src/emqx_connection.erl | 8 +- src/emqx_packet.erl | 3 + test/mqtt_protocol_v5_SUITE.erl | 499 +++++++++++++++++++++++++++++++- 3 files changed, 496 insertions(+), 14 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 11683a6f2..a3d3e41a6 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -136,10 +136,10 @@ info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; -info(stats_timer, #state{stats_timer = Stats_timer}) -> - Stats_timer; -info(limit_timer, #state{limit_timer = Limit_timer}) -> - Limit_timer; +info(stats_timer, #state{stats_timer = StatsTimer}) -> + StatsTimer; +info(limit_timer, #state{limit_timer = LimitTimer}) -> + LimitTimer; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 6c17c17cb..354229f99 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -345,6 +345,9 @@ check_will_msg(#mqtt_packet_connect{will_flag = false}, _Caps) -> check_will_msg(#mqtt_packet_connect{will_retain = true}, _Opts = #{mqtt_retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; +check_will_msg(#mqtt_packet_connect{will_qos = WillQoS}, + _Opts = #{max_qos_allowed := MaxQoS}) when WillQoS > MaxQoS -> + {error, ?RC_QOS_NOT_SUPPORTED}; check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) -> try emqx_topic:validate(name, WillTopic) of true -> ok diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 01d4391f0..ddc20c040 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -48,6 +48,9 @@ end_per_suite(_Config) -> %% Helpers %%-------------------------------------------------------------------- +client_info(Key, Client) -> + maps:get(Key, maps:from_list(emqtt:info(Client)), undefined). + receive_messages(Count) -> receive_messages(Count, []). @@ -63,6 +66,14 @@ receive_messages(Count, Msgs) -> Msgs end. +receive_disconnect_reasoncode() -> + receive + {disconnected, ReasonCode, _} -> ReasonCode; + _Other -> receive_disconnect_reasoncode() + after 100 -> + error("no disconnect packet") + end. + clean_retained(Topic) -> {ok, Clean} = emqtt:start_link([{clean_start, true}]), {ok, _} = emqtt:connect(Clean), @@ -90,6 +101,106 @@ t_basic_test(_) -> %% Connection %%-------------------------------------------------------------------- +t_connect_clean_start(_) -> + {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]), + {ok, _} = emqtt:connect(Client1), + ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4] + ok = emqtt:pause(Client1), + {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]), + {ok, _} = emqtt:connect(Client2), + ?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5] + ok = emqtt:disconnect(Client2), + {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]), + {ok, _} = emqtt:connect(Client3), + ?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6] + ok = emqtt:disconnect(Client3). + +t_connect_will_message(_) -> + Topic = nth(1, ?TOPICS), + Payload = "will message", + + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload} + ]), + {ok, _} = emqtt:connect(Client1), + [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)), + ?assertNotEqual(undefined, maps:find(will_msg, emqx_connection:info(sys:get_state(ClientPid)))), %% [MQTT-3.1.2-7] + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), + + ok = emqtt:disconnect(Client1, 4), %% [MQTT-3.14.2-1] + [Msg | _ ] = receive_messages(1), + %% [MQTT-3.1.2-8] + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, 0}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client2), + + {ok, Client3} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload} + ]), + {ok, _} = emqtt:connect(Client3), + + {ok, Client4} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client4), + {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2), + ok = emqtt:disconnect(Client3), + ?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10] + ok = emqtt:disconnect(Client4). + +t_connect_will_retain(_) -> + Topic = nth(1, ?TOPICS), + Payload = "will message", + + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_retain, false} + ]), + {ok, _} = emqtt:connect(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), + + ok = emqtt:disconnect(Client1, 4), + [Msg1 | _ ] = receive_messages(1), + ?assertEqual({ok, false}, maps:find(retain, Msg1)), %% [MQTT-3.1.2-14] + ok = emqtt:disconnect(Client2), + + {ok, Client3} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_retain, true} + ]), + {ok, _} = emqtt:connect(Client3), + + {ok, Client4} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client4), + {ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), + + ok = emqtt:disconnect(Client3, 4), + [Msg2 | _ ] = receive_messages(1), + ?assertEqual({ok, true}, maps:find(retain, Msg2)), %% [MQTT-3.1.2-15] + ok = emqtt:disconnect(Client4), + clean_retained(Topic). + t_connect_idle_timeout(_) -> IdleTimeout = 2000, emqx_zone:set_env(external, idle_timeout, IdleTimeout), @@ -107,9 +218,9 @@ t_connect_limit_timeout(_) -> Topic = nth(1, ?TOPICS), emqx_zone:set_env(external, publish_limit, {2.0, 3}), - {ok, Client} = emqtt:start_link([{clientid, <<"t_connect_limit_timeout">>},{proto_ver, v5},{keepalive, 60}]), + {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, _} = emqtt:connect(Client), - [ClientPid] = emqx_cm:lookup_channels(<<"t_connect_limit_timeout">>), + [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))), ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), @@ -119,21 +230,21 @@ t_connect_limit_timeout(_) -> timer:sleep(200), ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), - emqtt:disconnect(Client), + ok = emqtt:disconnect(Client), meck:unload(proplists). t_connect_emit_stats_timeout(_) -> IdleTimeout = 2000, emqx_zone:set_env(external, idle_timeout, IdleTimeout), - {ok, Client} = emqtt:start_link([{clientid, <<"t_connect_emit_stats_timeout">>},{proto_ver, v5},{keepalive, 60}]), + {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, _} = emqtt:connect(Client), - [ClientPid] = emqx_cm:lookup_channels(<<"t_connect_emit_stats_timeout">>), + [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), timer:sleep(IdleTimeout), ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))), - emqtt:disconnect(Client). + ok = emqtt:disconnect(Client). %% [MQTT-3.1.2-22] t_connect_keepalive_timeout(_) -> @@ -144,13 +255,380 @@ t_connect_keepalive_timeout(_) -> {ok, _} = emqtt:connect(Client), emqtt:pause(Client), receive - Msg -> - ReasonCode = 141, - ?assertMatch({disconnected, ReasonCode, _Channel}, Msg) + {disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode) after round(timer:seconds(Keepalive) * 2 * 1.5 ) -> error("keepalive timeout") end. +%% [MQTT-3.1.2-23] +t_connect_session_expiry_interval(_) -> + Topic = nth(1, ?TOPICS), + Payload = "test message", + + {ok, Client1} = emqtt:start_link([ + {clientid, <<"t_connect_session_expiry_interval">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}} + ]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, 2} = emqtt:publish(Client2, Topic, Payload, 2), + ok = emqtt:disconnect(Client2), + + {ok, Client3} = emqtt:start_link([ + {clientid, <<"t_connect_session_expiry_interval">>}, + {proto_ver, v5}, + {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client3), + [Msg | _ ] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, 2}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client3). + +%% [MQTT-3.1.3-9] +t_connect_will_delay_interval(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + Payload = "will message", + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"t_connect_will_delay_interval">>}, + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_qos, 2}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_props, #{'Will-Delay-Interval' => 3}}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {keepalive, 2} + ]), + {ok, _} = emqtt:connect(Client2), + + timer:sleep(5000), + ?assertEqual(0, length(receive_messages(1))), + timer:sleep(7000), + ?assertEqual(1, length(receive_messages(1))), + + {ok, Client3} = emqtt:start_link([ + {clientid, <<"t_connect_will_delay_interval">>}, + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_qos, 2}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_props, #{'Will-Delay-Interval' => 7200}}, + {properties, #{'Session-Expiry-Interval' => 3}}, + {keepalive, 2} + ]), + {ok, _} = emqtt:connect(Client3), + + timer:sleep(5000), + ?assertEqual(0, length(receive_messages(1))), + timer:sleep(7000), + ?assertEqual(1, length(receive_messages(1))), + + ok = emqtt:disconnect(Client1), + process_flag(trap_exit, false). + +%% [MQTT-3.1.4-3] +t_connect_duplicate_clientid(_) -> + {ok, Client1} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client1), + {ok, Client2} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client2), + ?assertEqual(142, receive_disconnect_reasoncode()). + +%%-------------------------------------------------------------------- +%% Connack +%%-------------------------------------------------------------------- + +t_connack_session_present(_) -> + {ok, Client1} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {clean_start, true} + ]), + {ok, _} = emqtt:connect(Client1), + ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.2.2-2] + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client2), + ?assertEqual(1, client_info(session_present, Client2)), %% [[MQTT-3.2.2-3]] + ok = emqtt:disconnect(Client2). + +t_connack_max_qos_allowed(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + + %% max_qos_allowed = 0 + emqx_zone:set_env(external, max_qos_allowed, 0), + persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), + persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, Connack1} = emqtt:connect(Client1), + ?assertEqual(0, maps:get('Maximum-QoS',Connack1)), %% [MQTT-3.2.2-9] + + {ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10] + {ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1), %% [MQTT-3.2.2-10] + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), %% [MQTT-3.2.2-10] + + {ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1), + ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] + + {ok, Client2} = emqtt:start_link([ + {proto_ver, v5}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, <<"Unsupported Qos">>}, + {will_qos, 2} + ]), + {error, Connack2} = emqtt:connect(Client2), + ?assertMatch({qos_not_supported,_ }, Connack2), %% [MQTT-3.2.2-12] + + %% max_qos_allowed = 1 + emqx_zone:set_env(external, max_qos_allowed, 1), + persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), + persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + + {ok, Client3} = emqtt:start_link([{proto_ver, v5}]), + {ok, Connack3} = emqtt:connect(Client3), + ?assertEqual(1, maps:get('Maximum-QoS',Connack3)), %% [MQTT-3.2.2-9] + + {ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10] + {ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1), %% [MQTT-3.2.2-10] + {ok, _, [2]} = emqtt:subscribe(Client3, Topic, 2), %% [MQTT-3.2.2-10] + + {ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2), + ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] + + {ok, Client4} = emqtt:start_link([ + {proto_ver, v5}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, <<"Unsupported Qos">>}, + {will_qos, 2} + ]), + {error, Connack4} = emqtt:connect(Client4), + ?assertMatch({qos_not_supported,_ }, Connack4), %% [MQTT-3.2.2-12] + receive + {'EXIT', _, {shutdown,qos_not_supported}} -> ok + after 100 -> error("t_connack_max_qos_allowed") + end, + + %% max_qos_allowed = 2 + emqx_zone:set_env(external, max_qos_allowed, 2), + persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), + persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + + {ok, Client5} = emqtt:start_link([{proto_ver, v5}]), + {ok, Connack5} = emqtt:connect(Client5), + ?assertEqual(2, maps:get('Maximum-QoS',Connack5)), %% [MQTT-3.2.2-9] + ok = emqtt:disconnect(Client5), + + process_flag(trap_exit, false). + +t_connack_assigned_clienid(_) -> + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ?assert(is_binary(client_info(clientid, Client1))), %% [MQTT-3.2.2-16] + ok = emqtt:disconnect(Client1). + +%%-------------------------------------------------------------------- +%% Publish +%%-------------------------------------------------------------------- + +t_publish_rap(_) -> + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), + {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), + [Msg1 | _] = receive_messages(1), + ?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12] + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]), + {ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), + [Msg2 | _] = receive_messages(1), + ?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13] + ok = emqtt:disconnect(Client2), + + clean_retained(Topic). + +t_publish_wildtopic(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?WILD_TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:publish(Client1, Topic, <<"error topic">>), + ?assertEqual(144, receive_disconnect_reasoncode()), + + process_flag(trap_exit, false). + +t_publish_payload_format_indicator(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{'Payload-Format-Indicator' => 233}, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]), + [Msg1 | _] = receive_messages(1), + ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-6] + ok = emqtt:disconnect(Client1). + +t_publish_topic_alias(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8] + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), + ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12] + ok = emqtt:disconnect(Client2), + process_flag(trap_exit, false). + +t_publish_response_topic(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]), + ?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14] + + process_flag(trap_exit, false). + +t_publish_properties(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{ + 'Response-Topic' => Topic, %% [MQTT-3.3.2-15] + 'Correlation-Data' => <<"233">>, %% [MQTT-3.3.2-16] + 'User-Property' => [{<<"a">>, <<"2333">>}], %% [MQTT-3.3.2-18] + 'Content-Type' => <<"2333">> %% [MQTT-3.3.2-20] + }, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]), + [Msg1 | _] = receive_messages(1), + ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-16] + ok = emqtt:disconnect(Client1). + +t_publish_overlapping_subscriptions(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{'Subscription-Identifier' => 2333}, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1), + {ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0), + {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]), + + [Msg1 | _ ] = receive_messages(2), + ?assert( maps:get(qos, Msg1) < 2 ), %% [MQTT-3.3.4-2] + ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.4-3] + ok = emqtt:disconnect(Client1). + +%%-------------------------------------------------------------------- +%% Subsctibe +%%-------------------------------------------------------------------- + +t_subscribe_no_local(_) -> + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), + + ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0), + ?assertEqual(1, length(receive_messages(2))), %% [MQTT-3.8.3-3] + ok = emqtt:disconnect(Client1). + +t_subscribe_actions(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{'Subscription-Identifier' => 2333}, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2), + {ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1), + {ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2), + [Msg1 | _ ] = receive_messages(1), + ?assertEqual(1, maps:get(qos, Msg1)), %% [MQTT-3.8.4-3] [MQTT-3.8.4-8] + + {ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2}, {nth(2, ?TOPICS), qos2}] ), %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7] + ok = emqtt:disconnect(Client1). +%%-------------------------------------------------------------------- +%% Unsubsctibe Unsuback +%%-------------------------------------------------------------------- + +t_unscbsctibe(_) -> + Topic1 = nth(1, ?TOPICS), + Topic2 = nth(2, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), + {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1), %% [MQTT-3.10.4-4] + {ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5] + + {ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]), + {ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]), %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2] + ok = emqtt:disconnect(Client1). + +%%-------------------------------------------------------------------- +%% Pingreq +%%-------------------------------------------------------------------- + +t_pingreq(_) -> + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + pong = emqtt:ping(Client1), %% [MQTT-3.12.4-1] + ok = emqtt:disconnect(Client1). + %%-------------------------------------------------------------------- %% Shared Subscriptions %%-------------------------------------------------------------------- @@ -193,4 +671,5 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> error("disconnected timeout") end, - ?assertEqual(1, counters:get(CRef, 1)). + ?assertEqual(1, counters:get(CRef, 1)), + process_flag(trap_exit, false).