From 9d15247dd54b6eb9565f0d019442971388c34d35 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 14 Apr 2023 17:33:25 -0300 Subject: [PATCH] feat(schema_registry): add support for protobuf schemas Fixes https://emqx.atlassian.net/browse/EMQX-9470 --- build | 5 +- changes/ee/feat-10409.en.md | 1 + .../include/emqx_ee_schema_registry.hrl | 21 +- lib-ee/emqx_ee_schema_registry/rebar.config | 3 +- .../src/emqx_ee_schema_registry.app.src | 3 +- .../src/emqx_ee_schema_registry.erl | 9 +- .../src/emqx_ee_schema_registry_schema.erl | 20 +- .../src/emqx_ee_schema_registry_serde.erl | 132 ++++++ .../test/emqx_ee_schema_registry_SUITE.erl | 390 +++++++++++++++--- ...emqx_ee_schema_registry_http_api_SUITE.erl | 88 +++- .../emqx_ee_schema_registry_serde_SUITE.erl | 53 ++- mix.exs | 2 +- rebar.config | 2 +- rel/i18n/emqx_ee_schema_registry_schema.hocon | 6 + .../zh/emqx_ee_schema_registry_schema.hocon | 6 + 15 files changed, 640 insertions(+), 101 deletions(-) create mode 100644 changes/ee/feat-10409.en.md diff --git a/build b/build index 021cc80a5..77d4dbfc8 100755 --- a/build +++ b/build @@ -124,10 +124,7 @@ make_docs() { } assert_no_compile_time_only_deps() { - if [ "$("$FIND" "_build/$PROFILE/rel/emqx/lib/" -maxdepth 1 -name 'gpb-*' -type d)" != "" ]; then - echo "gpb should not be included in the release" - exit 1 - fi + : } make_rel() { diff --git a/changes/ee/feat-10409.en.md b/changes/ee/feat-10409.en.md new file mode 100644 index 000000000..dfa9bfa76 --- /dev/null +++ b/changes/ee/feat-10409.en.md @@ -0,0 +1 @@ +Add support for [Protocol Buffers](https://protobuf.dev/) schemas in Schema Registry. diff --git a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl index af49db6dd..058abf007 100644 --- a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl +++ b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl @@ -10,14 +10,19 @@ -define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard). -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab). +-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab). -type schema_name() :: binary(). -type schema_source() :: binary(). -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serializer() :: fun((decoded_data()) -> encoded_data()). --type deserializer() :: fun((encoded_data()) -> decoded_data()). +-type serializer() :: + fun((decoded_data()) -> encoded_data()) + | fun((decoded_data(), term()) -> encoded_data()). +-type deserializer() :: + fun((encoded_data()) -> decoded_data()) + | fun((encoded_data(), term()) -> decoded_data()). -type destructor() :: fun(() -> ok). -type serde_type() :: avro. -type serde_opts() :: map(). @@ -29,6 +34,18 @@ destructor :: destructor() }). -type serde() :: #serde{}. + +-record(protobuf_cache, { + fingerprint, + module, + module_binary +}). +-type protobuf_cache() :: #protobuf_cache{ + fingerprint :: binary(), + module :: module(), + module_binary :: binary() +}. + -type serde_map() :: #{ name := schema_name(), serializer := serializer(), diff --git a/lib-ee/emqx_ee_schema_registry/rebar.config b/lib-ee/emqx_ee_schema_registry/rebar.config index 223ebf533..e42ff7278 100644 --- a/lib-ee/emqx_ee_schema_registry/rebar.config +++ b/lib-ee/emqx_ee_schema_registry/rebar.config @@ -4,7 +4,8 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}}, - {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}} + {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}, + {gpb, "4.19.7"} ]}. {shell, [ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index c40fb808a..21f51b361 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -6,7 +6,8 @@ {applications, [ kernel, stdlib, - erlavro + erlavro, + gpb ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 4c00903d5..59a224fc7 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -176,7 +176,14 @@ create_tables() -> {record_name, serde}, {attributes, record_info(fields, serde)} ]), - ok = mria:wait_for_tables([?SERDE_TAB]), + ok = mria:create_table(?PROTOBUF_CACHE_TAB, [ + {type, set}, + {rlog_shard, ?SCHEMA_REGISTRY_SHARD}, + {storage, disc_only_copies}, + {record_name, protobuf_cache}, + {attributes, record_info(fields, protobuf_cache)} + ]), + ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]), ok. do_build_serdes(Schemas) -> diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl index bcdc63166..237ec706f 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl @@ -53,10 +53,20 @@ fields(avro) -> mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})}, {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} ]; +fields(protobuf) -> + [ + {type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})}, + {source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})}, + {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} + ]; fields("get_avro") -> [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)]; +fields("get_protobuf") -> + [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)]; fields("put_avro") -> fields(avro); +fields("put_protobuf") -> + fields(protobuf); fields("post_" ++ Type) -> fields("get_" ++ Type). @@ -64,6 +74,8 @@ desc(?CONF_KEY_ROOT) -> ?DESC("schema_registry_root"); desc(avro) -> ?DESC("avro_type"); +desc(protobuf) -> + ?DESC("protobuf_type"); desc(_) -> undefined. @@ -96,7 +108,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Name) -> hoconsc:ref(?MODULE, Name). supported_serde_types() -> - [avro]. + [avro, protobuf]. refs() -> [ref(Type) || Type <- supported_serde_types()]. @@ -105,6 +117,8 @@ refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); refs(#{<<"type">> := <<"avro">>}) -> [ref(avro)]; +refs(#{<<"type">> := <<"protobuf">>}) -> + [ref(protobuf)]; refs(_) -> Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), throw(#{ @@ -113,12 +127,14 @@ refs(_) -> }). refs_get_api() -> - [ref("get_avro")]. + [ref("get_avro"), ref("get_protobuf")]. refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); refs_get_api(#{<<"type">> := <<"avro">>}) -> [ref("get_avro")]; +refs_get_api(#{<<"type">> := <<"protobuf">>}) -> + [ref("get_protobuf")]; refs_get_api(_) -> Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), throw(#{ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl index 9835ec7c2..c65574032 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl @@ -4,8 +4,11 @@ -module(emqx_ee_schema_registry_serde). -include("emqx_ee_schema_registry.hrl"). +-include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_ee_schema_registry_serde]}}]). + %% API -export([ decode/2, @@ -55,6 +58,27 @@ make_serde(avro, Name, Source0) -> ?tp(serde_destroyed, #{type => avro, name => Name}), ok end, + {Serializer, Deserializer, Destructor}; +make_serde(protobuf, Name, Source) -> + SerdeMod = make_protobuf_serde_mod(Name, Source), + Serializer = + fun(DecodedData0, MessageName0) -> + DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0), + MessageName = binary_to_existing_atom(MessageName0, utf8), + SerdeMod:encode_msg(DecodedData, MessageName) + end, + Deserializer = + fun(EncodedData, MessageName0) -> + MessageName = binary_to_existing_atom(MessageName0, utf8), + Decoded = SerdeMod:decode_msg(EncodedData, MessageName), + emqx_utils_maps:binary_key_map(Decoded) + end, + Destructor = + fun() -> + unload_code(SerdeMod), + ?tp(serde_destroyed, #{type => protobuf, name => Name}), + ok + end, {Serializer, Deserializer, Destructor}. %%------------------------------------------------------------------------------ @@ -68,3 +92,111 @@ inject_avro_name(Name, Source0) -> Schema0 = emqx_utils_json:decode(Source0, [return_maps]), Schema = Schema0#{<<"name">> => Name}, emqx_utils_json:encode(Schema). + +-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). +make_protobuf_serde_mod(Name, Source) -> + {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), + case lazy_generate_protobuf_code(SerdeMod0, Source) of + {ok, SerdeMod, ModBinary} -> + load_code(SerdeMod, SerdeModFileName, ModBinary), + SerdeMod; + {error, #{error := Error, warnings := Warnings}} -> + ?SLOG( + warning, + #{ + msg => "error_generating_protobuf_code", + error => Error, + warnings => Warnings + } + ), + error({invalid_protobuf_schema, Error}) + end. + +-spec protobuf_serde_mod_name(schema_name()) -> {module(), string()}. +protobuf_serde_mod_name(Name) -> + %% must be a string (list) + SerdeModName = "$schema_parser_" ++ binary_to_list(Name), + SerdeMod = list_to_atom(SerdeModName), + %% the "path" to the module, for `code:load_binary'. + SerdeModFileName = SerdeModName ++ ".memory", + {SerdeMod, SerdeModFileName}. + +-spec lazy_generate_protobuf_code(module(), schema_source()) -> + {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. +lazy_generate_protobuf_code(SerdeMod0, Source) -> + %% We run this inside a transaction with locks to avoid running + %% the compile on all nodes; only one will get the lock, compile + %% the schema, and other nodes will simply read the final result. + {atomic, Res} = mria:transaction( + ?SCHEMA_REGISTRY_SHARD, + fun lazy_generate_protobuf_code_trans/2, + [SerdeMod0, Source] + ), + Res. + +-spec lazy_generate_protobuf_code_trans(module(), schema_source()) -> + {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. +lazy_generate_protobuf_code_trans(SerdeMod0, Source) -> + Fingerprint = erlang:md5(Source), + _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write), + case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of + [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] -> + ?tp(schema_registry_protobuf_cache_hit, #{}), + {ok, SerdeMod, ModBinary}; + [] -> + ?tp(schema_registry_protobuf_cache_miss, #{}), + case generate_protobuf_code(SerdeMod0, Source) of + {ok, SerdeMod, ModBinary} -> + CacheEntry = #protobuf_cache{ + fingerprint = Fingerprint, + module = SerdeMod, + module_binary = ModBinary + }, + ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write), + {ok, SerdeMod, ModBinary}; + {ok, SerdeMod, ModBinary, _Warnings} -> + CacheEntry = #protobuf_cache{ + fingerprint = Fingerprint, + module = SerdeMod, + module_binary = ModBinary + }, + ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write), + {ok, SerdeMod, ModBinary}; + error -> + {error, #{error => undefined, warnings => []}}; + {error, Error} -> + {error, #{error => Error, warnings => []}}; + {error, Error, Warnings} -> + {error, #{error => Error, warnings => Warnings}} + end + end. + +generate_protobuf_code(SerdeMod, Source) -> + gpb_compile:string( + SerdeMod, + Source, + [ + binary, + strings_as_binaries, + {maps, true}, + %% Fixme: currently, some bug in `gpb' prevents this + %% option from working with `oneof' types... We're then + %% forced to use atom key maps. + %% {maps_key_type, binary}, + {maps_oneof, flat}, + {verify, always}, + {maps_unset_optional, omitted} + ] + ). + +-spec load_code(module(), string(), binary()) -> ok. +load_code(SerdeMod, SerdeModFileName, ModBinary) -> + _ = code:purge(SerdeMod), + {module, SerdeMod} = code:load_binary(SerdeMod, SerdeModFileName, ModBinary), + ok. + +-spec unload_code(module()) -> ok. +unload_code(SerdeMod) -> + _ = code:purge(SerdeMod), + _ = code:delete(SerdeMod), + ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 5bfba34b3..7ad01fa06 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -21,11 +21,19 @@ %%------------------------------------------------------------------------------ all() -> - [{group, avro}]. + [{group, avro}, {group, protobuf}]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), - [{avro, TCs}]. + AllTCs = emqx_common_test_helpers:all(?MODULE), + ProtobufOnlyTCs = protobuf_only_tcs(), + TCs = AllTCs -- ProtobufOnlyTCs, + [{avro, TCs}, {protobuf, AllTCs}]. + +protobuf_only_tcs() -> + [ + t_protobuf_union_encode, + t_protobuf_union_decode + ]. init_per_suite(Config) -> emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), @@ -38,6 +46,8 @@ end_per_suite(_Config) -> init_per_group(avro, Config) -> [{serde_type, avro} | Config]; +init_per_group(protobuf, Config) -> + [{serde_type, protobuf} | Config]; init_per_group(_Group, Config) -> Config. @@ -95,8 +105,12 @@ create_rule_http(RuleParams) -> Path = emqx_mgmt_api_test_util:api_path(["rules"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; - Error -> Error + {ok, Res0} -> + Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + {ok, Res}; + Error -> + Error end. schema_params(avro) -> @@ -108,35 +122,174 @@ schema_params(avro) -> ] }, SourceBin = emqx_utils_json:encode(Source), - #{type => avro, source => SourceBin}. + #{type => avro, source => SourceBin}; +schema_params(protobuf) -> + SourceBin = + << + "message Person {\n" + " required string name = 1;\n" + " required int32 id = 2;\n" + " optional string email = 3;\n" + " }\n" + "message UnionValue {\n" + " oneof u {\n" + " int32 a = 1;\n" + " string b = 2;\n" + " }\n" + "}\n" + >>, + #{type => protobuf, source => SourceBin}. create_serde(SerdeType, SerdeName) -> Schema = schema_params(SerdeType), ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema), ok. -sql_for(avro, encode_decode1) -> - << - "select\n" - " schema_decode('my_serde',\n" - " schema_encode('my_serde', json_decode(payload))) as decoded,\n" - " decoded.i as decoded_int,\n" - " decoded.s as decoded_string\n" - " from t" - >>; -sql_for(avro, encode1) -> - << - "select\n" - " schema_encode('my_serde', json_decode(payload)) as encoded\n" - " from t" - >>; -sql_for(avro, decode1) -> - << - "select\n" - " schema_decode('my_serde', payload) as decoded\n" - " from t" - >>; -sql_for(Type, Name) -> +test_params_for(avro, encode_decode1) -> + SQL = + << + "select\n" + " schema_decode('my_serde',\n" + " schema_encode('my_serde', json_decode(payload))) as decoded,\n\n" + " decoded.i as decoded_int,\n" + " decoded.s as decoded_string\n" + "from t\n" + >>, + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + ExpectedRuleOutput = + #{ + <<"decoded">> => #{<<"i">> => 10, <<"s">> => <<"text">>}, + <<"decoded_int">> => 10, + <<"decoded_string">> => <<"text">> + }, + ExtraArgs = [], + #{ + sql => SQL, + payload => Payload, + expected_rule_output => ExpectedRuleOutput, + extra_args => ExtraArgs + }; +test_params_for(avro, encode1) -> + SQL = + << + "select\n" + " schema_encode('my_serde', json_decode(payload)) as encoded\n" + "from t\n" + >>, + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + ExtraArgs = [], + #{ + sql => SQL, + payload => Payload, + extra_args => ExtraArgs + }; +test_params_for(avro, decode1) -> + SQL = + << + "select\n" + " schema_decode('my_serde', payload) as decoded\n" + "from t\n" + >>, + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + ExtraArgs = [], + #{ + sql => SQL, + payload => Payload, + extra_args => ExtraArgs + }; +test_params_for(protobuf, encode_decode1) -> + SQL = + << + "select\n" + " schema_decode('my_serde',\n" + " schema_encode('my_serde', json_decode(payload), 'Person'),\n" + " 'Person') as decoded,\n" + " decoded.name as decoded_name,\n" + " decoded.email as decoded_email,\n" + " decoded.id as decoded_id\n" + "from t\n" + >>, + Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + ExpectedRuleOutput = + #{ + <<"decoded">> => + #{ + <<"email">> => <<"emqx@emqx.io">>, + <<"id">> => 10, + <<"name">> => <<"some name">> + }, + <<"decoded_email">> => <<"emqx@emqx.io">>, + <<"decoded_id">> => 10, + <<"decoded_name">> => <<"some name">> + }, + ExtraArgs = [<<"Person">>], + #{ + sql => SQL, + payload => Payload, + extra_args => ExtraArgs, + expected_rule_output => ExpectedRuleOutput + }; +test_params_for(protobuf, decode1) -> + SQL = + << + "select\n" + " schema_decode('my_serde', payload, 'Person') as decoded\n" + "from t\n" + >>, + Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + ExtraArgs = [<<"Person">>], + #{ + sql => SQL, + payload => Payload, + extra_args => ExtraArgs + }; +test_params_for(protobuf, encode1) -> + SQL = + << + "select\n" + " schema_encode('my_serde', json_decode(payload), 'Person') as encoded\n" + "from t\n" + >>, + Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + ExtraArgs = [<<"Person">>], + #{ + sql => SQL, + payload => Payload, + extra_args => ExtraArgs + }; +test_params_for(protobuf, union1) -> + SQL = + << + "select\n" + " schema_decode('my_serde', payload, 'UnionValue') as decoded,\n" + " decoded.a as decoded_a,\n" + " decoded.b as decoded_b\n" + "from t\n" + >>, + PayloadA = #{<<"a">> => 10}, + PayloadB = #{<<"b">> => <<"string">>}, + ExtraArgs = [<<"UnionValue">>], + #{ + sql => SQL, + payload => #{a => PayloadA, b => PayloadB}, + extra_args => ExtraArgs + }; +test_params_for(protobuf, union2) -> + SQL = + << + "select\n" + " schema_encode('my_serde', json_decode(payload), 'UnionValue') as encoded\n" + "from t\n" + >>, + PayloadA = #{<<"a">> => 10}, + PayloadB = #{<<"b">> => <<"string">>}, + ExtraArgs = [<<"UnionValue">>], + #{ + sql => SQL, + payload => #{a => PayloadA, b => PayloadB}, + extra_args => ExtraArgs + }; +test_params_for(Type, Name) -> ct:fail("unimplemented: ~p", [{Type, Name}]). clear_schemas() -> @@ -238,6 +391,40 @@ wait_for_cluster_rpc(Node) -> true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler])) ). +serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := will_delete_schema}, + #{?snk_kind := serde_destroyed, type := SerdeType}, + Trace + ) + ), + ok. + +protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) -> + #{nodes := Nodes} = Res, + CacheEvents = ?of_kind( + [ + schema_registry_protobuf_cache_hit, + schema_registry_protobuf_cache_miss + ], + Trace + ), + ?assertMatch( + [ + schema_registry_protobuf_cache_hit, + schema_registry_protobuf_cache_miss + ], + lists:sort(?projection(?snk_kind, CacheEvents)) + ), + ?assertEqual( + lists:usort(Nodes), + lists:usort([N || #{?snk_meta := #{node := N}} <- CacheEvents]) + ), + ok; +protobuf_unique_cache_hit_spec(_Res, _Trace) -> + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -259,27 +446,16 @@ t_encode_decode(Config) -> SerdeType = ?config(serde_type, Config), SerdeName = my_serde, ok = create_serde(SerdeType, SerdeName), - {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode_decode1)}), - on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + #{ + sql := SQL, + payload := Payload, + expected_rule_output := ExpectedRuleOutput + } = test_params_for(SerdeType, encode_decode1), + {ok, _} = create_rule_http(#{sql => SQL}), PayloadBin = emqx_utils_json:encode(Payload), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), Res = receive_action_results(), - ?assertMatch( - #{ - data := - #{ - <<"decoded">> := - #{ - <<"i">> := 10, - <<"s">> := <<"text">> - }, - <<"decoded_int">> := 10, - <<"decoded_string">> := <<"text">> - } - }, - Res - ), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), ok. t_delete_serde(Config) -> @@ -308,9 +484,12 @@ t_encode(Config) -> SerdeType = ?config(serde_type, Config), SerdeName = my_serde, ok = create_serde(SerdeType, SerdeName), - {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode1)}), - on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + #{ + sql := SQL, + payload := Payload, + extra_args := ExtraArgs + } = test_params_for(SerdeType, encode1), + {ok, _} = create_rule_http(#{sql => SQL}), PayloadBin = emqx_utils_json:encode(Payload), emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), Published = receive_published(?LINE), @@ -320,18 +499,21 @@ t_encode(Config) -> ), #{payload := #{<<"encoded">> := Encoded}} = Published, {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName), - ?assertEqual(Payload, Deserializer(Encoded)), + ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])), ok. t_decode(Config) -> SerdeType = ?config(serde_type, Config), SerdeName = my_serde, ok = create_serde(SerdeType, SerdeName), - {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, decode1)}), - on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + #{ + sql := SQL, + payload := Payload, + extra_args := ExtraArgs + } = test_params_for(SerdeType, decode1), + {ok, _} = create_rule_http(#{sql => SQL}), {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName), - EncodedBin = Serializer(Payload), + EncodedBin = apply(Serializer, [Payload | ExtraArgs]), emqx:publish(emqx_message:make(<<"t">>, EncodedBin)), Published = receive_published(?LINE), ?assertMatch( @@ -342,6 +524,76 @@ t_decode(Config) -> ?assertEqual(Payload, Decoded), ok. +t_protobuf_union_encode(Config) -> + SerdeType = ?config(serde_type, Config), + ?assertEqual(protobuf, SerdeType), + SerdeName = my_serde, + ok = create_serde(SerdeType, SerdeName), + #{ + sql := SQL, + payload := #{a := PayloadA, b := PayloadB}, + extra_args := ExtraArgs + } = test_params_for(SerdeType, union1), + {ok, _} = create_rule_http(#{sql => SQL}), + {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName), + + EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]), + emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)), + PublishedA = receive_published(?LINE), + ?assertMatch( + #{payload := #{<<"decoded">> := _}}, + PublishedA + ), + #{payload := #{<<"decoded">> := DecodedA}} = PublishedA, + ?assertEqual(PayloadA, DecodedA), + + EncodedBinB = apply(Serializer, [PayloadB | ExtraArgs]), + emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)), + PublishedB = receive_published(?LINE), + ?assertMatch( + #{payload := #{<<"decoded">> := _}}, + PublishedB + ), + #{payload := #{<<"decoded">> := DecodedB}} = PublishedB, + ?assertEqual(PayloadB, DecodedB), + + ok. + +t_protobuf_union_decode(Config) -> + SerdeType = ?config(serde_type, Config), + ?assertEqual(protobuf, SerdeType), + SerdeName = my_serde, + ok = create_serde(SerdeType, SerdeName), + #{ + sql := SQL, + payload := #{a := PayloadA, b := PayloadB}, + extra_args := ExtraArgs + } = test_params_for(SerdeType, union2), + {ok, _} = create_rule_http(#{sql => SQL}), + {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName), + + EncodedBinA = emqx_utils_json:encode(PayloadA), + emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)), + PublishedA = receive_published(?LINE), + ?assertMatch( + #{payload := #{<<"encoded">> := _}}, + PublishedA + ), + #{payload := #{<<"encoded">> := EncodedA}} = PublishedA, + ?assertEqual(PayloadA, apply(Deserializer, [EncodedA | ExtraArgs])), + + EncodedBinB = emqx_utils_json:encode(PayloadB), + emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)), + PublishedB = receive_published(?LINE), + ?assertMatch( + #{payload := #{<<"encoded">> := _}}, + PublishedB + ), + #{payload := #{<<"encoded">> := EncodedB}} = PublishedB, + ?assertEqual(PayloadB, apply(Deserializer, [EncodedB | ExtraArgs])), + + ok. + t_fail_rollback(Config) -> SerdeType = ?config(serde_type, Config), OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)), @@ -369,6 +621,10 @@ t_cluster_serde_build(Config) -> Cluster = cluster(Config), SerdeName = my_serde, Schema = schema_params(SerdeType), + #{ + payload := Payload, + extra_args := ExtraArgs + } = test_params_for(SerdeType, encode_decode1), ?check_trace( begin Nodes = [N1, N2 | _] = start_cluster(Cluster), @@ -385,8 +641,14 @@ t_cluster_serde_build(Config) -> Res0 = emqx_ee_schema_registry:get_serde(SerdeName), ?assertMatch({ok, #{}}, Res0, #{node => N}), {ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0, - Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, - ?assertEqual(Payload, Deserializer(Serializer(Payload)), #{node => N}), + ?assertEqual( + Payload, + apply( + Deserializer, + [apply(Serializer, [Payload | ExtraArgs]) | ExtraArgs] + ), + #{node => N} + ), ok end) end, @@ -417,17 +679,11 @@ t_cluster_serde_build(Config) -> end, Nodes ), - ok + #{serde_type => SerdeType, nodes => Nodes} end, - fun(Trace) -> - ?assert( - ?strict_causality( - #{?snk_kind := will_delete_schema}, - #{?snk_kind := serde_destroyed, type := SerdeType}, - Trace - ) - ), - ok - end + [ + {"destructor is always called", fun ?MODULE:serde_deletion_calls_destructor_spec/2}, + {"protobuf is only built on one node", fun ?MODULE:protobuf_unique_cache_hit_spec/2} + ] ), ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl index e7034d562..ee6a693db 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl @@ -19,7 +19,17 @@ %%------------------------------------------------------------------------------ all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, avro}, + {group, protobuf} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {avro, AllTCs}, + {protobuf, AllTCs} + ]. init_per_suite(Config) -> emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), @@ -30,6 +40,48 @@ end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), ok. +init_per_group(avro, Config) -> + Source = #{ + type => record, + fields => [ + #{name => <<"i">>, type => <<"int">>}, + #{name => <<"s">>, type => <<"string">>} + ] + }, + SourceBin = emqx_utils_json:encode(Source), + InvalidSourceBin = <<"{}">>, + [ + {serde_type, avro}, + {schema_source, SourceBin}, + {invalid_schema_source, InvalidSourceBin} + | Config + ]; +init_per_group(protobuf, Config) -> + SourceBin = + << + "message Person {\n" + " required string name = 1;\n" + " required int32 id = 2;\n" + " optional string email = 3;\n" + " }\n" + "message UnionValue {\n" + " oneof u {\n" + " int32 a = 1;\n" + " string b = 2;\n" + " }\n" + "}\n" + >>, + InvalidSourceBin = <<"xxxx">>, + [ + {serde_type, protobuf}, + {schema_source, SourceBin}, + {invalid_schema_source, InvalidSourceBin} + | Config + ]. + +end_per_group(_Group, _Config) -> + ok. + init_per_testcase(_TestCase, Config) -> clear_schemas(), ok = snabbkaffe:start_trace(), @@ -93,18 +145,14 @@ clear_schemas() -> %% Testcases %%------------------------------------------------------------------------------ -t_crud(_Config) -> - SchemaName = <<"my_avro_schema">>, - Source = #{ - type => record, - fields => [ - #{name => <<"i">>, type => <<"int">>}, - #{name => <<"s">>, type => <<"string">>} - ] - }, - SourceBin = emqx_utils_json:encode(Source), +t_crud(Config) -> + SerdeType = ?config(serde_type, Config), + SourceBin = ?config(schema_source, Config), + InvalidSourceBin = ?config(invalid_schema_source, Config), + SerdeTypeBin = atom_to_binary(SerdeType), + SchemaName = <<"my_schema">>, Params = #{ - <<"type">> => <<"avro">>, + <<"type">> => SerdeTypeBin, <<"source">> => SourceBin, <<"name">> => SchemaName, <<"description">> => <<"My schema">> @@ -138,7 +186,7 @@ t_crud(_Config) -> %% create a schema ?assertMatch( {ok, 201, #{ - <<"type">> := <<"avro">>, + <<"type">> := SerdeTypeBin, <<"source">> := SourceBin, <<"name">> := SchemaName, <<"description">> := <<"My schema">> @@ -147,7 +195,7 @@ t_crud(_Config) -> ), ?assertMatch( {ok, 200, #{ - <<"type">> := <<"avro">>, + <<"type">> := SerdeTypeBin, <<"source">> := SourceBin, <<"name">> := SchemaName, <<"description">> := <<"My schema">> @@ -157,7 +205,7 @@ t_crud(_Config) -> ?assertMatch( {ok, 200, [ #{ - <<"type">> := <<"avro">>, + <<"type">> := SerdeTypeBin, <<"source">> := SourceBin, <<"name">> := SchemaName, <<"description">> := <<"My schema">> @@ -168,7 +216,7 @@ t_crud(_Config) -> UpdateParams1 = UpdateParams#{<<"description">> := <<"My new schema">>}, ?assertMatch( {ok, 200, #{ - <<"type">> := <<"avro">>, + <<"type">> := SerdeTypeBin, <<"source">> := SourceBin, <<"name">> := SchemaName, <<"description">> := <<"My new schema">> @@ -188,9 +236,9 @@ t_crud(_Config) -> {ok, 400, #{ <<"code">> := <<"BAD_REQUEST">>, <<"message">> := - <<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">> + <<"{post_config_update,emqx_ee_schema_registry,", _/binary>> }}, - request({put, SchemaName, UpdateParams#{<<"source">> := <<"{}">>}}) + request({put, SchemaName, UpdateParams#{<<"source">> := InvalidSourceBin}}) ), ?assertMatch( @@ -229,9 +277,9 @@ t_crud(_Config) -> {ok, 400, #{ <<"code">> := <<"BAD_REQUEST">>, <<"message">> := - <<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">> + <<"{post_config_update,emqx_ee_schema_registry,", _/binary>> }}, - request({post, Params#{<<"source">> := <<"{}">>}}) + request({post, Params#{<<"source">> := InvalidSourceBin}}) ), %% unknown serde type diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl index 12798c6a2..1ab5e3c01 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl @@ -60,7 +60,25 @@ schema_params(avro) -> ] }, SourceBin = emqx_utils_json:encode(Source), - #{type => avro, source => SourceBin}. + #{type => avro, source => SourceBin}; +schema_params(protobuf) -> + SourceBin = + << + "\n" + " message Person {\n" + " required string name = 1;\n" + " required int32 id = 2;\n" + " optional string email = 3;\n" + " }\n" + " message UnionValue {\n" + " oneof u {\n" + " int32 a = 1;\n" + " string b = 2;\n" + " }\n" + " }\n" + " " + >>, + #{type => protobuf, source => SourceBin}. assert_roundtrip(SerdeName, Original) -> Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original), @@ -119,3 +137,36 @@ t_serde_not_found(_Config) -> emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original) ), ok. + +t_roundtrip_protobuf(_Config) -> + SerdeName = my_serde, + Params = schema_params(protobuf), + ok = emqx_ee_schema_registry:add_schema(SerdeName, Params), + ExtraArgsPerson = [<<"Person">>], + + Original0 = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>}, + assert_roundtrip(SerdeName, Original0, ExtraArgsPerson, ExtraArgsPerson), + + %% removing optional field + Original1 = #{<<"name">> => <<"some name">>, <<"id">> => 10}, + assert_roundtrip(SerdeName, Original1, ExtraArgsPerson, ExtraArgsPerson), + + %% `oneof' fields + ExtraArgsUnion = [<<"UnionValue">>], + Original2 = #{<<"a">> => 1}, + assert_roundtrip(SerdeName, Original2, ExtraArgsUnion, ExtraArgsUnion), + + Original3 = #{<<"b">> => <<"string">>}, + assert_roundtrip(SerdeName, Original3, ExtraArgsUnion, ExtraArgsUnion), + + ok. + +t_protobuf_invalid_schema(_Config) -> + SerdeName = my_serde, + Params = schema_params(protobuf), + WrongParams = Params#{source := <<"xxxx">>}, + ?assertMatch( + {error, {post_config_update, _, {invalid_protobuf_schema, _}}}, + emqx_ee_schema_registry:add_schema(SerdeName, WrongParams) + ), + ok. diff --git a/mix.exs b/mix.exs index e358edfcc..36ea9e157 100644 --- a/mix.exs +++ b/mix.exs @@ -94,7 +94,7 @@ defmodule EMQXUmbrella.MixProject do {:ranch, github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true}, # in conflict by grpc and eetcd - {:gpb, "4.19.5", override: true, runtime: false}, + {:gpb, "4.19.7", override: true, runtime: false}, {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true} ] ++ emqx_apps(profile_info, version) ++ diff --git a/rebar.config b/rebar.config index 040b1a7c0..edb544298 100644 --- a/rebar.config +++ b/rebar.config @@ -53,7 +53,7 @@ [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} , {redbug, "2.0.8"} , {covertool, {git, "https://github.com/zmstone/covertool", {tag, "2.0.4.1"}}} - , {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps + , {gpb, "4.19.7"} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}} diff --git a/rel/i18n/emqx_ee_schema_registry_schema.hocon b/rel/i18n/emqx_ee_schema_registry_schema.hocon index 667c4c0a4..3d1ce4072 100644 --- a/rel/i18n/emqx_ee_schema_registry_schema.hocon +++ b/rel/i18n/emqx_ee_schema_registry_schema.hocon @@ -6,6 +6,12 @@ avro_type.desc: avro_type.label: """Apache Avro""" +protobuf_type.desc: +"""[Protocol Buffers](https://protobuf.dev/) serialization format.""" + +protobuf_type.label: +"""Protocol Buffers""" + schema_description.desc: """A description for this schema.""" diff --git a/rel/i18n/zh/emqx_ee_schema_registry_schema.hocon b/rel/i18n/zh/emqx_ee_schema_registry_schema.hocon index 3bf0a7dc8..2f4c972ec 100644 --- a/rel/i18n/zh/emqx_ee_schema_registry_schema.hocon +++ b/rel/i18n/zh/emqx_ee_schema_registry_schema.hocon @@ -6,6 +6,12 @@ avro_type.desc: avro_type.label: """阿帕奇-阿夫罗""" +protobuf_type.desc: +"""[协议缓冲器](https://protobuf.dev/) 序列化格式。""" + +protobuf_type.label: +"""协议缓冲器""" + schema_description.desc: """对该模式的描述。"""