feat(schema_registry): add support for protobuf schemas
Fixes https://emqx.atlassian.net/browse/EMQX-9470
This commit is contained in:
parent
14e807adeb
commit
9d15247dd5
5
build
5
build
|
@ -124,10 +124,7 @@ make_docs() {
|
|||
}
|
||||
|
||||
assert_no_compile_time_only_deps() {
|
||||
if [ "$("$FIND" "_build/$PROFILE/rel/emqx/lib/" -maxdepth 1 -name 'gpb-*' -type d)" != "" ]; then
|
||||
echo "gpb should not be included in the release"
|
||||
exit 1
|
||||
fi
|
||||
:
|
||||
}
|
||||
|
||||
make_rel() {
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Add support for [Protocol Buffers](https://protobuf.dev/) schemas in Schema Registry.
|
|
@ -10,14 +10,19 @@
|
|||
|
||||
-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
|
||||
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
|
||||
-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).
|
||||
|
||||
-type schema_name() :: binary().
|
||||
-type schema_source() :: binary().
|
||||
|
||||
-type encoded_data() :: iodata().
|
||||
-type decoded_data() :: map().
|
||||
-type serializer() :: fun((decoded_data()) -> encoded_data()).
|
||||
-type deserializer() :: fun((encoded_data()) -> decoded_data()).
|
||||
-type serializer() ::
|
||||
fun((decoded_data()) -> encoded_data())
|
||||
| fun((decoded_data(), term()) -> encoded_data()).
|
||||
-type deserializer() ::
|
||||
fun((encoded_data()) -> decoded_data())
|
||||
| fun((encoded_data(), term()) -> decoded_data()).
|
||||
-type destructor() :: fun(() -> ok).
|
||||
-type serde_type() :: avro.
|
||||
-type serde_opts() :: map().
|
||||
|
@ -29,6 +34,18 @@
|
|||
destructor :: destructor()
|
||||
}).
|
||||
-type serde() :: #serde{}.
|
||||
|
||||
-record(protobuf_cache, {
|
||||
fingerprint,
|
||||
module,
|
||||
module_binary
|
||||
}).
|
||||
-type protobuf_cache() :: #protobuf_cache{
|
||||
fingerprint :: binary(),
|
||||
module :: module(),
|
||||
module_binary :: binary()
|
||||
}.
|
||||
|
||||
-type serde_map() :: #{
|
||||
name := schema_name(),
|
||||
serializer := serializer(),
|
||||
|
|
|
@ -4,7 +4,8 @@
|
|||
{deps, [
|
||||
{emqx, {path, "../../apps/emqx"}},
|
||||
{emqx_utils, {path, "../../apps/emqx_utils"}},
|
||||
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}
|
||||
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
|
||||
{gpb, "4.19.7"}
|
||||
]}.
|
||||
|
||||
{shell, [
|
||||
|
|
|
@ -6,7 +6,8 @@
|
|||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
erlavro
|
||||
erlavro,
|
||||
gpb
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
|
|
@ -176,7 +176,14 @@ create_tables() ->
|
|||
{record_name, serde},
|
||||
{attributes, record_info(fields, serde)}
|
||||
]),
|
||||
ok = mria:wait_for_tables([?SERDE_TAB]),
|
||||
ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
|
||||
{type, set},
|
||||
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
|
||||
{storage, disc_only_copies},
|
||||
{record_name, protobuf_cache},
|
||||
{attributes, record_info(fields, protobuf_cache)}
|
||||
]),
|
||||
ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]),
|
||||
ok.
|
||||
|
||||
do_build_serdes(Schemas) ->
|
||||
|
|
|
@ -53,10 +53,20 @@ fields(avro) ->
|
|||
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
|
||||
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
|
||||
];
|
||||
fields(protobuf) ->
|
||||
[
|
||||
{type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})},
|
||||
{source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})},
|
||||
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
|
||||
];
|
||||
fields("get_avro") ->
|
||||
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
|
||||
fields("get_protobuf") ->
|
||||
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
|
||||
fields("put_avro") ->
|
||||
fields(avro);
|
||||
fields("put_protobuf") ->
|
||||
fields(protobuf);
|
||||
fields("post_" ++ Type) ->
|
||||
fields("get_" ++ Type).
|
||||
|
||||
|
@ -64,6 +74,8 @@ desc(?CONF_KEY_ROOT) ->
|
|||
?DESC("schema_registry_root");
|
||||
desc(avro) ->
|
||||
?DESC("avro_type");
|
||||
desc(protobuf) ->
|
||||
?DESC("protobuf_type");
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
|
@ -96,7 +108,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta).
|
|||
ref(Name) -> hoconsc:ref(?MODULE, Name).
|
||||
|
||||
supported_serde_types() ->
|
||||
[avro].
|
||||
[avro, protobuf].
|
||||
|
||||
refs() ->
|
||||
[ref(Type) || Type <- supported_serde_types()].
|
||||
|
@ -105,6 +117,8 @@ refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
|
|||
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
|
||||
refs(#{<<"type">> := <<"avro">>}) ->
|
||||
[ref(avro)];
|
||||
refs(#{<<"type">> := <<"protobuf">>}) ->
|
||||
[ref(protobuf)];
|
||||
refs(_) ->
|
||||
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
|
||||
throw(#{
|
||||
|
@ -113,12 +127,14 @@ refs(_) ->
|
|||
}).
|
||||
|
||||
refs_get_api() ->
|
||||
[ref("get_avro")].
|
||||
[ref("get_avro"), ref("get_protobuf")].
|
||||
|
||||
refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
|
||||
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
|
||||
refs_get_api(#{<<"type">> := <<"avro">>}) ->
|
||||
[ref("get_avro")];
|
||||
refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
|
||||
[ref("get_protobuf")];
|
||||
refs_get_api(_) ->
|
||||
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
|
||||
throw(#{
|
||||
|
|
|
@ -4,8 +4,11 @@
|
|||
-module(emqx_ee_schema_registry_serde).
|
||||
|
||||
-include("emqx_ee_schema_registry.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_ee_schema_registry_serde]}}]).
|
||||
|
||||
%% API
|
||||
-export([
|
||||
decode/2,
|
||||
|
@ -55,6 +58,27 @@ make_serde(avro, Name, Source0) ->
|
|||
?tp(serde_destroyed, #{type => avro, name => Name}),
|
||||
ok
|
||||
end,
|
||||
{Serializer, Deserializer, Destructor};
|
||||
make_serde(protobuf, Name, Source) ->
|
||||
SerdeMod = make_protobuf_serde_mod(Name, Source),
|
||||
Serializer =
|
||||
fun(DecodedData0, MessageName0) ->
|
||||
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
|
||||
MessageName = binary_to_existing_atom(MessageName0, utf8),
|
||||
SerdeMod:encode_msg(DecodedData, MessageName)
|
||||
end,
|
||||
Deserializer =
|
||||
fun(EncodedData, MessageName0) ->
|
||||
MessageName = binary_to_existing_atom(MessageName0, utf8),
|
||||
Decoded = SerdeMod:decode_msg(EncodedData, MessageName),
|
||||
emqx_utils_maps:binary_key_map(Decoded)
|
||||
end,
|
||||
Destructor =
|
||||
fun() ->
|
||||
unload_code(SerdeMod),
|
||||
?tp(serde_destroyed, #{type => protobuf, name => Name}),
|
||||
ok
|
||||
end,
|
||||
{Serializer, Deserializer, Destructor}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -68,3 +92,111 @@ inject_avro_name(Name, Source0) ->
|
|||
Schema0 = emqx_utils_json:decode(Source0, [return_maps]),
|
||||
Schema = Schema0#{<<"name">> => Name},
|
||||
emqx_utils_json:encode(Schema).
|
||||
|
||||
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
|
||||
make_protobuf_serde_mod(Name, Source) ->
|
||||
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
|
||||
case lazy_generate_protobuf_code(SerdeMod0, Source) of
|
||||
{ok, SerdeMod, ModBinary} ->
|
||||
load_code(SerdeMod, SerdeModFileName, ModBinary),
|
||||
SerdeMod;
|
||||
{error, #{error := Error, warnings := Warnings}} ->
|
||||
?SLOG(
|
||||
warning,
|
||||
#{
|
||||
msg => "error_generating_protobuf_code",
|
||||
error => Error,
|
||||
warnings => Warnings
|
||||
}
|
||||
),
|
||||
error({invalid_protobuf_schema, Error})
|
||||
end.
|
||||
|
||||
-spec protobuf_serde_mod_name(schema_name()) -> {module(), string()}.
|
||||
protobuf_serde_mod_name(Name) ->
|
||||
%% must be a string (list)
|
||||
SerdeModName = "$schema_parser_" ++ binary_to_list(Name),
|
||||
SerdeMod = list_to_atom(SerdeModName),
|
||||
%% the "path" to the module, for `code:load_binary'.
|
||||
SerdeModFileName = SerdeModName ++ ".memory",
|
||||
{SerdeMod, SerdeModFileName}.
|
||||
|
||||
-spec lazy_generate_protobuf_code(module(), schema_source()) ->
|
||||
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
|
||||
lazy_generate_protobuf_code(SerdeMod0, Source) ->
|
||||
%% We run this inside a transaction with locks to avoid running
|
||||
%% the compile on all nodes; only one will get the lock, compile
|
||||
%% the schema, and other nodes will simply read the final result.
|
||||
{atomic, Res} = mria:transaction(
|
||||
?SCHEMA_REGISTRY_SHARD,
|
||||
fun lazy_generate_protobuf_code_trans/2,
|
||||
[SerdeMod0, Source]
|
||||
),
|
||||
Res.
|
||||
|
||||
-spec lazy_generate_protobuf_code_trans(module(), schema_source()) ->
|
||||
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
|
||||
lazy_generate_protobuf_code_trans(SerdeMod0, Source) ->
|
||||
Fingerprint = erlang:md5(Source),
|
||||
_ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
|
||||
case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
|
||||
[#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
|
||||
?tp(schema_registry_protobuf_cache_hit, #{}),
|
||||
{ok, SerdeMod, ModBinary};
|
||||
[] ->
|
||||
?tp(schema_registry_protobuf_cache_miss, #{}),
|
||||
case generate_protobuf_code(SerdeMod0, Source) of
|
||||
{ok, SerdeMod, ModBinary} ->
|
||||
CacheEntry = #protobuf_cache{
|
||||
fingerprint = Fingerprint,
|
||||
module = SerdeMod,
|
||||
module_binary = ModBinary
|
||||
},
|
||||
ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
|
||||
{ok, SerdeMod, ModBinary};
|
||||
{ok, SerdeMod, ModBinary, _Warnings} ->
|
||||
CacheEntry = #protobuf_cache{
|
||||
fingerprint = Fingerprint,
|
||||
module = SerdeMod,
|
||||
module_binary = ModBinary
|
||||
},
|
||||
ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
|
||||
{ok, SerdeMod, ModBinary};
|
||||
error ->
|
||||
{error, #{error => undefined, warnings => []}};
|
||||
{error, Error} ->
|
||||
{error, #{error => Error, warnings => []}};
|
||||
{error, Error, Warnings} ->
|
||||
{error, #{error => Error, warnings => Warnings}}
|
||||
end
|
||||
end.
|
||||
|
||||
generate_protobuf_code(SerdeMod, Source) ->
|
||||
gpb_compile:string(
|
||||
SerdeMod,
|
||||
Source,
|
||||
[
|
||||
binary,
|
||||
strings_as_binaries,
|
||||
{maps, true},
|
||||
%% Fixme: currently, some bug in `gpb' prevents this
|
||||
%% option from working with `oneof' types... We're then
|
||||
%% forced to use atom key maps.
|
||||
%% {maps_key_type, binary},
|
||||
{maps_oneof, flat},
|
||||
{verify, always},
|
||||
{maps_unset_optional, omitted}
|
||||
]
|
||||
).
|
||||
|
||||
-spec load_code(module(), string(), binary()) -> ok.
|
||||
load_code(SerdeMod, SerdeModFileName, ModBinary) ->
|
||||
_ = code:purge(SerdeMod),
|
||||
{module, SerdeMod} = code:load_binary(SerdeMod, SerdeModFileName, ModBinary),
|
||||
ok.
|
||||
|
||||
-spec unload_code(module()) -> ok.
|
||||
unload_code(SerdeMod) ->
|
||||
_ = code:purge(SerdeMod),
|
||||
_ = code:delete(SerdeMod),
|
||||
ok.
|
||||
|
|
|
@ -21,11 +21,19 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
[{group, avro}].
|
||||
[{group, avro}, {group, protobuf}].
|
||||
|
||||
groups() ->
|
||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||
[{avro, TCs}].
|
||||
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||
ProtobufOnlyTCs = protobuf_only_tcs(),
|
||||
TCs = AllTCs -- ProtobufOnlyTCs,
|
||||
[{avro, TCs}, {protobuf, AllTCs}].
|
||||
|
||||
protobuf_only_tcs() ->
|
||||
[
|
||||
t_protobuf_union_encode,
|
||||
t_protobuf_union_decode
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
|
||||
|
@ -38,6 +46,8 @@ end_per_suite(_Config) ->
|
|||
|
||||
init_per_group(avro, Config) ->
|
||||
[{serde_type, avro} | Config];
|
||||
init_per_group(protobuf, Config) ->
|
||||
[{serde_type, protobuf} | Config];
|
||||
init_per_group(_Group, Config) ->
|
||||
Config.
|
||||
|
||||
|
@ -95,8 +105,12 @@ create_rule_http(RuleParams) ->
|
|||
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
||||
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
|
||||
Error -> Error
|
||||
{ok, Res0} ->
|
||||
Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
{ok, Res};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
schema_params(avro) ->
|
||||
|
@ -108,35 +122,174 @@ schema_params(avro) ->
|
|||
]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
#{type => avro, source => SourceBin}.
|
||||
#{type => avro, source => SourceBin};
|
||||
schema_params(protobuf) ->
|
||||
SourceBin =
|
||||
<<
|
||||
"message Person {\n"
|
||||
" required string name = 1;\n"
|
||||
" required int32 id = 2;\n"
|
||||
" optional string email = 3;\n"
|
||||
" }\n"
|
||||
"message UnionValue {\n"
|
||||
" oneof u {\n"
|
||||
" int32 a = 1;\n"
|
||||
" string b = 2;\n"
|
||||
" }\n"
|
||||
"}\n"
|
||||
>>,
|
||||
#{type => protobuf, source => SourceBin}.
|
||||
|
||||
create_serde(SerdeType, SerdeName) ->
|
||||
Schema = schema_params(SerdeType),
|
||||
ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema),
|
||||
ok.
|
||||
|
||||
sql_for(avro, encode_decode1) ->
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde',\n"
|
||||
" schema_encode('my_serde', json_decode(payload))) as decoded,\n"
|
||||
" decoded.i as decoded_int,\n"
|
||||
" decoded.s as decoded_string\n"
|
||||
" from t"
|
||||
>>;
|
||||
sql_for(avro, encode1) ->
|
||||
<<
|
||||
"select\n"
|
||||
" schema_encode('my_serde', json_decode(payload)) as encoded\n"
|
||||
" from t"
|
||||
>>;
|
||||
sql_for(avro, decode1) ->
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde', payload) as decoded\n"
|
||||
" from t"
|
||||
>>;
|
||||
sql_for(Type, Name) ->
|
||||
test_params_for(avro, encode_decode1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde',\n"
|
||||
" schema_encode('my_serde', json_decode(payload))) as decoded,\n\n"
|
||||
" decoded.i as decoded_int,\n"
|
||||
" decoded.s as decoded_string\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
ExpectedRuleOutput =
|
||||
#{
|
||||
<<"decoded">> => #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
<<"decoded_int">> => 10,
|
||||
<<"decoded_string">> => <<"text">>
|
||||
},
|
||||
ExtraArgs = [],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => Payload,
|
||||
expected_rule_output => ExpectedRuleOutput,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(avro, encode1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_encode('my_serde', json_decode(payload)) as encoded\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
ExtraArgs = [],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => Payload,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(avro, decode1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde', payload) as decoded\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
ExtraArgs = [],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => Payload,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(protobuf, encode_decode1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde',\n"
|
||||
" schema_encode('my_serde', json_decode(payload), 'Person'),\n"
|
||||
" 'Person') as decoded,\n"
|
||||
" decoded.name as decoded_name,\n"
|
||||
" decoded.email as decoded_email,\n"
|
||||
" decoded.id as decoded_id\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
|
||||
ExpectedRuleOutput =
|
||||
#{
|
||||
<<"decoded">> =>
|
||||
#{
|
||||
<<"email">> => <<"emqx@emqx.io">>,
|
||||
<<"id">> => 10,
|
||||
<<"name">> => <<"some name">>
|
||||
},
|
||||
<<"decoded_email">> => <<"emqx@emqx.io">>,
|
||||
<<"decoded_id">> => 10,
|
||||
<<"decoded_name">> => <<"some name">>
|
||||
},
|
||||
ExtraArgs = [<<"Person">>],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => Payload,
|
||||
extra_args => ExtraArgs,
|
||||
expected_rule_output => ExpectedRuleOutput
|
||||
};
|
||||
test_params_for(protobuf, decode1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde', payload, 'Person') as decoded\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
|
||||
ExtraArgs = [<<"Person">>],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => Payload,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(protobuf, encode1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_encode('my_serde', json_decode(payload), 'Person') as encoded\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
Payload = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
|
||||
ExtraArgs = [<<"Person">>],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => Payload,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(protobuf, union1) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_decode('my_serde', payload, 'UnionValue') as decoded,\n"
|
||||
" decoded.a as decoded_a,\n"
|
||||
" decoded.b as decoded_b\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
PayloadA = #{<<"a">> => 10},
|
||||
PayloadB = #{<<"b">> => <<"string">>},
|
||||
ExtraArgs = [<<"UnionValue">>],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => #{a => PayloadA, b => PayloadB},
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(protobuf, union2) ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
" schema_encode('my_serde', json_decode(payload), 'UnionValue') as encoded\n"
|
||||
"from t\n"
|
||||
>>,
|
||||
PayloadA = #{<<"a">> => 10},
|
||||
PayloadB = #{<<"b">> => <<"string">>},
|
||||
ExtraArgs = [<<"UnionValue">>],
|
||||
#{
|
||||
sql => SQL,
|
||||
payload => #{a => PayloadA, b => PayloadB},
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(Type, Name) ->
|
||||
ct:fail("unimplemented: ~p", [{Type, Name}]).
|
||||
|
||||
clear_schemas() ->
|
||||
|
@ -238,6 +391,40 @@ wait_for_cluster_rpc(Node) ->
|
|||
true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
|
||||
).
|
||||
|
||||
serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) ->
|
||||
?assert(
|
||||
?strict_causality(
|
||||
#{?snk_kind := will_delete_schema},
|
||||
#{?snk_kind := serde_destroyed, type := SerdeType},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
ok.
|
||||
|
||||
protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
|
||||
#{nodes := Nodes} = Res,
|
||||
CacheEvents = ?of_kind(
|
||||
[
|
||||
schema_registry_protobuf_cache_hit,
|
||||
schema_registry_protobuf_cache_miss
|
||||
],
|
||||
Trace
|
||||
),
|
||||
?assertMatch(
|
||||
[
|
||||
schema_registry_protobuf_cache_hit,
|
||||
schema_registry_protobuf_cache_miss
|
||||
],
|
||||
lists:sort(?projection(?snk_kind, CacheEvents))
|
||||
),
|
||||
?assertEqual(
|
||||
lists:usort(Nodes),
|
||||
lists:usort([N || #{?snk_meta := #{node := N}} <- CacheEvents])
|
||||
),
|
||||
ok;
|
||||
protobuf_unique_cache_hit_spec(_Res, _Trace) ->
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -259,27 +446,16 @@ t_encode_decode(Config) ->
|
|||
SerdeType = ?config(serde_type, Config),
|
||||
SerdeName = my_serde,
|
||||
ok = create_serde(SerdeType, SerdeName),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode_decode1)}),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
#{
|
||||
sql := SQL,
|
||||
payload := Payload,
|
||||
expected_rule_output := ExpectedRuleOutput
|
||||
} = test_params_for(SerdeType, encode_decode1),
|
||||
{ok, _} = create_rule_http(#{sql => SQL}),
|
||||
PayloadBin = emqx_utils_json:encode(Payload),
|
||||
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
|
||||
Res = receive_action_results(),
|
||||
?assertMatch(
|
||||
#{
|
||||
data :=
|
||||
#{
|
||||
<<"decoded">> :=
|
||||
#{
|
||||
<<"i">> := 10,
|
||||
<<"s">> := <<"text">>
|
||||
},
|
||||
<<"decoded_int">> := 10,
|
||||
<<"decoded_string">> := <<"text">>
|
||||
}
|
||||
},
|
||||
Res
|
||||
),
|
||||
?assertMatch(#{data := ExpectedRuleOutput}, Res),
|
||||
ok.
|
||||
|
||||
t_delete_serde(Config) ->
|
||||
|
@ -308,9 +484,12 @@ t_encode(Config) ->
|
|||
SerdeType = ?config(serde_type, Config),
|
||||
SerdeName = my_serde,
|
||||
ok = create_serde(SerdeType, SerdeName),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode1)}),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
#{
|
||||
sql := SQL,
|
||||
payload := Payload,
|
||||
extra_args := ExtraArgs
|
||||
} = test_params_for(SerdeType, encode1),
|
||||
{ok, _} = create_rule_http(#{sql => SQL}),
|
||||
PayloadBin = emqx_utils_json:encode(Payload),
|
||||
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
|
||||
Published = receive_published(?LINE),
|
||||
|
@ -320,18 +499,21 @@ t_encode(Config) ->
|
|||
),
|
||||
#{payload := #{<<"encoded">> := Encoded}} = Published,
|
||||
{ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
||||
?assertEqual(Payload, Deserializer(Encoded)),
|
||||
?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
|
||||
ok.
|
||||
|
||||
t_decode(Config) ->
|
||||
SerdeType = ?config(serde_type, Config),
|
||||
SerdeName = my_serde,
|
||||
ok = create_serde(SerdeType, SerdeName),
|
||||
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, decode1)}),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
#{
|
||||
sql := SQL,
|
||||
payload := Payload,
|
||||
extra_args := ExtraArgs
|
||||
} = test_params_for(SerdeType, decode1),
|
||||
{ok, _} = create_rule_http(#{sql => SQL}),
|
||||
{ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
||||
EncodedBin = Serializer(Payload),
|
||||
EncodedBin = apply(Serializer, [Payload | ExtraArgs]),
|
||||
emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
|
||||
Published = receive_published(?LINE),
|
||||
?assertMatch(
|
||||
|
@ -342,6 +524,76 @@ t_decode(Config) ->
|
|||
?assertEqual(Payload, Decoded),
|
||||
ok.
|
||||
|
||||
t_protobuf_union_encode(Config) ->
|
||||
SerdeType = ?config(serde_type, Config),
|
||||
?assertEqual(protobuf, SerdeType),
|
||||
SerdeName = my_serde,
|
||||
ok = create_serde(SerdeType, SerdeName),
|
||||
#{
|
||||
sql := SQL,
|
||||
payload := #{a := PayloadA, b := PayloadB},
|
||||
extra_args := ExtraArgs
|
||||
} = test_params_for(SerdeType, union1),
|
||||
{ok, _} = create_rule_http(#{sql => SQL}),
|
||||
{ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
||||
|
||||
EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]),
|
||||
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
|
||||
PublishedA = receive_published(?LINE),
|
||||
?assertMatch(
|
||||
#{payload := #{<<"decoded">> := _}},
|
||||
PublishedA
|
||||
),
|
||||
#{payload := #{<<"decoded">> := DecodedA}} = PublishedA,
|
||||
?assertEqual(PayloadA, DecodedA),
|
||||
|
||||
EncodedBinB = apply(Serializer, [PayloadB | ExtraArgs]),
|
||||
emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
|
||||
PublishedB = receive_published(?LINE),
|
||||
?assertMatch(
|
||||
#{payload := #{<<"decoded">> := _}},
|
||||
PublishedB
|
||||
),
|
||||
#{payload := #{<<"decoded">> := DecodedB}} = PublishedB,
|
||||
?assertEqual(PayloadB, DecodedB),
|
||||
|
||||
ok.
|
||||
|
||||
t_protobuf_union_decode(Config) ->
|
||||
SerdeType = ?config(serde_type, Config),
|
||||
?assertEqual(protobuf, SerdeType),
|
||||
SerdeName = my_serde,
|
||||
ok = create_serde(SerdeType, SerdeName),
|
||||
#{
|
||||
sql := SQL,
|
||||
payload := #{a := PayloadA, b := PayloadB},
|
||||
extra_args := ExtraArgs
|
||||
} = test_params_for(SerdeType, union2),
|
||||
{ok, _} = create_rule_http(#{sql => SQL}),
|
||||
{ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
|
||||
|
||||
EncodedBinA = emqx_utils_json:encode(PayloadA),
|
||||
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
|
||||
PublishedA = receive_published(?LINE),
|
||||
?assertMatch(
|
||||
#{payload := #{<<"encoded">> := _}},
|
||||
PublishedA
|
||||
),
|
||||
#{payload := #{<<"encoded">> := EncodedA}} = PublishedA,
|
||||
?assertEqual(PayloadA, apply(Deserializer, [EncodedA | ExtraArgs])),
|
||||
|
||||
EncodedBinB = emqx_utils_json:encode(PayloadB),
|
||||
emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
|
||||
PublishedB = receive_published(?LINE),
|
||||
?assertMatch(
|
||||
#{payload := #{<<"encoded">> := _}},
|
||||
PublishedB
|
||||
),
|
||||
#{payload := #{<<"encoded">> := EncodedB}} = PublishedB,
|
||||
?assertEqual(PayloadB, apply(Deserializer, [EncodedB | ExtraArgs])),
|
||||
|
||||
ok.
|
||||
|
||||
t_fail_rollback(Config) ->
|
||||
SerdeType = ?config(serde_type, Config),
|
||||
OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
|
||||
|
@ -369,6 +621,10 @@ t_cluster_serde_build(Config) ->
|
|||
Cluster = cluster(Config),
|
||||
SerdeName = my_serde,
|
||||
Schema = schema_params(SerdeType),
|
||||
#{
|
||||
payload := Payload,
|
||||
extra_args := ExtraArgs
|
||||
} = test_params_for(SerdeType, encode_decode1),
|
||||
?check_trace(
|
||||
begin
|
||||
Nodes = [N1, N2 | _] = start_cluster(Cluster),
|
||||
|
@ -385,8 +641,14 @@ t_cluster_serde_build(Config) ->
|
|||
Res0 = emqx_ee_schema_registry:get_serde(SerdeName),
|
||||
?assertMatch({ok, #{}}, Res0, #{node => N}),
|
||||
{ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
|
||||
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
|
||||
?assertEqual(Payload, Deserializer(Serializer(Payload)), #{node => N}),
|
||||
?assertEqual(
|
||||
Payload,
|
||||
apply(
|
||||
Deserializer,
|
||||
[apply(Serializer, [Payload | ExtraArgs]) | ExtraArgs]
|
||||
),
|
||||
#{node => N}
|
||||
),
|
||||
ok
|
||||
end)
|
||||
end,
|
||||
|
@ -417,17 +679,11 @@ t_cluster_serde_build(Config) ->
|
|||
end,
|
||||
Nodes
|
||||
),
|
||||
ok
|
||||
#{serde_type => SerdeType, nodes => Nodes}
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assert(
|
||||
?strict_causality(
|
||||
#{?snk_kind := will_delete_schema},
|
||||
#{?snk_kind := serde_destroyed, type := SerdeType},
|
||||
Trace
|
||||
)
|
||||
),
|
||||
ok
|
||||
end
|
||||
[
|
||||
{"destructor is always called", fun ?MODULE:serde_deletion_calls_destructor_spec/2},
|
||||
{"protobuf is only built on one node", fun ?MODULE:protobuf_unique_cache_hit_spec/2}
|
||||
]
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -19,7 +19,17 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
[
|
||||
{group, avro},
|
||||
{group, protobuf}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||
[
|
||||
{avro, AllTCs},
|
||||
{protobuf, AllTCs}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
|
||||
|
@ -30,6 +40,48 @@ end_per_suite(_Config) ->
|
|||
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
|
||||
ok.
|
||||
|
||||
init_per_group(avro, Config) ->
|
||||
Source = #{
|
||||
type => record,
|
||||
fields => [
|
||||
#{name => <<"i">>, type => <<"int">>},
|
||||
#{name => <<"s">>, type => <<"string">>}
|
||||
]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
InvalidSourceBin = <<"{}">>,
|
||||
[
|
||||
{serde_type, avro},
|
||||
{schema_source, SourceBin},
|
||||
{invalid_schema_source, InvalidSourceBin}
|
||||
| Config
|
||||
];
|
||||
init_per_group(protobuf, Config) ->
|
||||
SourceBin =
|
||||
<<
|
||||
"message Person {\n"
|
||||
" required string name = 1;\n"
|
||||
" required int32 id = 2;\n"
|
||||
" optional string email = 3;\n"
|
||||
" }\n"
|
||||
"message UnionValue {\n"
|
||||
" oneof u {\n"
|
||||
" int32 a = 1;\n"
|
||||
" string b = 2;\n"
|
||||
" }\n"
|
||||
"}\n"
|
||||
>>,
|
||||
InvalidSourceBin = <<"xxxx">>,
|
||||
[
|
||||
{serde_type, protobuf},
|
||||
{schema_source, SourceBin},
|
||||
{invalid_schema_source, InvalidSourceBin}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_group(_Group, _Config) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
clear_schemas(),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
|
@ -93,18 +145,14 @@ clear_schemas() ->
|
|||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_crud(_Config) ->
|
||||
SchemaName = <<"my_avro_schema">>,
|
||||
Source = #{
|
||||
type => record,
|
||||
fields => [
|
||||
#{name => <<"i">>, type => <<"int">>},
|
||||
#{name => <<"s">>, type => <<"string">>}
|
||||
]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
t_crud(Config) ->
|
||||
SerdeType = ?config(serde_type, Config),
|
||||
SourceBin = ?config(schema_source, Config),
|
||||
InvalidSourceBin = ?config(invalid_schema_source, Config),
|
||||
SerdeTypeBin = atom_to_binary(SerdeType),
|
||||
SchemaName = <<"my_schema">>,
|
||||
Params = #{
|
||||
<<"type">> => <<"avro">>,
|
||||
<<"type">> => SerdeTypeBin,
|
||||
<<"source">> => SourceBin,
|
||||
<<"name">> => SchemaName,
|
||||
<<"description">> => <<"My schema">>
|
||||
|
@ -138,7 +186,7 @@ t_crud(_Config) ->
|
|||
%% create a schema
|
||||
?assertMatch(
|
||||
{ok, 201, #{
|
||||
<<"type">> := <<"avro">>,
|
||||
<<"type">> := SerdeTypeBin,
|
||||
<<"source">> := SourceBin,
|
||||
<<"name">> := SchemaName,
|
||||
<<"description">> := <<"My schema">>
|
||||
|
@ -147,7 +195,7 @@ t_crud(_Config) ->
|
|||
),
|
||||
?assertMatch(
|
||||
{ok, 200, #{
|
||||
<<"type">> := <<"avro">>,
|
||||
<<"type">> := SerdeTypeBin,
|
||||
<<"source">> := SourceBin,
|
||||
<<"name">> := SchemaName,
|
||||
<<"description">> := <<"My schema">>
|
||||
|
@ -157,7 +205,7 @@ t_crud(_Config) ->
|
|||
?assertMatch(
|
||||
{ok, 200, [
|
||||
#{
|
||||
<<"type">> := <<"avro">>,
|
||||
<<"type">> := SerdeTypeBin,
|
||||
<<"source">> := SourceBin,
|
||||
<<"name">> := SchemaName,
|
||||
<<"description">> := <<"My schema">>
|
||||
|
@ -168,7 +216,7 @@ t_crud(_Config) ->
|
|||
UpdateParams1 = UpdateParams#{<<"description">> := <<"My new schema">>},
|
||||
?assertMatch(
|
||||
{ok, 200, #{
|
||||
<<"type">> := <<"avro">>,
|
||||
<<"type">> := SerdeTypeBin,
|
||||
<<"source">> := SourceBin,
|
||||
<<"name">> := SchemaName,
|
||||
<<"description">> := <<"My new schema">>
|
||||
|
@ -188,9 +236,9 @@ t_crud(_Config) ->
|
|||
{ok, 400, #{
|
||||
<<"code">> := <<"BAD_REQUEST">>,
|
||||
<<"message">> :=
|
||||
<<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
|
||||
<<"{post_config_update,emqx_ee_schema_registry,", _/binary>>
|
||||
}},
|
||||
request({put, SchemaName, UpdateParams#{<<"source">> := <<"{}">>}})
|
||||
request({put, SchemaName, UpdateParams#{<<"source">> := InvalidSourceBin}})
|
||||
),
|
||||
|
||||
?assertMatch(
|
||||
|
@ -229,9 +277,9 @@ t_crud(_Config) ->
|
|||
{ok, 400, #{
|
||||
<<"code">> := <<"BAD_REQUEST">>,
|
||||
<<"message">> :=
|
||||
<<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
|
||||
<<"{post_config_update,emqx_ee_schema_registry,", _/binary>>
|
||||
}},
|
||||
request({post, Params#{<<"source">> := <<"{}">>}})
|
||||
request({post, Params#{<<"source">> := InvalidSourceBin}})
|
||||
),
|
||||
|
||||
%% unknown serde type
|
||||
|
|
|
@ -60,7 +60,25 @@ schema_params(avro) ->
|
|||
]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
#{type => avro, source => SourceBin}.
|
||||
#{type => avro, source => SourceBin};
|
||||
schema_params(protobuf) ->
|
||||
SourceBin =
|
||||
<<
|
||||
"\n"
|
||||
" message Person {\n"
|
||||
" required string name = 1;\n"
|
||||
" required int32 id = 2;\n"
|
||||
" optional string email = 3;\n"
|
||||
" }\n"
|
||||
" message UnionValue {\n"
|
||||
" oneof u {\n"
|
||||
" int32 a = 1;\n"
|
||||
" string b = 2;\n"
|
||||
" }\n"
|
||||
" }\n"
|
||||
" "
|
||||
>>,
|
||||
#{type => protobuf, source => SourceBin}.
|
||||
|
||||
assert_roundtrip(SerdeName, Original) ->
|
||||
Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original),
|
||||
|
@ -119,3 +137,36 @@ t_serde_not_found(_Config) ->
|
|||
emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_roundtrip_protobuf(_Config) ->
|
||||
SerdeName = my_serde,
|
||||
Params = schema_params(protobuf),
|
||||
ok = emqx_ee_schema_registry:add_schema(SerdeName, Params),
|
||||
ExtraArgsPerson = [<<"Person">>],
|
||||
|
||||
Original0 = #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
|
||||
assert_roundtrip(SerdeName, Original0, ExtraArgsPerson, ExtraArgsPerson),
|
||||
|
||||
%% removing optional field
|
||||
Original1 = #{<<"name">> => <<"some name">>, <<"id">> => 10},
|
||||
assert_roundtrip(SerdeName, Original1, ExtraArgsPerson, ExtraArgsPerson),
|
||||
|
||||
%% `oneof' fields
|
||||
ExtraArgsUnion = [<<"UnionValue">>],
|
||||
Original2 = #{<<"a">> => 1},
|
||||
assert_roundtrip(SerdeName, Original2, ExtraArgsUnion, ExtraArgsUnion),
|
||||
|
||||
Original3 = #{<<"b">> => <<"string">>},
|
||||
assert_roundtrip(SerdeName, Original3, ExtraArgsUnion, ExtraArgsUnion),
|
||||
|
||||
ok.
|
||||
|
||||
t_protobuf_invalid_schema(_Config) ->
|
||||
SerdeName = my_serde,
|
||||
Params = schema_params(protobuf),
|
||||
WrongParams = Params#{source := <<"xxxx">>},
|
||||
?assertMatch(
|
||||
{error, {post_config_update, _, {invalid_protobuf_schema, _}}},
|
||||
emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
|
||||
),
|
||||
ok.
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -94,7 +94,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:ranch,
|
||||
github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true},
|
||||
# in conflict by grpc and eetcd
|
||||
{:gpb, "4.19.5", override: true, runtime: false},
|
||||
{:gpb, "4.19.7", override: true, runtime: false},
|
||||
{:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}
|
||||
] ++
|
||||
emqx_apps(profile_info, version) ++
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
[ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
|
||||
, {redbug, "2.0.8"}
|
||||
, {covertool, {git, "https://github.com/zmstone/covertool", {tag, "2.0.4.1"}}}
|
||||
, {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||
, {gpb, "4.19.7"}
|
||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
|
||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}}
|
||||
|
|
|
@ -6,6 +6,12 @@ avro_type.desc:
|
|||
avro_type.label:
|
||||
"""Apache Avro"""
|
||||
|
||||
protobuf_type.desc:
|
||||
"""[Protocol Buffers](https://protobuf.dev/) serialization format."""
|
||||
|
||||
protobuf_type.label:
|
||||
"""Protocol Buffers"""
|
||||
|
||||
schema_description.desc:
|
||||
"""A description for this schema."""
|
||||
|
||||
|
|
|
@ -6,6 +6,12 @@ avro_type.desc:
|
|||
avro_type.label:
|
||||
"""阿帕奇-阿夫罗"""
|
||||
|
||||
protobuf_type.desc:
|
||||
"""[协议缓冲器](https://protobuf.dev/) 序列化格式。"""
|
||||
|
||||
protobuf_type.label:
|
||||
"""协议缓冲器"""
|
||||
|
||||
schema_description.desc:
|
||||
"""对该模式的描述。"""
|
||||
|
||||
|
|
Loading…
Reference in New Issue