feat(mqtt_consumer): add support for rule engine `FROM`

This commit is contained in:
Thales Macedo Garitezi 2024-01-10 15:12:10 -03:00
parent 28de7c89c7
commit cc24fe6e93
8 changed files with 135 additions and 18 deletions

View File

@ -54,6 +54,7 @@
check_deps_and_remove/3, check_deps_and_remove/3,
check_deps_and_remove/4 check_deps_and_remove/4
]). ]).
-export([lookup_action/2, lookup_source/2]).
%% Operations %% Operations
@ -222,6 +223,12 @@ unload_bridges(ConfRooKey) ->
lookup(Type, Name) -> lookup(Type, Name) ->
lookup(?ROOT_KEY_ACTIONS, Type, Name). lookup(?ROOT_KEY_ACTIONS, Type, Name).
lookup_action(Type, Name) ->
lookup(?ROOT_KEY_ACTIONS, Type, Name).
lookup_source(Type, Name) ->
lookup(?ROOT_KEY_SOURCES, Type, Name).
-spec lookup(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) -> -spec lookup(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) ->
{ok, bridge_v2_info()} | {error, not_found}. {ok, bridge_v2_info()} | {error, not_found}.
lookup(ConfRootName, Type, Name) -> lookup(ConfRootName, Type, Name) ->
@ -900,9 +907,11 @@ do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
parse_id(Id) -> parse_id(Id) ->
case binary:split(Id, <<":">>, [global]) of case binary:split(Id, <<":">>, [global]) of
[Type, Name] -> [Type, Name] ->
{Type, Name}; #{kind => undefined, type => Type, name => Name};
[<<"action">>, Type, Name | _] -> [<<"action">>, Type, Name | _] ->
{Type, Name}; #{kind => action, type => Type, name => Name};
[<<"source">>, Type, Name | _] ->
#{kind => source, type => Type, name => Name};
_X -> _X ->
error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))}) error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))})
end. end.

View File

@ -442,6 +442,23 @@ try_decode_error(Body0) ->
Body0 Body0
end. end.
create_rule_api(Opts) ->
#{
sql := SQL,
actions := RuleActions
} = Opts,
Params = #{
enable => true,
sql => SQL,
actions => RuleActions
},
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
ct:pal("create rule:\n ~p", [Params]),
Method = post,
Res = request(Method, Path, Params),
ct:pal("create rule results:\n ~p", [Res]),
Res.
create_rule_and_action_http(BridgeType, RuleTopic, Config) -> create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).

View File

@ -135,7 +135,7 @@ create_producers_for_bridge_v2(
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
{_BridgeType, BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX),
IsDryRun = IsDryRun =
case TestIdStart of case TestIdStart of

View File

@ -82,6 +82,7 @@ fields("mqtt_subscriber_source") ->
fields(ingress_parameters) -> fields(ingress_parameters) ->
Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"), Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"),
Fields1 = proplists:delete(pool_size, Fields0), Fields1 = proplists:delete(pool_size, Fields0),
%% FIXME: should we make `local` hidden?
Fields1; Fields1;
fields(action_resource_opts) -> fields(action_resource_opts) ->
UnsupportedOpts = [enable_batch, batch_size, batch_time], UnsupportedOpts = [enable_batch, batch_size, batch_time],
@ -120,8 +121,6 @@ desc(ingress_parameters) ->
?DESC(ingress_parameters); ?DESC(ingress_parameters);
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for WebHook using `", string:to_upper(Method), "` method."]; ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc("config_connector") ->
?DESC("desc_config");
desc("http_action") -> desc("http_action") ->
?DESC("desc_config"); ?DESC("desc_config");
desc("parameters_opts") -> desc("parameters_opts") ->

View File

@ -18,6 +18,7 @@
-compile(export_all). -compile(export_all).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/asserts.hrl").
@ -75,6 +76,11 @@ init_per_testcase(TestCase, Config) ->
| Config | Config
]. ].
end_per_testcase(_TestCase, _Config) ->
emqx_common_test_helpers:call_janitor(),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -85,6 +91,7 @@ connector_config() ->
<<"enable">> => true, <<"enable">> => true,
<<"description">> => <<"my connector">>, <<"description">> => <<"my connector">>,
<<"pool_size">> => 3, <<"pool_size">> => 3,
<<"proto_ver">> => <<"v5">>,
<<"server">> => <<"127.0.0.1:1883">>, <<"server">> => <<"127.0.0.1:1883">>,
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>, <<"health_check_interval">> => <<"15s">>,
@ -105,13 +112,6 @@ source_config(Overrides0) ->
#{ #{
<<"topic">> => <<"remote/topic">>, <<"topic">> => <<"remote/topic">>,
<<"qos">> => 2 <<"qos">> => 2
},
<<"local">> =>
#{
<<"topic">> => <<"local/topic">>,
<<"qos">> => 2,
<<"retain">> => false,
<<"payload">> => <<"${payload}">>
} }
}, },
<<"resource_opts">> => #{ <<"resource_opts">> => #{
@ -134,6 +134,15 @@ source_config(Overrides0) ->
replace(Key, Value, Proplist) -> replace(Key, Value, Proplist) ->
lists:keyreplace(Key, 1, Proplist, {Key, Value}). lists:keyreplace(Key, 1, Proplist, {Key, Value}).
bridge_id(Config) ->
Type = ?config(source_type, Config),
Name = ?config(source_name, Config),
emqx_bridge_resource:bridge_id(Type, Name).
hookpoint(Config) ->
BridgeId = bridge_id(Config),
emqx_bridge_resource:bridge_hookpoint(BridgeId).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -151,6 +160,11 @@ t_create_via_http(Config) ->
]}}, ]}},
emqx_bridge_v2_testlib:list_bridges_http_api_v1() emqx_bridge_v2_testlib:list_bridges_http_api_v1()
), ),
?assertMatch(
{ok, {{_, 200, _}, _, [#{<<"enable">> := true}]}},
emqx_bridge_v2_testlib:list_connectors_http_api()
),
NewSourceName = <<"my_other_source">>, NewSourceName = <<"my_other_source">>,
{ok, {{_, 201, _}, _, _}} = {ok, {{_, 201, _}, _, _}} =
emqx_bridge_v2_testlib:create_kind_api( emqx_bridge_v2_testlib:create_kind_api(
@ -173,3 +187,71 @@ t_create_via_http(Config) ->
t_start_stop(Config) -> t_start_stop(Config) ->
ok = emqx_bridge_v2_testlib:t_start_stop(Config, mqtt_connector_stopped), ok = emqx_bridge_v2_testlib:t_start_stop(Config, mqtt_connector_stopped),
ok. ok.
t_receive_via_rule(Config) ->
SourceConfig = ?config(source_config, Config),
?check_trace(
begin
{ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_connector_api(Config),
{ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_kind_api(Config),
Hookpoint = hookpoint(Config),
RepublishTopic = <<"rep/t">>,
RemoteTopic = emqx_utils_maps:deep_get(
[<<"parameters">>, <<"remote">>, <<"topic">>],
SourceConfig
),
RuleOpts = #{
sql => <<"select * from \"", Hookpoint/binary, "\"">>,
actions => [
%% #{function => console},
#{
function => republish,
args => #{
topic => RepublishTopic,
payload => <<"${.}">>,
qos => 0,
retain => false,
user_properties => <<"${.pub_props.'User-Property'}">>
}
}
]
},
{ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} =
emqx_bridge_v2_testlib:create_rule_api(RuleOpts),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
{ok, Client} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client),
{ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(Client, RepublishTopic),
ok = emqtt:publish(
Client,
RemoteTopic,
#{'User-Property' => [{<<"key">>, <<"value">>}]},
<<"mypayload">>,
_Opts = []
),
{publish, Msg} =
?assertReceive(
{publish, #{
topic := RepublishTopic,
retain := false,
qos := 0,
properties := #{'User-Property' := [{<<"key">>, <<"value">>}]}
}}
),
Payload = emqx_utils_json:decode(maps:get(payload, Msg), [return_maps]),
?assertMatch(
#{
<<"event">> := Hookpoint,
<<"payload">> := <<"mypayload">>
},
Payload
),
emqtt:stop(Client),
ok
end,
fun(Trace) ->
?assertEqual([], ?of_kind("action_references_nonexistent_bridges", Trace)),
ok
end
),
ok.

View File

@ -326,7 +326,7 @@ schema("/connectors_probe") ->
create_connector(ConnectorType, ConnectorName, Conf) create_connector(ConnectorType, ConnectorName, Conf)
end; end;
'/connectors'(get, _Params) -> '/connectors'(get, _Params) ->
Nodes = mria:running_nodes(), Nodes = emqx:running_nodes(),
NodeReplies = emqx_connector_proto_v1:list_connectors_on_nodes(Nodes), NodeReplies = emqx_connector_proto_v1:list_connectors_on_nodes(Nodes),
case is_ok(NodeReplies) of case is_ok(NodeReplies) of
{ok, NodeConnectors} -> {ok, NodeConnectors} ->
@ -674,7 +674,10 @@ unpack_connector_conf(Type, PackedConf) ->
RawConf. RawConf.
format_action(ActionId) -> format_action(ActionId) ->
element(2, emqx_bridge_v2:parse_id(ActionId)). case emqx_bridge_v2:parse_id(ActionId) of
#{name := Name} ->
Name
end.
is_ok(ok) -> is_ok(ok) ->
ok; ok;

View File

@ -241,6 +241,12 @@ parse_user_properties(<<"${pub_props.'User-Property'}">>) ->
%% we do not want to force users to select the value %% we do not want to force users to select the value
%% the value will be taken from Env.pub_props directly %% the value will be taken from Env.pub_props directly
?ORIGINAL_USER_PROPERTIES; ?ORIGINAL_USER_PROPERTIES;
parse_user_properties(<<"${.pub_props.'User-Property'}">>) ->
%% keep the original
%% avoid processing this special variable because
%% we do not want to force users to select the value
%% the value will be taken from Env.pub_props directly
?ORIGINAL_USER_PROPERTIES;
parse_user_properties(<<"${", _/binary>> = V) -> parse_user_properties(<<"${", _/binary>> = V) ->
%% use a variable %% use a variable
emqx_template:parse(V); emqx_template:parse(V);

View File

@ -23,6 +23,7 @@
-include("rule_engine.hrl"). -include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/qlc.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([start_link/0]). -export([start_link/0]).
@ -482,8 +483,7 @@ with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat
ok -> ok ->
ok; ok;
{error, NonExistentBridgeIDs} -> {error, NonExistentBridgeIDs} ->
?SLOG(error, #{ ?tp(error, "action_references_nonexistent_bridges", #{
msg => "action_references_nonexistent_bridges",
rule_id => RuleId, rule_id => RuleId,
nonexistent_bridge_ids => NonExistentBridgeIDs, nonexistent_bridge_ids => NonExistentBridgeIDs,
hint => "this rule will be disabled" hint => "this rule will be disabled"
@ -626,7 +626,7 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul
{Type, Name} = {Type, Name} =
emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}), emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}),
case emqx_action_info:is_action_type(Type) of case emqx_action_info:is_action_type(Type) of
true -> {action, Type, Name}; true -> {source, Type, Name};
false -> {bridge_v1, Type, Name} false -> {bridge_v1, Type, Name}
end end
end, end,
@ -646,7 +646,8 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul
fun({Kind, Type, Name}) -> fun({Kind, Type, Name}) ->
LookupFn = LookupFn =
case Kind of case Kind of
action -> fun emqx_bridge_v2:lookup/2; action -> fun emqx_bridge_v2:lookup_action/2;
source -> fun emqx_bridge_v2:lookup_source/2;
bridge_v1 -> fun emqx_bridge:lookup/2 bridge_v1 -> fun emqx_bridge:lookup/2
end, end,
try try