fix: decouple emqx_rule_engine application from emqx_ee_schema_registry

This commit decouples the emqx_rule_engine application from the
emqx_ee_schema_registry application by making it possible to register a
callback module that defines extra rule engine SQL functions instead of
calling a module in emqx_ee_schema_registry directly from the
emqx_rule_engine application.
This commit is contained in:
Kjell Winblad 2023-06-30 15:31:52 +02:00
parent 4d45de2939
commit 714363bd01
7 changed files with 95 additions and 56 deletions

View File

@ -84,6 +84,12 @@
import_config/1 import_config/1
]). ]).
%% For setting and getting extra rule engine SQL functions module
-export([
extra_functions_module/0,
set_extra_functions_module/1
]).
-define(RULE_ENGINE, ?MODULE). -define(RULE_ENGINE, ?MODULE).
-define(T_CALL, infinity). -define(T_CALL, infinity).
@ -542,3 +548,21 @@ get_egress_bridges(Actions) ->
emqx_bridge_resource:bridge_id(BridgeType, BridgeName) emqx_bridge_resource:bridge_id(BridgeType, BridgeName)
|| {bridge, BridgeType, BridgeName, _ResId} <- Actions || {bridge, BridgeType, BridgeName, _ResId} <- Actions
]. ].
%% For allowing an external application to add extra "built-in" functions to the
%% rule engine SQL like language. The module set by
%% set_extra_functions_module/1 should export a function called
%% handle_rule_function with two parameters (the first being an atom for the
%% the function name and the second a list of arguments). The function should
%% should return the result or {error, no_match_for_function} if it cannot
%% handle the function. See '$handle_undefined_function' in the emqx_rule_funcs
%% module. See also callback function declaration in emqx_rule_funcs.erl.
-spec extra_functions_module() -> module() | undefined.
extra_functions_module() ->
persistent_term:get({?MODULE, extra_functions}, undefined).
-spec set_extra_functions_module(module()) -> ok.
set_extra_functions_module(Mod) ->
persistent_term:put({?MODULE, extra_functions}, Mod),
ok.

View File

@ -19,10 +19,6 @@
-include("rule_engine.hrl"). -include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-if(?EMQX_RELEASE_EDITION == ee).
-include_lib("emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl").
-endif.
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
%% IoT Funcs %% IoT Funcs
@ -236,6 +232,10 @@
timezone_to_offset_seconds/1 timezone_to_offset_seconds/1
]). ]).
%% See extra_functions_module/0 and set_extra_functions_module/1 in the
%% emqx_rule_engine module
-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
%% MongoDB specific date functions. These functions return a date tuple. The %% MongoDB specific date functions. These functions return a date tuple. The
%% MongoDB bridge converts such date tuples to a MongoDB date type. The %% MongoDB bridge converts such date tuples to a MongoDB date type. The
%% following functions are therefore only useful for rules with at least one %% following functions are therefore only useful for rules with at least one
@ -1126,56 +1126,27 @@ timezone_to_second(TimeZone) ->
timezone_to_offset_seconds(TimeZone) -> timezone_to_offset_seconds(TimeZone) ->
emqx_calendar:offset_second(TimeZone). emqx_calendar:offset_second(TimeZone).
%% @doc This is for sql funcs that should be handled in the specific modules. '$handle_undefined_function'(sprintf, [Format | Args]) ->
%% Here the emqx_rule_funcs module acts as a proxy, forwarding erlang:apply(fun sprintf_s/2, [Format, Args]);
%% the function handling to the worker module. %% This is for functions that should be handled in another module
%% @end %% (currently this module is emqx_ee_schema_registry_serde in the case of EE but
-if(?EMQX_RELEASE_EDITION == ee). %% could be changed to another module in the future).
%% EE '$handle_undefined_function'(FunctionName, Args) ->
case emqx_rule_engine:extra_functions_module() of
undefined ->
throw_sql_function_not_supported(FunctionName, Args);
Mod ->
case Mod:handle_rule_function(FunctionName, Args) of
{error, no_match_for_function} ->
throw_sql_function_not_supported(FunctionName, Args);
Result ->
Result
end
end.
'$handle_undefined_function'(sparkplug_decode, [Data]) -> -spec throw_sql_function_not_supported(atom(), list()) -> no_return().
'$handle_undefined_function'( throw_sql_function_not_supported(FunctionName, Args) ->
schema_decode, error({sql_function_not_supported, function_literal(FunctionName, Args)}).
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>]
);
'$handle_undefined_function'(sparkplug_decode, [Data | MoreArgs]) ->
'$handle_undefined_function'(
schema_decode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
);
'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) ->
emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs);
'$handle_undefined_function'(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
'$handle_undefined_function'(sparkplug_encode, [Term]) ->
'$handle_undefined_function'(
schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>]
);
'$handle_undefined_function'(sparkplug_encode, [Term | MoreArgs]) ->
'$handle_undefined_function'(
schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
);
'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
%% encode outputs iolists, but when the rule actions process those
%% it might wrongly encode them as JSON lists, so we force them to
%% binaries here.
IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs),
iolist_to_binary(IOList);
'$handle_undefined_function'(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-else.
%% CE
'$handle_undefined_function'(sprintf, [Format | Args]) ->
erlang:apply(fun sprintf_s/2, [Format, Args]);
'$handle_undefined_function'(Fun, Args) ->
error({sql_function_not_supported, function_literal(Fun, Args)}).
-endif.
map_path(Key) -> map_path(Key) ->
{path, [{key, P} || P <- string:split(Key, ".", all)]}. {path, [{key, P} || P <- string:split(Key, ".", all)]}.

View File

@ -4,6 +4,7 @@
{deps, [ {deps, [
{emqx, {path, "../../apps/emqx"}}, {emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}}, {emqx_utils, {path, "../../apps/emqx_utils"}},
{emqx_rule_engine, {path, "../../apps/emqx_rule_engine"}},
{erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}, {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}},
{gpb, "4.19.7"} {gpb, "4.19.7"}
]}. ]}.

View File

@ -7,7 +7,8 @@
kernel, kernel,
stdlib, stdlib,
erlavro, erlavro,
gpb gpb,
emqx_rule_engine
]}, ]},
{env, []}, {env, []},
{modules, []}, {modules, []},

View File

@ -239,7 +239,7 @@ create_tables() ->
do_build_serdes(Schemas) -> do_build_serdes(Schemas) ->
%% We build a special serde for the Sparkplug B payload. This serde is used %% We build a special serde for the Sparkplug B payload. This serde is used
%% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1. %% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1.
maybe_build_sparkplug_b_serde(), ok = maybe_build_sparkplug_b_serde(),
%% TODO: use some kind of mutex to make each core build a %% TODO: use some kind of mutex to make each core build a
%% different serde to avoid duplicate work. Maybe ekka_locker? %% different serde to avoid duplicate work. Maybe ekka_locker?
maps:foreach(fun do_build_serde/2, Schemas), maps:foreach(fun do_build_serde/2, Schemas),

View File

@ -10,6 +10,9 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
%% Register rule engine extra functions module so that we can handle decode
%% and encode functions called from the rule engine SQL like language
ok = emqx_rule_engine:set_extra_functions_module(emqx_ee_schema_registry_serde),
ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
%% HTTP API handler %% HTTP API handler
emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),

View File

@ -3,6 +3,8 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_schema_registry_serde). -module(emqx_ee_schema_registry_serde).
-behaviour(emqx_rule_funcs).
-include("emqx_ee_schema_registry.hrl"). -include("emqx_ee_schema_registry.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -15,13 +17,50 @@
decode/3, decode/3,
encode/2, encode/2,
encode/3, encode/3,
make_serde/3 make_serde/3,
handle_rule_function/2
]). ]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
handle_rule_function(sparkplug_decode, [Data]) ->
handle_rule_function(
schema_decode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>]
);
handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
handle_rule_function(
schema_decode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
);
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
decode(SchemaId, Data, MoreArgs);
handle_rule_function(schema_decode, Args) ->
error({args_count_error, {schema_decode, Args}});
handle_rule_function(sparkplug_encode, [Term]) ->
handle_rule_function(
schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>]
);
handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
handle_rule_function(
schema_encode,
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
);
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
%% encode outputs iolists, but when the rule actions process those
%% it might wrongly encode them as JSON lists, so we force them to
%% binaries here.
IOList = encode(SchemaId, Term, MoreArgs),
iolist_to_binary(IOList);
handle_rule_function(schema_encode, Args) ->
error({args_count_error, {schema_encode, Args}});
handle_rule_function(_, _) ->
{error, no_match_for_function}.
-spec decode(schema_name(), encoded_data()) -> decoded_data(). -spec decode(schema_name(), encoded_data()) -> decoded_data().
decode(SerdeName, RawData) -> decode(SerdeName, RawData) ->
decode(SerdeName, RawData, []). decode(SerdeName, RawData, []).