From 12f6b8fab0056d5a3e45a4d307dac22e71776e62 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 31 Mar 2022 17:01:33 +0300 Subject: [PATCH] chore(emqx_modules): improve emqx_delayed_api coverage --- .../test/emqx_dashboard_api_test_helpers.erl | 4 + apps/emqx_modules/src/emqx_delayed_api.erl | 20 +-- .../test/emqx_delayed_api_SUITE.erl | 154 +++++++++++++----- 3 files changed, 130 insertions(+), 48 deletions(-) diff --git a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl index 3bfe9829c..88cf45610 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl @@ -18,6 +18,7 @@ -export([set_default_config/0, set_default_config/1, + request/2, request/3, request/4, uri/0, @@ -39,6 +40,9 @@ set_default_config(DefaultUsername) -> emqx_config:put([dashboard], Config), ok. +request(Method, Url) -> + request(Method, Url, []). + request(Method, Url, Body) -> request(<<"admin">>, Method, Url, Body). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 57028c462..4029646af 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -24,7 +24,7 @@ -import(hoconsc, [mk/2, ref/1, ref/2]). -define(MAX_PAYLOAD_LENGTH, 2048). --define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). +-define(PAYLOAD_TOO_LARGE, <<"PAYLOAD_TOO_LARGE">>). -export([ status/2, @@ -49,8 +49,6 @@ -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_SCHEMA_ERROR, 'MESSAGE_ID_SCHEMA_ERROR'). -define(INVALID_NODE, 'INVALID_NODE'). -%% 1MB = 1024 x 1024 --define(MAX_PAYLOAD_SIZE, 1048576). api_spec() -> emqx_dashboard_swagger:spec(?MODULE). @@ -106,7 +104,7 @@ schema("/mqtt/delayed/messages/:node/:msgid") -> responses => #{ 200 => ref("message_without_payload"), 400 => emqx_dashboard_swagger:error_codes( - [?MESSAGE_ID_SCHEMA_ERROR], + [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE], <<"Bad MsgId format">> ), 404 => emqx_dashboard_swagger:error_codes( @@ -129,7 +127,7 @@ schema("/mqtt/delayed/messages/:node/:msgid") -> responses => #{ 204 => <<"Delete delayed message success">>, 400 => emqx_dashboard_swagger:error_codes( - [?MESSAGE_ID_SCHEMA_ERROR], + [?MESSAGE_ID_SCHEMA_ERROR, ?INVALID_NODE], <<"Bad MsgId format">> ), 404 => emqx_dashboard_swagger:error_codes( @@ -175,7 +173,7 @@ fields("message_without_payload") -> ]; fields("message") -> PayloadDesc = io_lib:format( - "Payload, base64 encode. Payload will be ~p if length large than ~p", + "Payload, base64 encoded. Payload will be set to ~p if its length is larger than ~p", [?PAYLOAD_TOO_LARGE, ?MAX_PAYLOAD_LENGTH] ), fields("message_without_payload") ++ @@ -193,7 +191,7 @@ delayed_messages(get, #{query_string := Qs}) -> {200, emqx_delayed:cluster_list(Qs)}. delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> - MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_atom/1), + MaybeNode = make_maybe(NodeBin, invalid_node, fun erlang:binary_to_existing_atom/1), MaybeId = make_maybe(HexId, id_schema_error, fun emqx_guid:from_hexstr/1), with_maybe( [MaybeNode, MaybeId], @@ -201,14 +199,16 @@ delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) -> case emqx_delayed:get_delayed_message(Node, Id) of {ok, Message} -> Payload = maps:get(payload, Message), - case erlang:byte_size(Payload) > ?MAX_PAYLOAD_SIZE of + case erlang:byte_size(Payload) > ?MAX_PAYLOAD_LENGTH of true -> - {200, Message}; + {200, Message#{payload => ?PAYLOAD_TOO_LARGE}}; _ -> {200, Message#{payload => base64:encode(Payload)}} end; {error, not_found} -> - {404, generate_http_code_map(not_found, Id)} + {404, generate_http_code_map(not_found, Id)}; + {badrpc, _} -> + {400, generate_http_code_map(invalid_node, Id)} end end ); diff --git a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl index 98dd3fb25..f703dcf13 100644 --- a/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_api_SUITE.erl @@ -22,72 +22,75 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). --define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). +-define(BASE_CONF, #{<<"dealyed">> => <<"true">>, + <<"max_delayed_messages">> => <<"0">> + }). --import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]). +-import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:load(emqx_conf), - ok = ekka:start(), - ok = mria:start(), - ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), - emqx_config:put([dealyed], #{enable => true, max_delayed_messages => 10}), - meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), - meck:expect( - emqx_config, - get_schema_mod, - fun - (delayed) -> emqx_conf_schema; - (Any) -> meck:passthrough(Any) - end - ), + ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), - ok = emqx_delayed:mnesia(boot), - emqx_mgmt_api_test_util:init_suite([emqx_modules]), + ok = emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_modules, emqx_dashboard], + fun set_special_configs/1 + ), emqx_delayed:enable(), Config. end_per_suite(Config) -> - ekka:stop(), - mria:stop(), - mria_mnesia:delete_schema(), - meck:unload(emqx_config), ok = emqx_delayed:disable(), - emqx_mgmt_api_test_util:end_suite([emqx_modules]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]), Config. init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(), Config. +set_special_configs(emqx_dashboard) -> + emqx_dashboard_api_test_helpers:set_default_config(); +set_special_configs(_App) -> + ok. + %%------------------------------------------------------------------------------ %% Test Cases %%------------------------------------------------------------------------------ t_status(_Config) -> - Path = api_path(["mqtt", "delayed"]), - Auth = emqx_mgmt_api_test_util:auth_header_(), - {ok, R1} = request_api( + Path = uri(["mqtt", "delayed"]), + {ok, 200, R1} = request( put, Path, - "", - Auth, #{enable => false, max_delayed_messages => 10} ), ?assertMatch(#{enable := false, max_delayed_messages := 10}, decode_json(R1)), - {ok, R2} = request_api( + {ok, 200, R2} = request( put, Path, - "", - Auth, #{enable => true, max_delayed_messages => 12} ), ?assertMatch(#{enable := true, max_delayed_messages := 12}, decode_json(R2)), - {ok, ConfJson} = request_api(get, Path), + ?assertMatch( + {ok, 200, _}, + request( + put, + Path, + #{enable => true} + )), + + ?assertMatch( + {ok, 400, _}, + request( + put, + Path, + #{enable => true, max_delayed_messages => -5} + )), + + {ok, 200, ConfJson} = request(get, Path), ReturnConf = decode_json(ConfJson), ?assertMatch(#{enable := true, max_delayed_messages := 12}, ReturnConf). @@ -131,25 +134,100 @@ t_messages(_) -> ), MsgId = maps:get(msgid, First), - {ok, LookupMsg} = request_api( + {ok, 200, LookupMsg} = request( get, - api_path(["mqtt", "delayed", "messages", node(), MsgId]) + uri(["mqtt", "delayed", "messages", node(), MsgId]) ), ?assertEqual(MsgId, maps:get(msgid, decode_json(LookupMsg))), - {ok, _} = request_api( - delete, - api_path(["mqtt", "delayed", "messages", node(), MsgId]) + ?assertMatch( + {ok, 404, _}, + request( + get, + uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())]) + ) + ), + + ?assertMatch( + {ok, 400, _}, + request( + get, + uri(["mqtt", "delayed", "messages", node(), "invalid_msg_id"]) + ) + ), + + ?assertMatch( + {ok, 400, _}, + request( + get, + uri(["mqtt", "delayed", "messages", atom_to_list('unknownnode@127.0.0.1'), MsgId]) + ) + ), + + ?assertMatch( + {ok, 400, _}, + request( + get, + uri(["mqtt", "delayed", "messages", "some_unknown_atom", MsgId]) + ) + ), + + ?assertMatch( + {ok, 404, _}, + request( + delete, + uri(["mqtt", "delayed", "messages", node(), emqx_guid:to_hexstr(emqx_guid:gen())]) + ) + ), + + ?assertMatch( + {ok, 204, _}, + request( + delete, + uri(["mqtt", "delayed", "messages", node(), MsgId]) + ) ), _ = get_messages(4), ok = emqtt:disconnect(C1). +t_large_payload(_) -> + clear_all_record(), + + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + timer:sleep(500), + Topic = <<"$delayed/123/msgs">>, + emqtt:publish( + C1, + Topic, + iolist_to_binary([<<"x">> || _ <- lists:seq(1, 5000)]), + [{qos, 0}, {retain, true}] + ), + + timer:sleep(500), + + [#{msgid := MsgId}] = get_messages(1), + + {ok, 200, Msg} = request( + get, + uri(["mqtt", "delayed", "messages", node(), MsgId]) + ), + + ?assertMatch( + #{ + payload := <<"PAYLOAD_TOO_LARGE">>, + topic := <<"msgs">> + }, + decode_json(Msg) + ). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- + decode_json(Data) -> BinJson = emqx_json:decode(Data, [return_maps]), emqx_map_lib:unsafe_atom_key_map(BinJson). @@ -158,7 +236,7 @@ clear_all_record() -> ets:delete_all_objects(emqx_delayed). get_messages(Len) -> - {ok, MsgsJson} = request_api(get, api_path(["mqtt", "delayed", "messages"])), + {ok, 200, MsgsJson} = request(get, uri(["mqtt", "delayed", "messages"])), #{data := Msgs} = decode_json(MsgsJson), MsgLen = erlang:length(Msgs), ?assert(