From 12f6b8fab0056d5a3e45a4d307dac22e71776e62 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 31 Mar 2022 17:01:33 +0300 Subject: [PATCH 1/2] chore(emqx_modules): improve emqx_delayed_api coverage --- .../test/emqx_dashboard_api_test_helpers.erl | 4 + apps/emqx_modules/src/emqx_delayed_api.erl | 20 +-- .../test/emqx_delayed_api_SUITE.erl | 154 +++++++++++++----- 3 files changed, 130 insertions(+), 48 deletions(-) diff --git a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl index 3bfe9829c..88cf45610 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl @@ -18,6 +18,7 @@ -export([set_default_config/0, set_default_config/1, + request/2, request/3, request/4, uri/0, @@ -39,6 +40,9 @@ set_default_config(DefaultUsername) -> emqx_config:put([dashboard], Config), ok. +request(Method, Url) -> + request(Method, Url, []). + request(Method, Url, Body) -> request(<<"admin">>, Method, Url, Body). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 57028c462..4029646af 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -24,7 +24,7 @@ -import(hoconsc, [mk/2, ref/1, ref/2]). -define(MAX_PAYLOAD_LENGTH, 2048). --define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). +-define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>). -export([ status/2, @@ -49,8 +49,6 @@ -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(INVALID_NODE, 'INVALID_NODE'). -%% 1MB = 1024 x 1024 --define(MAX_PAYLOAD_SIZE, 1048576). api_spec() -> emqx_dashboard_swagger:spec(?MODULE). @@ -106,7 +104,7 @@ schema("/mqtt/delayed/messages/:node/:msgid") -> responses => #{ 200 => ref("message_without_payload"), 400 => emqx_dashboard_swagger:error_codes( - [?MESSAGE_ID_SCHEMA_ERROR], + [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE], <<"Bad MsgId format">> ), 404 => emqx_dashboard_swagger:error_codes( @@ -129,7 +127,7 @@ schema("/mqtt/delayed/messages/:node/:msgid") -> responses => #{ 204 => <<"Delete delayed message success">>, 400 => emqx_dashboard_swagger:error_codes( - [?MESSAGE_ID_SCHEMA_ERROR], + [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE], <<"Bad MsgId format">> ), 404 => emqx_dashboard_swagger:error_codes( @@ -175,7 +173,7 @@ fields("message_without_payload") -> ]; fields("message") -> PayloadDesc = io_lib:format( - "Payload, base64 encode. Payload will be ~p if length large than ~p", + "Payload, base64 encoded. Payload will be set to ~p if its length is larger than ~p", [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH] ), fields("message_without_payload") ++ @@ -193,7 +191,7 @@ delayed_messages(get, #{query_string := Qs}) -> {200, emqx_delayed:cluster_list(Qs)}. delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> - MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), + MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1), MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), with_maybe( [MaybeNode, MaybeId], @@ -201,14 +199,16 @@ delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> case emqx_delayed:get_delayed_message(Node, Id) of {ok, Message} -> Payload = maps:get(payload, Message), - case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of + case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of true -> - {200, Message}; + {200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; _ -> {200, Message#{payload => base64:encode(Payload)}} end; {error, not_found} -> - {404, generate_http_code_map(not_found, Id)} + {404, generate_http_code_map(not_found, Id)}; + {badrpc, _} -> + {400, generate_http_code_map(invalid_node, Id)} end end ); diff --git a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl index 98dd3fb25..f703dcf13 100644 --- a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl @@ -22,72 +22,75 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). --define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). +-define(BASE_CONF, #{<<"dealyed">> => <<"true">>, + <<"max_delayed_messages">> => <<"0">> + }). --import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]). +-import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:load(emqx_conf), - ok = ekka:start(), - ok = mria:start(), - ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), - emqx_config:put([dealyed], #{enable => true, max_delayed_messages => 10}), - meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), - meck:expect( - emqx_config, - get_schema_mod, - fun - (delayed) -> emqx_conf_schema; - (Any) -> meck:passthrough(Any) - end - ), + ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), - ok = emqx_delayed:mnesia(boot), - emqx_mgmt_api_test_util:init_suite([emqx_modules]), + ok = emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_modules, emqx_dashboard], + fun set_special_configs/1 + ), emqx_delayed:enable(), Config. end_per_suite(Config) -> - ekka:stop(), - mria:stop(), - mria_mnesia:delete_schema(), - meck:unload(emqx_config), ok = emqx_delayed:disable(), - emqx_mgmt_api_test_util:end_suite([emqx_modules]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]), Config. init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(), Config. +set_special_configs(emqx_dashboard) -> + emqx_dashboard_api_test_helpers:set_default_config(); +set_special_configs(_App) -> + ok. + %%------------------------------------------------------------------------------ %% Test Cases %%------------------------------------------------------------------------------ t_status(_Config) -> - Path = api_path(["mqtt", "delayed"]), - Auth = emqx_mgmt_api_test_util:auth_header_(), - {ok, R1} = request_api( + Path = uri(["mqtt", "delayed"]), + {ok, 200, R1} = request( put, Path, - "", - Auth, #{enable => false, max_delayed_messages => 10} ), ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)), - {ok, R2} = request_api( + {ok, 200, R2} = request( put, Path, - "", - Auth, #{enable => true, max_delayed_messages => 12} ), ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)), - {ok, ConfJson} = request_api(get, Path), + ?assertMatch( + {ok, 200, _}, + request( + put, + Path, + #{enable => true} + )), + + ?assertMatch( + {ok, 400, _}, + request( + put, + Path, + #{enable => true, max_delayed_messages => -5} + )), + + {ok, 200, ConfJson} = request(get, Path), ReturnConf = decode_json(ConfJson), ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf). @@ -131,25 +134,100 @@ t_messages(_) -> ), MsgId = maps:get(msgid, First), - {ok, LookupMsg} = request_api( + {ok, 200, LookupMsg} = request( get, - api_path(["mqtt", "delayed", "messages", node(), MsgId]) + uri(["mqtt", "delayed", "messages", node(), MsgId]) ), ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))), - {ok, _} = request_api( - delete, - api_path(["mqtt", "delayed", "messages", node(), MsgId]) + ?assertMatch( + {ok, 404, _}, + request( + get, + uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())]) + ) + ), + + ?assertMatch( + {ok, 400, _}, + request( + get, + uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"]) + ) + ), + + ?assertMatch( + {ok, 400, _}, + request( + get, + uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId]) + ) + ), + + ?assertMatch( + {ok, 400, _}, + request( + get, + uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId]) + ) + ), + + ?assertMatch( + {ok, 404, _}, + request( + delete, + uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())]) + ) + ), + + ?assertMatch( + {ok, 204, _}, + request( + delete, + uri(["mqtt", "delayed", "messages", node(), MsgId]) + ) ), _ = get_messages(4), ok = emqtt:disconnect(C1). +t_large_payload(_) -> + clear_all_record(), + + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + timer:sleep(500), + Topic = <<"$delayed/123/msgs">>, + emqtt:publish( + C1, + Topic, + iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]), + [{qos, 0}, {retain, true}] + ), + + timer:sleep(500), + + [#{msgid := MsgId}] = get_messages(1), + + {ok, 200, Msg} = request( + get, + uri(["mqtt", "delayed", "messages", node(), MsgId]) + ), + + ?assertMatch( + #{ + payload := <<"PAYLOAD_TOO_LARGE">>, + topic := <<"msgs">> + }, + decode_json(Msg) + ). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- + decode_json(Data) -> BinJson = emqx_json:decode(Data, [return_maps]), emqx_map_lib:unsafe_atom_key_map(BinJson). @@ -158,7 +236,7 @@ clear_all_record() -> ets:delete_all_objects(emqx_delayed). get_messages(Len) -> - {ok, MsgsJson} = request_api(get, api_path(["mqtt", "delayed", "messages"])), + {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])), #{data := Msgs} = decode_json(MsgsJson), MsgLen = erlang:length(Msgs), ?assert( From ce437ac5b22da3dfe19872780c689c62dc90b575 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 31 Mar 2022 20:29:46 +0300 Subject: [PATCH 2/2] chore(emqx_modules): improve emqx_delayed coverage --- apps/emqx_modules/src/emqx_delayed.erl | 15 +-- .../src/proto/emqx_delayed_proto_v1.erl | 8 +- apps/emqx_modules/test/emqx_delayed_SUITE.erl | 104 +++++++++++++++++- .../test/emqx_delayed_api_SUITE.erl | 44 ++++---- 4 files changed, 131 insertions(+), 40 deletions(-) diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index be7ca3ba3..8adf86e5d 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -275,17 +276,13 @@ init([Opts]) -> handle_call({set_max_delayed_messages, Max}, _From, State) -> {reply, ok, State#{max_delayed_messages => Max}}; handle_call( - {store, DelayedMsg = #delayed_message{key = Key}}, - _From, - State = #{max_delayed_messages := 0} + {store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := 0} ) -> ok = mria:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; handle_call( - {store, DelayedMsg = #delayed_message{key = Key}}, - _From, - State = #{max_delayed_messages := Max} + {store, DelayedMsg = #delayed_message{key = Key}}, _From, State = #{max_delayed_messages := Max} ) -> Size = mnesia:table_info(?TAB, size), case Size >= Max of @@ -303,11 +300,11 @@ handle_call(disable, _From, State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), {reply, ok, State}; handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", call => Req}), + ?tp(error, emqx_delayed_unexpected_call, #{call => Req}), {reply, ignored, State}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), + ?tp(error, emqx_delayed_unexpected_cast, #{cast => Msg}), {noreply, State}. %% Do Publish... @@ -320,7 +317,7 @@ handle_info(stats, State = #{stats_fun := StatsFun}) -> StatsFun(delayed_count()), {noreply, State#{stats_timer := StatsTimer}, hibernate}; handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", info => Info}), + ?tp(error, emqx_delayed_unexpected_info, #{info => Info}), {noreply, State}. terminate(_Reason, #{publish_timer := PublishTimer, stats_timer := StatsTimer}) -> diff --git a/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl b/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl index 4c4a55dc4..28eec64ee 100644 --- a/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl +++ b/apps/emqx_modules/src/proto/emqx_delayed_proto_v1.erl @@ -29,9 +29,9 @@ introduced_in() -> "5.0.0". -spec get_delayed_message(node(), binary()) -> emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc(). -get_delayed_message(Node, HexId) -> - rpc:call(Node, emqx_delayed, get_delayed_message, [HexId]). +get_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, get_delayed_message, [Id]). -spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc(). -delete_delayed_message(Node, HexId) -> - rpc:call(Node, emqx_delayed, delete_delayed_message, [HexId]). +delete_delayed_message(Node, Id) -> + rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]). diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 5bde45c4b..9e98807d9 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -43,6 +43,16 @@ init_per_suite(Config) -> end_per_suite(_) -> emqx_common_test_helpers:stop_apps([emqx_modules]). +init_per_testcase(t_load_case, Config) -> + Config; +init_per_testcase(_Case, Config) -> + {atomic, ok} = mria:clear_table(emqx_delayed), + ok = emqx_delayed:enable(), + Config. + +end_per_testcase(_Case, _Config) -> + ok = emqx_delayed:disable(). + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- @@ -57,7 +67,6 @@ t_load_case(_) -> ok. t_delayed_message(_) -> - ok = emqx_delayed:enable(), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), ?assertEqual( {stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, @@ -67,11 +76,94 @@ t_delayed_message(_) -> Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>), ?assertEqual({ok, Msg}, on_message_publish(Msg)), - [Key] = mnesia:dirty_all_keys(emqx_delayed), - [#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_delayed, Key}), + [#delayed_message{msg = #message{payload = Payload}}] = ets:tab2list(emqx_delayed), ?assertEqual(<<"delayed_m">>, Payload), - timer:sleep(5000), + ct:sleep(2000), EmptyKey = mnesia:dirty_all_keys(emqx_delayed), - ?assertEqual([], EmptyKey), - ok = emqx_delayed:disable(). + ?assertEqual([], EmptyKey). + +t_delayed_message_abs_time(_) -> + Ts0 = integer_to_binary(erlang:system_time(second) + 1), + DelayedMsg0 = emqx_message:make( + ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">> + ), + _ = on_message_publish(DelayedMsg0), + + ?assertMatch( + [#delayed_message{msg = #message{payload = <<"delayed_abs">>}}], + ets:tab2list(emqx_delayed) + ), + + ct:sleep(2000), + + ?assertMatch( + [], + ets:tab2list(emqx_delayed) + ), + + Ts1 = integer_to_binary(erlang:system_time(second) + 10000000), + DelayedMsg1 = emqx_message:make( + ?MODULE, 1, <<"$delayed/", Ts1/binary, "/publish">>, <<"delayed_abs">> + ), + + ?assertError( + invalid_delayed_timestamp, + on_message_publish(DelayedMsg1) + ). + +t_list(_) -> + Ts0 = integer_to_binary(erlang:system_time(second) + 1), + DelayedMsg0 = emqx_message:make( + ?MODULE, 1, <<"$delayed/", Ts0/binary, "/publish">>, <<"delayed_abs">> + ), + _ = on_message_publish(DelayedMsg0), + + ?assertMatch( + #{data := [#{topic := <<"publish">>}]}, + emqx_delayed:list(#{}) + ). + +t_max(_) -> + emqx_delayed:set_max_delayed_messages(1), + + DelayedMsg0 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t0">>, <<"delayed0">>), + DelayedMsg1 = emqx_message:make(?MODULE, 1, <<"$delayed/10/t1">>, <<"delayed1">>), + _ = on_message_publish(DelayedMsg0), + _ = on_message_publish(DelayedMsg1), + + ?assertMatch( + #{data := [#{topic := <<"t0">>}]}, + emqx_delayed:list(#{}) + ). + +t_cluster(_) -> + DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed">>), + Id = emqx_message:id(DelayedMsg), + _ = on_message_publish(DelayedMsg), + + ?assertMatch( + {ok, _}, + emqx_delayed_proto_v1:get_delayed_message(node(), Id) + ), + + ?assertEqual( + emqx_delayed:get_delayed_message(Id), + emqx_delayed_proto_v1:get_delayed_message(node(), Id) + ), + + ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id), + + ?assertMatch( + {error, _}, + emqx_delayed:get_delayed_message(Id) + ). + +t_unknown_messages(_) -> + OldPid = whereis(emqx_delayed), + OldPid ! unknown, + ok = gen_server:cast(OldPid, unknown), + ?assertEqual( + ignored, + gen_server:call(OldPid, unknown) + ). diff --git a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl index f703dcf13..41c1e10b9 100644 --- a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl @@ -20,11 +20,11 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("emqx/include/emqx.hrl"). --define(BASE_CONF, #{<<"dealyed">> => <<"true">>, - <<"max_delayed_messages">> => <<"0">> - }). +-define(BASE_CONF, #{ + <<"dealyed">> => <<"true">>, + <<"max_delayed_messages">> => <<"0">> +}). -import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]). @@ -75,20 +75,22 @@ t_status(_Config) -> ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)), ?assertMatch( - {ok, 200, _}, - request( - put, - Path, - #{enable => true} - )), + {ok, 200, _}, + request( + put, + Path, + #{enable => true} + ) + ), ?assertMatch( - {ok, 400, _}, - request( - put, - Path, - #{enable => true, max_delayed_messages => -5} - )), + {ok, 400, _}, + request( + put, + Path, + #{enable => true, max_delayed_messages => -5} + ) + ), {ok, 200, ConfJson} = request(get, Path), ReturnConf = decode_json(ConfJson), @@ -201,11 +203,11 @@ t_large_payload(_) -> timer:sleep(500), Topic = <<"$delayed/123/msgs">>, emqtt:publish( - C1, - Topic, - iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]), - [{qos, 0}, {retain, true}] - ), + C1, + Topic, + iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]), + [{qos, 0}, {retain, true}] + ), timer:sleep(500),