test: add test cases for sparkplug_decode and sparkplug_encode

This commit is contained in:
Kjell Winblad 2023-06-30 08:12:52 +02:00
parent 039e27a153
commit 01ff3d10d6
1 changed files with 130 additions and 5 deletions

View File

@ -21,13 +21,16 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
[{group, avro}, {group, protobuf}]. [
{group, avro},
{group, protobuf}
] ++ sparkplug_tests().
groups() -> groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
ProtobufOnlyTCs = protobuf_only_tcs(), ProtobufOnlyTCs = protobuf_only_tcs(),
TCs = AllTCs -- ProtobufOnlyTCs, TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
[{avro, TCs}, {protobuf, AllTCs}]. [{avro, TCs}, {protobuf, AllTCsExceptSP}].
protobuf_only_tcs() -> protobuf_only_tcs() ->
[ [
@ -35,6 +38,13 @@ protobuf_only_tcs() ->
t_protobuf_union_decode t_protobuf_union_decode
]. ].
sparkplug_tests() ->
[
t_sparkplug_decode,
t_sparkplug_encode,
t_sparkplug_decode_encode_with_message_name
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS), emqx_mgmt_api_test_util:init_suite(?APPS),
@ -411,13 +421,18 @@ serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) ->
protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) -> protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
#{nodes := Nodes} = Res, #{nodes := Nodes} = Res,
CacheEvents = ?of_kind( CacheEvents0 = ?of_kind(
[ [
schema_registry_protobuf_cache_hit, schema_registry_protobuf_cache_hit,
schema_registry_protobuf_cache_miss schema_registry_protobuf_cache_miss
], ],
Trace Trace
), ),
CacheEvents = [
Event
|| Event <- CacheEvents0,
maps:get(name, Event, no_name) =/= ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME
],
?assertMatch( ?assertMatch(
[ [
schema_registry_protobuf_cache_hit, schema_registry_protobuf_cache_hit,
@ -731,3 +746,113 @@ t_import_config(_Config) ->
{ok, #{root_key => schema_registry, changed => [Path]}}, {ok, #{root_key => schema_registry, changed => [Path]}},
emqx_ee_schema_registry:import_config(RawConf1) emqx_ee_schema_registry:import_config(RawConf1)
). ).
sparkplug_example_data_base64() ->
<<"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA==">>.
sparkplug_example_data() ->
#{
<<"metrics">> =>
[
#{
<<"datatype">> => 2,
<<"int_value">> => 424,
<<"name">> => <<"counter_group1/counter1_1sec">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 2,
<<"int_value">> => 84,
<<"name">> => <<"counter_group1/counter1_5sec">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 2,
<<"int_value">> => 42,
<<"name">> => <<"counter_group1/counter1_10sec">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 5,
<<"int_value">> => 1,
<<"name">> => <<"counter_group1/counter1_run">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 5,
<<"int_value">> => 0,
<<"name">> => <<"counter_group1/counter1_reset">>,
<<"timestamp">> => 1678094561525
}
],
<<"seq">> => 88,
<<"timestamp">> => 1678094561521
}.
wait_for_sparkplug_schema_registered() ->
wait_for_sparkplug_schema_registered(100).
wait_for_sparkplug_schema_registered(0) ->
ct:fail("Timed out waiting for sparkplug schema to be registered");
wait_for_sparkplug_schema_registered(AttemptsLeft) ->
case ets:info(?SERDE_TAB, size) of
0 ->
timer:sleep(100),
wait_for_sparkplug_schema_registered(AttemptsLeft - 1);
_ ->
ok
end.
t_sparkplug_decode(_Config) ->
SQL =
<<
"select\n"
" sparkplug_decode(payload) as decoded\n"
"from t\n"
>>,
PayloadBase64 = sparkplug_example_data_base64(),
{ok, _} = create_rule_http(#{sql => SQL}),
PayloadBin = base64:decode(PayloadBase64),
ExpectedRuleOutput =
#{<<"decoded">> => sparkplug_example_data()},
wait_for_sparkplug_schema_registered(),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
t_sparkplug_encode(Config) ->
%% Default message name field is 'Payload'
SQL =
<<
"select\n"
" sparkplug_encode(json_decode(payload)) as encoded\n"
"from t\n"
>>,
PayloadJSONBin = emqx_utils_json:encode(sparkplug_example_data()),
{ok, _} = create_rule_http(#{sql => SQL}),
ExpectedRuleOutput =
#{<<"encoded">> => base64:decode(sparkplug_example_data_base64())},
wait_for_sparkplug_schema_registered(),
emqx:publish(emqx_message:make(<<"t">>, PayloadJSONBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
t_sparkplug_decode_encode_with_message_name(_Config) ->
SQL =
<<
"select\n"
" sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded\n"
"from t\n"
>>,
PayloadBase64 = sparkplug_example_data_base64(),
PayloadBin = base64:decode(PayloadBase64),
{ok, _} = create_rule_http(#{sql => SQL}),
ExpectedRuleOutput =
#{<<"encoded">> => PayloadBin},
wait_for_sparkplug_schema_registered(),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.