Merge pull request #3238 from emqx/master
Auto-pull-request-by-2020-02-07
This commit is contained in:
commit
fe94acffec
|
@ -570,11 +570,6 @@ mqtt.strict_mode = false
|
|||
## Value: duration
|
||||
zone.external.idle_timeout = 15s
|
||||
|
||||
## Hibernate after a duration of idle state.
|
||||
##
|
||||
## Value: duration
|
||||
zone.external.hibernate_after = 60s
|
||||
|
||||
## Publish limit for the external MQTT connections.
|
||||
##
|
||||
## Value: Number,Duration
|
||||
|
|
|
@ -735,12 +735,6 @@ end}.
|
|||
{datatype, {duration, ms}}
|
||||
]}.
|
||||
|
||||
%% @doc Hibernate after a duration of idle state.
|
||||
{mapping, "zone.$name.hibernate_after", "emqx.zones", [
|
||||
{default, "60s"},
|
||||
{datatype, {duration, ms}}
|
||||
]}.
|
||||
|
||||
{mapping, "zone.$name.allow_anonymous", "emqx.zones", [
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
|
32
rebar.config
32
rebar.config
|
@ -1,24 +1,32 @@
|
|||
{minimum_otp_vsn, "21.0"}.
|
||||
|
||||
{deps,
|
||||
[{jsx, "2.10.0"},
|
||||
{gproc, "0.8.0"},
|
||||
[{gproc, "0.8.0"},
|
||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.2"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.2"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
|
||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||
]}.
|
||||
|
||||
{edoc_opts, [{preprocess, true}]}.
|
||||
{erl_opts, [warn_unused_vars,
|
||||
warn_shadow_vars,
|
||||
warn_unused_import,
|
||||
warn_obsolete_guard,
|
||||
debug_info]}.
|
||||
debug_info,
|
||||
compressed %% for edge
|
||||
]}.
|
||||
|
||||
{overrides, [{add, [{erl_opts, [compressed]}]}]}.
|
||||
|
||||
{edoc_opts, [{preprocess, true}]}.
|
||||
|
||||
{xref_checks, [undefined_function_calls, undefined_functions,
|
||||
locals_not_used, deprecated_function_calls,
|
||||
warnings_as_errors, deprecated_functions]}.
|
||||
warnings_as_errors, deprecated_functions
|
||||
]}.
|
||||
|
||||
{cover_enabled, true}.
|
||||
{cover_opts, [verbose]}.
|
||||
{cover_export_enabled, true}.
|
||||
|
@ -30,9 +38,11 @@
|
|||
{profiles,
|
||||
[{test,
|
||||
[{deps,
|
||||
[{bbmustache, "1.7.0"}, % hex
|
||||
{emqtt, {git, "https://github.com/emqx/emqtt", {branch, "develop"}}},
|
||||
{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "develop"}}}
|
||||
]}
|
||||
[{bbmustache, "1.7.0"},
|
||||
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}},
|
||||
{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}}
|
||||
]},
|
||||
{erl_opts, [debug_info]}
|
||||
]}
|
||||
]}.
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
{vsn, "git"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy,
|
||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,
|
||||
sasl,os_mon]},
|
||||
{mod, {emqx_app,[]}},
|
||||
{env, []},
|
||||
|
|
|
@ -154,16 +154,21 @@ encode_alarm({AlarmId, #alarm{severity = Severity,
|
|||
title = Title,
|
||||
summary = Summary,
|
||||
timestamp = Ts}}) ->
|
||||
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
|
||||
{desc, [{severity, Severity},
|
||||
{title, iolist_to_binary(Title)},
|
||||
{summary, iolist_to_binary(Summary)},
|
||||
{timestamp, Ts}]}]);
|
||||
Descr = #{severity => Severity,
|
||||
title => iolist_to_binary(Title),
|
||||
summary => iolist_to_binary(Summary),
|
||||
timestamp => Ts
|
||||
},
|
||||
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId),
|
||||
desc => Descr
|
||||
});
|
||||
|
||||
encode_alarm({AlarmId, undefined}) ->
|
||||
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)}]);
|
||||
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId)});
|
||||
encode_alarm({AlarmId, AlarmDesc}) ->
|
||||
emqx_json:safe_encode([{id, maybe_to_binary(AlarmId)},
|
||||
{desc, maybe_to_binary(AlarmDesc)}]).
|
||||
emqx_json:safe_encode(#{id => maybe_to_binary(AlarmId),
|
||||
desc => maybe_to_binary(AlarmDesc)
|
||||
}).
|
||||
|
||||
alarm_msg(Topic, Payload) ->
|
||||
Msg = emqx_message:make(?MODULE, Topic, Payload),
|
||||
|
|
|
@ -136,10 +136,10 @@ info(sockstate, #state{sockstate = SockSt}) ->
|
|||
SockSt;
|
||||
info(active_n, #state{active_n = ActiveN}) ->
|
||||
ActiveN;
|
||||
info(stats_timer, #state{stats_timer = Stats_timer}) ->
|
||||
Stats_timer;
|
||||
info(limit_timer, #state{limit_timer = Limit_timer}) ->
|
||||
Limit_timer;
|
||||
info(stats_timer, #state{stats_timer = StatsTimer}) ->
|
||||
StatsTimer;
|
||||
info(limit_timer, #state{limit_timer = LimitTimer}) ->
|
||||
LimitTimer;
|
||||
info(limiter, #state{limiter = Limiter}) ->
|
||||
maybe_apply(fun emqx_limiter:info/1, Limiter).
|
||||
|
||||
|
|
|
@ -165,7 +165,7 @@ safe_execute(Fun, Args) ->
|
|||
Result -> Result
|
||||
catch
|
||||
_:Reason:Stacktrace ->
|
||||
?LOG(error, "Failed to execute ~p(~p): ~p", [Fun, Args, {Reason, Stacktrace}]),
|
||||
?LOG(error, "Failed to execute ~p: ~p", [Fun, {Reason, Stacktrace}]),
|
||||
ok
|
||||
end.
|
||||
|
||||
|
|
|
@ -40,22 +40,30 @@
|
|||
, decode/2
|
||||
]}).
|
||||
|
||||
-spec(encode(jsx:json_term()) -> jsx:json_text()).
|
||||
-type(encode_options() :: jiffy:encode_options()).
|
||||
-type(decode_options() :: jiffy:decode_options()).
|
||||
|
||||
-type(json_text() :: iolist() | binary()).
|
||||
-type(json_term() :: jiffy:jiffy_decode_result()).
|
||||
|
||||
-export_type([json_text/0, json_term/0]).
|
||||
-export_type([decode_options/0, encode_options/0]).
|
||||
|
||||
-spec(encode(json_term()) -> json_text()).
|
||||
encode(Term) ->
|
||||
jsx:encode(Term).
|
||||
encode(Term, []).
|
||||
|
||||
-spec(encode(jsx:json_term(), jsx_to_json:config())
|
||||
-> jsx:json_text()).
|
||||
-spec(encode(json_term(), encode_options()) -> json_text()).
|
||||
encode(Term, Opts) ->
|
||||
jsx:encode(Term, Opts).
|
||||
jiffy:encode(to_ejson(Term), Opts).
|
||||
|
||||
-spec(safe_encode(jsx:json_term())
|
||||
-> {ok, jsx:json_text()} | {error, Reason :: term()}).
|
||||
-spec(safe_encode(json_term())
|
||||
-> {ok, json_text()} | {error, Reason :: term()}).
|
||||
safe_encode(Term) ->
|
||||
safe_encode(Term, []).
|
||||
|
||||
-spec(safe_encode(jsx:json_term(), jsx_to_json:config())
|
||||
-> {ok, jsx:json_text()} | {error, Reason :: term()}).
|
||||
-spec(safe_encode(json_term(), encode_options())
|
||||
-> {ok, json_text()} | {error, Reason :: term()}).
|
||||
safe_encode(Term, Opts) ->
|
||||
try encode(Term, Opts) of
|
||||
Json -> {ok, Json}
|
||||
|
@ -64,22 +72,20 @@ safe_encode(Term, Opts) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec(decode(jsx:json_text()) -> jsx:json_term()).
|
||||
decode(Json) ->
|
||||
jsx:decode(Json).
|
||||
-spec(decode(json_text()) -> json_term()).
|
||||
decode(Json) -> decode(Json, []).
|
||||
|
||||
-spec(decode(jsx:json_text(), jsx_to_json:config())
|
||||
-> jsx:json_term()).
|
||||
-spec(decode(json_text(), decode_options()) -> json_term()).
|
||||
decode(Json, Opts) ->
|
||||
jsx:decode(Json, Opts).
|
||||
from_ejson(jiffy:decode(Json, Opts)).
|
||||
|
||||
-spec(safe_decode(jsx:json_text())
|
||||
-> {ok, jsx:json_term()} | {error, Reason :: term()}).
|
||||
-spec(safe_decode(json_text())
|
||||
-> {ok, json_term()} | {error, Reason :: term()}).
|
||||
safe_decode(Json) ->
|
||||
safe_decode(Json, []).
|
||||
|
||||
-spec(safe_decode(jsx:json_text(), jsx_to_json:config())
|
||||
-> {ok, jsx:json_term()} | {error, Reason :: term()}).
|
||||
-spec(safe_decode(json_text(), decode_options())
|
||||
-> {ok, json_term()} | {error, Reason :: term()}).
|
||||
safe_decode(Json, Opts) ->
|
||||
try decode(Json, Opts) of
|
||||
Term -> {ok, Term}
|
||||
|
@ -88,3 +94,27 @@ safe_decode(Json, Opts) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-compile({inline,
|
||||
[ to_ejson/1
|
||||
, from_ejson/1
|
||||
]}).
|
||||
|
||||
to_ejson([[{_,_}]|_] = L) ->
|
||||
[to_ejson(E) || E <- L];
|
||||
to_ejson([{_, _}|_] = L) ->
|
||||
lists:foldl(
|
||||
fun({Name, Value}, Acc) ->
|
||||
Acc#{Name => to_ejson(Value)}
|
||||
end, #{}, L);
|
||||
to_ejson(T) -> T.
|
||||
|
||||
from_ejson([{_}|_] = L) ->
|
||||
[from_ejson(E) || E <- L];
|
||||
from_ejson({L}) ->
|
||||
[{Name, from_ejson(Value)} || {Name, Value} <- L];
|
||||
from_ejson(T) -> T.
|
||||
|
||||
|
|
|
@ -90,6 +90,7 @@
|
|||
{counter, 'packets.connack.auth_error'}, % CONNACK auth_error sent
|
||||
{counter, 'packets.publish.received'}, % PUBLISH packets received
|
||||
{counter, 'packets.publish.sent'}, % PUBLISH packets sent
|
||||
{counter, 'packets.publish.inuse'}, % PUBLISH packet_id inuse
|
||||
{counter, 'packets.publish.error'}, % PUBLISH failed for error
|
||||
{counter, 'packets.publish.auth_error'}, % PUBLISH failed for auth error
|
||||
{counter, 'packets.publish.dropped'}, % PUBLISH(QoS2) packets dropped
|
||||
|
@ -457,37 +458,38 @@ reserved_idx('packets.connack.error') -> 14;
|
|||
reserved_idx('packets.connack.auth_error') -> 15;
|
||||
reserved_idx('packets.publish.received') -> 16;
|
||||
reserved_idx('packets.publish.sent') -> 17;
|
||||
reserved_idx('packets.publish.error') -> 18;
|
||||
reserved_idx('packets.publish.auth_error') -> 19;
|
||||
reserved_idx('packets.puback.received') -> 20;
|
||||
reserved_idx('packets.puback.sent') -> 21;
|
||||
reserved_idx('packets.puback.inuse') -> 22;
|
||||
reserved_idx('packets.puback.missed') -> 23;
|
||||
reserved_idx('packets.pubrec.received') -> 24;
|
||||
reserved_idx('packets.pubrec.sent') -> 25;
|
||||
reserved_idx('packets.pubrec.inuse') -> 26;
|
||||
reserved_idx('packets.pubrec.missed') -> 27;
|
||||
reserved_idx('packets.pubrel.received') -> 28;
|
||||
reserved_idx('packets.pubrel.sent') -> 29;
|
||||
reserved_idx('packets.pubrel.missed') -> 30;
|
||||
reserved_idx('packets.pubcomp.received') -> 31;
|
||||
reserved_idx('packets.pubcomp.sent') -> 32;
|
||||
reserved_idx('packets.pubcomp.inuse') -> 33;
|
||||
reserved_idx('packets.pubcomp.missed') -> 34;
|
||||
reserved_idx('packets.subscribe.received') -> 35;
|
||||
reserved_idx('packets.subscribe.error') -> 36;
|
||||
reserved_idx('packets.subscribe.auth_error') -> 37;
|
||||
reserved_idx('packets.suback.sent') -> 38;
|
||||
reserved_idx('packets.unsubscribe.received') -> 39;
|
||||
reserved_idx('packets.unsubscribe.error') -> 40;
|
||||
reserved_idx('packets.unsuback.sent') -> 41;
|
||||
reserved_idx('packets.pingreq.received') -> 42;
|
||||
reserved_idx('packets.pingresp.sent') -> 43;
|
||||
reserved_idx('packets.disconnect.received') -> 44;
|
||||
reserved_idx('packets.disconnect.sent') -> 45;
|
||||
reserved_idx('packets.auth.received') -> 46;
|
||||
reserved_idx('packets.auth.sent') -> 47;
|
||||
reserved_idx('packets.publish.dropped') -> 48;
|
||||
reserved_idx('packets.publish.inuse') -> 18;
|
||||
reserved_idx('packets.publish.error') -> 19;
|
||||
reserved_idx('packets.publish.auth_error') -> 20;
|
||||
reserved_idx('packets.puback.received') -> 21;
|
||||
reserved_idx('packets.puback.sent') -> 22;
|
||||
reserved_idx('packets.puback.inuse') -> 23;
|
||||
reserved_idx('packets.puback.missed') -> 24;
|
||||
reserved_idx('packets.pubrec.received') -> 25;
|
||||
reserved_idx('packets.pubrec.sent') -> 26;
|
||||
reserved_idx('packets.pubrec.inuse') -> 27;
|
||||
reserved_idx('packets.pubrec.missed') -> 28;
|
||||
reserved_idx('packets.pubrel.received') -> 29;
|
||||
reserved_idx('packets.pubrel.sent') -> 30;
|
||||
reserved_idx('packets.pubrel.missed') -> 31;
|
||||
reserved_idx('packets.pubcomp.received') -> 32;
|
||||
reserved_idx('packets.pubcomp.sent') -> 33;
|
||||
reserved_idx('packets.pubcomp.inuse') -> 34;
|
||||
reserved_idx('packets.pubcomp.missed') -> 35;
|
||||
reserved_idx('packets.subscribe.received') -> 36;
|
||||
reserved_idx('packets.subscribe.error') -> 37;
|
||||
reserved_idx('packets.subscribe.auth_error') -> 38;
|
||||
reserved_idx('packets.suback.sent') -> 39;
|
||||
reserved_idx('packets.unsubscribe.received') -> 40;
|
||||
reserved_idx('packets.unsubscribe.error') -> 41;
|
||||
reserved_idx('packets.unsuback.sent') -> 42;
|
||||
reserved_idx('packets.pingreq.received') -> 43;
|
||||
reserved_idx('packets.pingresp.sent') -> 44;
|
||||
reserved_idx('packets.disconnect.received') -> 45;
|
||||
reserved_idx('packets.disconnect.sent') -> 46;
|
||||
reserved_idx('packets.auth.received') -> 47;
|
||||
reserved_idx('packets.auth.sent') -> 48;
|
||||
reserved_idx('packets.publish.dropped') -> 49;
|
||||
%% Reserved indices of message's metrics
|
||||
reserved_idx('messages.received') -> 100;
|
||||
reserved_idx('messages.sent') -> 101;
|
||||
|
|
|
@ -345,6 +345,9 @@ check_will_msg(#mqtt_packet_connect{will_flag = false}, _Caps) ->
|
|||
check_will_msg(#mqtt_packet_connect{will_retain = true},
|
||||
_Opts = #{mqtt_retain_available := false}) ->
|
||||
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||
check_will_msg(#mqtt_packet_connect{will_qos = WillQoS},
|
||||
_Opts = #{max_qos_allowed := MaxQoS}) when WillQoS > MaxQoS ->
|
||||
{error, ?RC_QOS_NOT_SUPPORTED};
|
||||
check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) ->
|
||||
try emqx_topic:validate(name, WillTopic) of
|
||||
true -> ok
|
||||
|
|
|
@ -93,7 +93,7 @@ t_info(_) ->
|
|||
{'$gen_call', From, info} ->
|
||||
gen_server:reply(From, emqx_connection:info(st()))
|
||||
after
|
||||
0 -> error("error")
|
||||
100 -> error("error")
|
||||
end
|
||||
end),
|
||||
#{sockinfo := SockInfo} = emqx_connection:info(CPid),
|
||||
|
@ -113,7 +113,7 @@ t_stats(_) ->
|
|||
{'$gen_call', From, stats} ->
|
||||
gen_server:reply(From, emqx_connection:stats(st()))
|
||||
after
|
||||
0 -> error("error")
|
||||
100 -> error("error")
|
||||
end
|
||||
end),
|
||||
Stats = emqx_connection:stats(CPid),
|
||||
|
@ -384,7 +384,7 @@ trap_exit(Pid, Reason) ->
|
|||
{'EXIT', Pid, Reason} -> ok;
|
||||
{'EXIT', Pid, Other} -> error({unexpect_exit, Other})
|
||||
after
|
||||
0 -> error({expect_exit, Reason})
|
||||
100 -> error({expect_exit, Reason})
|
||||
end.
|
||||
|
||||
make_frame(Packet) ->
|
||||
|
|
|
@ -21,24 +21,83 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(DEC_OPTS, [{labels, atom}, return_maps]).
|
||||
-import(emqx_json,
|
||||
[ encode/1
|
||||
, decode/1
|
||||
, decode/2
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Erlang JSON Erlang
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% null -> null -> null
|
||||
%% true -> true -> true
|
||||
%% false -> false -> false
|
||||
%% "hi" -> [104, 105] -> [104, 105]
|
||||
%% <<"hi">> -> "hi" -> <<"hi">>
|
||||
%% hi -> "hi" -> <<"hi">>
|
||||
%% 1 -> 1 -> 1
|
||||
%% 1.25 -> 1.25 -> 1.25
|
||||
%% [] -> [] -> []
|
||||
%% [true, 1.0] -> [true, 1.0] -> [true, 1.0]
|
||||
%% {[]} -> {} -> {[]}
|
||||
%% {[{foo, bar}]} -> {"foo": "bar"} -> {[{<<"foo">>, <<"bar">>}]}
|
||||
%% {[{<<"foo">>, <<"bar">>}]} -> {"foo": "bar"} -> {[{<<"foo">>, <<"bar">>}]}
|
||||
%% #{<<"foo">> => <<"bar">>} -> {"foo": "bar"} -> #{<<"foo">> => <<"bar">>}
|
||||
%%
|
||||
%% Extension:
|
||||
%% [{<<"foo">>, <<"bar">>}] -> {"foo": "bar"} -> [{<<"foo">>, <<"bar">>}]
|
||||
%%
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_decode_encode(_) ->
|
||||
JsonText = <<"{\"library\": \"jsx\", \"awesome\": true}">>,
|
||||
JsonTerm = emqx_json:decode(JsonText),
|
||||
JsonMaps = #{library => <<"jsx">>, awesome => true},
|
||||
?assertEqual(JsonText, emqx_json:encode(JsonTerm, [{space, 1}])),
|
||||
?assertEqual(JsonMaps, emqx_json:decode(JsonText, ?DEC_OPTS)).
|
||||
null = decode(encode(null)),
|
||||
true = decode(encode(true)),
|
||||
false = decode(encode(false)),
|
||||
"hi" = decode(encode("hi")),
|
||||
<<"hi">> = decode(encode(hi)),
|
||||
1 = decode(encode(1)),
|
||||
1.25 = decode(encode(1.25)),
|
||||
[] = decode(encode([])),
|
||||
[true, 1] = decode(encode([true, 1])),
|
||||
[] = decode(encode({[]})),
|
||||
[{<<"foo">>, <<"bar">>}] = decode(encode({[{foo, bar}]})),
|
||||
[{<<"foo">>, <<"bar">>}] = decode(encode({[{<<"foo">>, <<"bar">>}]})),
|
||||
[{<<"foo">>, <<"bar">>}] = decode(encode([{<<"foo">>, <<"bar">>}])),
|
||||
[[{<<"foo">>, <<"bar">>}]] = decode(encode([[{<<"foo">>, <<"bar">>}]])),
|
||||
#{<<"foo">> := <<"bar">>} = decode(encode(#{<<"foo">> => <<"bar">>}), [return_maps]),
|
||||
JsonText = <<"{\"bool\":true,\"int\":10,\"foo\":\"bar\"}">>,
|
||||
JsonMaps = #{<<"bool">> => true,
|
||||
<<"int">> => 10,
|
||||
<<"foo">> => <<"bar">>
|
||||
},
|
||||
?assertEqual(JsonText, encode({decode(JsonText)})),
|
||||
?assertEqual(JsonMaps, decode(JsonText, [return_maps])).
|
||||
|
||||
t_safe_decode_encode(_) ->
|
||||
JsonText = <<"{\"library\": \"jsx\", \"awesome\": true}">>,
|
||||
{ok, JsonTerm} = emqx_json:safe_decode(JsonText),
|
||||
JsonMaps = #{library => <<"jsx">>, awesome => true},
|
||||
?assertEqual({ok, JsonText}, emqx_json:safe_encode(JsonTerm, [{space, 1}])),
|
||||
?assertEqual({ok, JsonMaps}, emqx_json:safe_decode(JsonText, ?DEC_OPTS)),
|
||||
BadJsonText = <<"{\"library\", \"awesome\": true}">>,
|
||||
?assertEqual({error, badarg}, emqx_json:safe_decode(BadJsonText)),
|
||||
{error, badarg} = emqx_json:safe_encode({a, {b ,1}}).
|
||||
safe_encode_decode(null),
|
||||
safe_encode_decode(true),
|
||||
safe_encode_decode(false),
|
||||
"hi" = safe_encode_decode("hi"),
|
||||
<<"hi">> = safe_encode_decode(hi),
|
||||
1 = safe_encode_decode(1),
|
||||
1.25 = safe_encode_decode(1.25),
|
||||
[] = safe_encode_decode([]),
|
||||
[true, 1] = safe_encode_decode([true, 1]),
|
||||
[] = safe_encode_decode({[]}),
|
||||
[{<<"foo">>, <<"bar">>}] = safe_encode_decode({[{foo, bar}]}),
|
||||
[{<<"foo">>, <<"bar">>}] = safe_encode_decode({[{<<"foo">>, <<"bar">>}]}),
|
||||
[{<<"foo">>, <<"bar">>}] = safe_encode_decode([{<<"foo">>, <<"bar">>}]),
|
||||
{ok, Json} = emqx_json:safe_encode(#{<<"foo">> => <<"bar">>}),
|
||||
{ok, #{<<"foo">> := <<"bar">>}} = emqx_json:safe_decode(Json, [return_maps]).
|
||||
|
||||
safe_encode_decode(Term) ->
|
||||
{ok, Json} = emqx_json:safe_encode(Term),
|
||||
case emqx_json:safe_decode(Json) of
|
||||
{ok, {NTerm}} -> NTerm;
|
||||
{ok, NTerm} -> NTerm
|
||||
end.
|
||||
|
||||
|
|
|
@ -63,17 +63,17 @@ recv_and_check_presence(ClientId, Presence) ->
|
|||
binary:split(Topic, <<"/">>, [global])),
|
||||
case Presence of
|
||||
<<"connected">> ->
|
||||
?assertMatch(#{clientid := <<"clientid">>,
|
||||
username := <<"username">>,
|
||||
ipaddress := <<"127.0.0.1">>,
|
||||
proto_name := <<"MQTT">>,
|
||||
proto_ver := ?MQTT_PROTO_V4,
|
||||
connack := ?RC_SUCCESS,
|
||||
clean_start := true}, emqx_json:decode(Payload, [{labels, atom}, return_maps]));
|
||||
?assertMatch(#{<<"clientid">> := <<"clientid">>,
|
||||
<<"username">> := <<"username">>,
|
||||
<<"ipaddress">> := <<"127.0.0.1">>,
|
||||
<<"proto_name">> := <<"MQTT">>,
|
||||
<<"proto_ver">> := ?MQTT_PROTO_V4,
|
||||
<<"connack">> := ?RC_SUCCESS,
|
||||
<<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps]));
|
||||
<<"disconnected">> ->
|
||||
?assertMatch(#{clientid := <<"clientid">>,
|
||||
username := <<"username">>,
|
||||
reason := <<"normal">>}, emqx_json:decode(Payload, [{labels, atom}, return_maps]))
|
||||
?assertMatch(#{<<"clientid">> := <<"clientid">>,
|
||||
<<"username">> := <<"username">>,
|
||||
<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -48,6 +48,9 @@ end_per_suite(_Config) ->
|
|||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
client_info(Key, Client) ->
|
||||
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
|
||||
|
||||
receive_messages(Count) ->
|
||||
receive_messages(Count, []).
|
||||
|
||||
|
@ -63,6 +66,14 @@ receive_messages(Count, Msgs) ->
|
|||
Msgs
|
||||
end.
|
||||
|
||||
receive_disconnect_reasoncode() ->
|
||||
receive
|
||||
{disconnected, ReasonCode, _} -> ReasonCode;
|
||||
_Other -> receive_disconnect_reasoncode()
|
||||
after 100 ->
|
||||
error("no disconnect packet")
|
||||
end.
|
||||
|
||||
clean_retained(Topic) ->
|
||||
{ok, Clean} = emqtt:start_link([{clean_start, true}]),
|
||||
{ok, _} = emqtt:connect(Clean),
|
||||
|
@ -90,6 +101,106 @@ t_basic_test(_) ->
|
|||
%% Connection
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_connect_clean_start(_) ->
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4]
|
||||
ok = emqtt:pause(Client1),
|
||||
{ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5]
|
||||
ok = emqtt:disconnect(Client2),
|
||||
{ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6]
|
||||
ok = emqtt:disconnect(Client3).
|
||||
|
||||
t_connect_will_message(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Payload = "will message",
|
||||
|
||||
{ok, Client1} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)),
|
||||
?assertNotEqual(undefined, maps:find(will_msg, emqx_connection:info(sys:get_state(ClientPid)))), %% [MQTT-3.1.2-7]
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
|
||||
|
||||
ok = emqtt:disconnect(Client1, 4), %% [MQTT-3.14.2-1]
|
||||
[Msg | _ ] = receive_messages(1),
|
||||
%% [MQTT-3.1.2-8]
|
||||
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
|
||||
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
|
||||
?assertEqual({ok, 0}, maps:find(qos, Msg)),
|
||||
ok = emqtt:disconnect(Client2),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
|
||||
{ok, Client4} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client4),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2),
|
||||
ok = emqtt:disconnect(Client3),
|
||||
?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10]
|
||||
ok = emqtt:disconnect(Client4).
|
||||
|
||||
t_connect_will_retain(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Payload = "will message",
|
||||
|
||||
{ok, Client1} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload},
|
||||
{will_retain, false}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
|
||||
|
||||
ok = emqtt:disconnect(Client1, 4),
|
||||
[Msg1 | _ ] = receive_messages(1),
|
||||
?assertEqual({ok, false}, maps:find(retain, Msg1)), %% [MQTT-3.1.2-14]
|
||||
ok = emqtt:disconnect(Client2),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload},
|
||||
{will_retain, true}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
|
||||
{ok, Client4} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client4),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
|
||||
|
||||
ok = emqtt:disconnect(Client3, 4),
|
||||
[Msg2 | _ ] = receive_messages(1),
|
||||
?assertEqual({ok, true}, maps:find(retain, Msg2)), %% [MQTT-3.1.2-15]
|
||||
ok = emqtt:disconnect(Client4),
|
||||
clean_retained(Topic).
|
||||
|
||||
t_connect_idle_timeout(_) ->
|
||||
IdleTimeout = 2000,
|
||||
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
|
||||
|
@ -107,9 +218,9 @@ t_connect_limit_timeout(_) ->
|
|||
Topic = nth(1, ?TOPICS),
|
||||
emqx_zone:set_env(external, publish_limit, {2.0, 3}),
|
||||
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"t_connect_limit_timeout">>},{proto_ver, v5},{keepalive, 60}]),
|
||||
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
[ClientPid] = emqx_cm:lookup_channels(<<"t_connect_limit_timeout">>),
|
||||
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
|
||||
|
||||
?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))),
|
||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
||||
|
@ -119,21 +230,21 @@ t_connect_limit_timeout(_) ->
|
|||
timer:sleep(200),
|
||||
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
|
||||
|
||||
emqtt:disconnect(Client),
|
||||
ok = emqtt:disconnect(Client),
|
||||
meck:unload(proplists).
|
||||
|
||||
t_connect_emit_stats_timeout(_) ->
|
||||
IdleTimeout = 2000,
|
||||
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
|
||||
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"t_connect_emit_stats_timeout">>},{proto_ver, v5},{keepalive, 60}]),
|
||||
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
[ClientPid] = emqx_cm:lookup_channels(<<"t_connect_emit_stats_timeout">>),
|
||||
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
|
||||
|
||||
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
|
||||
timer:sleep(IdleTimeout),
|
||||
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
|
||||
emqtt:disconnect(Client).
|
||||
ok = emqtt:disconnect(Client).
|
||||
|
||||
%% [MQTT-3.1.2-22]
|
||||
t_connect_keepalive_timeout(_) ->
|
||||
|
@ -144,13 +255,380 @@ t_connect_keepalive_timeout(_) ->
|
|||
{ok, _} = emqtt:connect(Client),
|
||||
emqtt:pause(Client),
|
||||
receive
|
||||
Msg ->
|
||||
ReasonCode = 141,
|
||||
?assertMatch({disconnected, ReasonCode, _Channel}, Msg)
|
||||
{disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode)
|
||||
after round(timer:seconds(Keepalive) * 2 * 1.5 ) ->
|
||||
error("keepalive timeout")
|
||||
end.
|
||||
|
||||
%% [MQTT-3.1.2-23]
|
||||
t_connect_session_expiry_interval(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Payload = "test message",
|
||||
|
||||
{ok, Client1} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_session_expiry_interval">>},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => 7200}}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||
ok = emqtt:disconnect(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
{ok, 2} = emqtt:publish(Client2, Topic, Payload, 2),
|
||||
ok = emqtt:disconnect(Client2),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_session_expiry_interval">>},
|
||||
{proto_ver, v5},
|
||||
{clean_start, false}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
[Msg | _ ] = receive_messages(1),
|
||||
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
|
||||
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
|
||||
?assertEqual({ok, 2}, maps:find(qos, Msg)),
|
||||
ok = emqtt:disconnect(Client3).
|
||||
|
||||
%% [MQTT-3.1.3-9]
|
||||
t_connect_will_delay_interval(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Payload = "will message",
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_will_delay_interval">>},
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_qos, 2},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload},
|
||||
{will_props, #{'Will-Delay-Interval' => 3}},
|
||||
{properties, #{'Session-Expiry-Interval' => 7200}},
|
||||
{keepalive, 2}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
|
||||
timer:sleep(5000),
|
||||
?assertEqual(0, length(receive_messages(1))),
|
||||
timer:sleep(7000),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_will_delay_interval">>},
|
||||
{proto_ver, v5},
|
||||
{clean_start, true},
|
||||
{will_flag, true},
|
||||
{will_qos, 2},
|
||||
{will_topic, Topic},
|
||||
{will_payload, Payload},
|
||||
{will_props, #{'Will-Delay-Interval' => 7200}},
|
||||
{properties, #{'Session-Expiry-Interval' => 3}},
|
||||
{keepalive, 2}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client3),
|
||||
|
||||
timer:sleep(5000),
|
||||
?assertEqual(0, length(receive_messages(1))),
|
||||
timer:sleep(7000),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
|
||||
ok = emqtt:disconnect(Client1),
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
%% [MQTT-3.1.4-3]
|
||||
t_connect_duplicate_clientid(_) ->
|
||||
{ok, Client1} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_duplicate_clientid">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_duplicate_clientid">>},
|
||||
{proto_ver, v5}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
?assertEqual(142, receive_disconnect_reasoncode()).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Connack
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_connack_session_present(_) ->
|
||||
{ok, Client1} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_duplicate_clientid">>},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => 7200}},
|
||||
{clean_start, true}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.2.2-2]
|
||||
ok = emqtt:disconnect(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
{clientid, <<"t_connect_duplicate_clientid">>},
|
||||
{proto_ver, v5},
|
||||
{properties, #{'Session-Expiry-Interval' => 7200}},
|
||||
{clean_start, false}
|
||||
]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
?assertEqual(1, client_info(session_present, Client2)), %% [[MQTT-3.2.2-3]]
|
||||
ok = emqtt:disconnect(Client2).
|
||||
|
||||
t_connack_max_qos_allowed(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
||||
%% max_qos_allowed = 0
|
||||
emqx_zone:set_env(external, max_qos_allowed, 0),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack1} = emqtt:connect(Client1),
|
||||
?assertEqual(0, maps:get('Maximum-QoS',Connack1)), %% [MQTT-3.2.2-9]
|
||||
|
||||
{ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10]
|
||||
{ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1), %% [MQTT-3.2.2-10]
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), %% [MQTT-3.2.2-10]
|
||||
|
||||
{ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1),
|
||||
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
|
||||
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{will_flag, true},
|
||||
{will_topic, Topic},
|
||||
{will_payload, <<"Unsupported Qos">>},
|
||||
{will_qos, 2}
|
||||
]),
|
||||
{error, Connack2} = emqtt:connect(Client2),
|
||||
?assertMatch({qos_not_supported,_ }, Connack2), %% [MQTT-3.2.2-12]
|
||||
|
||||
%% max_qos_allowed = 1
|
||||
emqx_zone:set_env(external, max_qos_allowed, 1),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
|
||||
|
||||
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack3} = emqtt:connect(Client3),
|
||||
?assertEqual(1, maps:get('Maximum-QoS',Connack3)), %% [MQTT-3.2.2-9]
|
||||
|
||||
{ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10]
|
||||
{ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1), %% [MQTT-3.2.2-10]
|
||||
{ok, _, [2]} = emqtt:subscribe(Client3, Topic, 2), %% [MQTT-3.2.2-10]
|
||||
|
||||
{ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2),
|
||||
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
|
||||
|
||||
{ok, Client4} = emqtt:start_link([
|
||||
{proto_ver, v5},
|
||||
{will_flag, true},
|
||||
{will_topic, Topic},
|
||||
{will_payload, <<"Unsupported Qos">>},
|
||||
{will_qos, 2}
|
||||
]),
|
||||
{error, Connack4} = emqtt:connect(Client4),
|
||||
?assertMatch({qos_not_supported,_ }, Connack4), %% [MQTT-3.2.2-12]
|
||||
receive
|
||||
{'EXIT', _, {shutdown,qos_not_supported}} -> ok
|
||||
after 100 -> error("t_connack_max_qos_allowed")
|
||||
end,
|
||||
|
||||
%% max_qos_allowed = 2
|
||||
emqx_zone:set_env(external, max_qos_allowed, 2),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
|
||||
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
|
||||
|
||||
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, Connack5} = emqtt:connect(Client5),
|
||||
?assertEqual(2, maps:get('Maximum-QoS',Connack5)), %% [MQTT-3.2.2-9]
|
||||
ok = emqtt:disconnect(Client5),
|
||||
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_connack_assigned_clienid(_) ->
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
?assert(is_binary(client_info(clientid, Client1))), %% [MQTT-3.2.2-16]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Publish
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_publish_rap(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
|
||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12]
|
||||
ok = emqtt:disconnect(Client1),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]),
|
||||
{ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]),
|
||||
[Msg2 | _] = receive_messages(1),
|
||||
?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13]
|
||||
ok = emqtt:disconnect(Client2),
|
||||
|
||||
clean_retained(Topic).
|
||||
|
||||
t_publish_wildtopic(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
Topic = nth(1, ?WILD_TOPICS),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
ok = emqtt:publish(Client1, Topic, <<"error topic">>),
|
||||
?assertEqual(144, receive_disconnect_reasoncode()),
|
||||
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_publish_payload_format_indicator(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Properties = #{'Payload-Format-Indicator' => 233},
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||
ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-6]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
t_publish_topic_alias(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||
?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8]
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
|
||||
ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||
ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||
?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12]
|
||||
ok = emqtt:disconnect(Client2),
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_publish_response_topic(_) ->
|
||||
process_flag(trap_exit, true),
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]),
|
||||
?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14]
|
||||
|
||||
process_flag(trap_exit, false).
|
||||
|
||||
t_publish_properties(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Properties = #{
|
||||
'Response-Topic' => Topic, %% [MQTT-3.3.2-15]
|
||||
'Correlation-Data' => <<"233">>, %% [MQTT-3.3.2-16]
|
||||
'User-Property' => [{<<"a">>, <<"2333">>}], %% [MQTT-3.3.2-18]
|
||||
'Content-Type' => <<"2333">> %% [MQTT-3.3.2-20]
|
||||
},
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||
ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-16]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
t_publish_overlapping_subscriptions(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Properties = #{'Subscription-Identifier' => 2333},
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1),
|
||||
{ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0),
|
||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]),
|
||||
|
||||
[Msg1 | _ ] = receive_messages(2),
|
||||
?assert( maps:get(qos, Msg1) < 2 ), %% [MQTT-3.3.4-2]
|
||||
?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.4-3]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Subsctibe
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_subscribe_no_local(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
|
||||
|
||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
|
||||
|
||||
ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0),
|
||||
?assertEqual(1, length(receive_messages(2))), %% [MQTT-3.8.3-3]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
t_subscribe_actions(_) ->
|
||||
Topic = nth(1, ?TOPICS),
|
||||
Properties = #{'Subscription-Identifier' => 2333},
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2),
|
||||
{ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1),
|
||||
{ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2),
|
||||
[Msg1 | _ ] = receive_messages(1),
|
||||
?assertEqual(1, maps:get(qos, Msg1)), %% [MQTT-3.8.4-3] [MQTT-3.8.4-8]
|
||||
|
||||
{ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2}, {nth(2, ?TOPICS), qos2}] ), %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
%%--------------------------------------------------------------------
|
||||
%% Unsubsctibe Unsuback
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_unscbsctibe(_) ->
|
||||
Topic1 = nth(1, ?TOPICS),
|
||||
Topic2 = nth(2, ?TOPICS),
|
||||
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2),
|
||||
{ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1), %% [MQTT-3.10.4-4]
|
||||
{ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5]
|
||||
|
||||
{ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]),
|
||||
{ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]), %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Pingreq
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_pingreq(_) ->
|
||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
pong = emqtt:ping(Client1), %% [MQTT-3.12.4-1]
|
||||
ok = emqtt:disconnect(Client1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Shared Subscriptions
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -193,4 +671,5 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
|
|||
error("disconnected timeout")
|
||||
end,
|
||||
|
||||
?assertEqual(1, counters:get(CRef, 1)).
|
||||
?assertEqual(1, counters:get(CRef, 1)),
|
||||
process_flag(trap_exit, false).
|
||||
|
|
Loading…
Reference in New Issue