From fd961f9da7a973422680e0659dfda96e8d0ee392 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 30 Jul 2024 12:06:24 -0300 Subject: [PATCH] fix(schema registry): clear protobuf code cache when deleting/updating serde Fixes https://emqx.atlassian.net/browse/EMQX-12789 --- .../include/emqx_schema_registry.hrl | 2 +- .../src/emqx_schema_registry.erl | 16 +++-- .../src/emqx_schema_registry_serde.erl | 49 +++++++++++---- .../test/emqx_schema_registry_serde_SUITE.erl | 60 +++++++++++++++++++ changes/ee/fix-13543.en.md | 1 + 5 files changed, 112 insertions(+), 16 deletions(-) create mode 100644 changes/ee/fix-13543.en.md diff --git a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl index b25042c20..289b167fd 100644 --- a/apps/emqx_schema_registry/include/emqx_schema_registry.hrl +++ b/apps/emqx_schema_registry/include/emqx_schema_registry.hrl @@ -34,7 +34,7 @@ type :: serde_type(), eval_context :: term(), %% for future use - extra = [] + extra = #{} }). -type serde() :: #serde{}. diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry.erl b/apps/emqx_schema_registry/src/emqx_schema_registry.erl index f8d760ddc..3919cd7be 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry.erl @@ -148,14 +148,19 @@ post_config_update( post_config_update( [?CONF_KEY_ROOT, schemas, NewName], _Cmd, - NewSchemas, - %% undefined or OldSchemas - _, + NewSchema, + OldSchema, _AppEnvs ) -> - case build_serdes([{NewName, NewSchemas}]) of + case OldSchema of + undefined -> + ok; + _ -> + ensure_serde_absent(NewName) + end, + case build_serdes([{NewName, NewSchema}]) of ok -> - {ok, #{NewName => NewSchemas}}; + {ok, #{NewName => NewSchema}}; {error, Reason, SerdesToRollback} -> lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), {error, Reason} @@ -176,6 +181,7 @@ post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, Old async_delete_serdes(RemovedNames) end, SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), + ok = lists:foreach(fun ensure_serde_absent/1, [N || {N, _} <- SchemasToBuild]), case build_serdes(SchemasToBuild) of ok -> {ok, NewConf}; diff --git a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl index 0e661c356..b5944a68e 100644 --- a/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl +++ b/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl @@ -48,6 +48,10 @@ -type eval_context() :: term(). +-type fingerprint() :: binary(). + +-type protobuf_cache_key() :: {schema_name(), fingerprint()}. + -export_type([serde_type/0]). %%------------------------------------------------------------------------------ @@ -175,11 +179,12 @@ make_serde(avro, Name, Source) -> eval_context = Store }; make_serde(protobuf, Name, Source) -> - SerdeMod = make_protobuf_serde_mod(Name, Source), + {CacheKey, SerdeMod} = make_protobuf_serde_mod(Name, Source), #serde{ name = Name, type = protobuf, - eval_context = SerdeMod + eval_context = SerdeMod, + extra = #{cache_key => CacheKey} }; make_serde(json, Name, Source) -> case json_decode(Source) of @@ -254,8 +259,9 @@ eval_encode(#serde{type = json, name = Name}, [Map]) -> destroy(#serde{type = avro, name = _Name}) -> ?tp(serde_destroyed, #{type => avro, name => _Name}), ok; -destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) -> +destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod} = Serde) -> unload_code(SerdeMod), + destroy_protobuf_code(Serde), ?tp(serde_destroyed, #{type => protobuf, name => _Name}), ok; destroy(#serde{type = json, name = Name}) -> @@ -282,13 +288,14 @@ jesse_validate(Name, Map) -> jesse_name(Str) -> unicode:characters_to_list(Str). --spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). +-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> {protobuf_cache_key(), module()}. make_protobuf_serde_mod(Name, Source) -> {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> load_code(SerdeMod, SerdeModFileName, ModBinary), - SerdeMod; + CacheKey = protobuf_cache_key(Name, Source), + {CacheKey, SerdeMod}; {error, #{error := Error, warnings := Warnings}} -> ?SLOG( warning, @@ -310,6 +317,13 @@ protobuf_serde_mod_name(Name) -> SerdeModFileName = SerdeModName ++ ".memory", {SerdeMod, SerdeModFileName}. +%% Fixme: we cannot uncomment the following typespec because Dialyzer complains that +%% `Source' should be `string()' due to `gpb_compile:string/3', but it does work fine with +%% binaries... +%% -spec protobuf_cache_key(schema_name(), schema_source()) -> {schema_name(), fingerprint()}. +protobuf_cache_key(Name, Source) -> + {Name, erlang:md5(Source)}. + -spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. lazy_generate_protobuf_code(Name, SerdeMod0, Source) -> @@ -326,9 +340,9 @@ lazy_generate_protobuf_code(Name, SerdeMod0, Source) -> -spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> - Fingerprint = erlang:md5(Source), - _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write), - case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of + CacheKey = protobuf_cache_key(Name, Source), + _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, CacheKey}, write), + case mnesia:read(?PROTOBUF_CACHE_TAB, CacheKey) of [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] -> ?tp(schema_registry_protobuf_cache_hit, #{name => Name}), {ok, SerdeMod, ModBinary}; @@ -337,7 +351,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> case generate_protobuf_code(SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> CacheEntry = #protobuf_cache{ - fingerprint = Fingerprint, + fingerprint = CacheKey, module = SerdeMod, module_binary = ModBinary }, @@ -345,7 +359,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> {ok, SerdeMod, ModBinary}; {ok, SerdeMod, ModBinary, _Warnings} -> CacheEntry = #protobuf_cache{ - fingerprint = Fingerprint, + fingerprint = CacheKey, module = SerdeMod, module_binary = ModBinary }, @@ -390,6 +404,21 @@ unload_code(SerdeMod) -> _ = code:delete(SerdeMod), ok. +-spec destroy_protobuf_code(serde()) -> ok. +destroy_protobuf_code(Serde) -> + #serde{extra = #{cache_key := CacheKey}} = Serde, + {atomic, Res} = mria:transaction( + ?SCHEMA_REGISTRY_SHARD, + fun destroy_protobuf_code_trans/1, + [CacheKey] + ), + ?tp("schema_registry_protobuf_cache_destroyed", #{name => Serde#serde.name}), + Res. + +-spec destroy_protobuf_code_trans({schema_name(), fingerprint()}) -> ok. +destroy_protobuf_code_trans(CacheKey) -> + mnesia:delete(?PROTOBUF_CACHE_TAB, CacheKey, write). + -spec has_inner_type(serde_type(), eval_context(), [binary()]) -> boolean(). has_inner_type(protobuf, _SerdeMod, [_, _ | _]) -> diff --git a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl index bdc083736..685e152e6 100644 --- a/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl +++ b/apps/emqx_schema_registry/test/emqx_schema_registry_serde_SUITE.erl @@ -207,6 +207,66 @@ t_protobuf_invalid_schema(_Config) -> ), ok. +%% Checks that we unload code and clear code generation cache after destroying a protobuf +%% serde. +t_destroy_protobuf(_Config) -> + SerdeName = ?FUNCTION_NAME, + SerdeNameBin = atom_to_binary(SerdeName), + ?check_trace( + #{timetrap => 5_000}, + begin + Params = schema_params(protobuf), + ok = emqx_schema_registry:add_schema(SerdeName, Params), + {ok, {ok, _}} = + ?wait_async_action( + emqx_schema_registry:delete_schema(SerdeName), + #{?snk_kind := serde_destroyed, name := SerdeNameBin} + ), + %% Create again to check we don't hit the cache. + ok = emqx_schema_registry:add_schema(SerdeName, Params), + {ok, {ok, _}} = + ?wait_async_action( + emqx_schema_registry:delete_schema(SerdeName), + #{?snk_kind := serde_destroyed, name := SerdeNameBin} + ), + ok + end, + fun(Trace) -> + ?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)), + ?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)), + ok + end + ), + ok. + +%% Checks that we don't leave entries lingering in the protobuf code cache table when +%% updating the source of a serde. +t_update_protobuf_cache(_Config) -> + SerdeName = ?FUNCTION_NAME, + ?check_trace( + #{timetrap => 5_000}, + begin + #{source := Source0} = Params0 = schema_params(protobuf), + ok = emqx_schema_registry:add_schema(SerdeName, Params0), + %% Now we touch the source so protobuf needs to be recompiled. + Source1 = <>, + Params1 = Params0#{source := Source1}, + {ok, {ok, _}} = + ?wait_async_action( + emqx_schema_registry:add_schema(SerdeName, Params1), + #{?snk_kind := "schema_registry_protobuf_cache_destroyed"} + ), + ok + end, + fun(Trace) -> + ?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)), + ?assertMatch([_, _ | _], ?of_kind(schema_registry_protobuf_cache_miss, Trace)), + ?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)), + ok + end + ), + ok. + t_json_invalid_schema(_Config) -> SerdeName = invalid_json, Params = schema_params(json), diff --git a/changes/ee/fix-13543.en.md b/changes/ee/fix-13543.en.md new file mode 100644 index 000000000..f9f56a5a6 --- /dev/null +++ b/changes/ee/fix-13543.en.md @@ -0,0 +1 @@ +Fixed an issue where the internal cache for Protobuf schemas in Schema Registry was not properly cleaned up after deleting or updating a schema.