test(schema registry): add test asserting the behavior of empty message roundtrip

Relates to https://emqx.atlassian.net/browse/EMQX-10866
This commit is contained in:
Thales Macedo Garitezi 2024-06-27 16:21:23 -03:00
parent 3ff9440a01
commit 79f15b1daa
2 changed files with 103 additions and 23 deletions

View File

@ -49,12 +49,22 @@ sparkplug_tests() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema), Apps = emqx_cth_suite:start(
emqx_mgmt_api_test_util:init_suite(?APPS), [
Config. emqx,
emqx_conf,
emqx_rule_engine,
emqx_schema_registry,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok. ok.
init_per_group(avro, Config) -> init_per_group(avro, Config) ->

View File

@ -7,6 +7,7 @@
-compile(export_all). -compile(export_all).
-import(emqx_mgmt_api_test_util, [uri/1]). -import(emqx_mgmt_api_test_util, [uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
@ -14,34 +15,51 @@
-include("emqx_schema_registry.hrl"). -include("emqx_schema_registry.hrl").
-define(APPS, [emqx_conf, emqx_schema_registry]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
OnlyOnceTCs = only_once_testcases(),
[ [
{group, avro}, {group, avro},
{group, protobuf}, {group, protobuf},
{group, json} {group, json}
| OnlyOnceTCs
]. ].
groups() -> groups() ->
OnlyOnceTCs = only_once_testcases(),
AllTCs = emqx_common_test_helpers:all(?MODULE), AllTCs = emqx_common_test_helpers:all(?MODULE),
PerTypeTCs = AllTCs -- OnlyOnceTCs,
[ [
{avro, AllTCs}, {avro, PerTypeTCs},
{protobuf, AllTCs}, {protobuf, PerTypeTCs},
{json, AllTCs} {json, PerTypeTCs}
].
only_once_testcases() ->
[
t_empty_sparkplug
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema), Apps = emqx_cth_suite:start(
emqx_mgmt_api_test_util:init_suite(?APPS), [
Config. emqx,
emqx_conf,
emqx_rule_engine,
emqx_schema_registry,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok. ok.
init_per_group(avro, Config) -> init_per_group(avro, Config) ->
@ -112,6 +130,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
clear_schemas(), clear_schemas(),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(),
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -119,21 +138,20 @@ end_per_testcase(_TestCase, _Config) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
request(get) -> request(get) ->
do_request(get, _Parts = [], _Body = []); do_request(get, uri(["schema_registry"]), _Body = []);
request({get, Name}) -> request({get, Name}) ->
do_request(get, _Parts = [Name], _Body = []); do_request(get, uri(["schema_registry", Name]), _Body = []);
request({delete, Name}) -> request({delete, Name}) ->
do_request(delete, _Parts = [Name], _Body = []); do_request(delete, uri(["schema_registry", Name]), _Body = []);
request({put, Name, Params}) -> request({put, Name, Params}) ->
do_request(put, _Parts = [Name], Params); do_request(put, uri(["schema_registry", Name]), Params);
request({post, Params}) -> request({post, Params}) ->
do_request(post, _Parts = [], Params). do_request(post, uri(["schema_registry"]), Params).
do_request(Method, PathParts, Body) -> do_request(Method, Path, Body) ->
Header = emqx_common_test_http:default_auth_header(), Header = emqx_common_test_http:default_auth_header(),
URI = uri(["schema_registry" | PathParts]),
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
Res0 = emqx_mgmt_api_test_util:request_api(Method, URI, [], Header, Body, Opts), Res0 = emqx_mgmt_api_test_util:request_api(Method, Path, [], Header, Body, Opts),
case Res0 of case Res0 of
{ok, Code, <<>>} -> {ok, Code, <<>>} ->
{ok, Code, <<>>}; {ok, Code, <<>>};
@ -163,6 +181,28 @@ clear_schemas() ->
emqx_schema_registry:list_schemas() emqx_schema_registry:list_schemas()
). ).
dryrun_rule(SQL, Context) ->
Params = #{
context => Context,
sql => SQL
},
Path = emqx_mgmt_api_test_util:api_path(["rule_test"]),
Res = do_request(post, Path, Params),
ct:pal("dryrun rule result:\n ~p", [Res]),
case Res of
{ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} ->
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
simplify_result(Res);
_ ->
simplify_result(Res)
end.
simplify_result(Res) ->
case Res of
{ok, Status, Body} ->
{Status, Body}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -329,3 +369,33 @@ t_crud(Config) ->
), ),
ok. ok.
t_empty_sparkplug(_Config) ->
?check_trace(
begin
SQL = <<
"select sparkplug_decode(sparkplug_encode(json_decode(payload))) as decoded"
" from \"t\" "
>>,
Context = #{
<<"clientid">> => <<"c_emqx">>,
<<"event_type">> => <<"message_publish">>,
<<"payload">> => <<"{}">>,
<<"qos">> => 1,
<<"topic">> => <<"t">>,
<<"username">> => <<"u_emqx">>
},
%% N.B.: It's not an error for a `metrics' field to "appear out of nowhere"
%% after a roundtrip. Since Sparkplug B schema uses proto 2, and the `metrics'
%% field is `repeated', this field has no presence tracking, hence it's
%% impossible to distinguish between an absent field and a field with its
%% default value (an empty array, in this case).
%% https://protobuf.dev/programming-guides/field_presence/#presence-in-proto2-apis
?assertMatch(
{200, #{<<"decoded">> := #{<<"metrics">> := []}}}, dryrun_rule(SQL, Context)
),
ok
end,
[]
),
ok.