fix(schema registry): clear protobuf code cache when deleting/updating serde

Fixes https://emqx.atlassian.net/browse/EMQX-12789
This commit is contained in:
Thales Macedo Garitezi 2024-07-30 12:06:24 -03:00
parent c347c2c285
commit fd961f9da7
5 changed files with 112 additions and 16 deletions

View File

@ -34,7 +34,7 @@
type :: serde_type(),
eval_context :: term(),
%% for future use
extra = []
extra = #{}
}).
-type serde() :: #serde{}.

View File

@ -148,14 +148,19 @@ post_config_update(
post_config_update(
[?CONF_KEY_ROOT, schemas, NewName],
_Cmd,
NewSchemas,
%% undefined or OldSchemas
_,
NewSchema,
OldSchema,
_AppEnvs
) ->
case build_serdes([{NewName, NewSchemas}]) of
case OldSchema of
undefined ->
ok;
_ ->
ensure_serde_absent(NewName)
end,
case build_serdes([{NewName, NewSchema}]) of
ok ->
{ok, #{NewName => NewSchemas}};
{ok, #{NewName => NewSchema}};
{error, Reason, SerdesToRollback} ->
lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
{error, Reason}
@ -176,6 +181,7 @@ post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, Old
async_delete_serdes(RemovedNames)
end,
SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
ok = lists:foreach(fun ensure_serde_absent/1, [N || {N, _} <- SchemasToBuild]),
case build_serdes(SchemasToBuild) of
ok ->
{ok, NewConf};

View File

@ -48,6 +48,10 @@
-type eval_context() :: term().
-type fingerprint() :: binary().
-type protobuf_cache_key() :: {schema_name(), fingerprint()}.
-export_type([serde_type/0]).
%%------------------------------------------------------------------------------
@ -175,11 +179,12 @@ make_serde(avro, Name, Source) ->
eval_context = Store
};
make_serde(protobuf, Name, Source) ->
SerdeMod = make_protobuf_serde_mod(Name, Source),
{CacheKey, SerdeMod} = make_protobuf_serde_mod(Name, Source),
#serde{
name = Name,
type = protobuf,
eval_context = SerdeMod
eval_context = SerdeMod,
extra = #{cache_key => CacheKey}
};
make_serde(json, Name, Source) ->
case json_decode(Source) of
@ -254,8 +259,9 @@ eval_encode(#serde{type = json, name = Name}, [Map]) ->
destroy(#serde{type = avro, name = _Name}) ->
?tp(serde_destroyed, #{type => avro, name => _Name}),
ok;
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod} = Serde) ->
unload_code(SerdeMod),
destroy_protobuf_code(Serde),
?tp(serde_destroyed, #{type => protobuf, name => _Name}),
ok;
destroy(#serde{type = json, name = Name}) ->
@ -282,13 +288,14 @@ jesse_validate(Name, Map) ->
jesse_name(Str) ->
unicode:characters_to_list(Str).
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> {protobuf_cache_key(), module()}.
make_protobuf_serde_mod(Name, Source) ->
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of
{ok, SerdeMod, ModBinary} ->
load_code(SerdeMod, SerdeModFileName, ModBinary),
SerdeMod;
CacheKey = protobuf_cache_key(Name, Source),
{CacheKey, SerdeMod};
{error, #{error := Error, warnings := Warnings}} ->
?SLOG(
warning,
@ -310,6 +317,13 @@ protobuf_serde_mod_name(Name) ->
SerdeModFileName = SerdeModName ++ ".memory",
{SerdeMod, SerdeModFileName}.
%% Fixme: we cannot uncomment the following typespec because Dialyzer complains that
%% `Source' should be `string()' due to `gpb_compile:string/3', but it does work fine with
%% binaries...
%% -spec protobuf_cache_key(schema_name(), schema_source()) -> {schema_name(), fingerprint()}.
protobuf_cache_key(Name, Source) ->
{Name, erlang:md5(Source)}.
-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) ->
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
@ -326,9 +340,9 @@ lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) ->
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
Fingerprint = erlang:md5(Source),
_ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
CacheKey = protobuf_cache_key(Name, Source),
_ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, CacheKey}, write),
case mnesia:read(?PROTOBUF_CACHE_TAB, CacheKey) of
[#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
?tp(schema_registry_protobuf_cache_hit, #{name => Name}),
{ok, SerdeMod, ModBinary};
@ -337,7 +351,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
case generate_protobuf_code(SerdeMod0, Source) of
{ok, SerdeMod, ModBinary} ->
CacheEntry = #protobuf_cache{
fingerprint = Fingerprint,
fingerprint = CacheKey,
module = SerdeMod,
module_binary = ModBinary
},
@ -345,7 +359,7 @@ lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
{ok, SerdeMod, ModBinary};
{ok, SerdeMod, ModBinary, _Warnings} ->
CacheEntry = #protobuf_cache{
fingerprint = Fingerprint,
fingerprint = CacheKey,
module = SerdeMod,
module_binary = ModBinary
},
@ -390,6 +404,21 @@ unload_code(SerdeMod) ->
_ = code:delete(SerdeMod),
ok.
-spec destroy_protobuf_code(serde()) -> ok.
destroy_protobuf_code(Serde) ->
#serde{extra = #{cache_key := CacheKey}} = Serde,
{atomic, Res} = mria:transaction(
?SCHEMA_REGISTRY_SHARD,
fun destroy_protobuf_code_trans/1,
[CacheKey]
),
?tp("schema_registry_protobuf_cache_destroyed", #{name => Serde#serde.name}),
Res.
-spec destroy_protobuf_code_trans({schema_name(), fingerprint()}) -> ok.
destroy_protobuf_code_trans(CacheKey) ->
mnesia:delete(?PROTOBUF_CACHE_TAB, CacheKey, write).
-spec has_inner_type(serde_type(), eval_context(), [binary()]) ->
boolean().
has_inner_type(protobuf, _SerdeMod, [_, _ | _]) ->

View File

@ -207,6 +207,66 @@ t_protobuf_invalid_schema(_Config) ->
),
ok.
%% Checks that we unload code and clear code generation cache after destroying a protobuf
%% serde.
t_destroy_protobuf(_Config) ->
SerdeName = ?FUNCTION_NAME,
SerdeNameBin = atom_to_binary(SerdeName),
?check_trace(
#{timetrap => 5_000},
begin
Params = schema_params(protobuf),
ok = emqx_schema_registry:add_schema(SerdeName, Params),
{ok, {ok, _}} =
?wait_async_action(
emqx_schema_registry:delete_schema(SerdeName),
#{?snk_kind := serde_destroyed, name := SerdeNameBin}
),
%% Create again to check we don't hit the cache.
ok = emqx_schema_registry:add_schema(SerdeName, Params),
{ok, {ok, _}} =
?wait_async_action(
emqx_schema_registry:delete_schema(SerdeName),
#{?snk_kind := serde_destroyed, name := SerdeNameBin}
),
ok
end,
fun(Trace) ->
?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)),
?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)),
ok
end
),
ok.
%% Checks that we don't leave entries lingering in the protobuf code cache table when
%% updating the source of a serde.
t_update_protobuf_cache(_Config) ->
SerdeName = ?FUNCTION_NAME,
?check_trace(
#{timetrap => 5_000},
begin
#{source := Source0} = Params0 = schema_params(protobuf),
ok = emqx_schema_registry:add_schema(SerdeName, Params0),
%% Now we touch the source so protobuf needs to be recompiled.
Source1 = <<Source0/binary, "\n\n">>,
Params1 = Params0#{source := Source1},
{ok, {ok, _}} =
?wait_async_action(
emqx_schema_registry:add_schema(SerdeName, Params1),
#{?snk_kind := "schema_registry_protobuf_cache_destroyed"}
),
ok
end,
fun(Trace) ->
?assertMatch([], ?of_kind(schema_registry_protobuf_cache_hit, Trace)),
?assertMatch([_, _ | _], ?of_kind(schema_registry_protobuf_cache_miss, Trace)),
?assertMatch([_ | _], ?of_kind("schema_registry_protobuf_cache_destroyed", Trace)),
ok
end
),
ok.
t_json_invalid_schema(_Config) ->
SerdeName = invalid_json,
Params = schema_params(json),

View File

@ -0,0 +1 @@
Fixed an issue where the internal cache for Protobuf schemas in Schema Registry was not properly cleaned up after deleting or updating a schema.