emqx/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

897 lines
29 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_schema_registry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_schema_registry.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
[
{group, avro},
{group, protobuf},
{group, json}
] ++ sparkplug_tests().
groups() ->
AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
ProtobufOnlyTCs = protobuf_only_tcs(),
TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
[{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}].
protobuf_only_tcs() ->
[
t_protobuf_union_encode,
t_protobuf_union_decode
].
sparkplug_tests() ->
[
t_sparkplug_decode,
t_sparkplug_encode,
t_sparkplug_decode_encode_with_message_name,
t_sparkplug_encode_float_to_uint64_key
].
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_group(avro, Config) ->
[{serde_type, avro} | Config];
init_per_group(json, Config) ->
[{serde_type, json} | Config];
init_per_group(protobuf, Config) ->
[{serde_type, protobuf} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(),
clear_schemas(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
trace_rule(Data, Envs, _Args) ->
Now = erlang:monotonic_time(),
ets:insert(recorded_actions, {Now, #{data => Data, envs => Envs}}),
TestPid = persistent_term:get({?MODULE, test_pid}),
TestPid ! {action, #{data => Data, envs => Envs}},
ok.
make_trace_fn_action() ->
persistent_term:put({?MODULE, test_pid}, self()),
Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>,
emqx_utils_ets:new(recorded_actions, [named_table, public, ordered_set]),
#{function => Fn, args => #{}}.
create_rule_http(RuleParams) ->
create_rule_http(RuleParams, _Overrides = #{}).
create_rule_http(RuleParams, Overrides) ->
RepublishTopic = <<"republish/schema_registry">>,
emqx:subscribe(RepublishTopic),
PayloadTemplate = maps:get(payload_template, Overrides, <<>>),
DefaultParams = #{
enable => true,
actions => [
make_trace_fn_action(),
#{
<<"function">> => <<"republish">>,
<<"args">> =>
#{
<<"topic">> => RepublishTopic,
<<"payload">> => PayloadTemplate,
<<"qos">> => 0,
<<"retain">> => false,
<<"user_properties">> => <<>>
}
}
]
},
Params = maps:merge(DefaultParams, RuleParams),
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res0} ->
Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
{ok, Res};
Error ->
Error
end.
schema_params(avro) ->
Source = #{
type => record,
name => <<"test">>,
namespace => <<"emqx.com">>,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_utils_json:encode(Source),
#{type => avro, source => SourceBin};
schema_params(json) ->
Source =
#{
type => object,
properties => #{
i => #{type => integer},
s => #{type => string}
},
required => [<<"i">>, <<"s">>]
},
SourceBin = emqx_utils_json:encode(Source),
#{type => json, source => SourceBin};
schema_params(protobuf) ->
SourceBin =
<<
"message Person {\n"
" required string name = 1;\n"
" required int32 id = 2;\n"
" optional string email = 3;\n"
" }\n"
"message UnionValue {\n"
" oneof u {\n"
" int32 a = 1;\n"
" string b = 2;\n"
" }\n"
"}\n"
>>,
#{type => protobuf, source => SourceBin}.
create_serde(SerdeType, SerdeName) ->
Schema = schema_params(SerdeType),
ok = emqx_schema_registry:add_schema(SerdeName, Schema),
ok.
test_params_for(Type, encode_decode1) when Type =:= avro; Type =:= json ->
SQL =
<<
"select\n"
" schema_decode('my_serde',\n"
" schema_encode('my_serde', json_decode(payload))) as decoded,\n\n"
" decoded.i as decoded_int,\n"
" decoded.s as decoded_string\n"
"from t\n"
>>,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
ExpectedRuleOutput =
#{
<<"decoded">> => #{<<"i">> => 10, <<"s">> => <<"text">>},
<<"decoded_int">> => 10,
<<"decoded_string">> => <<"text">>
},
ExtraArgs = [],
#{
sql => SQL,
payload => Payload,
expected_rule_output => ExpectedRuleOutput,
extra_args => ExtraArgs
};
test_params_for(Type, encode1) when Type =:= avro; Type =:= json ->
SQL =
<<
"select\n"
" schema_encode('my_serde', json_decode(payload)) as encoded\n"
"from t\n"
>>,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
PayloadTemplate = <<"${.encoded}">>,
ExtraArgs = [],
#{
sql => SQL,
payload => Payload,
payload_template => PayloadTemplate,
extra_args => ExtraArgs
};
test_params_for(Type, decode1) when Type =:= avro; Type =:= json ->
SQL =
<<
"select\n"
" schema_decode('my_serde', payload) as decoded\n"
"from t\n"
>>,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
ExtraArgs = [],
#{
sql => SQL,
payload => Payload,
extra_args => ExtraArgs
};
test_params_for(protobuf, encode_decode1) ->
SQL =
<<
"select\n"
" schema_decode('my_serde',\n"
" schema_encode('my_serde', json_decode(payload), 'Person'),\n"
" 'Person') as decoded,\n"
" decoded.name as decoded_name,\n"
" decoded.email as decoded_email,\n"
" decoded.id as decoded_id\n"
"from t\n"
>>,
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
ExpectedRuleOutput =
#{
<<"decoded">> =>
#{
<<"email">> => <<"emqx@emqx.io">>,
<<"id">> => 10,
<<"name">> => <<"some name">>
},
<<"decoded_email">> => <<"emqx@emqx.io">>,
<<"decoded_id">> => 10,
<<"decoded_name">> => <<"some name">>
},
ExtraArgs = [<<"Person">>],
#{
sql => SQL,
payload => Payload,
extra_args => ExtraArgs,
expected_rule_output => ExpectedRuleOutput
};
test_params_for(protobuf, decode1) ->
SQL =
<<
"select\n"
" schema_decode('my_serde', payload, 'Person') as decoded\n"
"from t\n"
>>,
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
ExtraArgs = [<<"Person">>],
#{
sql => SQL,
payload => Payload,
extra_args => ExtraArgs
};
test_params_for(protobuf, encode1) ->
SQL =
<<
"select\n"
" schema_encode('my_serde', json_decode(payload), 'Person') as encoded\n"
"from t\n"
>>,
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
PayloadTemplate = <<"${.encoded}">>,
ExtraArgs = [<<"Person">>],
#{
sql => SQL,
payload => Payload,
payload_template => PayloadTemplate,
extra_args => ExtraArgs
};
test_params_for(protobuf, union1) ->
SQL =
<<
"select\n"
" schema_decode('my_serde', payload, 'UnionValue') as decoded,\n"
" decoded.a as decoded_a,\n"
" decoded.b as decoded_b\n"
"from t\n"
>>,
PayloadA = #{<<"a">> => 10},
PayloadB = #{<<"b">> => <<"string">>},
ExtraArgs = [<<"UnionValue">>],
#{
sql => SQL,
payload => #{a => PayloadA, b => PayloadB},
extra_args => ExtraArgs
};
test_params_for(protobuf, union2) ->
SQL =
<<
"select\n"
" schema_encode('my_serde', json_decode(payload), 'UnionValue') as encoded\n"
"from t\n"
>>,
PayloadA = #{<<"a">> => 10},
PayloadB = #{<<"b">> => <<"string">>},
ExtraArgs = [<<"UnionValue">>],
#{
sql => SQL,
payload => #{a => PayloadA, b => PayloadB},
extra_args => ExtraArgs
};
test_params_for(Type, Name) ->
ct:fail("unimplemented: ~p", [{Type, Name}]).
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_schema_registry:delete_schema(Name)
end,
emqx_schema_registry:list_schemas()
).
receive_action_results() ->
receive
{action, #{data := _} = Res} ->
Res
after 1_000 ->
ct:fail("action didn't run")
end.
receive_published(Line) ->
receive
{deliver, _Topic, Msg} ->
MsgMap = emqx_message:to_map(Msg),
maps:update_with(
payload,
fun(Raw) ->
case emqx_utils_json:safe_decode(Raw, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> Raw
end
end,
MsgMap
)
after 1_000 ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("publish not received, line ~b", [Line])
end.
cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config),
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
{apps, ?APPS},
{listener_ports, []},
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_enterprise_schema},
{load_apps, [emqx_machine]},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker]),
ok;
(emqx_conf) ->
ok;
(_) ->
ok
end}
]
),
ct:pal("cluster:\n ~p", [Cluster]),
Cluster.
start_cluster(Cluster) ->
Nodes = [
emqx_common_test_helpers:start_peer(Name, Opts)
|| {Name, Opts} <- Cluster
],
NumNodes = length(Nodes),
on_exit(fun() ->
emqx_utils:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_peer(N)
end,
Nodes
)
end),
{ok, _} = snabbkaffe:block_until(
%% -1 because only those that join the first node will emit the event.
?match_n_events(NumNodes - 1, #{?snk_kind := emqx_machine_boot_apps_started}),
30_000
),
Nodes.
wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after
%% restarting during the cluster join.
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
).
protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
#{nodes := Nodes} = Res,
CacheEvents0 = ?of_kind(
[
schema_registry_protobuf_cache_hit,
schema_registry_protobuf_cache_miss
],
Trace
),
CacheEvents = [
Event
|| Event <- CacheEvents0,
maps:get(name, Event, no_name) =/= ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME
],
?assertMatch(
[
schema_registry_protobuf_cache_hit,
schema_registry_protobuf_cache_miss
],
lists:sort(?projection(?snk_kind, CacheEvents))
),
?assertEqual(
lists:usort(Nodes),
lists:usort([N || #{?snk_meta := #{node := N}} <- CacheEvents])
),
ok;
protobuf_unique_cache_hit_spec(_Res, _Trace) ->
ok.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_unknown_calls(_Config) ->
Ref = monitor(process, emqx_schema_registry),
%% for coverage
emqx_schema_registry ! unknown,
gen_server:cast(emqx_schema_registry, unknown),
?assertEqual({error, unknown_call}, gen_server:call(emqx_schema_registry, unknown)),
receive
{'DOWN', Ref, process, _, _} ->
ct:fail("registry shouldn't have died")
after 500 ->
ok
end.
t_encode_decode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
#{
sql := SQL,
payload := Payload,
expected_rule_output := ExpectedRuleOutput
} = test_params_for(SerdeType, encode_decode1),
{ok, _} = create_rule_http(#{sql => SQL}),
PayloadBin = emqx_utils_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
t_delete_serde(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
?check_trace(
begin
ok = create_serde(SerdeType, SerdeName),
{ok, {ok, _}} =
?wait_async_action(
emqx_schema_registry:delete_schema(SerdeName),
#{?snk_kind := schema_registry_serdes_deleted},
1_000
),
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(schema_registry_serdes_deleted, Trace)),
?assertMatch([#{type := SerdeType}], ?of_kind(serde_destroyed, Trace)),
ok
end
),
ok.
t_encode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
#{
sql := SQL,
payload := Payload,
payload_template := PayloadTemplate,
extra_args := ExtraArgs
} = test_params_for(SerdeType, encode1),
{ok, _} = create_rule_http(#{sql => SQL}, #{payload_template => PayloadTemplate}),
PayloadBin = emqx_utils_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Published = receive_published(?LINE),
case SerdeType of
json ->
%% should have received binary
%% but since it's valid json, so it got
%% 'safe_decode' decoded in receive_published
?assertMatch(#{payload := #{<<"i">> := _, <<"s">> := _}}, Published);
_ ->
?assertMatch(#{payload := B} when is_binary(B), Published),
#{payload := Encoded} = Published,
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs]))
end,
ok.
t_decode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
#{
sql := SQL,
payload := Payload,
extra_args := ExtraArgs
} = test_params_for(SerdeType, decode1),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
EncodedBin = eval_encode(Serde, [Payload | ExtraArgs]),
emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
Published = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"decoded">> := _}},
Published
),
#{payload := #{<<"decoded">> := Decoded}} = Published,
?assertEqual(Payload, Decoded),
ok.
t_protobuf_union_encode(Config) ->
SerdeType = ?config(serde_type, Config),
?assertEqual(protobuf, SerdeType),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
#{
sql := SQL,
payload := #{a := PayloadA, b := PayloadB},
extra_args := ExtraArgs
} = test_params_for(SerdeType, union1),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
EncodedBinA = eval_encode(Serde, [PayloadA | ExtraArgs]),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
PublishedA = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"decoded">> := _}},
PublishedA
),
#{payload := #{<<"decoded">> := DecodedA}} = PublishedA,
?assertEqual(PayloadA, DecodedA),
EncodedBinB = eval_encode(Serde, [PayloadB | ExtraArgs]),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
PublishedB = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"decoded">> := _}},
PublishedB
),
#{payload := #{<<"decoded">> := DecodedB}} = PublishedB,
?assertEqual(PayloadB, DecodedB),
ok.
t_protobuf_union_decode(Config) ->
SerdeType = ?config(serde_type, Config),
?assertEqual(protobuf, SerdeType),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
#{
sql := SQL,
payload := #{a := PayloadA, b := PayloadB},
extra_args := ExtraArgs
} = test_params_for(SerdeType, union2),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
EncodedBinA = emqx_utils_json:encode(PayloadA),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
PublishedA = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"encoded">> := _}},
PublishedA
),
#{payload := #{<<"encoded">> := EncodedA}} = PublishedA,
?assertEqual(PayloadA, eval_decode(Serde, [EncodedA | ExtraArgs])),
EncodedBinB = emqx_utils_json:encode(PayloadB),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
PublishedB = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"encoded">> := _}},
PublishedB
),
#{payload := #{<<"encoded">> := EncodedB}} = PublishedB,
?assertEqual(PayloadB, eval_decode(Serde, [EncodedB | ExtraArgs])),
ok.
t_fail_rollback(Config) ->
SerdeType = ?config(serde_type, Config),
OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
BrokenSchema =
case SerdeType of
json ->
OkSchema#{<<"source">> := <<"not a json value">>};
_ ->
OkSchema#{<<"source">> := <<"{}">>}
end,
?assertMatch(
{ok, _},
emqx_conf:update(
[?CONF_KEY_ROOT, schemas, <<"a">>],
OkSchema,
#{}
)
),
?assertMatch(
{error, _},
emqx_conf:update(
[?CONF_KEY_ROOT, schemas, <<"z">>],
BrokenSchema,
#{}
)
),
?assertMatch({ok, #serde{name = <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)),
%% no z serdes should be in the table
?assertEqual({error, not_found}, emqx_schema_registry:get_serde(<<"z">>)),
ok.
t_cluster_serde_build(Config) ->
SerdeType = ?config(serde_type, Config),
Cluster = cluster(Config),
SerdeName = my_serde,
Schema = schema_params(SerdeType),
#{
payload := Payload,
extra_args := ExtraArgs
} = test_params_for(SerdeType, encode_decode1),
?check_trace(
begin
Nodes = [N1, N2 | _] = start_cluster(Cluster),
NumNodes = length(Nodes),
wait_for_cluster_rpc(N2),
?assertMatch(
ok,
erpc:call(N2, emqx_schema_registry, add_schema, [SerdeName, Schema])
),
%% check that we can serialize/deserialize in all nodes
lists:foreach(
fun(N) ->
ok = erpc:call(N, fun() ->
Res0 = emqx_schema_registry:get_serde(SerdeName),
?assertMatch({ok, #serde{}}, Res0, #{node => N}),
{ok, Serde} = Res0,
?assertEqual(
Payload,
encode_then_decode(Serde, Payload, ExtraArgs),
#{node => N}
),
ok
end)
end,
Nodes
),
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := schema_registry_serdes_deleted}),
NumNodes,
10_000
),
?assertEqual(
ok,
erpc:call(N1, emqx_schema_registry, delete_schema, [SerdeName])
),
{ok, DeleteEvents} = snabbkaffe:receive_events(SRef1),
%% expect all nodes to delete (local) serdes
?assertEqual(NumNodes, length(DeleteEvents)),
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
?assertMatch(
{error, not_found},
emqx_schema_registry:get_serde(SerdeName),
#{node => N}
),
ok
end)
end,
Nodes
),
#{serde_type => SerdeType, nodes => Nodes}
end,
[
{"protobuf is only built on one node", fun ?MODULE:protobuf_unique_cache_hit_spec/2}
]
),
ok.
t_import_config(_Config) ->
RawConf = #{
<<"schema_registry">> =>
#{
<<"schemas">> =>
#{
<<"my_avro_schema">> =>
#{
<<"description">> => <<"My Avro Schema">>,
<<"source">> =>
emqx_utils_json:encode(
#{
type => <<"record">>,
name => <<"ct">>,
namespace => <<"emqx.com">>,
fields => [
#{type => <<"int">>, name => <<"i">>},
#{type => <<"string">>, name => <<"s">>}
]
}
),
<<"type">> => <<"avro">>
}
}
}
},
RawConf1 = emqx_utils_maps:deep_put(
[<<"schema_registry">>, <<"schemas">>, <<"my_avro_schema">>, <<"description">>],
RawConf,
<<"Updated description">>
),
Path = [schema_registry, schemas, <<"my_avro_schema">>],
?assertEqual(
{ok, #{root_key => schema_registry, changed => []}},
emqx_schema_registry:import_config(RawConf)
),
?assertEqual(
{ok, #{root_key => schema_registry, changed => [Path]}},
emqx_schema_registry:import_config(RawConf1)
).
sparkplug_example_data_base64() ->
<<
"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgD"
"EikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3Vu"
"dGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3Jv"
"dXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50"
"ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA=="
>>.
sparkplug_example_data() ->
#{
<<"metrics">> =>
[
#{
<<"datatype">> => 2,
<<"int_value">> => 424,
<<"name">> => <<"counter_group1/counter1_1sec">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 2,
<<"int_value">> => 84,
<<"name">> => <<"counter_group1/counter1_5sec">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 2,
<<"int_value">> => 42,
<<"name">> => <<"counter_group1/counter1_10sec">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 5,
<<"int_value">> => 1,
<<"name">> => <<"counter_group1/counter1_run">>,
<<"timestamp">> => 1678094561525
},
#{
<<"datatype">> => 5,
<<"int_value">> => 0,
<<"name">> => <<"counter_group1/counter1_reset">>,
<<"timestamp">> => 1678094561525
}
],
<<"seq">> => 88,
<<"timestamp">> => 1678094561521
}.
wait_for_sparkplug_schema_registered() ->
?retry(
100,
100,
[_] = ets:lookup(?SERDE_TAB, ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME)
).
t_sparkplug_decode(_Config) ->
SQL =
<<
"select\n"
" sparkplug_decode(payload) as decoded\n"
"from t\n"
>>,
PayloadBase64 = sparkplug_example_data_base64(),
{ok, _} = create_rule_http(#{sql => SQL}),
PayloadBin = base64:decode(PayloadBase64),
ExpectedRuleOutput =
#{<<"decoded">> => sparkplug_example_data()},
wait_for_sparkplug_schema_registered(),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
t_sparkplug_encode(_Config) ->
%% Default message name field is 'Payload'
SQL =
<<
"select\n"
" sparkplug_encode(json_decode(payload)) as encoded\n"
"from t\n"
>>,
PayloadJSONBin = emqx_utils_json:encode(sparkplug_example_data()),
{ok, _} = create_rule_http(#{sql => SQL}),
ExpectedRuleOutput =
#{<<"encoded">> => base64:decode(sparkplug_example_data_base64())},
wait_for_sparkplug_schema_registered(),
emqx:publish(emqx_message:make(<<"t">>, PayloadJSONBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
t_sparkplug_encode_float_to_uint64_key(_Config) ->
%% Test that the following bug is fixed:
%% https://emqx.atlassian.net/browse/EMQX-10775
%% When one assign a float value to a uint64 key, one should get a
%% gpb_type_error and not a badarith error
wait_for_sparkplug_schema_registered(),
?assertException(
error,
{gpb_type_error, _},
emqx_rule_funcs:sparkplug_encode(#{<<"seq">> => 1.5})
).
t_sparkplug_decode_encode_with_message_name(_Config) ->
SQL =
<<
"select\n"
" sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded\n"
"from t\n"
>>,
PayloadBase64 = sparkplug_example_data_base64(),
PayloadBin = base64:decode(PayloadBase64),
{ok, _} = create_rule_http(#{sql => SQL}),
ExpectedRuleOutput =
#{<<"encoded">> => PayloadBin},
wait_for_sparkplug_schema_registered(),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
eval_encode(Serde, Args) ->
emqx_schema_registry_serde:eval_encode(Serde, Args).
eval_decode(Serde, Args) ->
emqx_schema_registry_serde:eval_decode(Serde, Args).
encode_then_decode(Serde, Payload, ExtraArgs) ->
Encoded = eval_encode(Serde, [Payload | ExtraArgs]),
eval_decode(Serde, [Encoded | ExtraArgs]).