refactor: change schema registry SERD_TAB to ets

Prior to this change, it's a mria/mnesia table
which may cause nodes to overwrite each other.
This commit is contained in:
Zaiming (Stone) Shi 2024-02-26 09:44:42 +01:00
parent de5e4491f7
commit 10c1245125
8 changed files with 109 additions and 130 deletions

View File

@ -10,32 +10,29 @@
%% Note: this has the `_ee_' segment for backwards compatibility.
-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).
%% ETS table for serde build results.
-define(SERDE_TAB, emqx_schema_registry_serde_tab).
-define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
<<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">>
).
-type schema_name() :: binary().
-type schema_source() :: binary().
-type serde_args() :: list().
-type encoded_data() :: iodata().
-type decoded_data() :: map().
-type serializer() ::
fun((decoded_data()) -> encoded_data())
| fun((decoded_data(), term()) -> encoded_data()).
-type deserializer() ::
fun((encoded_data()) -> decoded_data())
| fun((encoded_data(), term()) -> decoded_data()).
-type destructor() :: fun(() -> ok).
-type serde_type() :: avro.
-type serde_type() :: avro | protobuf.
-type serde_opts() :: map().
-record(serde, {
name :: schema_name(),
serializer :: serializer(),
deserializer :: deserializer(),
destructor :: destructor()
type :: serde_type(),
eval_context :: term()
}).
-type serde() :: #serde{}.
@ -50,11 +47,4 @@
module_binary :: binary()
}.
-type serde_map() :: #{
name := schema_name(),
serializer := serializer(),
deserializer := deserializer(),
destructor := destructor()
}.
-endif.

View File

@ -5,7 +5,7 @@
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_rule_engine, {path, "../emqx_rule_engine"}},
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
{erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
{gpb, "4.19.9"}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.1.8"},
{vsn, "0.2.0"},
{registered, [emqx_schema_registry_sup]},
{mod, {emqx_schema_registry_app, []}},
{included_applications, [

View File

@ -14,7 +14,6 @@
%% API
-export([
start_link/0,
get_serde/1,
add_schema/2,
get_schema/1,
delete_schema/1,
@ -38,6 +37,11 @@
import_config/1
]).
%% for testing
-export([
get_serde/1
]).
-type schema() :: #{
type := serde_type(),
source := binary(),
@ -51,13 +55,13 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_serde(schema_name()) -> {ok, serde_map()} | {error, not_found}.
-spec get_serde(schema_name()) -> {ok, serde()} | {error, not_found}.
get_serde(SchemaName) ->
case ets:lookup(?SERDE_TAB, to_bin(SchemaName)) of
[] ->
{error, not_found};
[Serde] ->
{ok, serde_to_map(Serde)}
{ok, Serde}
end.
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
@ -214,13 +218,7 @@ terminate(_Reason, _State) ->
%%-------------------------------------------------------------------------------------------------
create_tables() ->
ok = mria:create_table(?SERDE_TAB, [
{type, ordered_set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
{storage, ram_copies},
{record_name, serde},
{attributes, record_info(fields, serde)}
]),
ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]),
ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
{type, set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
@ -228,7 +226,7 @@ create_tables() ->
{record_name, protobuf_cache},
{attributes, record_info(fields, protobuf_cache)}
]),
ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]),
ok = mria:wait_for_tables([?PROTOBUF_CACHE_TAB]),
ok.
do_build_serdes(Schemas) ->
@ -290,15 +288,8 @@ do_build_serde(Name, Serde) when not is_binary(Name) ->
do_build_serde(to_bin(Name), Serde);
do_build_serde(Name, #{type := Type, source := Source}) ->
try
{Serializer, Deserializer, Destructor} =
emqx_schema_registry_serde:make_serde(Type, Name, Source),
Serde = #serde{
name = Name,
serializer = Serializer,
deserializer = Deserializer,
destructor = Destructor
},
ok = mria:dirty_write(?SERDE_TAB, Serde),
Serde = emqx_schema_registry_serde:make_serde(Type, Name, Source),
true = ets:insert(?SERDE_TAB, Serde),
ok
catch
Kind:Error:Stacktrace ->
@ -320,9 +311,9 @@ ensure_serde_absent(Name) when not is_binary(Name) ->
ensure_serde_absent(to_bin(Name));
ensure_serde_absent(Name) ->
case get_serde(Name) of
{ok, #{destructor := Destructor}} ->
Destructor(),
ok = mria:dirty_delete(?SERDE_TAB, Name);
{ok, Serde} ->
_ = ets:delete(?SERDE_TAB, Name),
ok = emqx_schema_registry_serde:destroy(Serde);
{error, not_found} ->
ok
end.
@ -346,12 +337,3 @@ schema_name_bin_to_atom(Bin) when size(Bin) > 255 ->
);
schema_name_bin_to_atom(Bin) ->
binary_to_atom(Bin, utf8).
-spec serde_to_map(serde()) -> serde_map().
serde_to_map(#serde{} = Serde) ->
#{
name => Serde#serde.name,
serializer => Serde#serde.serializer,
deserializer => Serde#serde.deserializer,
destructor => Serde#serde.destructor
}.

View File

@ -9,8 +9,6 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_schema_registry_serde]}}]).
%% API
-export([
decode/2,
@ -18,7 +16,13 @@
encode/2,
encode/3,
make_serde/3,
handle_rule_function/2
handle_rule_function/2,
destroy/1
]).
-export([
eval_decode/2,
eval_encode/2
]).
%%------------------------------------------------------------------------------
@ -70,8 +74,8 @@ decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
case emqx_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{deserializer := Deserializer}} ->
apply(Deserializer, [RawData | VarArgs])
{ok, Serde} ->
eval_decode(Serde, [RawData | VarArgs])
end.
-spec encode(schema_name(), decoded_data()) -> encoded_data().
@ -83,55 +87,56 @@ encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
case emqx_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{serializer := Serializer}} ->
apply(Serializer, [EncodedData | VarArgs])
{ok, Serde} ->
eval_encode(Serde, [EncodedData | VarArgs])
end.
-spec make_serde(serde_type(), schema_name(), schema_source()) ->
{serializer(), deserializer(), destructor()}.
make_serde(avro, Name, Source0) ->
Source = inject_avro_name(Name, Source0),
Serializer = avro:make_simple_encoder(Source, _Opts = []),
Deserializer = avro:make_simple_decoder(Source, [{map_type, map}, {record_type, map}]),
Destructor = fun() ->
?tp(serde_destroyed, #{type => avro, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor};
-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
make_serde(avro, Name, Source) ->
Store0 = avro_schema_store:new([map]),
%% import the schema into the map store with an assigned name
%% if it's a named schema (e.g. struct), then Name is added as alias
Store = avro_schema_store:import_schema_json(Name, Source, Store0),
#serde{
name = Name,
type = avro,
eval_context = Store
};
make_serde(protobuf, Name, Source) ->
SerdeMod = make_protobuf_serde_mod(Name, Source),
Serializer =
fun(DecodedData0, MessageName0) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8),
SerdeMod:encode_msg(DecodedData, MessageName)
end,
Deserializer =
fun(EncodedData, MessageName0) ->
MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = SerdeMod:decode_msg(EncodedData, MessageName),
emqx_utils_maps:binary_key_map(Decoded)
end,
Destructor =
fun() ->
unload_code(SerdeMod),
?tp(serde_destroyed, #{type => protobuf, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor}.
#serde{
name = Name,
type = protobuf,
eval_context = SerdeMod
}.
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
avro_binary_decoder:decode(Data, Name, Store, Opts);
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
MessageName = binary_to_existing_atom(MessageName0, utf8),
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
emqx_utils_maps:binary_key_map(Decoded).
eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
avro_binary_encoder:encode(Store, Name, Data);
eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8),
apply(SerdeMod, encode_msg, [DecodedData, MessageName]).
destroy(#serde{type = avro, name = _Name}) ->
?tp(serde_destroyed, #{type => avro, name => _Name}),
ok;
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
unload_code(SerdeMod),
?tp(serde_destroyed, #{type => protobuf, name => _Name}),
ok.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
-spec inject_avro_name(schema_name(), schema_source()) -> schema_source().
inject_avro_name(Name, Source0) ->
%% The schema checks that the source is a valid JSON when
%% typechecking, so we shouldn't need to validate here.
Schema0 = emqx_utils_json:decode(Source0, [return_maps]),
Schema = Schema0#{<<"name">> => Name},
emqx_utils_json:encode(Schema).
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
make_protobuf_serde_mod(Name, Source) ->
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),

View File

@ -131,6 +131,8 @@ create_rule_http(RuleParams, Overrides) ->
schema_params(avro) ->
Source = #{
type => record,
name => <<"test">>,
namespace => <<"emqx.com">>,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
@ -403,16 +405,6 @@ wait_for_cluster_rpc(Node) ->
true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
).
serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) ->
?assert(
?strict_causality(
#{?snk_kind := will_delete_schema},
#{?snk_kind := serde_destroyed, type := SerdeType},
Trace
)
),
ok.
protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) ->
#{nodes := Nodes} = Res,
CacheEvents0 = ?of_kind(
@ -516,8 +508,8 @@ t_encode(Config) ->
Published
),
#{payload := Encoded} = Published,
{ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])),
ok.
t_decode(Config) ->
@ -530,8 +522,8 @@ t_decode(Config) ->
extra_args := ExtraArgs
} = test_params_for(SerdeType, decode1),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName),
EncodedBin = apply(Serializer, [Payload | ExtraArgs]),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
EncodedBin = eval_encode(Serde, [Payload | ExtraArgs]),
emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
Published = receive_published(?LINE),
?assertMatch(
@ -553,9 +545,9 @@ t_protobuf_union_encode(Config) ->
extra_args := ExtraArgs
} = test_params_for(SerdeType, union1),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]),
EncodedBinA = eval_encode(Serde, [PayloadA | ExtraArgs]),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
PublishedA = receive_published(?LINE),
?assertMatch(
@ -565,7 +557,7 @@ t_protobuf_union_encode(Config) ->
#{payload := #{<<"decoded">> := DecodedA}} = PublishedA,
?assertEqual(PayloadA, DecodedA),
EncodedBinB = apply(Serializer, [PayloadB | ExtraArgs]),
EncodedBinB = eval_encode(Serde, [PayloadB | ExtraArgs]),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
PublishedB = receive_published(?LINE),
?assertMatch(
@ -588,7 +580,7 @@ t_protobuf_union_decode(Config) ->
extra_args := ExtraArgs
} = test_params_for(SerdeType, union2),
{ok, _} = create_rule_http(#{sql => SQL}),
{ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName),
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
EncodedBinA = emqx_utils_json:encode(PayloadA),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)),
@ -598,7 +590,7 @@ t_protobuf_union_decode(Config) ->
PublishedA
),
#{payload := #{<<"encoded">> := EncodedA}} = PublishedA,
?assertEqual(PayloadA, apply(Deserializer, [EncodedA | ExtraArgs])),
?assertEqual(PayloadA, eval_decode(Serde, [EncodedA | ExtraArgs])),
EncodedBinB = emqx_utils_json:encode(PayloadB),
emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)),
@ -608,7 +600,7 @@ t_protobuf_union_decode(Config) ->
PublishedB
),
#{payload := #{<<"encoded">> := EncodedB}} = PublishedB,
?assertEqual(PayloadB, apply(Deserializer, [EncodedB | ExtraArgs])),
?assertEqual(PayloadB, eval_decode(Serde, [EncodedB | ExtraArgs])),
ok.
@ -633,7 +625,7 @@ t_fail_rollback(Config) ->
#{}
)
),
?assertMatch({ok, #{name := <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)),
?assertMatch({ok, #serde{name = <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)),
%% no z serdes should be in the table
?assertEqual({error, not_found}, emqx_schema_registry:get_serde(<<"z">>)),
ok.
@ -659,16 +651,13 @@ t_cluster_serde_build(Config) ->
%% check that we can serialize/deserialize in all nodes
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
ok = erpc:call(N, fun() ->
Res0 = emqx_schema_registry:get_serde(SerdeName),
?assertMatch({ok, #{}}, Res0, #{node => N}),
{ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
?assertMatch({ok, #serde{}}, Res0, #{node => N}),
{ok, Serde} = Res0,
?assertEqual(
Payload,
apply(
Deserializer,
[apply(Serializer, [Payload | ExtraArgs]) | ExtraArgs]
),
encode_then_decode(Serde, Payload, ExtraArgs),
#{node => N}
),
ok
@ -676,8 +665,6 @@ t_cluster_serde_build(Config) ->
end,
Nodes
),
%% now we delete and check it's removed from the table
?tp(will_delete_schema, #{}),
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := schema_registry_serdes_deleted}),
NumNodes,
@ -687,7 +674,9 @@ t_cluster_serde_build(Config) ->
ok,
erpc:call(N1, emqx_schema_registry, delete_schema, [SerdeName])
),
{ok, _} = snabbkaffe:receive_events(SRef1),
{ok, DeleteEvents} = snabbkaffe:receive_events(SRef1),
%% expect all nodes to delete (local) serdes
?assertEqual(NumNodes, length(DeleteEvents)),
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
@ -704,7 +693,6 @@ t_cluster_serde_build(Config) ->
#{serde_type => SerdeType, nodes => Nodes}
end,
[
{"destructor is always called", fun ?MODULE:serde_deletion_calls_destructor_spec/2},
{"protobuf is only built on one node", fun ?MODULE:protobuf_unique_cache_hit_spec/2}
]
),
@ -723,6 +711,8 @@ t_import_config(_Config) ->
emqx_utils_json:encode(
#{
type => <<"record">>,
name => <<"ct">>,
namespace => <<"emqx.com">>,
fields => [
#{type => <<"int">>, name => <<"i">>},
#{type => <<"string">>, name => <<"s">>}
@ -869,3 +859,13 @@ t_sparkplug_decode_encode_with_message_name(_Config) ->
Res = receive_action_results(),
?assertMatch(#{data := ExpectedRuleOutput}, Res),
ok.
eval_encode(Serde, Args) ->
emqx_schema_registry_serde:eval_encode(Serde, Args).
eval_decode(Serde, Args) ->
emqx_schema_registry_serde:eval_decode(Serde, Args).
encode_then_decode(Serde, Payload, ExtraArgs) ->
Encoded = eval_encode(Serde, [Payload | ExtraArgs]),
eval_decode(Serde, [Encoded | ExtraArgs]).

View File

@ -45,6 +45,7 @@ end_per_suite(_Config) ->
init_per_group(avro, Config) ->
Source = #{
type => record,
name => <<"apitest">>,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}

View File

@ -54,6 +54,7 @@ clear_schemas() ->
schema_params(avro) ->
Source = #{
type => record,
name => <<"n1">>,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}