emqx/apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

440 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_schema_registry_serde).
-feature(maybe_expr, enable).
-behaviour(emqx_rule_funcs).
-include("emqx_schema_registry.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
make_serde/3,
handle_rule_function/2,
schema_check/3,
is_existing_type/1,
is_existing_type/2,
destroy/1
]).
%% Tests
-export([
decode/2,
decode/3,
encode/2,
encode/3,
eval_decode/2,
eval_encode/2
]).
%%------------------------------------------------------------------------------
%% Type definitions
%%------------------------------------------------------------------------------
-define(BOOL(SerdeName, EXPR),
try
_ = EXPR,
true
catch
error:Reason ->
?SLOG(debug, #{msg => "schema_check_failed", schema => SerdeName, reason => Reason}),
false
end
).
-type eval_context() :: term().
-type fingerprint() :: binary().
-type protobuf_cache_key() :: {schema_name(), fingerprint()}.
-export_type([serde_type/0]).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec is_existing_type(schema_name()) -> boolean().
is_existing_type(SchemaName) ->
is_existing_type(SchemaName, []).
-spec is_existing_type(schema_name(), [binary()]) -> boolean().
is_existing_type(SchemaName, Path) ->
maybe
{ok, #serde{type = SerdeType, eval_context = EvalContext}} ?=
emqx_schema_registry:get_serde(SchemaName),
has_inner_type(SerdeType, EvalContext, Path)
else
_ -> false
end.
-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
handle_rule_function(sparkplug_decode, [Data]) ->
handle_rule_function(
schema_decode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>]
);
handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
handle_rule_function(
schema_decode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
);
handle_rule_function(sparkplug_encode, [Term]) ->
handle_rule_function(
schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>]
);
handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
handle_rule_function(
schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
);
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
decode(SchemaId, Data, MoreArgs);
handle_rule_function(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
%% encode outputs iolists, but when the rule actions process those
%% it might wrongly encode them as JSON lists, so we force them to
%% binaries here.
IOList = encode(SchemaId, Term, MoreArgs),
iolist_to_binary(IOList);
handle_rule_function(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
handle_rule_function(schema_check, [SchemaId, Data | MoreArgs]) ->
schema_check(SchemaId, Data, MoreArgs);
handle_rule_function(_, _) ->
{error, no_match_for_function}.
-spec schema_check(schema_name(), decoded_data() | encoded_data(), [term()]) -> decoded_data().
schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_binary(Data) ->
with_serde(
SerdeName,
fun(Serde) ->
?BOOL(SerdeName, eval_decode(Serde, [Data | VarArgs]))
end
);
schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_map(Data) ->
with_serde(
SerdeName,
fun(Serde) ->
?BOOL(SerdeName, eval_encode(Serde, [Data | VarArgs]))
end
).
-spec decode(schema_name(), encoded_data()) -> decoded_data().
decode(SerdeName, RawData) ->
decode(SerdeName, RawData, []).
-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
with_serde(SerdeName, fun(Serde) ->
eval_decode(Serde, [RawData | VarArgs])
end).
-spec encode(schema_name(), decoded_data()) -> encoded_data().
encode(SerdeName, RawData) ->
encode(SerdeName, RawData, []).
-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
with_serde(
SerdeName,
fun(Serde) ->
eval_encode(Serde, [Data | VarArgs])
end
).
with_serde(Name, F) ->
case emqx_schema_registry:get_serde(Name) of
{ok, Serde} ->
Meta =
case logger:get_process_metadata() of
undefined -> #{};
Meta0 -> Meta0
end,
logger:update_process_metadata(#{schema_name => Name}),
try
F(Serde)
after
logger:set_process_metadata(Meta)
end;
{error, not_found} ->
error({serde_not_found, Name})
end.
-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
make_serde(avro, Name, Source) ->
Store0 = avro_schema_store:new([map]),
%% import the schema into the map store with an assigned name
%% if it's a named schema (e.g. struct), then Name is added as alias
Store = avro_schema_store:import_schema_json(Name, Source, Store0),
#serde{
name = Name,
type = avro,
eval_context = Store
};
make_serde(protobuf, Name, Source) ->
{CacheKey, SerdeMod} = make_protobuf_serde_mod(Name, Source),
#serde{
name = Name,
type = protobuf,
eval_context = SerdeMod,
extra = #{cache_key => CacheKey}
};
make_serde(json, Name, Source) ->
case json_decode(Source) of
SchemaObj when is_map(SchemaObj) ->
%% jesse:add_schema adds any map() without further validation
%% if it's not a map, then case_clause
ok = jesse_add_schema(Name, SchemaObj),
#serde{name = Name, type = json};
_NotMap ->
error({invalid_json_schema, bad_schema_object})
end.
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
avro_binary_decoder:decode(Data, Name, Store, Opts);
eval_decode(#serde{type = protobuf}, [#{} = DecodedData, MessageType]) ->
%% Already decoded, so it's an user error.
throw(
{schema_decode_error, #{
error_type => decoding_failure,
data => DecodedData,
message_type => MessageType,
explain =>
<<
"Attempted to schema decode an already decoded message."
" Check your rules or transformation pipeline."
>>
}}
);
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageType0]) ->
MessageType = binary_to_existing_atom(MessageType0, utf8),
try
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageType]),
emqx_utils_maps:binary_key_map(Decoded)
catch
error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
#{schema_name := SchemaName} = logger:get_process_metadata(),
throw(
{schema_decode_error, #{
error_type => decoding_failure,
data => EncodedData,
message_type => MessageType,
schema_name => SchemaName,
explain =>
<<"The given data could not be decoded. Please check the input data and the schema.">>
}}
)
end;
eval_decode(#serde{type = json, name = Name}, [Data]) ->
true = is_binary(Data),
Term = json_decode(Data),
{ok, NewTerm} = jesse_validate(Name, Term),
NewTerm.
eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
avro_binary_encoder:encode(Store, Name, Data);
eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
MessageName = binary_to_existing_atom(MessageName0, utf8),
apply(SerdeMod, encode_msg, [DecodedData, MessageName]);
eval_encode(#serde{type = json, name = Name}, [Map]) ->
%% The input Map may not be a valid JSON term for jesse
Data = iolist_to_binary(emqx_utils_json:encode(Map)),
NewMap = json_decode(Data),
case jesse_validate(Name, NewMap) of
{ok, _} ->
Data;
{error, Reason} ->
error(Reason)
end.
destroy(#serde{type = avro, name = _Name}) ->
?tp(serde_destroyed, #{type => avro, name => _Name}),
ok;
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod} = Serde) ->
unload_code(SerdeMod),
destroy_protobuf_code(Serde),
?tp(serde_destroyed, #{type => protobuf, name => _Name}),
ok;
destroy(#serde{type = json, name = Name}) ->
ok = jesse_del_schema(Name),
?tp(serde_destroyed, #{type => json, name => Name}),
ok.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
json_decode(Data) ->
emqx_utils_json:decode(Data, [return_maps]).
jesse_add_schema(Name, Obj) ->
jesse:add_schema(jesse_name(Name), Obj).
jesse_del_schema(Name) ->
jesse:del_schema(jesse_name(Name)).
jesse_validate(Name, Map) ->
jesse:validate(jesse_name(Name), Map, []).
jesse_name(Str) ->
unicode:characters_to_list(Str).
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> {protobuf_cache_key(), module()}.
make_protobuf_serde_mod(Name, Source) ->
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of
{ok, SerdeMod, ModBinary} ->
load_code(SerdeMod, SerdeModFileName, ModBinary),
CacheKey = protobuf_cache_key(Name, Source),
{CacheKey, SerdeMod};
{error, #{error := Error, warnings := Warnings}} ->
?SLOG(
warning,
#{
msg => "error_generating_protobuf_code",
error => Error,
warnings => Warnings
}
),
error({invalid_protobuf_schema, Error})
end.
-spec protobuf_serde_mod_name(schema_name()) -> {module(), string()}.
protobuf_serde_mod_name(Name) ->
%% must be a string (list)
SerdeModName = "$schema_parser_" ++ binary_to_list(Name),
SerdeMod = list_to_atom(SerdeModName),
%% the "path" to the module, for `code:load_binary'.
SerdeModFileName = SerdeModName ++ ".memory",
{SerdeMod, SerdeModFileName}.
%% Fixme: we cannot uncomment the following typespec because Dialyzer complains that
%% `Source' should be `string()' due to `gpb_compile:string/3', but it does work fine with
%% binaries...
%% -spec protobuf_cache_key(schema_name(), schema_source()) -> {schema_name(), fingerprint()}.
protobuf_cache_key(Name, Source) ->
{Name, erlang:md5(Source)}.
-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) ->
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
%% We run this inside a transaction with locks to avoid running
%% the compile on all nodes; only one will get the lock, compile
%% the schema, and other nodes will simply read the final result.
{atomic, Res} = mria:transaction(
?SCHEMA_REGISTRY_SHARD,
fun lazy_generate_protobuf_code_trans/3,
[Name, SerdeMod0, Source]
),
Res.
-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) ->
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
CacheKey = protobuf_cache_key(Name, Source),
_ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, CacheKey}, write),
case mnesia:read(?PROTOBUF_CACHE_TAB, CacheKey) of
[#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
?tp(schema_registry_protobuf_cache_hit, #{name => Name}),
{ok, SerdeMod, ModBinary};
[] ->
?tp(schema_registry_protobuf_cache_miss, #{name => Name}),
case generate_protobuf_code(SerdeMod0, Source) of
{ok, SerdeMod, ModBinary} ->
CacheEntry = #protobuf_cache{
fingerprint = CacheKey,
module = SerdeMod,
module_binary = ModBinary
},
ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
{ok, SerdeMod, ModBinary};
{ok, SerdeMod, ModBinary, _Warnings} ->
CacheEntry = #protobuf_cache{
fingerprint = CacheKey,
module = SerdeMod,
module_binary = ModBinary
},
ok = mnesia:write(?PROTOBUF_CACHE_TAB, CacheEntry, write),
{ok, SerdeMod, ModBinary};
error ->
{error, #{error => undefined, warnings => []}};
{error, Error} ->
{error, #{error => Error, warnings => []}};
{error, Error, Warnings} ->
{error, #{error => Error, warnings => Warnings}}
end
end.
generate_protobuf_code(SerdeMod, Source) ->
gpb_compile:string(
SerdeMod,
Source,
[
binary,
strings_as_binaries,
{maps, true},
%% Fixme: currently, some bug in `gpb' prevents this
%% option from working with `oneof' types... We're then
%% forced to use atom key maps.
%% {maps_key_type, binary},
{maps_oneof, flat},
{verify, always},
{maps_unset_optional, omitted}
]
).
-spec load_code(module(), string(), binary()) -> ok.
load_code(SerdeMod, SerdeModFileName, ModBinary) ->
_ = code:purge(SerdeMod),
{module, SerdeMod} = code:load_binary(SerdeMod, SerdeModFileName, ModBinary),
ok.
-spec unload_code(module()) -> ok.
unload_code(SerdeMod) ->
_ = code:purge(SerdeMod),
_ = code:delete(SerdeMod),
ok.
-spec destroy_protobuf_code(serde()) -> ok.
destroy_protobuf_code(Serde) ->
#serde{extra = #{cache_key := CacheKey}} = Serde,
{atomic, Res} = mria:transaction(
?SCHEMA_REGISTRY_SHARD,
fun destroy_protobuf_code_trans/1,
[CacheKey]
),
?tp("schema_registry_protobuf_cache_destroyed", #{name => Serde#serde.name}),
Res.
-spec destroy_protobuf_code_trans({schema_name(), fingerprint()}) -> ok.
destroy_protobuf_code_trans(CacheKey) ->
mnesia:delete(?PROTOBUF_CACHE_TAB, CacheKey, write).
-spec has_inner_type(serde_type(), eval_context(), [binary()]) ->
boolean().
has_inner_type(protobuf, _SerdeMod, [_, _ | _]) ->
%% Protobuf only has one level of message types.
false;
has_inner_type(protobuf, SerdeMod, [MessageTypeBin]) ->
try apply(SerdeMod, get_msg_names, []) of
Names ->
lists:member(MessageTypeBin, [atom_to_binary(N, utf8) || N <- Names])
catch
_:_ ->
false
end;
has_inner_type(_SerdeType, _EvalContext, []) ->
%% This function is only called if we already found a serde, so the root does exist.
true;
has_inner_type(_SerdeType, _EvalContext, _Path) ->
false.