diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 6bfff38d3..20018b2d5 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -44,6 +44,7 @@ -type port_number() :: 1..65536. -type server_parse_option() :: #{default_port => port_number(), no_port => boolean()}. -type url() :: binary(). +-type json_binary() :: binary(). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -58,6 +59,7 @@ -typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). -typerefl_from_string({url/0, emqx_schema, to_url}). +-typerefl_from_string({json_binary/0, emqx_schema, to_json_binary}). -export([ validate_heap_size/1, @@ -84,7 +86,8 @@ to_ip_port/1, to_erl_cipher_suite/1, to_comma_separated_atoms/1, - to_url/1 + to_url/1, + to_json_binary/1 ]). -export([ @@ -112,7 +115,8 @@ ip_port/0, cipher/0, comma_separated_atoms/0, - url/0 + url/0, + json_binary/0 ]). -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]). @@ -2576,6 +2580,14 @@ to_url(Str) -> Error end. +to_json_binary(Str) -> + case emqx_json:safe_decode(Str) of + {ok, _} -> + {ok, iolist_to_binary(Str)}; + Error -> + Error + end. + to_bar_separated_list(Str) -> {ok, string:tokens(Str, "| ")}. diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 077ebe138..406183094 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -676,7 +676,8 @@ start_slave(Name, Opts) when is_map(Opts) -> ] ); slave -> - slave:start_link(host(), Name, ebin_path()) + Env = " -env HOCON_ENV_OVERRIDE_PREFIX EMQX_", + slave:start_link(host(), Name, ebin_path() ++ Env) end end, case DoStart() of @@ -749,6 +750,20 @@ setup_node(Node, Opts) when is_map(Opts) -> %% `emqx_conf' app and correctly catch up the config. StartAutocluster = maps:get(start_autocluster, Opts, false), + ct:pal( + "setting up node ~p:\n ~p", + [ + Node, + #{ + start_autocluster => StartAutocluster, + load_apps => LoadApps, + apps => Apps, + env => Env, + start_apps => StartApps + } + ] + ), + %% Load env before doing anything to avoid overriding [ok = erpc:call(Node, ?MODULE, load, [App]) || App <- [gen_rpc, ekka, mria, emqx | LoadApps]], @@ -773,10 +788,7 @@ setup_node(Node, Opts) when is_map(Opts) -> end, %% Setting env before starting any applications - [ - ok = rpc:call(Node, application, set_env, [Application, Key, Value]) - || {Application, Key, Value} <- Env - ], + set_envs(Node, Env), %% Here we start the apps EnvHandlerForRpc = @@ -794,8 +806,9 @@ setup_node(Node, Opts) when is_map(Opts) -> node(), integer_to_list(erlang:unique_integer()) ]), + Cookie = atom_to_list(erlang:get_cookie()), os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir), - os:putenv("EMQX_NODE__COOKIE", atom_to_list(erlang:get_cookie())), + os:putenv("EMQX_NODE__COOKIE", Cookie), emqx_config:init_load(SchemaMod), os:unsetenv("EMQX_NODE__DATA_DIR"), os:unsetenv("EMQX_NODE__COOKIE"), @@ -826,7 +839,15 @@ setup_node(Node, Opts) when is_map(Opts) -> ok; _ -> StartAutocluster andalso - (ok = rpc:call(Node, emqx_machine_boot, start_autocluster, [])), + begin + %% Note: we need to re-set the env because + %% starting the apps apparently make some of them + %% to be lost... This is particularly useful for + %% setting extra apps to be restarted after + %% joining. + set_envs(Node, Env), + ok = erpc:call(Node, emqx_machine_boot, start_autocluster, []) + end, case rpc:call(Node, ekka, join, [JoinTo]) of ok -> ok; @@ -883,6 +904,14 @@ merge_opts(Opts1, Opts2) -> Opts2 ). +set_envs(Node, Env) -> + lists:foreach( + fun({Application, Key, Value}) -> + ok = rpc:call(Node, application, set_env, [Application, Key, Value]) + end, + Env + ). + erl_flags() -> %% One core and redirecting logs to master "+S 1:1 -master " ++ atom_to_list(node()) ++ " " ++ ebin_path(). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index e2872c0d7..eb7f6c741 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -755,6 +755,8 @@ typename_to_spec("initial()", _Mod) -> #{type => string, example => <<"0MB">>}; typename_to_spec("bucket_name()", _Mod) -> #{type => string, example => <<"retainer">>}; +typename_to_spec("json_binary()", _Mod) -> + #{type => string, example => <<"{\"a\": [1,true]}">>}; typename_to_spec(Name, Mod) -> Spec = range(Name), Spec1 = remote_module_type(Spec, Name, Mod), diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index b8bfeb84c..79e0406c1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -1097,26 +1097,27 @@ date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) -> %% Here the emqx_rule_funcs module acts as a proxy, forwarding %% the function handling to the worker module. %% @end -% '$handle_undefined_function'(schema_decode, [SchemaId, Data|MoreArgs]) -> -% emqx_schema_parser:decode(SchemaId, Data, MoreArgs); -% '$handle_undefined_function'(schema_decode, Args) -> -% error({args_count_error, {schema_decode, Args}}); - -% '$handle_undefined_function'(schema_encode, [SchemaId, Term|MoreArgs]) -> -% emqx_schema_parser:encode(SchemaId, Term, MoreArgs); -% '$handle_undefined_function'(schema_encode, Args) -> -% error({args_count_error, {schema_encode, Args}}); - -% '$handle_undefined_function'(sprintf, [Format|Args]) -> -% erlang:apply(fun sprintf_s/2, [Format, Args]); - -% '$handle_undefined_function'(Fun, Args) -> -% error({sql_function_not_supported, function_literal(Fun, Args)}). - +-if(?EMQX_RELEASE_EDITION == ee). +%% EE +'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) -> + emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs); +'$handle_undefined_function'(schema_decode, Args) -> + error({args_count_error, {schema_decode, Args}}); +'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> + emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs); +'$handle_undefined_function'(schema_encode, Args) -> + error({args_count_error, {schema_encode, Args}}); '$handle_undefined_function'(sprintf, [Format | Args]) -> erlang:apply(fun sprintf_s/2, [Format, Args]); '$handle_undefined_function'(Fun, Args) -> error({sql_function_not_supported, function_literal(Fun, Args)}). +-else. +%% CE +'$handle_undefined_function'(sprintf, [Format | Args]) -> + erlang:apply(fun sprintf_s/2, [Format, Args]); +'$handle_undefined_function'(Fun, Args) -> + error({sql_function_not_supported, function_literal(Fun, Args)}). +-endif. map_path(Key) -> {path, [{key, P} || P <- string:split(Key, ".", all)]}. diff --git a/changes/ee/feat-10337.en.md b/changes/ee/feat-10337.en.md new file mode 100644 index 000000000..299933351 --- /dev/null +++ b/changes/ee/feat-10337.en.md @@ -0,0 +1,3 @@ +Add schema registry feature. + +With schema registry, one can encode and decode special serialization formats in payloads when transforming messages in Rule Engine. Currently, only [Apache Avro](https://avro.apache.org/) is supported. diff --git a/lib-ee/emqx_ee_conf/src/emqx_ee_conf.app.src b/lib-ee/emqx_ee_conf/src/emqx_ee_conf.app.src index 324e7e308..771fdcb27 100644 --- a/lib-ee/emqx_ee_conf/src/emqx_ee_conf.app.src +++ b/lib-ee/emqx_ee_conf/src/emqx_ee_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_conf, [ {description, "EMQX Enterprise Edition configuration schema"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_conf/src/emqx_ee_conf_schema.erl b/lib-ee/emqx_ee_conf/src/emqx_ee_conf_schema.erl index 5137574e3..7bf41deb5 100644 --- a/lib-ee/emqx_ee_conf/src/emqx_ee_conf_schema.erl +++ b/lib-ee/emqx_ee_conf/src/emqx_ee_conf_schema.erl @@ -8,7 +8,7 @@ -export([namespace/0, roots/0, fields/1, translations/0, translation/1]). --define(EE_SCHEMA_MODULES, [emqx_license_schema]). +-define(EE_SCHEMA_MODULES, [emqx_license_schema, emqx_ee_schema_registry_schema]). namespace() -> emqx_conf_schema:namespace(). diff --git a/lib-ee/emqx_ee_schema_registry/.gitignore b/lib-ee/emqx_ee_schema_registry/.gitignore new file mode 100644 index 000000000..f1c455451 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/.gitignore @@ -0,0 +1,19 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ diff --git a/lib-ee/emqx_ee_schema_registry/README.md b/lib-ee/emqx_ee_schema_registry/README.md new file mode 100644 index 000000000..9f477208c --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/README.md @@ -0,0 +1,69 @@ +# EMQX Schema Registry + +EMQX Schema Registry for managing various of schemas for +decoding/encoding messages. + +To use schema in rule engine, a schema name should be passed to the +SQL functions that decode/encode data, like: + +```sql +SELECT + schema_decode('sensor_notify', payload) as payload +FROM + "message.publish" +WHERE + topic = 't/1' +``` + +## Using schema registry with rule engine + +``` + +---------------------------+ + | | + Events/Msgs | | Events/Msgs + --------------------> EMQX |------------------> + | | + | | + +-------------|-------------+ + | + HOOK | + | + +-------------v-------------+ +----------+ + | | Data | | + | Rule Engine ------------- Backends | + | | | | + +------|-------------|------+ +----------+ + |^ |^ + Decode|| ||Encode + || || + +------v|------------v|-----+ + | | + | Schema Registry | + | | + +---------------------------+ +``` + +## Architecture + +``` + | | + Decode | [APIs] | Encode + | | + | | [Registry] + +------v--------------v------+ + REGISTER SCHEMA | | + -------------------> | +--------+ + | | | | +[Management APIs] | Schema Registry ------ Schema | + | | | | + -------------------> | +--------+ + LOAD PARSERS | | + +----------------------------+ + / | \ + +---/---+ +---|----+ +---\---+ + | | | | | | + [Decoders] | Avro | |ProtoBuf| |Others | + | | | | | | + +-------+ +--------+ +-------+ + +``` diff --git a/lib-ee/emqx_ee_schema_registry/etc/emqx_ee_schema_registry.conf b/lib-ee/emqx_ee_schema_registry/etc/emqx_ee_schema_registry.conf new file mode 100644 index 000000000..e69de29bb diff --git a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl new file mode 100644 index 000000000..af49db6dd --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_EE_SCHEMA_REGISTRY_HRL). +-define(EMQX_EE_SCHEMA_REGISTRY_HRL, true). + +-define(CONF_KEY_ROOT, schema_registry). +-define(CONF_KEY_PATH, [?CONF_KEY_ROOT]). + +-define(SCHEMA_REGISTRY_SHARD, emqx_ee_schema_registry_shard). +-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab). + +-type schema_name() :: binary(). +-type schema_source() :: binary(). + +-type encoded_data() :: iodata(). +-type decoded_data() :: map(). +-type serializer() :: fun((decoded_data()) -> encoded_data()). +-type deserializer() :: fun((encoded_data()) -> decoded_data()). +-type destructor() :: fun(() -> ok). +-type serde_type() :: avro. +-type serde_opts() :: map(). + +-record(serde, { + name :: schema_name(), + serializer :: serializer(), + deserializer :: deserializer(), + destructor :: destructor() +}). +-type serde() :: #serde{}. +-type serde_map() :: #{ + name := schema_name(), + serializer := serializer(), + deserializer := deserializer(), + destructor := destructor() +}. + +-endif. diff --git a/lib-ee/emqx_ee_schema_registry/rebar.config b/lib-ee/emqx_ee_schema_registry/rebar.config new file mode 100644 index 000000000..b19fb05ae --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/rebar.config @@ -0,0 +1,12 @@ +%% -*- mode: erlang -*- + +{erl_opts, [debug_info]}. +{deps, [ + {emqx, {path, "../../apps/emqx"}}, + {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}} +]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_ee_schema_registry]} +]}. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src new file mode 100644 index 000000000..c40fb808a --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -0,0 +1,15 @@ +{application, emqx_ee_schema_registry, [ + {description, "EMQX Schema Registry"}, + {vsn, "0.1.0"}, + {registered, [emqx_ee_schema_registry_sup]}, + {mod, {emqx_ee_schema_registry_app, []}}, + {applications, [ + kernel, + stdlib, + erlavro + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl new file mode 100644 index 000000000..436777e9f --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -0,0 +1,242 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry). + +-behaviour(gen_server). +-behaviour(emqx_config_handler). + +-include("emqx_ee_schema_registry.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% API +-export([ + start_link/0, + + get_serde/1, + + add_schema/2, + delete_schema/1, + list_schemas/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + terminate/2 +]). + +%% `emqx_config_handler' API +-export([post_config_update/5]). + +-type schema() :: #{ + type := serde_type(), + source := binary(), + description => binary() +}. + +%%------------------------------------------------------------------------------------------------- +%% API +%%------------------------------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec get_serde(schema_name()) -> {ok, serde_map()} | {error, not_found}. +get_serde(SchemaName) -> + case ets:lookup(?SERDE_TAB, to_bin(SchemaName)) of + [] -> + {error, not_found}; + [Serde] -> + {ok, serde_to_map(Serde)} + end. + +-spec add_schema(schema_name(), schema()) -> ok | {error, term()}. +add_schema(Name, Schema) -> + RawSchema = emqx_map_lib:binary_key_map(Schema), + Res = emqx_conf:update( + [?CONF_KEY_ROOT, schemas, Name], + RawSchema, + #{override_to => cluster} + ), + case Res of + {ok, _} -> + ok; + Error -> + Error + end. + +-spec delete_schema(schema_name()) -> ok | {error, term()}. +delete_schema(Name) -> + Res = emqx_conf:remove( + [?CONF_KEY_ROOT, schemas, Name], + #{override_to => cluster} + ), + case Res of + {ok, _} -> + ok; + Error -> + Error + end. + +-spec list_schemas() -> #{schema_name() => schema()}. +list_schemas() -> + emqx_config:get([?CONF_KEY_ROOT, schemas], #{}). + +%%------------------------------------------------------------------------------------------------- +%% `emqx_config_handler' API +%%------------------------------------------------------------------------------------------------- + +post_config_update( + [?CONF_KEY_ROOT, schemas] = _Path, + _Cmd, + NewConf = #{schemas := NewSchemas}, + OldConf = #{}, + _AppEnvs +) -> + OldSchemas = maps:get(schemas, OldConf, #{}), + #{ + added := Added, + changed := Changed0, + removed := Removed + } = emqx_map_lib:diff_maps(NewSchemas, OldSchemas), + Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0), + RemovedNames = maps:keys(Removed), + case RemovedNames of + [] -> + ok; + _ -> + async_delete_serdes(RemovedNames) + end, + SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), + case build_serdes(SchemasToBuild) of + ok -> + {ok, NewConf}; + {error, Reason, SerdesToRollback} -> + lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), + {error, Reason} + end; +post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) -> + {ok, NewConf}. + +%%------------------------------------------------------------------------------------------------- +%% `gen_server' API +%%------------------------------------------------------------------------------------------------- + +init(_) -> + process_flag(trap_exit, true), + create_tables(), + Schemas = emqx_conf:get([?CONF_KEY_ROOT, schemas], #{}), + async_build_serdes(Schemas), + State = #{}, + {ok, State}. + +handle_call(_Call, _From, State) -> + {reply, {error, unknown_call}, State}. + +handle_cast({delete_serdes, Names}, State) -> + lists:foreach(fun ensure_serde_absent/1, Names), + ?tp(schema_registry_serdes_deleted, #{}), + {noreply, State}; +handle_cast({build_serdes, Schemas}, State) -> + do_build_serdes(Schemas), + {noreply, State}; +handle_cast(_Cast, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +%%------------------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------------------- + +create_tables() -> + ok = mria:create_table(?SERDE_TAB, [ + {type, ordered_set}, + {rlog_shard, ?SCHEMA_REGISTRY_SHARD}, + {storage, ram_copies}, + {record_name, serde}, + {attributes, record_info(fields, serde)} + ]), + ok = mria:wait_for_tables([?SERDE_TAB]), + ok. + +do_build_serdes(Schemas) -> + %% TODO: use some kind of mutex to make each core build a + %% different serde to avoid duplicate work. Maybe ekka_locker? + maps:foreach(fun do_build_serde/2, Schemas), + ?tp(schema_registry_serdes_built, #{}). + +build_serdes(Serdes) -> + build_serdes(Serdes, []). + +build_serdes([{Name, Params} | Rest], Acc0) -> + Acc = [Name | Acc0], + case do_build_serde(Name, Params) of + ok -> + build_serdes(Rest, Acc); + {error, Error} -> + {error, Error, Acc} + end; +build_serdes([], _Acc) -> + ok. + +do_build_serde(Name0, #{type := Type, source := Source}) -> + try + Name = to_bin(Name0), + {Serializer, Deserializer, Destructor} = + emqx_ee_schema_registry_serde:make_serde(Type, Name, Source), + Serde = #serde{ + name = Name, + serializer = Serializer, + deserializer = Deserializer, + destructor = Destructor + }, + ok = mria:dirty_write(?SERDE_TAB, Serde), + ok + catch + Kind:Error:Stacktrace -> + ?SLOG( + error, + #{ + msg => "error_building_serde", + name => Name0, + type => Type, + kind => Kind, + error => Error, + stacktrace => Stacktrace + } + ), + {error, Error} + end. + +ensure_serde_absent(Name) -> + case get_serde(Name) of + {ok, #{destructor := Destructor}} -> + Destructor(), + ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name)); + {error, not_found} -> + ok + end. + +async_build_serdes(Schemas) -> + gen_server:cast(?MODULE, {build_serdes, Schemas}). + +async_delete_serdes(Names) -> + gen_server:cast(?MODULE, {delete_serdes, Names}). + +to_bin(A) when is_atom(A) -> atom_to_binary(A); +to_bin(B) when is_binary(B) -> B. + +-spec serde_to_map(serde()) -> serde_map(). +serde_to_map(#serde{} = Serde) -> + #{ + name => Serde#serde.name, + serializer => Serde#serde.serializer, + deserializer => Serde#serde.deserializer, + destructor => Serde#serde.destructor + }. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl new file mode 100644 index 000000000..e82ed95bd --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -0,0 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_app). + +-behaviour(application). + +-include("emqx_ee_schema_registry.hrl"). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), + emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry), + emqx_ee_schema_registry_sup:start_link(). + +stop(_State) -> + emqx_conf:remove_handler(?CONF_KEY_PATH), + ok. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl new file mode 100644 index 000000000..fca66a0b1 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_http_api.erl @@ -0,0 +1,251 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_http_api). + +-behaviour(minirest_api). + +-include("emqx_ee_schema_registry.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_api_lib.hrl"). + +-export([ + namespace/0, + api_spec/0, + paths/0, + schema/1 +]). + +-export([ + '/schema_registry'/2, + '/schema_registry/:name'/2 +]). + +%%------------------------------------------------------------------------------------------------- +%% `minirest' and `minirest_trails' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> "schema_registry_http_api". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +paths() -> + [ + "/schema_registry", + "/schema_registry/:name" + ]. + +schema("/schema_registry") -> + #{ + 'operationId' => '/schema_registry', + get => #{ + tags => [<<"schema_registry">>], + summary => <<"List registered schemas">>, + description => ?DESC("desc_schema_registry_api_list"), + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + hoconsc:array(emqx_ee_schema_registry_schema:api_schema("get")), + #{ + sample => + #{value => sample_list_schemas_response()} + } + ) + } + }, + post => #{ + tags => [<<"schema_registry">>], + summary => <<"Register a new schema">>, + description => ?DESC("desc_schema_registry_api_post"), + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_ee_schema_registry_schema:api_schema("post"), + post_examples() + ), + responses => + #{ + 201 => + emqx_dashboard_swagger:schema_with_examples( + emqx_ee_schema_registry_schema:api_schema("post"), + post_examples() + ), + 400 => error_schema('ALREADY_EXISTS', "Schema already exists") + } + } + }; +schema("/schema_registry/:name") -> + #{ + 'operationId' => '/schema_registry/:name', + get => #{ + tags => [<<"schema_registry">>], + summary => <<"Get registered schema">>, + description => ?DESC("desc_schema_registry_api_get"), + parameters => [param_path_schema_name()], + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + emqx_ee_schema_registry_schema:api_schema("get"), + get_examples() + ), + 404 => error_schema('NOT_FOUND', "Schema not found") + } + }, + put => #{ + tags => [<<"schema_registry">>], + summary => <<"Update a schema">>, + description => ?DESC("desc_schema_registry_api_put"), + parameters => [param_path_schema_name()], + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_ee_schema_registry_schema:api_schema("put"), + post_examples() + ), + responses => + #{ + 200 => + emqx_dashboard_swagger:schema_with_examples( + emqx_ee_schema_registry_schema:api_schema("put"), + put_examples() + ), + 404 => error_schema('NOT_FOUND', "Schema not found") + } + }, + delete => #{ + tags => [<<"schema_registry">>], + summary => <<"Delete registered schema">>, + description => ?DESC("desc_schema_registry_api_delete"), + parameters => [param_path_schema_name()], + responses => + #{ + 204 => <<"Schema deleted">>, + 404 => error_schema('NOT_FOUND', "Schema not found") + } + } + }. + +%%------------------------------------------------------------------------------------------------- +%% API +%%------------------------------------------------------------------------------------------------- + +'/schema_registry'(get, _Params) -> + Schemas = emqx_ee_schema_registry:list_schemas(), + Response = + lists:map( + fun({Name, Params}) -> + Params#{name => Name} + end, + maps:to_list(Schemas) + ), + ?OK(Response); +'/schema_registry'(post, #{body := Params0 = #{<<"name">> := Name}}) -> + Params = maps:without([<<"name">>], Params0), + case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of + undefined -> + case emqx_ee_schema_registry:add_schema(Name, Params) of + ok -> + Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]), + {201, Res#{name => Name}}; + {error, Error} -> + ?BAD_REQUEST(Error) + end; + _ -> + ?BAD_REQUEST('ALREADY_EXISTS', <<"Schema already exists">>) + end. + +'/schema_registry/:name'(get, #{bindings := #{name := Name}}) -> + case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of + undefined -> + ?NOT_FOUND(<<"Schema not found">>); + Res -> + ?OK(Res#{name => Name}) + end; +'/schema_registry/:name'(put, #{bindings := #{name := Name}, body := Params}) -> + case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of + undefined -> + ?NOT_FOUND(<<"Schema not found">>); + _ -> + case emqx_ee_schema_registry:add_schema(Name, Params) of + ok -> + Res = emqx_config:get([?CONF_KEY_ROOT, schemas, Name]), + ?OK(Res#{name => Name}); + {error, Error} -> + ?BAD_REQUEST(Error) + end + end; +'/schema_registry/:name'(delete, #{bindings := #{name := Name}}) -> + case emqx_config:get([?CONF_KEY_ROOT, schemas, Name], undefined) of + undefined -> + ?NOT_FOUND(<<"Schema not found">>); + _ -> + case emqx_ee_schema_registry:delete_schema(Name) of + ok -> + ?NO_CONTENT; + {error, Error} -> + ?BAD_REQUEST(Error) + end + end. + +%%------------------------------------------------------------------------------------------------- +%% Examples +%%------------------------------------------------------------------------------------------------- + +sample_list_schemas_response() -> + [sample_get_schema_response(avro)]. + +sample_get_schema_response(avro) -> + #{ + type => <<"avro">>, + name => <<"my_avro_schema">>, + description => <<"My Avro Schema">>, + source => << + "{\"type\":\"record\"," + "\"fields\":[{\"type\":\"int\",\"name\":\"i\"}," + "{\"type\":\"string\",\"name\":\"s\"}]}" + >> + }. + +put_examples() -> + post_examples(). + +post_examples() -> + get_examples(). + +get_examples() -> + #{ + <<"avro_schema">> => + #{ + summary => <<"Avro">>, + value => sample_get_schema_response(avro) + } + }. + +%%------------------------------------------------------------------------------------------------- +%% Schemas and hocon types +%%------------------------------------------------------------------------------------------------- + +param_path_schema_name() -> + {name, + mk( + binary(), + #{ + in => path, + required => true, + example => <<"my_schema">>, + desc => ?DESC("desc_param_path_schema_name") + } + )}. + +%%------------------------------------------------------------------------------------------------- +%% Internal fns +%%------------------------------------------------------------------------------------------------- + +mk(Type, Meta) -> hoconsc:mk(Type, Meta). + +error_schema(Code, Message) when is_atom(Code) -> + error_schema([Code], Message); +error_schema(Codes, Message) when is_list(Message) -> + error_schema(Codes, list_to_binary(Message)); +error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) -> + emqx_dashboard_swagger:error_codes(Codes, Message). diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl new file mode 100644 index 000000000..01177345a --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl @@ -0,0 +1,127 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_schema_registry_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_ee_schema_registry.hrl"). + +%% `hocon_schema' API +-export([ + roots/0, + fields/1, + desc/1, + tags/0, + union_member_selector/1 +]). + +%% `minirest_trails' API +-export([ + api_schema/1 +]). + +%%------------------------------------------------------------------------------ +%% `hocon_schema' APIs +%%------------------------------------------------------------------------------ + +roots() -> + [{?CONF_KEY_ROOT, mk(ref(?CONF_KEY_ROOT), #{required => false})}]. + +tags() -> + [<<"Schema Registry">>]. + +fields(?CONF_KEY_ROOT) -> + [ + {schemas, + mk( + hoconsc:map( + name, + hoconsc:union(fun union_member_selector/1) + ), + #{ + default => #{}, + desc => ?DESC("schema_registry_schemas") + } + )} + ]; +fields(avro) -> + [ + {type, mk(hoconsc:enum([avro]), #{required => true, desc => ?DESC("schema_type")})}, + {source, + mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})}, + {description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})} + ]; +fields("get_avro") -> + [{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)]; +fields("put_avro") -> + fields(avro); +fields("post_" ++ Type) -> + fields("get_" ++ Type). + +desc(?CONF_KEY_ROOT) -> + ?DESC("schema_registry_root"); +desc(avro) -> + ?DESC("avro_type"); +desc(_) -> + undefined. + +union_member_selector(all_union_members) -> + refs(); +union_member_selector({value, V}) -> + refs(V). + +union_member_selector_get_api(all_union_members) -> + refs_get_api(); +union_member_selector_get_api({value, V}) -> + refs_get_api(V). + +%%------------------------------------------------------------------------------ +%% `minirest_trails' "APIs" +%%------------------------------------------------------------------------------ + +api_schema("get") -> + hoconsc:union(fun union_member_selector_get_api/1); +api_schema("post") -> + api_schema("get"); +api_schema("put") -> + hoconsc:union(fun union_member_selector/1). + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +mk(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Name) -> hoconsc:ref(?MODULE, Name). + +supported_serde_types() -> + [avro]. + +refs() -> + [ref(Type) || Type <- supported_serde_types()]. + +refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> + refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); +refs(#{<<"type">> := <<"avro">>}) -> + [ref(avro)]; +refs(_) -> + Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), + throw(#{ + field_name => type, + expected => Expected + }). + +refs_get_api() -> + [ref("get_avro")]. + +refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) -> + refs(Value#{<<"type">> := atom_to_binary(TypeAtom)}); +refs_get_api(#{<<"type">> := <<"avro">>}) -> + [ref("get_avro")]; +refs_get_api(_) -> + Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]), + throw(#{ + field_name => type, + expected => Expected + }). diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl new file mode 100644 index 000000000..43145fb16 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl @@ -0,0 +1,70 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_serde). + +-include("emqx_ee_schema_registry.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% API +-export([ + decode/2, + decode/3, + encode/2, + encode/3, + make_serde/3 +]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec decode(schema_name(), encoded_data()) -> decoded_data(). +decode(SerdeName, RawData) -> + decode(SerdeName, RawData, []). + +-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data(). +decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) -> + case emqx_ee_schema_registry:get_serde(SerdeName) of + {error, not_found} -> + error({serde_not_found, SerdeName}); + {ok, #{deserializer := Deserializer}} -> + apply(Deserializer, [RawData | VarArgs]) + end. + +-spec encode(schema_name(), decoded_data()) -> encoded_data(). +encode(SerdeName, RawData) -> + encode(SerdeName, RawData, []). + +-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data(). +encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) -> + case emqx_ee_schema_registry:get_serde(SerdeName) of + {error, not_found} -> + error({serde_not_found, SerdeName}); + {ok, #{serializer := Serializer}} -> + apply(Serializer, [EncodedData | VarArgs]) + end. + +-spec make_serde(serde_type(), schema_name(), schema_source()) -> + {serializer(), deserializer(), destructor()}. +make_serde(avro, Name, Source0) -> + Source = inject_avro_name(Name, Source0), + Serializer = avro:make_simple_encoder(Source, _Opts = []), + Deserializer = avro:make_simple_decoder(Source, [{map_type, map}, {record_type, map}]), + Destructor = fun() -> + ?tp(serde_destroyed, #{type => avro, name => Name}), + ok + end, + {Serializer, Deserializer, Destructor}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +-spec inject_avro_name(schema_name(), schema_source()) -> schema_source(). +inject_avro_name(Name, Source0) -> + %% The schema checks that the source is a valid JSON when + %% typechecking, so we shouldn't need to validate here. + Schema0 = emqx_json:decode(Source0, [return_maps]), + Schema = Schema0#{<<"name">> => Name}, + emqx_json:encode(Schema). diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_sup.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_sup.erl new file mode 100644 index 000000000..0dfc601d3 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_sup.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 100 + }, + ChildSpecs = [child_spec(emqx_ee_schema_registry)], + {ok, {SupFlags, ChildSpecs}}. + +child_spec(Mod) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5_000, + type => worker, + modules => [Mod] + }. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl new file mode 100644 index 000000000..9b2f64c03 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -0,0 +1,433 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-include("emqx_ee_schema_registry.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [{group, avro}]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [{avro, TCs}]. + +init_per_suite(Config) -> + emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), + emqx_mgmt_api_test_util:init_suite(?APPS), + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), + ok. + +init_per_group(avro, Config) -> + [{serde_type, avro} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + emqx_common_test_helpers:call_janitor(), + clear_schemas(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +trace_rule(Data, Envs, _Args) -> + Now = erlang:monotonic_time(), + ets:insert(recorded_actions, {Now, #{data => Data, envs => Envs}}), + TestPid = persistent_term:get({?MODULE, test_pid}), + TestPid ! {action, #{data => Data, envs => Envs}}, + ok. + +make_trace_fn_action() -> + persistent_term:put({?MODULE, test_pid}, self()), + Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>, + emqx_tables:new(recorded_actions, [named_table, public, ordered_set]), + #{function => Fn, args => #{}}. + +create_rule_http(RuleParams) -> + RepublishTopic = <<"republish/schema_registry">>, + emqx:subscribe(RepublishTopic), + DefaultParams = #{ + enable => true, + actions => [ + make_trace_fn_action(), + #{ + <<"function">> => <<"republish">>, + <<"args">> => + #{ + <<"topic">> => RepublishTopic, + <<"payload">> => <<>>, + <<"qos">> => 0, + <<"retain">> => false, + <<"user_properties">> => <<>> + } + } + ] + }, + Params = maps:merge(DefaultParams, RuleParams), + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +schema_params(avro) -> + Source = #{ + type => record, + fields => [ + #{name => <<"i">>, type => <<"int">>}, + #{name => <<"s">>, type => <<"string">>} + ] + }, + SourceBin = emqx_json:encode(Source), + #{type => avro, source => SourceBin}. + +create_serde(SerdeType, SerdeName) -> + Schema = schema_params(SerdeType), + ok = emqx_ee_schema_registry:add_schema(SerdeName, Schema), + ok. + +sql_for(avro, encode_decode1) -> + << + "select\n" + " schema_decode('my_serde',\n" + " schema_encode('my_serde', json_decode(payload))) as decoded,\n" + " decoded.i as decoded_int,\n" + " decoded.s as decoded_string\n" + " from t" + >>; +sql_for(avro, encode1) -> + << + "select\n" + " schema_encode('my_serde', json_decode(payload)) as encoded\n" + " from t" + >>; +sql_for(avro, decode1) -> + << + "select\n" + " schema_decode('my_serde', payload) as decoded\n" + " from t" + >>; +sql_for(Type, Name) -> + ct:fail("unimplemented: ~p", [{Type, Name}]). + +clear_schemas() -> + maps:foreach( + fun(Name, _Schema) -> + ok = emqx_ee_schema_registry:delete_schema(Name) + end, + emqx_ee_schema_registry:list_schemas() + ). + +receive_action_results() -> + receive + {action, #{data := _} = Res} -> + Res + after 1_000 -> + ct:fail("action didn't run") + end. + +receive_published(Line) -> + receive + {deliver, _Topic, Msg} -> + MsgMap = emqx_message:to_map(Msg), + maps:update_with( + payload, + fun(Raw) -> + case emqx_json:safe_decode(Raw, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> Raw + end + end, + MsgMap + ) + after 1_000 -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("publish not received, line ~b", [Line]) + end. + +cluster(Config) -> + PrivDataDir = ?config(priv_dir, Config), + PeerModule = + case os:getenv("IS_CI") of + false -> + slave; + _ -> + ct_slave + end, + Cluster = emqx_common_test_helpers:emqx_cluster( + [core, core], + [ + {apps, ?APPS}, + {listener_ports, []}, + {peer_mod, PeerModule}, + {priv_data_dir, PrivDataDir}, + {load_schema, true}, + {start_autocluster, true}, + {schema_mod, emqx_ee_conf_schema}, + %% need to restart schema registry app in the tests so + %% that it re-registers the config handler that is lost + %% when emqx_conf restarts during join. + {env, [{emqx_machine, applications, [emqx_ee_schema_registry]}]}, + {load_apps, [emqx_machine | ?APPS]}, + {env_handler, fun + (emqx) -> + application:set_env(emqx, boot_modules, [broker, router]), + ok; + (emqx_conf) -> + ok; + (_) -> + ok + end} + ] + ), + ct:pal("cluster:\n ~p", [Cluster]), + Cluster. + +start_cluster(Cluster) -> + Nodes = [ + emqx_common_test_helpers:start_slave(Name, Opts) + || {Name, Opts} <- Cluster + ], + on_exit(fun() -> + emqx_misc:pmap( + fun(N) -> + ct:pal("stopping ~p", [N]), + ok = emqx_common_test_helpers:stop_slave(N) + end, + Nodes + ) + end), + erpc:multicall(Nodes, mria_rlog, wait_for_shards, [[?SCHEMA_REGISTRY_SHARD], 30_000]), + Nodes. + +wait_for_cluster_rpc(Node) -> + %% need to wait until the config handler is ready after + %% restarting during the cluster join. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler])) + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_unknown_calls(_Config) -> + Ref = monitor(process, emqx_ee_schema_registry), + %% for coverage + emqx_ee_schema_registry ! unknown, + gen_server:cast(emqx_ee_schema_registry, unknown), + ?assertEqual({error, unknown_call}, gen_server:call(emqx_ee_schema_registry, unknown)), + receive + {'DOWN', Ref, process, _, _} -> + ct:fail("registry shouldn't have died") + after 500 -> + ok + end. + +t_encode_decode(Config) -> + SerdeType = ?config(serde_type, Config), + SerdeName = my_serde, + ok = create_serde(SerdeType, SerdeName), + {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode_decode1)}), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + PayloadBin = emqx_json:encode(Payload), + emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), + Res = receive_action_results(), + ?assertMatch( + #{ + data := + #{ + <<"decoded">> := + #{ + <<"i">> := 10, + <<"s">> := <<"text">> + }, + <<"decoded_int">> := 10, + <<"decoded_string">> := <<"text">> + } + }, + Res + ), + ok. + +t_delete_serde(Config) -> + SerdeType = ?config(serde_type, Config), + SerdeName = my_serde, + ?check_trace( + begin + ok = create_serde(SerdeType, SerdeName), + {ok, {ok, _}} = + ?wait_async_action( + emqx_ee_schema_registry:delete_schema(SerdeName), + #{?snk_kind := schema_registry_serdes_deleted}, + 1_000 + ), + ok + end, + fun(Trace) -> + ?assertMatch([_], ?of_kind(schema_registry_serdes_deleted, Trace)), + ?assertMatch([#{type := SerdeType}], ?of_kind(serde_destroyed, Trace)), + ok + end + ), + ok. + +t_encode(Config) -> + SerdeType = ?config(serde_type, Config), + SerdeName = my_serde, + ok = create_serde(SerdeType, SerdeName), + {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, encode1)}), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + PayloadBin = emqx_json:encode(Payload), + emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), + Published = receive_published(?LINE), + ?assertMatch( + #{payload := #{<<"encoded">> := _}}, + Published + ), + #{payload := #{<<"encoded">> := Encoded}} = Published, + {ok, #{deserializer := Deserializer}} = emqx_ee_schema_registry:get_serde(SerdeName), + ?assertEqual(Payload, Deserializer(Encoded)), + ok. + +t_decode(Config) -> + SerdeType = ?config(serde_type, Config), + SerdeName = my_serde, + ok = create_serde(SerdeType, SerdeName), + {ok, #{<<"id">> := RuleId}} = create_rule_http(#{sql => sql_for(SerdeType, decode1)}), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + {ok, #{serializer := Serializer}} = emqx_ee_schema_registry:get_serde(SerdeName), + EncodedBin = Serializer(Payload), + emqx:publish(emqx_message:make(<<"t">>, EncodedBin)), + Published = receive_published(?LINE), + ?assertMatch( + #{payload := #{<<"decoded">> := _}}, + Published + ), + #{payload := #{<<"decoded">> := Decoded}} = Published, + ?assertEqual(Payload, Decoded), + ok. + +t_fail_rollback(Config) -> + SerdeType = ?config(serde_type, Config), + OkSchema = emqx_map_lib:binary_key_map(schema_params(SerdeType)), + BrokenSchema = OkSchema#{<<"source">> := <<"{}">>}, + %% hopefully, for this small map, the key order is used. + Serdes = #{ + <<"a">> => OkSchema, + <<"z">> => BrokenSchema + }, + ?assertMatch( + {error, _}, + emqx_conf:update( + [?CONF_KEY_ROOT, schemas], + Serdes, + #{} + ) + ), + %% no serdes should be in the table + ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"a">>)), + ?assertEqual({error, not_found}, emqx_ee_schema_registry:get_serde(<<"z">>)), + ok. + +t_cluster_serde_build(Config) -> + SerdeType = ?config(serde_type, Config), + Cluster = cluster(Config), + SerdeName = my_serde, + Schema = schema_params(SerdeType), + ?check_trace( + begin + Nodes = [N1, N2 | _] = start_cluster(Cluster), + NumNodes = length(Nodes), + wait_for_cluster_rpc(N2), + ?assertEqual( + ok, + erpc:call(N2, emqx_ee_schema_registry, add_schema, [SerdeName, Schema]) + ), + %% check that we can serialize/deserialize in all nodes + lists:foreach( + fun(N) -> + erpc:call(N, fun() -> + Res0 = emqx_ee_schema_registry:get_serde(SerdeName), + ?assertMatch({ok, #{}}, Res0, #{node => N}), + {ok, #{serializer := Serializer, deserializer := Deserializer}} = Res0, + Payload = #{<<"i">> => 10, <<"s">> => <<"text">>}, + ?assertEqual(Payload, Deserializer(Serializer(Payload)), #{node => N}), + ok + end) + end, + Nodes + ), + %% now we delete and check it's removed from the table + ?tp(will_delete_schema, #{}), + {ok, SRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := schema_registry_serdes_deleted}), + NumNodes, + 5_000 + ), + ?assertEqual( + ok, + erpc:call(N1, emqx_ee_schema_registry, delete_schema, [SerdeName]) + ), + {ok, _} = snabbkaffe:receive_events(SRef1), + lists:foreach( + fun(N) -> + erpc:call(N, fun() -> + ?assertMatch( + {error, not_found}, + emqx_ee_schema_registry:get_serde(SerdeName), + #{node => N} + ), + ok + end) + end, + Nodes + ), + ok + end, + fun(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := will_delete_schema}, + #{?snk_kind := serde_destroyed, type := SerdeType}, + Trace + ) + ), + ok + end + ), + ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl new file mode 100644 index 000000000..bbb6d5ef0 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl @@ -0,0 +1,250 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_http_api_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(APPS, [emqx_conf, emqx_ee_schema_registry]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), + emqx_mgmt_api_test_util:init_suite(?APPS), + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), + ok. + +init_per_testcase(_TestCase, Config) -> + clear_schemas(), + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TestCase, _Config) -> + clear_schemas(), + ok = snabbkaffe:stop(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +request(get) -> + do_request(get, _Parts = [], _Body = []); +request({get, Name}) -> + do_request(get, _Parts = [Name], _Body = []); +request({delete, Name}) -> + do_request(delete, _Parts = [Name], _Body = []); +request({put, Name, Params}) -> + do_request(put, _Parts = [Name], Params); +request({post, Params}) -> + do_request(post, _Parts = [], Params). + +do_request(Method, PathParts, Body) -> + Header = emqx_common_test_http:default_auth_header(), + URI = uri(["schema_registry" | PathParts]), + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + Res0 = emqx_mgmt_api_test_util:request_api(Method, URI, [], Header, Body, Opts), + case Res0 of + {ok, Code, <<>>} -> + {ok, Code, <<>>}; + {ok, Code, Res1} -> + Res2 = emqx_json:decode(Res1, [return_maps]), + Res3 = try_decode_error_message(Res2), + {ok, Code, Res3}; + Error -> + Error + end. + +try_decode_error_message(#{<<"message">> := Msg0} = Res0) -> + case emqx_json:safe_decode(Msg0, [return_maps]) of + {ok, Msg} -> + Res0#{<<"message">> := Msg}; + {error, _} -> + Res0 + end; +try_decode_error_message(Res) -> + Res. + +clear_schemas() -> + maps:foreach( + fun(Name, _Schema) -> + ok = emqx_ee_schema_registry:delete_schema(Name) + end, + emqx_ee_schema_registry:list_schemas() + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_crud(_Config) -> + SchemaName = <<"my_avro_schema">>, + Source = #{ + type => record, + fields => [ + #{name => <<"i">>, type => <<"int">>}, + #{name => <<"s">>, type => <<"string">>} + ] + }, + SourceBin = emqx_json:encode(Source), + Params = #{ + <<"type">> => <<"avro">>, + <<"source">> => SourceBin, + <<"name">> => SchemaName, + <<"description">> => <<"My schema">> + }, + UpdateParams = maps:without([<<"name">>], Params), + + %% no schemas at first + ?assertMatch({ok, 200, []}, request(get)), + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Schema not found">> + }}, + request({get, SchemaName}) + ), + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Schema not found">> + }}, + request({put, SchemaName, UpdateParams}) + ), + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Schema not found">> + }}, + request({delete, SchemaName}) + ), + + %% create a schema + ?assertMatch( + {ok, 201, #{ + <<"type">> := <<"avro">>, + <<"source">> := SourceBin, + <<"name">> := SchemaName, + <<"description">> := <<"My schema">> + }}, + request({post, Params}) + ), + ?assertMatch( + {ok, 200, #{ + <<"type">> := <<"avro">>, + <<"source">> := SourceBin, + <<"name">> := SchemaName, + <<"description">> := <<"My schema">> + }}, + request({get, SchemaName}) + ), + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := <<"avro">>, + <<"source">> := SourceBin, + <<"name">> := SchemaName, + <<"description">> := <<"My schema">> + } + ]}, + request(get) + ), + UpdateParams1 = UpdateParams#{<<"description">> := <<"My new schema">>}, + ?assertMatch( + {ok, 200, #{ + <<"type">> := <<"avro">>, + <<"source">> := SourceBin, + <<"name">> := SchemaName, + <<"description">> := <<"My new schema">> + }}, + request({put, SchemaName, UpdateParams1}) + ), + + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"ALREADY_EXISTS">>, + <<"message">> := <<"Schema already exists">> + }}, + request({post, Params}) + ), + %% typechecks, but is invalid + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + <<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">> + }}, + request({put, SchemaName, UpdateParams#{<<"source">> := <<"{}">>}}) + ), + + ?assertMatch( + {ok, 204, <<>>}, + request({delete, SchemaName}) + ), + + %% doesn't typecheck + lists:foreach( + fun(Field) -> + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := #{<<"reason">> := <<"required_field">>} + }}, + request({post, maps:without([Field], Params)}), + #{field => Field} + ) + end, + [<<"name">>, <<"source">>] + ), + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + #{ + <<"expected">> := [_ | _], + <<"field_name">> := <<"type">> + } + }}, + request({post, maps:without([<<"type">>], Params)}), + #{field => <<"type">>} + ), + %% typechecks, but is invalid + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + <<"{post_config_update,emqx_ee_schema_registry,{not_found,<<\"type\">>}}">> + }}, + request({post, Params#{<<"source">> := <<"{}">>}}) + ), + + %% unknown serde type + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := + #{ + <<"expected">> := [_ | _], + <<"field_name">> := <<"type">> + } + }}, + request({post, Params#{<<"type">> := <<"foo">>}}) + ), + + ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl new file mode 100644 index 000000000..be62717d3 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_serde_SUITE.erl @@ -0,0 +1,121 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_schema_registry_serde_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-include("emqx_ee_schema_registry.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(APPS, [emqx_conf, emqx_rule_engine, emqx_ee_schema_registry]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), + emqx_mgmt_api_test_util:init_suite(?APPS), + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(lists:reverse(?APPS)), + ok. +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), + clear_schemas(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +clear_schemas() -> + maps:foreach( + fun(Name, _Schema) -> + ok = emqx_ee_schema_registry:delete_schema(Name) + end, + emqx_ee_schema_registry:list_schemas() + ). + +schema_params(avro) -> + Source = #{ + type => record, + fields => [ + #{name => <<"i">>, type => <<"int">>}, + #{name => <<"s">>, type => <<"string">>} + ] + }, + SourceBin = emqx_json:encode(Source), + #{type => avro, source => SourceBin}. + +assert_roundtrip(SerdeName, Original) -> + Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original), + Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded), + ?assertEqual(Original, Decoded, #{original => Original}). + +assert_roundtrip(SerdeName, Original, ArgsSerialize, ArgsDeserialize) -> + Encoded = emqx_ee_schema_registry_serde:encode(SerdeName, Original, ArgsSerialize), + Decoded = emqx_ee_schema_registry_serde:decode(SerdeName, Encoded, ArgsDeserialize), + ?assertEqual(Original, Decoded, #{original => Original}). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_roundtrip_avro(_Config) -> + SerdeName = my_serde, + Params = schema_params(avro), + ok = emqx_ee_schema_registry:add_schema(SerdeName, Params), + Original = #{<<"i">> => 10, <<"s">> => <<"hi">>}, + %% for coverage + assert_roundtrip(SerdeName, Original, _ArgsSerialize = [], _ArgsDeserialize = []), + assert_roundtrip(SerdeName, Original), + ok. + +t_avro_invalid_json_schema(_Config) -> + SerdeName = my_serde, + Params = schema_params(avro), + WrongParams = Params#{source := <<"{">>}, + ?assertMatch( + {error, #{reason := #{expected_type := _}}}, + emqx_ee_schema_registry:add_schema(SerdeName, WrongParams) + ), + ok. + +t_avro_invalid_schema(_Config) -> + SerdeName = my_serde, + Params = schema_params(avro), + WrongParams = Params#{source := <<"{}">>}, + ?assertMatch( + {error, {post_config_update, _, {not_found, <<"type">>}}}, + emqx_ee_schema_registry:add_schema(SerdeName, WrongParams) + ), + ok. + +t_serde_not_found(_Config) -> + %% for coverage + NonexistentSerde = <<"nonexistent">>, + Original = #{}, + ?assertError( + {serde_not_found, NonexistentSerde}, + emqx_ee_schema_registry_serde:encode(NonexistentSerde, Original) + ), + ?assertError( + {serde_not_found, NonexistentSerde}, + emqx_ee_schema_registry_serde:decode(NonexistentSerde, Original) + ), + ok. diff --git a/mix.exs b/mix.exs index 600218e52..712e1f4e2 100644 --- a/mix.exs +++ b/mix.exs @@ -83,6 +83,8 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqx and observer_cli {:recon, github: "ferd/recon", tag: "2.5.1", override: true}, {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}, + # in conflict by erlavro and rocketmq + {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true}, # dependencies of dependencies; we choose specific refs to match # what rebar3 chooses. # in conflict by gun and emqtt @@ -307,7 +309,8 @@ defmodule EMQXUmbrella.MixProject do emqx_license: :permanent, emqx_ee_conf: :load, emqx_ee_connector: :permanent, - emqx_ee_bridge: :permanent + emqx_ee_bridge: :permanent, + emqx_ee_schema_registry: :permanent ], else: [] ) diff --git a/rebar.config b/rebar.config index 50a8124be..8ad8133fd 100644 --- a/rebar.config +++ b/rebar.config @@ -81,6 +81,8 @@ , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {telemetry, "1.1.0"} , {hackney, {git, "https://github.com/emqx/hackney.git", {tag, "1.18.1-1"}}} + %% in conflict by erlavro and rocketmq + , {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}} ]}. {xref_ignores, diff --git a/rebar.config.erl b/rebar.config.erl index 98cd30570..e00fe730d 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -422,7 +422,8 @@ relx_apps_per_edition(ee) -> emqx_license, {emqx_ee_conf, load}, emqx_ee_connector, - emqx_ee_bridge + emqx_ee_bridge, + emqx_ee_schema_registry ]; relx_apps_per_edition(ce) -> []. diff --git a/rel/i18n/emqx_bridge_api.hocon b/rel/i18n/emqx_bridge_api.hocon index f5d372128..66960619a 100644 --- a/rel/i18n/emqx_bridge_api.hocon +++ b/rel/i18n/emqx_bridge_api.hocon @@ -57,7 +57,7 @@ emqx_bridge_api { desc_api1 { desc { en: """List all created bridges""" - zh: """列出所有 Birdge""" + zh: """列出所有 Bridge""" } label: { en: "List All Bridges" diff --git a/rel/i18n/emqx_ee_schema_registry_http_api.hocon b/rel/i18n/emqx_ee_schema_registry_http_api.hocon new file mode 100644 index 000000000..058796a66 --- /dev/null +++ b/rel/i18n/emqx_ee_schema_registry_http_api.hocon @@ -0,0 +1,69 @@ +emqx_ee_schema_registry_http_api { + # apis + desc_schema_registry_api_list { + desc { + en: "List all registered schemas" + zh: "列出所有注册的模式" + } + label { + en: "List schemas" + zh: "列表模式" + } + } + + desc_schema_registry_api_get { + desc { + en: "Get a schema by its name" + zh: "通过名称获取模式" + } + label { + en: "Get schema" + zh: "获取模式" + } + } + + desc_schema_registry_api_post { + desc { + en: "Register a new schema" + zh: "注册一个新的模式" + } + label { + en: "Register schema" + zh: "注册模式" + } + } + + desc_schema_registry_api_put { + desc { + en: "Update an existing schema" + zh: "更新一个现有的模式" + } + label { + en: "Update schema" + zh: "更新模式" + } + } + + desc_schema_registry_api_delete { + desc { + en: "Delete a schema" + zh: "删除一个模式" + } + label { + en: "Delete schema" + zh: "删除模式" + } + } + + # params + desc_param_path_schema_name { + desc { + en: "The schema name" + zh: "模式名称" + } + label { + en: "Schema name" + zh: "模式名称" + } + } +} diff --git a/rel/i18n/emqx_ee_schema_registry_schema.hocon b/rel/i18n/emqx_ee_schema_registry_schema.hocon new file mode 100644 index 000000000..1538fe5f9 --- /dev/null +++ b/rel/i18n/emqx_ee_schema_registry_schema.hocon @@ -0,0 +1,78 @@ +emqx_ee_schema_registry_schema { + schema_registry_root { + desc { + en: "Schema registry configurations." + zh: "模式注册表的配置。" + } + label { + en: "Schema registry" + zh: "模式注册表" + } + } + + schema_registry_schemas { + desc { + en: "Registered schemas." + zh: "注册的模式。" + } + label { + en: "Registered schemas" + zh: "注册的模式" + } + } + + schema_name { + desc { + en: "A name for the schema that will serve as its identifier." + zh: "模式的一个名称,将作为其标识符。" + } + label { + en: "Schema name" + zh: "模式名称" + } + } + + schema_type { + desc { + en: "Schema type." + zh: "模式类型。" + } + label { + en: "Schema type" + zh: "模式类型" + } + } + + schema_source { + desc { + en: "Source text for the schema." + zh: "模式的源文本。" + } + label { + en: "Schema source" + zh: "模式来源" + } + } + + schema_description { + desc { + en: "A description for this schema." + zh: "对该模式的描述。" + } + label { + en: "Schema description" + zh: "模式描述" + } + } + + avro_type { + desc { + en: "[Apache Avro](https://avro.apache.org/) serialization format." + zh: "[阿帕奇-阿夫罗](https://avro.apache.org/) 序列化格式。" + } + label { + en: "Apache Avro" + zh: "阿帕奇-阿夫罗" + } + } +} diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 79c8b7e3a..168275e1e 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -1,6 +1,7 @@ ACL AES APIs +Avro BPAPI BSON Backplane