diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 4a2cb2ea1..f0640c7dc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -84,6 +84,12 @@ import_config/1 ]). +%% For setting and getting extra rule engine SQL functions module +-export([ + extra_functions_module/0, + set_extra_functions_module/1 +]). + -define(RULE_ENGINE, ?MODULE). -define(T_CALL, infinity). @@ -542,3 +548,21 @@ get_egress_bridges(Actions) -> emqx_bridge_resource:bridge_id(BridgeType, BridgeName) || {bridge, BridgeType, BridgeName, _ResId} <- Actions ]. + +%% For allowing an external application to add extra "built-in" functions to the +%% rule engine SQL like language. The module set by +%% set_extra_functions_module/1 should export a function called +%% handle_rule_function with two parameters (the first being an atom for the +%% the function name and the second a list of arguments). The function should +%% should return the result or {error, no_match_for_function} if it cannot +%% handle the function. See '$handle_undefined_function' in the emqx_rule_funcs +%% module. See also callback function declaration in emqx_rule_funcs.erl. + +-spec extra_functions_module() -> module() | undefined. +extra_functions_module() -> + persistent_term:get({?MODULE, extra_functions}, undefined). + +-spec set_extra_functions_module(module()) -> ok. +set_extra_functions_module(Mod) -> + persistent_term:put({?MODULE, extra_functions}, Mod), + ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 475880ed1..0bad2590b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -19,10 +19,6 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). --if(?EMQX_RELEASE_EDITION == ee). --include_lib("emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl"). --endif. - -elvis([{elvis_style, god_modules, disable}]). %% IoT Funcs @@ -236,6 +232,10 @@ timezone_to_offset_seconds/1 ]). +%% See extra_functions_module/0 and set_extra_functions_module/1 in the +%% emqx_rule_engine module +-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. + %% MongoDB specific date functions. These functions return a date tuple. The %% MongoDB bridge converts such date tuples to a MongoDB date type. The %% following functions are therefore only useful for rules with at least one @@ -1126,56 +1126,27 @@ timezone_to_second(TimeZone) -> timezone_to_offset_seconds(TimeZone) -> emqx_calendar:offset_second(TimeZone). -%% @doc This is for sql funcs that should be handled in the specific modules. -%% Here the emqx_rule_funcs module acts as a proxy, forwarding -%% the function handling to the worker module. -%% @end --if(?EMQX_RELEASE_EDITION == ee). -%% EE +'$handle_undefined_function'(sprintf, [Format | Args]) -> + erlang:apply(fun sprintf_s/2, [Format, Args]); +%% This is for functions that should be handled in another module +%% (currently this module is emqx_ee_schema_registry_serde in the case of EE but +%% could be changed to another module in the future). +'$handle_undefined_function'(FunctionName, Args) -> + case emqx_rule_engine:extra_functions_module() of + undefined -> + throw_sql_function_not_supported(FunctionName, Args); + Mod -> + case Mod:handle_rule_function(FunctionName, Args) of + {error, no_match_for_function} -> + throw_sql_function_not_supported(FunctionName, Args); + Result -> + Result + end + end. -'$handle_undefined_function'(sparkplug_decode, [Data]) -> - '$handle_undefined_function'( - schema_decode, - [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>] - ); -'$handle_undefined_function'(sparkplug_decode, [Data | MoreArgs]) -> - '$handle_undefined_function'( - schema_decode, - [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs] - ); -'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) -> - emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs); -'$handle_undefined_function'(schema_decode, Args) -> - error({args_count_error, {schema_decode, Args}}); -'$handle_undefined_function'(sparkplug_encode, [Term]) -> - '$handle_undefined_function'( - schema_encode, - [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>] - ); -'$handle_undefined_function'(sparkplug_encode, [Term | MoreArgs]) -> - '$handle_undefined_function'( - schema_encode, - [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] - ); -'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> - %% encode outputs iolists, but when the rule actions process those - %% it might wrongly encode them as JSON lists, so we force them to - %% binaries here. - IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs), - iolist_to_binary(IOList); -'$handle_undefined_function'(schema_encode, Args) -> - error({args_count_error, {schema_encode, Args}}); -'$handle_undefined_function'(sprintf, [Format | Args]) -> - erlang:apply(fun sprintf_s/2, [Format, Args]); -'$handle_undefined_function'(Fun, Args) -> - error({sql_function_not_supported, function_literal(Fun, Args)}). --else. -%% CE -'$handle_undefined_function'(sprintf, [Format | Args]) -> - erlang:apply(fun sprintf_s/2, [Format, Args]); -'$handle_undefined_function'(Fun, Args) -> - error({sql_function_not_supported, function_literal(Fun, Args)}). --endif. +-spec throw_sql_function_not_supported(atom(), list()) -> no_return(). +throw_sql_function_not_supported(FunctionName, Args) -> + error({sql_function_not_supported, function_literal(FunctionName, Args)}). map_path(Key) -> {path, [{key, P} || P <- string:split(Key, ".", all)]}. diff --git a/lib-ee/emqx_ee_schema_registry/rebar.config b/lib-ee/emqx_ee_schema_registry/rebar.config index e42ff7278..309d9cdf8 100644 --- a/lib-ee/emqx_ee_schema_registry/rebar.config +++ b/lib-ee/emqx_ee_schema_registry/rebar.config @@ -4,6 +4,7 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}}, + {emqx_rule_engine, {path, "../../apps/emqx_rule_engine"}}, {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}, {gpb, "4.19.7"} ]}. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index c0073c5c2..a20cfec82 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -7,7 +7,8 @@ kernel, stdlib, erlavro, - gpb + gpb, + emqx_rule_engine ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 1d35d8ffa..df294bcbb 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -239,7 +239,7 @@ create_tables() -> do_build_serdes(Schemas) -> %% We build a special serde for the Sparkplug B payload. This serde is used %% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1. - maybe_build_sparkplug_b_serde(), + ok = maybe_build_sparkplug_b_serde(), %% TODO: use some kind of mutex to make each core build a %% different serde to avoid duplicate work. Maybe ekka_locker? maps:foreach(fun do_build_serde/2, Schemas), diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl index 85d35be1f..f9cd5810e 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -10,6 +10,9 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + %% Register rule engine extra functions module so that we can handle decode + %% and encode functions called from the rule engine SQL like language + ok = emqx_rule_engine:set_extra_functions_module(emqx_ee_schema_registry_serde), ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), %% HTTP API handler emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl index fa3b66c22..15a254e8f 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_ee_schema_registry_serde). +-behaviour(emqx_rule_funcs). + -include("emqx_ee_schema_registry.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -15,13 +17,50 @@ decode/3, encode/2, encode/3, - make_serde/3 + make_serde/3, + handle_rule_function/2 ]). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ +-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. +handle_rule_function(sparkplug_decode, [Data]) -> + handle_rule_function( + schema_decode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>] + ); +handle_rule_function(sparkplug_decode, [Data | MoreArgs]) -> + handle_rule_function( + schema_decode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs] + ); +handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> + decode(SchemaId, Data, MoreArgs); +handle_rule_function(schema_decode, Args) -> + error({args_count_error, {schema_decode, Args}}); +handle_rule_function(sparkplug_encode, [Term]) -> + handle_rule_function( + schema_encode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>] + ); +handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> + handle_rule_function( + schema_encode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] + ); +handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> + %% encode outputs iolists, but when the rule actions process those + %% it might wrongly encode them as JSON lists, so we force them to + %% binaries here. + IOList = encode(SchemaId, Term, MoreArgs), + iolist_to_binary(IOList); +handle_rule_function(schema_encode, Args) -> + error({args_count_error, {schema_encode, Args}}); +handle_rule_function(_, _) -> + {error, no_match_for_function}. + -spec decode(schema_name(), encoded_data()) -> decoded_data(). decode(SerdeName, RawData) -> decode(SerdeName, RawData, []).