diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 9167fed9e..6d724a4e4 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -21,13 +21,16 @@ %%------------------------------------------------------------------------------ all() -> - [{group, avro}, {group, protobuf}]. + [ + {group, avro}, + {group, protobuf} + ] ++ sparkplug_tests(). groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), + AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(), ProtobufOnlyTCs = protobuf_only_tcs(), - TCs = AllTCs -- ProtobufOnlyTCs, - [{avro, TCs}, {protobuf, AllTCs}]. + TCs = AllTCsExceptSP -- ProtobufOnlyTCs, + [{avro, TCs}, {protobuf, AllTCsExceptSP}]. protobuf_only_tcs() -> [ @@ -35,6 +38,13 @@ protobuf_only_tcs() -> t_protobuf_union_decode ]. +sparkplug_tests() -> + [ + t_sparkplug_decode, + t_sparkplug_encode, + t_sparkplug_decode_encode_with_message_name + ]. + init_per_suite(Config) -> emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), emqx_mgmt_api_test_util:init_suite(?APPS), @@ -411,13 +421,18 @@ serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) -> protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) -> #{nodes := Nodes} = Res, - CacheEvents = ?of_kind( + CacheEvents0 = ?of_kind( [ schema_registry_protobuf_cache_hit, schema_registry_protobuf_cache_miss ], Trace ), + CacheEvents = [ + Event + || Event <- CacheEvents0, + maps:get(name, Event, no_name) =/= ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME + ], ?assertMatch( [ schema_registry_protobuf_cache_hit, @@ -731,3 +746,113 @@ t_import_config(_Config) -> {ok, #{root_key => schema_registry, changed => [Path]}}, emqx_ee_schema_registry:import_config(RawConf1) ). + +sparkplug_example_data_base64() -> + <<"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA==">>. + +sparkplug_example_data() -> + #{ + <<"metrics">> => + [ + #{ + <<"datatype">> => 2, + <<"int_value">> => 424, + <<"name">> => <<"counter_group1/counter1_1sec">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 2, + <<"int_value">> => 84, + <<"name">> => <<"counter_group1/counter1_5sec">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 2, + <<"int_value">> => 42, + <<"name">> => <<"counter_group1/counter1_10sec">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 5, + <<"int_value">> => 1, + <<"name">> => <<"counter_group1/counter1_run">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 5, + <<"int_value">> => 0, + <<"name">> => <<"counter_group1/counter1_reset">>, + <<"timestamp">> => 1678094561525 + } + ], + <<"seq">> => 88, + <<"timestamp">> => 1678094561521 + }. + +wait_for_sparkplug_schema_registered() -> + wait_for_sparkplug_schema_registered(100). + +wait_for_sparkplug_schema_registered(0) -> + ct:fail("Timed out waiting for sparkplug schema to be registered"); +wait_for_sparkplug_schema_registered(AttemptsLeft) -> + case ets:info(?SERDE_TAB, size) of + 0 -> + timer:sleep(100), + wait_for_sparkplug_schema_registered(AttemptsLeft - 1); + _ -> + ok + end. + +t_sparkplug_decode(_Config) -> + SQL = + << + "select\n" + " sparkplug_decode(payload) as decoded\n" + "from t\n" + >>, + PayloadBase64 = sparkplug_example_data_base64(), + {ok, _} = create_rule_http(#{sql => SQL}), + PayloadBin = base64:decode(PayloadBase64), + ExpectedRuleOutput = + #{<<"decoded">> => sparkplug_example_data()}, + wait_for_sparkplug_schema_registered(), + emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), + Res = receive_action_results(), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), + ok. + +t_sparkplug_encode(Config) -> + %% Default message name field is 'Payload' + SQL = + << + "select\n" + " sparkplug_encode(json_decode(payload)) as encoded\n" + "from t\n" + >>, + PayloadJSONBin = emqx_utils_json:encode(sparkplug_example_data()), + {ok, _} = create_rule_http(#{sql => SQL}), + ExpectedRuleOutput = + #{<<"encoded">> => base64:decode(sparkplug_example_data_base64())}, + wait_for_sparkplug_schema_registered(), + emqx:publish(emqx_message:make(<<"t">>, PayloadJSONBin)), + Res = receive_action_results(), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), + ok. + +t_sparkplug_decode_encode_with_message_name(_Config) -> + SQL = + << + "select\n" + " sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded\n" + "from t\n" + >>, + PayloadBase64 = sparkplug_example_data_base64(), + PayloadBin = base64:decode(PayloadBase64), + {ok, _} = create_rule_http(#{sql => SQL}), + ExpectedRuleOutput = + #{<<"encoded">> => PayloadBin}, + wait_for_sparkplug_schema_registered(), + emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), + Res = receive_action_results(), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), + ok.