diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index e831ec7d3..853eddcc0 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -10,32 +10,29 @@ %% Note: this has the `_ee_' segment for backwards compatibility. -define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard). --define(SERDE_TAB, emqx_ee_schema_registry_serde_tab). -define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab). +%% ETS table for serde build results. +-define(SERDE_TAB, emqx_schema_registry_serde_tab). + -define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, <<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">> ). -type schema_name() :: binary(). -type schema_source() :: binary(). +-type serde_args() :: list(). + -type encoded_data() :: iodata(). -type decoded_data() :: map(). --type serializer() :: - fun((decoded_data()) -> encoded_data()) - | fun((decoded_data(), term()) -> encoded_data()). --type deserializer() :: - fun((encoded_data()) -> decoded_data()) - | fun((encoded_data(), term()) -> decoded_data()). --type destructor() :: fun(() -> ok). --type serde_type() :: avro. + +-type serde_type() :: avro | protobuf. -type serde_opts() :: map(). -record(serde, { name :: schema_name(), - serializer :: serializer(), - deserializer :: deserializer(), - destructor :: destructor() + type :: serde_type(), + eval_context :: term() }). -type serde() :: #serde{}. @@ -50,11 +47,4 @@ module_binary :: binary() }. --type serde_map() :: #{ - name := schema_name(), - serializer := serializer(), - deserializer := deserializer(), - destructor := destructor() -}. - -endif. diff --git a/apps/emqx_schema_registry/rebar.config b/apps/emqx_schema_registry/rebar.config index 342737ee6..44082865b 100644 --- a/apps/emqx_schema_registry/rebar.config +++ b/apps/emqx_schema_registry/rebar.config @@ -5,7 +5,7 @@ {emqx, {path, "../emqx"}}, {emqx_utils, {path, "../emqx_utils"}}, {emqx_rule_engine, {path, "../emqx_rule_engine"}}, - {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}, + {erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}}, {gpb, "4.19.9"} ]}. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src index f4089fdc1..21430c5ff 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.app.src +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.1.8"}, + {vsn, "0.2.0"}, {registered, [emqx_schema_registry_sup]}, {mod, {emqx_schema_registry_app, []}}, {included_applications, [ diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index 925351258..fdc94f64c 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -14,7 +14,6 @@ %% API -export([ start_link/0, - get_serde/1, add_schema/2, get_schema/1, delete_schema/1, @@ -38,6 +37,11 @@ import_config/1 ]). +%% for testing +-export([ + get_serde/1 +]). + -type schema() :: #{ type := serde_type(), source := binary(), @@ -51,13 +55,13 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec get_serde(schema_name()) -> {ok, serde_map()} | {error, not_found}. +-spec get_serde(schema_name()) -> {ok, serde()} | {error, not_found}. get_serde(SchemaName) -> case ets:lookup(?SERDE_TAB, to_bin(SchemaName)) of [] -> {error, not_found}; [Serde] -> - {ok, serde_to_map(Serde)} + {ok, Serde} end. -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}. @@ -214,13 +218,7 @@ terminate(_Reason, _State) -> %%------------------------------------------------------------------------------------------------- create_tables() -> - ok = mria:create_table(?SERDE_TAB, [ - {type, ordered_set}, - {rlog_shard, ?SCHEMA_REGISTRY_SHARD}, - {storage, ram_copies}, - {record_name, serde}, - {attributes, record_info(fields, serde)} - ]), + ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]), ok = mria:create_table(?PROTOBUF_CACHE_TAB, [ {type, set}, {rlog_shard, ?SCHEMA_REGISTRY_SHARD}, @@ -228,7 +226,7 @@ create_tables() -> {record_name, protobuf_cache}, {attributes, record_info(fields, protobuf_cache)} ]), - ok = mria:wait_for_tables([?SERDE_TAB, ?PROTOBUF_CACHE_TAB]), + ok = mria:wait_for_tables([?PROTOBUF_CACHE_TAB]), ok. do_build_serdes(Schemas) -> @@ -290,15 +288,8 @@ do_build_serde(Name, Serde) when not is_binary(Name) -> do_build_serde(to_bin(Name), Serde); do_build_serde(Name, #{type := Type, source := Source}) -> try - {Serializer, Deserializer, Destructor} = - emqx_schema_registry_serde:make_serde(Type, Name, Source), - Serde = #serde{ - name = Name, - serializer = Serializer, - deserializer = Deserializer, - destructor = Destructor - }, - ok = mria:dirty_write(?SERDE_TAB, Serde), + Serde = emqx_schema_registry_serde:make_serde(Type, Name, Source), + true = ets:insert(?SERDE_TAB, Serde), ok catch Kind:Error:Stacktrace -> @@ -320,9 +311,9 @@ ensure_serde_absent(Name) when not is_binary(Name) -> ensure_serde_absent(to_bin(Name)); ensure_serde_absent(Name) -> case get_serde(Name) of - {ok, #{destructor := Destructor}} -> - Destructor(), - ok = mria:dirty_delete(?SERDE_TAB, Name); + {ok, Serde} -> + _ = ets:delete(?SERDE_TAB, Name), + ok = emqx_schema_registry_serde:destroy(Serde); {error, not_found} -> ok end. @@ -346,12 +337,3 @@ schema_name_bin_to_atom(Bin) when size(Bin) > 255 -> ); schema_name_bin_to_atom(Bin) -> binary_to_atom(Bin, utf8). - --spec serde_to_map(serde()) -> serde_map(). -serde_to_map(#serde{} = Serde) -> - #{ - name => Serde#serde.name, - serializer => Serde#serde.serializer, - deserializer => Serde#serde.deserializer, - destructor => Serde#serde.destructor - }. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 3cfd468eb..2be00a0b9 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -9,8 +9,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_schema_registry_serde]}}]). - %% API -export([ decode/2, @@ -18,7 +16,13 @@ encode/2, encode/3, make_serde/3, - handle_rule_function/2 + handle_rule_function/2, + destroy/1 +]). + +-export([ + eval_decode/2, + eval_encode/2 ]). %%------------------------------------------------------------------------------ @@ -70,8 +74,8 @@ decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) -> case emqx_schema_registry:get_serde(SerdeName) of {error, not_found} -> error({serde_not_found, SerdeName}); - {ok, #{deserializer := Deserializer}} -> - apply(Deserializer, [RawData | VarArgs]) + {ok, Serde} -> + eval_decode(Serde, [RawData | VarArgs]) end. -spec encode(schema_name(), decoded_data()) -> encoded_data(). @@ -83,55 +87,56 @@ encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) -> case emqx_schema_registry:get_serde(SerdeName) of {error, not_found} -> error({serde_not_found, SerdeName}); - {ok, #{serializer := Serializer}} -> - apply(Serializer, [EncodedData | VarArgs]) + {ok, Serde} -> + eval_encode(Serde, [EncodedData | VarArgs]) end. --spec make_serde(serde_type(), schema_name(), schema_source()) -> - {serializer(), deserializer(), destructor()}. -make_serde(avro, Name, Source0) -> - Source = inject_avro_name(Name, Source0), - Serializer = avro:make_simple_encoder(Source, _Opts = []), - Deserializer = avro:make_simple_decoder(Source, [{map_type, map}, {record_type, map}]), - Destructor = fun() -> - ?tp(serde_destroyed, #{type => avro, name => Name}), - ok - end, - {Serializer, Deserializer, Destructor}; +-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde(). +make_serde(avro, Name, Source) -> + Store0 = avro_schema_store:new([map]), + %% import the schema into the map store with an assigned name + %% if it's a named schema (e.g. struct), then Name is added as alias + Store = avro_schema_store:import_schema_json(Name, Source, Store0), + #serde{ + name = Name, + type = avro, + eval_context = Store + }; make_serde(protobuf, Name, Source) -> SerdeMod = make_protobuf_serde_mod(Name, Source), - Serializer = - fun(DecodedData0, MessageName0) -> - DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0), - MessageName = binary_to_existing_atom(MessageName0, utf8), - SerdeMod:encode_msg(DecodedData, MessageName) - end, - Deserializer = - fun(EncodedData, MessageName0) -> - MessageName = binary_to_existing_atom(MessageName0, utf8), - Decoded = SerdeMod:decode_msg(EncodedData, MessageName), - emqx_utils_maps:binary_key_map(Decoded) - end, - Destructor = - fun() -> - unload_code(SerdeMod), - ?tp(serde_destroyed, #{type => protobuf, name => Name}), - ok - end, - {Serializer, Deserializer, Destructor}. + #serde{ + name = Name, + type = protobuf, + eval_context = SerdeMod + }. + +eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> + Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]), + avro_binary_decoder:decode(Data, Name, Store, Opts); +eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) -> + MessageName = binary_to_existing_atom(MessageName0, utf8), + Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]), + emqx_utils_maps:binary_key_map(Decoded). + +eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) -> + avro_binary_encoder:encode(Store, Name, Data); +eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) -> + DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0), + MessageName = binary_to_existing_atom(MessageName0, utf8), + apply(SerdeMod, encode_msg, [DecodedData, MessageName]). + +destroy(#serde{type = avro, name = _Name}) -> + ?tp(serde_destroyed, #{type => avro, name => _Name}), + ok; +destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) -> + unload_code(SerdeMod), + ?tp(serde_destroyed, #{type => protobuf, name => _Name}), + ok. %%------------------------------------------------------------------------------ %% Internal fns %%------------------------------------------------------------------------------ --spec inject_avro_name(schema_name(), schema_source()) -> schema_source(). -inject_avro_name(Name, Source0) -> - %% The schema checks that the source is a valid JSON when - %% typechecking, so we shouldn't need to validate here. - Schema0 = emqx_utils_json:decode(Source0, [return_maps]), - Schema = Schema0#{<<"name">> => Name}, - emqx_utils_json:encode(Schema). - -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). make_protobuf_serde_mod(Name, Source) -> {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl index 3a4fc5607..ab311c578 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl @@ -131,6 +131,8 @@ create_rule_http(RuleParams, Overrides) -> schema_params(avro) -> Source = #{ type => record, + name => <<"test">>, + namespace => <<"emqx.com">>, fields => [ #{name => <<"i">>, type => <<"int">>}, #{name => <<"s">>, type => <<"string">>} @@ -403,16 +405,6 @@ wait_for_cluster_rpc(Node) -> true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler])) ). -serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) -> - ?assert( - ?strict_causality( - #{?snk_kind := will_delete_schema}, - #{?snk_kind := serde_destroyed, type := SerdeType}, - Trace - ) - ), - ok. - protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) -> #{nodes := Nodes} = Res, CacheEvents0 = ?of_kind( @@ -516,8 +508,8 @@ t_encode(Config) -> Published ), #{payload := Encoded} = Published, - {ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName), - ?assertEqual(Payload, apply(Deserializer, [Encoded | ExtraArgs])), + {ok, Serde} = emqx_schema_registry:get_serde(SerdeName), + ?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])), ok. t_decode(Config) -> @@ -530,8 +522,8 @@ t_decode(Config) -> extra_args := ExtraArgs } = test_params_for(SerdeType, decode1), {ok, _} = create_rule_http(#{sql => SQL}), - {ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName), - EncodedBin = apply(Serializer, [Payload | ExtraArgs]), + {ok, Serde} = emqx_schema_registry:get_serde(SerdeName), + EncodedBin = eval_encode(Serde, [Payload | ExtraArgs]), emqx:publish(emqx_message:make(<<"t">>, EncodedBin)), Published = receive_published(?LINE), ?assertMatch( @@ -553,9 +545,9 @@ t_protobuf_union_encode(Config) -> extra_args := ExtraArgs } = test_params_for(SerdeType, union1), {ok, _} = create_rule_http(#{sql => SQL}), - {ok, #{serializer := Serializer}} = emqx_schema_registry:get_serde(SerdeName), + {ok, Serde} = emqx_schema_registry:get_serde(SerdeName), - EncodedBinA = apply(Serializer, [PayloadA | ExtraArgs]), + EncodedBinA = eval_encode(Serde, [PayloadA | ExtraArgs]), emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)), PublishedA = receive_published(?LINE), ?assertMatch( @@ -565,7 +557,7 @@ t_protobuf_union_encode(Config) -> #{payload := #{<<"decoded">> := DecodedA}} = PublishedA, ?assertEqual(PayloadA, DecodedA), - EncodedBinB = apply(Serializer, [PayloadB | ExtraArgs]), + EncodedBinB = eval_encode(Serde, [PayloadB | ExtraArgs]), emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)), PublishedB = receive_published(?LINE), ?assertMatch( @@ -588,7 +580,7 @@ t_protobuf_union_decode(Config) -> extra_args := ExtraArgs } = test_params_for(SerdeType, union2), {ok, _} = create_rule_http(#{sql => SQL}), - {ok, #{deserializer := Deserializer}} = emqx_schema_registry:get_serde(SerdeName), + {ok, Serde} = emqx_schema_registry:get_serde(SerdeName), EncodedBinA = emqx_utils_json:encode(PayloadA), emqx:publish(emqx_message:make(<<"t">>, EncodedBinA)), @@ -598,7 +590,7 @@ t_protobuf_union_decode(Config) -> PublishedA ), #{payload := #{<<"encoded">> := EncodedA}} = PublishedA, - ?assertEqual(PayloadA, apply(Deserializer, [EncodedA | ExtraArgs])), + ?assertEqual(PayloadA, eval_decode(Serde, [EncodedA | ExtraArgs])), EncodedBinB = emqx_utils_json:encode(PayloadB), emqx:publish(emqx_message:make(<<"t">>, EncodedBinB)), @@ -608,7 +600,7 @@ t_protobuf_union_decode(Config) -> PublishedB ), #{payload := #{<<"encoded">> := EncodedB}} = PublishedB, - ?assertEqual(PayloadB, apply(Deserializer, [EncodedB | ExtraArgs])), + ?assertEqual(PayloadB, eval_decode(Serde, [EncodedB | ExtraArgs])), ok. @@ -633,7 +625,7 @@ t_fail_rollback(Config) -> #{} ) ), - ?assertMatch({ok, #{name := <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)), + ?assertMatch({ok, #serde{name = <<"a">>}}, emqx_schema_registry:get_serde(<<"a">>)), %% no z serdes should be in the table ?assertEqual({error, not_found}, emqx_schema_registry:get_serde(<<"z">>)), ok. @@ -659,16 +651,13 @@ t_cluster_serde_build(Config) -> %% check that we can serialize/deserialize in all nodes lists:foreach( fun(N) -> - erpc:call(N, fun() -> + ok = erpc:call(N, fun() -> Res0 = emqx_schema_registry:get_serde(SerdeName), - ?assertMatch({ok, #{}}, Res0, #{node => N}), - {ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0, + ?assertMatch({ok, #serde{}}, Res0, #{node => N}), + {ok, Serde} = Res0, ?assertEqual( Payload, - apply( - Deserializer, - [apply(Serializer, [Payload | ExtraArgs]) | ExtraArgs] - ), + encode_then_decode(Serde, Payload, ExtraArgs), #{node => N} ), ok @@ -676,8 +665,6 @@ t_cluster_serde_build(Config) -> end, Nodes ), - %% now we delete and check it's removed from the table - ?tp(will_delete_schema, #{}), {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := schema_registry_serdes_deleted}), NumNodes, @@ -687,7 +674,9 @@ t_cluster_serde_build(Config) -> ok, erpc:call(N1, emqx_schema_registry, delete_schema, [SerdeName]) ), - {ok, _} = snabbkaffe:receive_events(SRef1), + {ok, DeleteEvents} = snabbkaffe:receive_events(SRef1), + %% expect all nodes to delete (local) serdes + ?assertEqual(NumNodes, length(DeleteEvents)), lists:foreach( fun(N) -> erpc:call(N, fun() -> @@ -704,7 +693,6 @@ t_cluster_serde_build(Config) -> #{serde_type => SerdeType, nodes => Nodes} end, [ - {"destructor is always called", fun ?MODULE:serde_deletion_calls_destructor_spec/2}, {"protobuf is only built on one node", fun ?MODULE:protobuf_unique_cache_hit_spec/2} ] ), @@ -723,6 +711,8 @@ t_import_config(_Config) -> emqx_utils_json:encode( #{ type => <<"record">>, + name => <<"ct">>, + namespace => <<"emqx.com">>, fields => [ #{type => <<"int">>, name => <<"i">>}, #{type => <<"string">>, name => <<"s">>} @@ -869,3 +859,13 @@ t_sparkplug_decode_encode_with_message_name(_Config) -> Res = receive_action_results(), ?assertMatch(#{data := ExpectedRuleOutput}, Res), ok. + +eval_encode(Serde, Args) -> + emqx_schema_registry_serde:eval_encode(Serde, Args). + +eval_decode(Serde, Args) -> + emqx_schema_registry_serde:eval_decode(Serde, Args). + +encode_then_decode(Serde, Payload, ExtraArgs) -> + Encoded = eval_encode(Serde, [Payload | ExtraArgs]), + eval_decode(Serde, [Encoded | ExtraArgs]). diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl index 4a01beb93..77d940191 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl @@ -45,6 +45,7 @@ end_per_suite(_Config) -> init_per_group(avro, Config) -> Source = #{ type => record, + name => <<"apitest">>, fields => [ #{name => <<"i">>, type => <<"int">>}, #{name => <<"s">>, type => <<"string">>} diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl index 58c2ac864..1fa7124ac 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl @@ -54,6 +54,7 @@ clear_schemas() -> schema_params(avro) -> Source = #{ type => record, + name => <<"n1">>, fields => [ #{name => <<"i">>, type => <<"int">>}, #{name => <<"s">>, type => <<"string">>}