feat: implement schema registry for 5.0 (avro)

Part of https://emqx.atlassian.net/browse/EMQX-9251

This ports part of the Schema Registry app from 4.x to 5.0.  Here,
only support for Avro is added.  Subsequent PRs will follow to add
support for other formats.
This commit is contained in:
Thales Macedo Garitezi 2023-04-04 08:55:43 -03:00
parent eeb7b32bc8
commit 33100ecca6
29 changed files with 1941 additions and 30 deletions

View File

@ -44,6 +44,7 @@
-type port_number() :: 1..65536.
-type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}.
-type url() :: binary().
-type json_binary() :: binary().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
@ -58,6 +59,7 @@
-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}).
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
-typerefl_from_string({url/0, emqx_schema, to_url}).
-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}).
-export([
validate_heap_size/1,
@ -84,7 +86,8 @@
to_ip_port/1,
to_erl_cipher_suite/1,
to_comma_separated_atoms/1,
to_url/1
to_url/1,
to_json_binary/1
]).
-export([
@ -112,7 +115,8 @@
ip_port/0,
cipher/0,
comma_separated_atoms/0,
url/0
url/0,
json_binary/0
]).
-export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
@ -2576,6 +2580,14 @@ to_url(Str) ->
Error
end.
to_json_binary(Str) ->
case emqx_json:safe_decode(Str) of
{ok, _} ->
{ok, iolist_to_binary(Str)};
Error ->
Error
end.
to_bar_separated_list(Str) ->
{ok, string:tokens(Str, "| ")}.

View File

@ -676,7 +676,8 @@ start_slave(Name, Opts) when is_map(Opts) ->
]
);
slave ->
slave:start_link(host(), Name, ebin_path())
Env = " -env HOCON_ENV_OVERRIDE_PREFIX EMQX_",
slave:start_link(host(), Name, ebin_path() ++ Env)
end
end,
case DoStart() of
@ -749,6 +750,20 @@ setup_node(Node, Opts) when is_map(Opts) ->
%% `emqx_conf' app and correctly catch up the config.
StartAutocluster = maps:get(start_autocluster, Opts, false),
ct:pal(
"setting up node ~p:\n ~p",
[
Node,
#{
start_autocluster => StartAutocluster,
load_apps => LoadApps,
apps => Apps,
env => Env,
start_apps => StartApps
}
]
),
%% Load env before doing anything to avoid overriding
[ok = erpc:call(Node, ?MODULE, load, [App]) || App <- [gen_rpc, ekka, mria, emqx | LoadApps]],
@ -773,10 +788,7 @@ setup_node(Node, Opts) when is_map(Opts) ->
end,
%% Setting env before starting any applications
[
ok = rpc:call(Node, application, set_env, [Application, Key, Value])
|| {Application, Key, Value} <- Env
],
set_envs(Node, Env),
%% Here we start the apps
EnvHandlerForRpc =
@ -794,8 +806,9 @@ setup_node(Node, Opts) when is_map(Opts) ->
node(),
integer_to_list(erlang:unique_integer())
]),
Cookie = atom_to_list(erlang:get_cookie()),
os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
os:putenv("EMQX_NODE__COOKIE", atom_to_list(erlang:get_cookie())),
os:putenv("EMQX_NODE__COOKIE", Cookie),
emqx_config:init_load(SchemaMod),
os:unsetenv("EMQX_NODE__DATA_DIR"),
os:unsetenv("EMQX_NODE__COOKIE"),
@ -826,7 +839,15 @@ setup_node(Node, Opts) when is_map(Opts) ->
ok;
_ ->
StartAutocluster andalso
(ok = rpc:call(Node, emqx_machine_boot, start_autocluster, [])),
begin
%% Note: we need to re-set the env because
%% starting the apps apparently make some of them
%% to be lost... This is particularly useful for
%% setting extra apps to be restarted after
%% joining.
set_envs(Node, Env),
ok = erpc:call(Node, emqx_machine_boot, start_autocluster, [])
end,
case rpc:call(Node, ekka, join, [JoinTo]) of
ok ->
ok;
@ -883,6 +904,14 @@ merge_opts(Opts1, Opts2) ->
Opts2
).
set_envs(Node, Env) ->
lists:foreach(
fun({Application, Key, Value}) ->
ok = rpc:call(Node, application, set_env, [Application, Key, Value])
end,
Env
).
erl_flags() ->
%% One core and redirecting logs to master
"+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path().

View File

@ -755,6 +755,8 @@ typename_to_spec("initial()", _Mod) ->
#{type => string, example => <<"0MB">>};
typename_to_spec("bucket_name()", _Mod) ->
#{type => string, example => <<"retainer">>};
typename_to_spec("json_binary()", _Mod) ->
#{type => string, example => <<"{\"a\": [1,true]}">>};
typename_to_spec(Name, Mod) ->
Spec = range(Name),
Spec1 = remote_module_type(Spec, Name, Mod),

View File

@ -1097,26 +1097,27 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) ->
%% Here the emqx_rule_funcs module acts as a proxy, forwarding
%% the function handling to the worker module.
%% @end
% '$handle_undefined_function'(schema_decode, [SchemaId, Data|MoreArgs]) ->
% emqx_schema_parser:decode(SchemaId, Data, MoreArgs);
% '$handle_undefined_function'(schema_decode, Args) ->
% error({args_count_error, {schema_decode, Args}});
% '$handle_undefined_function'(schema_encode, [SchemaId, Term|MoreArgs]) ->
% emqx_schema_parser:encode(SchemaId, Term, MoreArgs);
% '$handle_undefined_function'(schema_encode, Args) ->
% error({args_count_error, {schema_encode, Args}});
% '$handle_undefined_function'(sprintf, [Format|Args]) ->
% erlang:apply(fun sprintf_s/2, [Format, Args]);
% '$handle_undefined_function'(Fun, Args) ->
% error({sql_function_not_supported, function_literal(Fun, Args)}).
-if(?EMQX_RELEASE_EDITION == ee).
%% EE
'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) ->
emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs);
'$handle_undefined_function'(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs);
'$handle_undefined_function'(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-else.
%% CE
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-endif.
map_path(Key) ->
{path, [{key, P} || P <- string:split(Key, ".", all)]}.

View File

@ -0,0 +1,3 @@
Add schema registry feature.
With schema registry, one can encode and decode special serialization formats in payloads when transforming messages in Rule Engine. Currently, only [Apache Avro](https://avro.apache.org/) is supported.

View File

@ -1,6 +1,6 @@
{application, emqx_ee_conf, [
{description, "EMQX Enterprise Edition configuration schema"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -8,7 +8,7 @@
-export([namespace/0, roots/0, fields/1, translations/0, translation/1]).
-define(EE_SCHEMA_MODULES, [emqx_license_schema]).
-define(EE_SCHEMA_MODULES, [emqx_license_schema, emqx_ee_schema_registry_schema]).
namespace() ->
emqx_conf_schema:namespace().

View File

@ -0,0 +1,19 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -0,0 +1,69 @@
# EMQX Schema Registry
EMQX Schema Registry for managing various of schemas for
decoding/encoding messages.
To use schema in rule engine, a schema name should be passed to the
SQL functions that decode/encode data, like:
```sql
SELECT
schema_decode('sensor_notify', payload) as payload
FROM
"message.publish"
WHERE
topic = 't/1'
```
## Using schema registry with rule engine
```
+---------------------------+
| |
Events/Msgs | | Events/Msgs
--------------------> EMQX |------------------>
| |
| |
+-------------|-------------+
|
HOOK |
|
+-------------v-------------+ +----------+
| | Data | |
| Rule Engine ------------- Backends |
| | | |
+------|-------------|------+ +----------+
|^ |^
Decode|| ||Encode
|| ||
+------v|------------v|-----+
| |
| Schema Registry |
| |
+---------------------------+
```
## Architecture
```
| |
Decode | [APIs] | Encode
| |
| | [Registry]
+------v--------------v------+
REGISTER SCHEMA | |
-------------------> | +--------+
| | | |
[Management APIs] | Schema Registry ------ Schema |
| | | |
-------------------> | +--------+
LOAD PARSERS | |
+----------------------------+
/ | \
+---/---+ +---|----+ +---\---+
| | | | | |
[Decoders] | Avro | |ProtoBuf| |Others |
| | | | | |
+-------+ +--------+ +-------+
```

View File

@ -0,0 +1,39 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(EMQX_EE_SCHEMA_REGISTRY_HRL).
-define(EMQX_EE_SCHEMA_REGISTRY_HRL, true).
-define(CONF_KEY_ROOT, schema_registry).
-define(CONF_KEY_PATH, [?CONF_KEY_ROOT]).
-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard).
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
-type schema_name() :: binary().
-type schema_source() :: binary().
-type encoded_data() :: iodata().
-type decoded_data() :: map().
-type serializer() :: fun((decoded_data()) -> encoded_data()).
-type deserializer() :: fun((encoded_data()) -> decoded_data()).
-type destructor() :: fun(() -> ok).
-type serde_type() :: avro.
-type serde_opts() :: map().
-record(serde, {
name :: schema_name(),
serializer :: serializer(),
deserializer :: deserializer(),
destructor :: destructor()
}).
-type serde() :: #serde{}.
-type serde_map() :: #{
name := schema_name(),
serializer := serializer(),
deserializer := deserializer(),
destructor := destructor()
}.
-endif.

View File

@ -0,0 +1,12 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../../apps/emqx"}},
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_ee_schema_registry]}
]}.

View File

@ -0,0 +1,15 @@
{application, emqx_ee_schema_registry, [
{description, "EMQX Schema Registry"},
{vsn, "0.1.0"},
{registered, [emqx_ee_schema_registry_sup]},
{mod, {emqx_ee_schema_registry_app, []}},
{applications, [
kernel,
stdlib,
erlavro
]},
{env, []},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,242 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry).
-behaviour(gen_server).
-behaviour(emqx_config_handler).
-include("emqx_ee_schema_registry.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
start_link/0,
get_serde/1,
add_schema/2,
delete_schema/1,
list_schemas/0
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
terminate/2
]).
%% `emqx_config_handler' API
-export([post_config_update/5]).
-type schema() :: #{
type := serde_type(),
source := binary(),
description => binary()
}.
%%-------------------------------------------------------------------------------------------------
%% API
%%-------------------------------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_serde(schema_name()) -> {ok, serde_map()} | {error, not_found}.
get_serde(SchemaName) ->
case ets:lookup(?SERDE_TAB, to_bin(SchemaName)) of
[] ->
{error, not_found};
[Serde] ->
{ok, serde_to_map(Serde)}
end.
-spec add_schema(schema_name(), schema()) -> ok | {error, term()}.
add_schema(Name, Schema) ->
RawSchema = emqx_map_lib:binary_key_map(Schema),
Res = emqx_conf:update(
[?CONF_KEY_ROOT, schemas, Name],
RawSchema,
#{override_to => cluster}
),
case Res of
{ok, _} ->
ok;
Error ->
Error
end.
-spec delete_schema(schema_name()) -> ok | {error, term()}.
delete_schema(Name) ->
Res = emqx_conf:remove(
[?CONF_KEY_ROOT, schemas, Name],
#{override_to => cluster}
),
case Res of
{ok, _} ->
ok;
Error ->
Error
end.
-spec list_schemas() -> #{schema_name() => schema()}.
list_schemas() ->
emqx_config:get([?CONF_KEY_ROOT, schemas], #{}).
%%-------------------------------------------------------------------------------------------------
%% `emqx_config_handler' API
%%-------------------------------------------------------------------------------------------------
post_config_update(
[?CONF_KEY_ROOT, schemas] = _Path,
_Cmd,
NewConf = #{schemas := NewSchemas},
OldConf = #{},
_AppEnvs
) ->
OldSchemas = maps:get(schemas, OldConf, #{}),
#{
added := Added,
changed := Changed0,
removed := Removed
} = emqx_map_lib:diff_maps(NewSchemas, OldSchemas),
Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
RemovedNames = maps:keys(Removed),
case RemovedNames of
[] ->
ok;
_ ->
async_delete_serdes(RemovedNames)
end,
SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
case build_serdes(SchemasToBuild) of
ok ->
{ok, NewConf};
{error, Reason, SerdesToRollback} ->
lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
{error, Reason}
end;
post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
{ok, NewConf}.
%%-------------------------------------------------------------------------------------------------
%% `gen_server' API
%%-------------------------------------------------------------------------------------------------
init(_) ->
process_flag(trap_exit, true),
create_tables(),
Schemas = emqx_conf:get([?CONF_KEY_ROOT, schemas], #{}),
async_build_serdes(Schemas),
State = #{},
{ok, State}.
handle_call(_Call, _From, State) ->
{reply, {error, unknown_call}, State}.
handle_cast({delete_serdes, Names}, State) ->
lists:foreach(fun ensure_serde_absent/1, Names),
?tp(schema_registry_serdes_deleted, #{}),
{noreply, State};
handle_cast({build_serdes, Schemas}, State) ->
do_build_serdes(Schemas),
{noreply, State};
handle_cast(_Cast, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
create_tables() ->
ok = mria:create_table(?SERDE_TAB, [
{type, ordered_set},
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
{storage, ram_copies},
{record_name, serde},
{attributes, record_info(fields, serde)}
]),
ok = mria:wait_for_tables([?SERDE_TAB]),
ok.
do_build_serdes(Schemas) ->
%% TODO: use some kind of mutex to make each core build a
%% different serde to avoid duplicate work. Maybe ekka_locker?
maps:foreach(fun do_build_serde/2, Schemas),
?tp(schema_registry_serdes_built, #{}).
build_serdes(Serdes) ->
build_serdes(Serdes, []).
build_serdes([{Name, Params} | Rest], Acc0) ->
Acc = [Name | Acc0],
case do_build_serde(Name, Params) of
ok ->
build_serdes(Rest, Acc);
{error, Error} ->
{error, Error, Acc}
end;
build_serdes([], _Acc) ->
ok.
do_build_serde(Name0, #{type := Type, source := Source}) ->
try
Name = to_bin(Name0),
{Serializer, Deserializer, Destructor} =
emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
Serde = #serde{
name = Name,
serializer = Serializer,
deserializer = Deserializer,
destructor = Destructor
},
ok = mria:dirty_write(?SERDE_TAB, Serde),
ok
catch
Kind:Error:Stacktrace ->
?SLOG(
error,
#{
msg => "error_building_serde",
name => Name0,
type => Type,
kind => Kind,
error => Error,
stacktrace => Stacktrace
}
),
{error, Error}
end.
ensure_serde_absent(Name) ->
case get_serde(Name) of
{ok, #{destructor := Destructor}} ->
Destructor(),
ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name));
{error, not_found} ->
ok
end.
async_build_serdes(Schemas) ->
gen_server:cast(?MODULE, {build_serdes, Schemas}).
async_delete_serdes(Names) ->
gen_server:cast(?MODULE, {delete_serdes, Names}).
to_bin(A) when is_atom(A) -> atom_to_binary(A);
to_bin(B) when is_binary(B) -> B.
-spec serde_to_map(serde()) -> serde_map().
serde_to_map(#serde{} = Serde) ->
#{
name => Serde#serde.name,
serializer => Serde#serde.serializer,
deserializer => Serde#serde.deserializer,
destructor => Serde#serde.destructor
}.

View File

@ -0,0 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_app).
-behaviour(application).
-include("emqx_ee_schema_registry.hrl").
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
emqx_ee_schema_registry_sup:start_link().
stop(_State) ->
emqx_conf:remove_handler(?CONF_KEY_PATH),
ok.

View File

@ -0,0 +1,251 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_http_api).
-behaviour(minirest_api).
-include("emqx_ee_schema_registry.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_api_lib.hrl").
-export([
namespace/0,
api_spec/0,
paths/0,
schema/1
]).
-export([
'/schema_registry'/2,
'/schema_registry/:name'/2
]).
%%-------------------------------------------------------------------------------------------------
%% `minirest' and `minirest_trails' API
%%-------------------------------------------------------------------------------------------------
namespace() -> "schema_registry_http_api".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/schema_registry",
"/schema_registry/:name"
].
schema("/schema_registry") ->
#{
'operationId' => '/schema_registry',
get => #{
tags => [<<"schema_registry">>],
summary => <<"List registered schemas">>,
description => ?DESC("desc_schema_registry_api_list"),
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
hoconsc:array(emqx_ee_schema_registry_schema:api_schema("get")),
#{
sample =>
#{value => sample_list_schemas_response()}
}
)
}
},
post => #{
tags => [<<"schema_registry">>],
summary => <<"Register a new schema">>,
description => ?DESC("desc_schema_registry_api_post"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("post"),
post_examples()
),
responses =>
#{
201 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("post"),
post_examples()
),
400 => error_schema('ALREADY_EXISTS', "Schema already exists")
}
}
};
schema("/schema_registry/:name") ->
#{
'operationId' => '/schema_registry/:name',
get => #{
tags => [<<"schema_registry">>],
summary => <<"Get registered schema">>,
description => ?DESC("desc_schema_registry_api_get"),
parameters => [param_path_schema_name()],
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("get"),
get_examples()
),
404 => error_schema('NOT_FOUND', "Schema not found")
}
},
put => #{
tags => [<<"schema_registry">>],
summary => <<"Update a schema">>,
description => ?DESC("desc_schema_registry_api_put"),
parameters => [param_path_schema_name()],
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("put"),
post_examples()
),
responses =>
#{
200 =>
emqx_dashboard_swagger:schema_with_examples(
emqx_ee_schema_registry_schema:api_schema("put"),
put_examples()
),
404 => error_schema('NOT_FOUND', "Schema not found")
}
},
delete => #{
tags => [<<"schema_registry">>],
summary => <<"Delete registered schema">>,
description => ?DESC("desc_schema_registry_api_delete"),
parameters => [param_path_schema_name()],
responses =>
#{
204 => <<"Schema deleted">>,
404 => error_schema('NOT_FOUND', "Schema not found")
}
}
}.
%%-------------------------------------------------------------------------------------------------
%% API
%%-------------------------------------------------------------------------------------------------
'/schema_registry'(get, _Params) ->
Schemas = emqx_ee_schema_registry:list_schemas(),
Response =
lists:map(
fun({Name, Params}) ->
Params#{name => Name}
end,
maps:to_list(Schemas)
),
?OK(Response);
'/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) ->
Params = maps:without([<<"name">>], Params0),
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
undefined ->
case emqx_ee_schema_registry:add_schema(Name, Params) of
ok ->
Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]),
{201, Res#{name => Name}};
{error, Error} ->
?BAD_REQUEST(Error)
end;
_ ->
?BAD_REQUEST('ALREADY_EXISTS', <<"Schema already exists">>)
end.
'/schema_registry/:name'(get, #{bindings := #{name := Name}}) ->
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
undefined ->
?NOT_FOUND(<<"Schema not found">>);
Res ->
?OK(Res#{name => Name})
end;
'/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) ->
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
undefined ->
?NOT_FOUND(<<"Schema not found">>);
_ ->
case emqx_ee_schema_registry:add_schema(Name, Params) of
ok ->
Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]),
?OK(Res#{name => Name});
{error, Error} ->
?BAD_REQUEST(Error)
end
end;
'/schema_registry/:name'(delete, #{bindings := #{name := Name}}) ->
case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of
undefined ->
?NOT_FOUND(<<"Schema not found">>);
_ ->
case emqx_ee_schema_registry:delete_schema(Name) of
ok ->
?NO_CONTENT;
{error, Error} ->
?BAD_REQUEST(Error)
end
end.
%%-------------------------------------------------------------------------------------------------
%% Examples
%%-------------------------------------------------------------------------------------------------
sample_list_schemas_response() ->
[sample_get_schema_response(avro)].
sample_get_schema_response(avro) ->
#{
type => <<"avro">>,
name => <<"my_avro_schema">>,
description => <<"My Avro Schema">>,
source => <<
"{\"type\":\"record\","
"\"fields\":[{\"type\":\"int\",\"name\":\"i\"},"
"{\"type\":\"string\",\"name\":\"s\"}]}"
>>
}.
put_examples() ->
post_examples().
post_examples() ->
get_examples().
get_examples() ->
#{
<<"avro_schema">> =>
#{
summary => <<"Avro">>,
value => sample_get_schema_response(avro)
}
}.
%%-------------------------------------------------------------------------------------------------
%% Schemas and hocon types
%%-------------------------------------------------------------------------------------------------
param_path_schema_name() ->
{name,
mk(
binary(),
#{
in => path,
required => true,
example => <<"my_schema">>,
desc => ?DESC("desc_param_path_schema_name")
}
)}.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
error_schema(Code, Message) when is_atom(Code) ->
error_schema([Code], Message);
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).

View File

@ -0,0 +1,127 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_ee_schema_registry.hrl").
%% `hocon_schema' API
-export([
roots/0,
fields/1,
desc/1,
tags/0,
union_member_selector/1
]).
%% `minirest_trails' API
-export([
api_schema/1
]).
%%------------------------------------------------------------------------------
%% `hocon_schema' APIs
%%------------------------------------------------------------------------------
roots() ->
[{?CONF_KEY_ROOT, mk(ref(?CONF_KEY_ROOT), #{required => false})}].
tags() ->
[<<"Schema Registry">>].
fields(?CONF_KEY_ROOT) ->
[
{schemas,
mk(
hoconsc:map(
name,
hoconsc:union(fun union_member_selector/1)
),
#{
default => #{},
desc => ?DESC("schema_registry_schemas")
}
)}
];
fields(avro) ->
[
{type, mk(hoconsc:enum([avro]), #{required => true, desc => ?DESC("schema_type")})},
{source,
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
];
fields("get_avro") ->
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
fields("put_avro") ->
fields(avro);
fields("post_" ++ Type) ->
fields("get_" ++ Type).
desc(?CONF_KEY_ROOT) ->
?DESC("schema_registry_root");
desc(avro) ->
?DESC("avro_type");
desc(_) ->
undefined.
union_member_selector(all_union_members) ->
refs();
union_member_selector({value, V}) ->
refs(V).
union_member_selector_get_api(all_union_members) ->
refs_get_api();
union_member_selector_get_api({value, V}) ->
refs_get_api(V).
%%------------------------------------------------------------------------------
%% `minirest_trails' "APIs"
%%------------------------------------------------------------------------------
api_schema("get") ->
hoconsc:union(fun union_member_selector_get_api/1);
api_schema("post") ->
api_schema("get");
api_schema("put") ->
hoconsc:union(fun union_member_selector/1).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Name) -> hoconsc:ref(?MODULE, Name).
supported_serde_types() ->
[avro].
refs() ->
[ref(Type) || Type <- supported_serde_types()].
refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
refs(#{<<"type">> := <<"avro">>}) ->
[ref(avro)];
refs(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{
field_name => type,
expected => Expected
}).
refs_get_api() ->
[ref("get_avro")].
refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
refs_get_api(#{<<"type">> := <<"avro">>}) ->
[ref("get_avro")];
refs_get_api(_) ->
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
throw(#{
field_name => type,
expected => Expected
}).

View File

@ -0,0 +1,70 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_serde).
-include("emqx_ee_schema_registry.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API
-export([
decode/2,
decode/3,
encode/2,
encode/3,
make_serde/3
]).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec decode(schema_name(), encoded_data()) -> decoded_data().
decode(SerdeName, RawData) ->
decode(SerdeName, RawData, []).
-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
case emqx_ee_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{deserializer := Deserializer}} ->
apply(Deserializer, [RawData | VarArgs])
end.
-spec encode(schema_name(), decoded_data()) -> encoded_data().
encode(SerdeName, RawData) ->
encode(SerdeName, RawData, []).
-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
case emqx_ee_schema_registry:get_serde(SerdeName) of
{error, not_found} ->
error({serde_not_found, SerdeName});
{ok, #{serializer := Serializer}} ->
apply(Serializer, [EncodedData | VarArgs])
end.
-spec make_serde(serde_type(), schema_name(), schema_source()) ->
{serializer(), deserializer(), destructor()}.
make_serde(avro, Name, Source0) ->
Source = inject_avro_name(Name, Source0),
Serializer = avro:make_simple_encoder(Source, _Opts = []),
Deserializer = avro:make_simple_decoder(Source, [{map_type, map}, {record_type, map}]),
Destructor = fun() ->
?tp(serde_destroyed, #{type => avro, name => Name}),
ok
end,
{Serializer, Deserializer, Destructor}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
-spec inject_avro_name(schema_name(), schema_source()) -> schema_source().
inject_avro_name(Name, Source0) ->
%% The schema checks that the source is a valid JSON when
%% typechecking, so we shouldn't need to validate here.
Schema0 = emqx_json:decode(Source0, [return_maps]),
Schema = Schema0#{<<"name">> => Name},
emqx_json:encode(Schema).

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 100
},
ChildSpecs = [child_spec(emqx_ee_schema_registry)],
{ok, {SupFlags, ChildSpecs}}.
child_spec(Mod) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5_000,
type => worker,
modules => [Mod]
}.

View File

@ -0,0 +1,433 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_ee_schema_registry.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
[{group, avro}].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[{avro, TCs}].
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_group(avro, Config) ->
[{serde_type, avro} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(),
clear_schemas(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
trace_rule(Data, Envs, _Args) ->
Now = erlang:monotonic_time(),
ets:insert(recorded_actions, {Now, #{data => Data, envs => Envs}}),
TestPid = persistent_term:get({?MODULE, test_pid}),
TestPid ! {action, #{data => Data, envs => Envs}},
ok.
make_trace_fn_action() ->
persistent_term:put({?MODULE, test_pid}, self()),
Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>,
emqx_tables:new(recorded_actions, [named_table, public, ordered_set]),
#{function => Fn, args => #{}}.
create_rule_http(RuleParams) ->
RepublishTopic = <<"republish/schema_registry">>,
emqx:subscribe(RepublishTopic),
DefaultParams = #{
enable => true,
actions => [
make_trace_fn_action(),
#{
<<"function">> => <<"republish">>,
<<"args">> =>
#{
<<"topic">> => RepublishTopic,
<<"payload">> => <<>>,
<<"qos">> => 0,
<<"retain">> => false,
<<"user_properties">> => <<>>
}
}
]
},
Params = maps:merge(DefaultParams, RuleParams),
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
Error -> Error
end.
schema_params(avro) ->
Source = #{
type => record,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_json:encode(Source),
#{type => avro, source => SourceBin}.
create_serde(SerdeType, SerdeName) ->
Schema = schema_params(SerdeType),
ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema),
ok.
sql_for(avro, encode_decode1) ->
<<
"select\n"
" schema_decode('my_serde',\n"
" schema_encode('my_serde', json_decode(payload))) as decoded,\n"
" decoded.i as decoded_int,\n"
" decoded.s as decoded_string\n"
" from t"
>>;
sql_for(avro, encode1) ->
<<
"select\n"
" schema_encode('my_serde', json_decode(payload)) as encoded\n"
" from t"
>>;
sql_for(avro, decode1) ->
<<
"select\n"
" schema_decode('my_serde', payload) as decoded\n"
" from t"
>>;
sql_for(Type, Name) ->
ct:fail("unimplemented: ~p", [{Type, Name}]).
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_ee_schema_registry:delete_schema(Name)
end,
emqx_ee_schema_registry:list_schemas()
).
receive_action_results() ->
receive
{action, #{data := _} = Res} ->
Res
after 1_000 ->
ct:fail("action didn't run")
end.
receive_published(Line) ->
receive
{deliver, _Topic, Msg} ->
MsgMap = emqx_message:to_map(Msg),
maps:update_with(
payload,
fun(Raw) ->
case emqx_json:safe_decode(Raw, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> Raw
end
end,
MsgMap
)
after 1_000 ->
ct:pal("mailbox: ~p", [process_info(self(), messages)]),
ct:fail("publish not received, line ~b", [Line])
end.
cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config),
PeerModule =
case os:getenv("IS_CI") of
false ->
slave;
_ ->
ct_slave
end,
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
{apps, ?APPS},
{listener_ports, []},
{peer_mod, PeerModule},
{priv_data_dir, PrivDataDir},
{load_schema, true},
{start_autocluster, true},
{schema_mod, emqx_ee_conf_schema},
%% need to restart schema registry app in the tests so
%% that it re-registers the config handler that is lost
%% when emqx_conf restarts during join.
{env, [{emqx_machine, applications, [emqx_ee_schema_registry]}]},
{load_apps, [emqx_machine | ?APPS]},
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
ok;
(emqx_conf) ->
ok;
(_) ->
ok
end}
]
),
ct:pal("cluster:\n ~p", [Cluster]),
Cluster.
start_cluster(Cluster) ->
Nodes = [
emqx_common_test_helpers:start_slave(Name, Opts)
|| {Name, Opts} <- Cluster
],
on_exit(fun() ->
emqx_misc:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N)
end,
Nodes
)
end),
erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]),
Nodes.
wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after
%% restarting during the cluster join.
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler]))
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_unknown_calls(_Config) ->
Ref = monitor(process, emqx_ee_schema_registry),
%% for coverage
emqx_ee_schema_registry ! unknown,
gen_server:cast(emqx_ee_schema_registry, unknown),
?assertEqual({error, unknown_call}, gen_server:call(emqx_ee_schema_registry, unknown)),
receive
{'DOWN', Ref, process, _, _} ->
ct:fail("registry shouldn't have died")
after 500 ->
ok
end.
t_encode_decode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode_decode1)}),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
PayloadBin = emqx_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Res = receive_action_results(),
?assertMatch(
#{
data :=
#{
<<"decoded">> :=
#{
<<"i">> := 10,
<<"s">> := <<"text">>
},
<<"decoded_int">> := 10,
<<"decoded_string">> := <<"text">>
}
},
Res
),
ok.
t_delete_serde(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
?check_trace(
begin
ok = create_serde(SerdeType, SerdeName),
{ok, {ok, _}} =
?wait_async_action(
emqx_ee_schema_registry:delete_schema(SerdeName),
#{?snk_kind := schema_registry_serdes_deleted},
1_000
),
ok
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(schema_registry_serdes_deleted, Trace)),
?assertMatch([#{type := SerdeType}], ?of_kind(serde_destroyed, Trace)),
ok
end
),
ok.
t_encode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode1)}),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
PayloadBin = emqx_json:encode(Payload),
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
Published = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"encoded">> := _}},
Published
),
#{payload := #{<<"encoded">> := Encoded}} = Published,
{ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
?assertEqual(Payload, Deserializer(Encoded)),
ok.
t_decode(Config) ->
SerdeType = ?config(serde_type, Config),
SerdeName = my_serde,
ok = create_serde(SerdeType, SerdeName),
{ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, decode1)}),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
{ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName),
EncodedBin = Serializer(Payload),
emqx:publish(emqx_message:make(<<"t">>, EncodedBin)),
Published = receive_published(?LINE),
?assertMatch(
#{payload := #{<<"decoded">> := _}},
Published
),
#{payload := #{<<"decoded">> := Decoded}} = Published,
?assertEqual(Payload, Decoded),
ok.
t_fail_rollback(Config) ->
SerdeType = ?config(serde_type, Config),
OkSchema = emqx_map_lib:binary_key_map(schema_params(SerdeType)),
BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
%% hopefully, for this small map, the key order is used.
Serdes = #{
<<"a">> => OkSchema,
<<"z">> => BrokenSchema
},
?assertMatch(
{error, _},
emqx_conf:update(
[?CONF_KEY_ROOT, schemas],
Serdes,
#{}
)
),
%% no serdes should be in the table
?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)),
?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)),
ok.
t_cluster_serde_build(Config) ->
SerdeType = ?config(serde_type, Config),
Cluster = cluster(Config),
SerdeName = my_serde,
Schema = schema_params(SerdeType),
?check_trace(
begin
Nodes = [N1, N2 | _] = start_cluster(Cluster),
NumNodes = length(Nodes),
wait_for_cluster_rpc(N2),
?assertEqual(
ok,
erpc:call(N2, emqx_ee_schema_registry, add_schema, [SerdeName, Schema])
),
%% check that we can serialize/deserialize in all nodes
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
Res0 = emqx_ee_schema_registry:get_serde(SerdeName),
?assertMatch({ok, #{}}, Res0, #{node => N}),
{ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0,
Payload = #{<<"i">> => 10, <<"s">> => <<"text">>},
?assertEqual(Payload, Deserializer(Serializer(Payload)), #{node => N}),
ok
end)
end,
Nodes
),
%% now we delete and check it's removed from the table
?tp(will_delete_schema, #{}),
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := schema_registry_serdes_deleted}),
NumNodes,
5_000
),
?assertEqual(
ok,
erpc:call(N1, emqx_ee_schema_registry, delete_schema, [SerdeName])
),
{ok, _} = snabbkaffe:receive_events(SRef1),
lists:foreach(
fun(N) ->
erpc:call(N, fun() ->
?assertMatch(
{error, not_found},
emqx_ee_schema_registry:get_serde(SerdeName),
#{node => N}
),
ok
end)
end,
Nodes
),
ok
end,
fun(Trace) ->
?assert(
?strict_causality(
#{?snk_kind := will_delete_schema},
#{?snk_kind := serde_destroyed, type := SerdeType},
Trace
)
),
ok
end
),
ok.

View File

@ -0,0 +1,250 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_http_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(APPS, [emqx_conf, emqx_ee_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_testcase(_TestCase, Config) ->
clear_schemas(),
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TestCase, _Config) ->
clear_schemas(),
ok = snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
request(get) ->
do_request(get, _Parts = [], _Body = []);
request({get, Name}) ->
do_request(get, _Parts = [Name], _Body = []);
request({delete, Name}) ->
do_request(delete, _Parts = [Name], _Body = []);
request({put, Name, Params}) ->
do_request(put, _Parts = [Name], Params);
request({post, Params}) ->
do_request(post, _Parts = [], Params).
do_request(Method, PathParts, Body) ->
Header = emqx_common_test_http:default_auth_header(),
URI = uri(["schema_registry" | PathParts]),
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
Res0 = emqx_mgmt_api_test_util:request_api(Method, URI, [], Header, Body, Opts),
case Res0 of
{ok, Code, <<>>} ->
{ok, Code, <<>>};
{ok, Code, Res1} ->
Res2 = emqx_json:decode(Res1, [return_maps]),
Res3 = try_decode_error_message(Res2),
{ok, Code, Res3};
Error ->
Error
end.
try_decode_error_message(#{<<"message">> := Msg0} = Res0) ->
case emqx_json:safe_decode(Msg0, [return_maps]) of
{ok, Msg} ->
Res0#{<<"message">> := Msg};
{error, _} ->
Res0
end;
try_decode_error_message(Res) ->
Res.
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_ee_schema_registry:delete_schema(Name)
end,
emqx_ee_schema_registry:list_schemas()
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_crud(_Config) ->
SchemaName = <<"my_avro_schema">>,
Source = #{
type => record,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_json:encode(Source),
Params = #{
<<"type">> => <<"avro">>,
<<"source">> => SourceBin,
<<"name">> => SchemaName,
<<"description">> => <<"My schema">>
},
UpdateParams = maps:without([<<"name">>], Params),
%% no schemas at first
?assertMatch({ok, 200, []}, request(get)),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({get, SchemaName})
),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({put, SchemaName, UpdateParams})
),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({delete, SchemaName})
),
%% create a schema
?assertMatch(
{ok, 201, #{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My schema">>
}},
request({post, Params})
),
?assertMatch(
{ok, 200, #{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My schema">>
}},
request({get, SchemaName})
),
?assertMatch(
{ok, 200, [
#{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My schema">>
}
]},
request(get)
),
UpdateParams1 = UpdateParams#{<<"description">> := <<"My new schema">>},
?assertMatch(
{ok, 200, #{
<<"type">> := <<"avro">>,
<<"source">> := SourceBin,
<<"name">> := SchemaName,
<<"description">> := <<"My new schema">>
}},
request({put, SchemaName, UpdateParams1})
),
?assertMatch(
{ok, 400, #{
<<"code">> := <<"ALREADY_EXISTS">>,
<<"message">> := <<"Schema already exists">>
}},
request({post, Params})
),
%% typechecks, but is invalid
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
<<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
}},
request({put, SchemaName, UpdateParams#{<<"source">> := <<"{}">>}})
),
?assertMatch(
{ok, 204, <<>>},
request({delete, SchemaName})
),
%% doesn't typecheck
lists:foreach(
fun(Field) ->
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := #{<<"reason">> := <<"required_field">>}
}},
request({post, maps:without([Field], Params)}),
#{field => Field}
)
end,
[<<"name">>, <<"source">>]
),
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
#{
<<"expected">> := [_ | _],
<<"field_name">> := <<"type">>
}
}},
request({post, maps:without([<<"type">>], Params)}),
#{field => <<"type">>}
),
%% typechecks, but is invalid
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
<<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">>
}},
request({post, Params#{<<"source">> := <<"{}">>}})
),
%% unknown serde type
?assertMatch(
{ok, 400, #{
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> :=
#{
<<"expected">> := [_ | _],
<<"field_name">> := <<"type">>
}
}},
request({post, Params#{<<"type">> := <<"foo">>}})
),
ok.

View File

@ -0,0 +1,121 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_serde_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_ee_schema_registry.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema),
emqx_mgmt_api_test_util:init_suite(?APPS),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
clear_schemas(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
clear_schemas() ->
maps:foreach(
fun(Name, _Schema) ->
ok = emqx_ee_schema_registry:delete_schema(Name)
end,
emqx_ee_schema_registry:list_schemas()
).
schema_params(avro) ->
Source = #{
type => record,
fields => [
#{name => <<"i">>, type => <<"int">>},
#{name => <<"s">>, type => <<"string">>}
]
},
SourceBin = emqx_json:encode(Source),
#{type => avro, source => SourceBin}.
assert_roundtrip(SerdeName, Original) ->
Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original),
Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded),
?assertEqual(Original, Decoded, #{original => Original}).
assert_roundtrip(SerdeName, Original, ArgsSerialize, ArgsDeserialize) ->
Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original, ArgsSerialize),
Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded, ArgsDeserialize),
?assertEqual(Original, Decoded, #{original => Original}).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_roundtrip_avro(_Config) ->
SerdeName = my_serde,
Params = schema_params(avro),
ok = emqx_ee_schema_registry:add_schema(SerdeName, Params),
Original = #{<<"i">> => 10, <<"s">> => <<"hi">>},
%% for coverage
assert_roundtrip(SerdeName, Original, _ArgsSerialize = [], _ArgsDeserialize = []),
assert_roundtrip(SerdeName, Original),
ok.
t_avro_invalid_json_schema(_Config) ->
SerdeName = my_serde,
Params = schema_params(avro),
WrongParams = Params#{source := <<"{">>},
?assertMatch(
{error, #{reason := #{expected_type := _}}},
emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
),
ok.
t_avro_invalid_schema(_Config) ->
SerdeName = my_serde,
Params = schema_params(avro),
WrongParams = Params#{source := <<"{}">>},
?assertMatch(
{error, {post_config_update, _, {not_found, <<"type">>}}},
emqx_ee_schema_registry:add_schema(SerdeName, WrongParams)
),
ok.
t_serde_not_found(_Config) ->
%% for coverage
NonexistentSerde = <<"nonexistent">>,
Original = #{},
?assertError(
{serde_not_found, NonexistentSerde},
emqx_ee_schema_registry_serde:encode(NonexistentSerde, Original)
),
?assertError(
{serde_not_found, NonexistentSerde},
emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original)
),
ok.

View File

@ -83,6 +83,8 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqx and observer_cli
{:recon, github: "ferd/recon", tag: "2.5.1", override: true},
{:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true},
# in conflict by erlavro and rocketmq
{:jsone, github: "emqx/jsone", tag: "1.7.1", override: true},
# dependencies of dependencies; we choose specific refs to match
# what rebar3 chooses.
# in conflict by gun and emqtt
@ -307,7 +309,8 @@ defmodule EMQXUmbrella.MixProject do
emqx_license: :permanent,
emqx_ee_conf: :load,
emqx_ee_connector: :permanent,
emqx_ee_bridge: :permanent
emqx_ee_bridge: :permanent,
emqx_ee_schema_registry: :permanent
],
else: []
)

View File

@ -81,6 +81,8 @@
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
, {telemetry, "1.1.0"}
, {hackney, {git, "https://github.com/emqx/hackney.git", {tag, "1.18.1-1"}}}
%% in conflict by erlavro and rocketmq
, {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}}
]}.
{xref_ignores,

View File

@ -422,7 +422,8 @@ relx_apps_per_edition(ee) ->
emqx_license,
{emqx_ee_conf, load},
emqx_ee_connector,
emqx_ee_bridge
emqx_ee_bridge,
emqx_ee_schema_registry
];
relx_apps_per_edition(ce) ->
[].

View File

@ -57,7 +57,7 @@ emqx_bridge_api {
desc_api1 {
desc {
en: """List all created bridges"""
zh: """列出所有 Birdge"""
zh: """列出所有 Bridge"""
}
label: {
en: "List All Bridges"

View File

@ -0,0 +1,69 @@
emqx_ee_schema_registry_http_api {
# apis
desc_schema_registry_api_list {
desc {
en: "List all registered schemas"
zh: "列出所有注册的模式"
}
label {
en: "List schemas"
zh: "列表模式"
}
}
desc_schema_registry_api_get {
desc {
en: "Get a schema by its name"
zh: "通过名称获取模式"
}
label {
en: "Get schema"
zh: "获取模式"
}
}
desc_schema_registry_api_post {
desc {
en: "Register a new schema"
zh: "注册一个新的模式"
}
label {
en: "Register schema"
zh: "注册模式"
}
}
desc_schema_registry_api_put {
desc {
en: "Update an existing schema"
zh: "更新一个现有的模式"
}
label {
en: "Update schema"
zh: "更新模式"
}
}
desc_schema_registry_api_delete {
desc {
en: "Delete a schema"
zh: "删除一个模式"
}
label {
en: "Delete schema"
zh: "删除模式"
}
}
# params
desc_param_path_schema_name {
desc {
en: "The schema name"
zh: "模式名称"
}
label {
en: "Schema name"
zh: "模式名称"
}
}
}

View File

@ -0,0 +1,78 @@
emqx_ee_schema_registry_schema {
schema_registry_root {
desc {
en: "Schema registry configurations."
zh: "模式注册表的配置。"
}
label {
en: "Schema registry"
zh: "模式注册表"
}
}
schema_registry_schemas {
desc {
en: "Registered schemas."
zh: "注册的模式。"
}
label {
en: "Registered schemas"
zh: "注册的模式"
}
}
schema_name {
desc {
en: "A name for the schema that will serve as its identifier."
zh: "模式的一个名称,将作为其标识符。"
}
label {
en: "Schema name"
zh: "模式名称"
}
}
schema_type {
desc {
en: "Schema type."
zh: "模式类型。"
}
label {
en: "Schema type"
zh: "模式类型"
}
}
schema_source {
desc {
en: "Source text for the schema."
zh: "模式的源文本。"
}
label {
en: "Schema source"
zh: "模式来源"
}
}
schema_description {
desc {
en: "A description for this schema."
zh: "对该模式的描述。"
}
label {
en: "Schema description"
zh: "模式描述"
}
}
avro_type {
desc {
en: "[Apache Avro](https://avro.apache.org/) serialization format."
zh: "[阿帕奇-阿夫罗](https://avro.apache.org/) 序列化格式。"
}
label {
en: "Apache Avro"
zh: "阿帕奇-阿夫罗"
}
}
}

View File

@ -1,6 +1,7 @@
ACL
AES
APIs
Avro
BPAPI
BSON
Backplane