diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 67aeeca41..32034f774 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -54,6 +54,7 @@ check_deps_and_remove/3, check_deps_and_remove/4 ]). +-export([lookup_action/2, lookup_source/2]). %% Operations @@ -222,6 +223,12 @@ unload_bridges(ConfRooKey) -> lookup(Type, Name) -> lookup(?ROOT_KEY_ACTIONS, Type, Name). +lookup_action(Type, Name) -> + lookup(?ROOT_KEY_ACTIONS, Type, Name). + +lookup_source(Type, Name) -> + lookup(?ROOT_KEY_SOURCES, Type, Name). + -spec lookup(root_cfg_key(), bridge_v2_type(), bridge_v2_name()) -> {ok, bridge_v2_info()} | {error, not_found}. lookup(ConfRootName, Type, Name) -> @@ -900,9 +907,11 @@ do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) -> parse_id(Id) -> case binary:split(Id, <<":">>, [global]) of [Type, Name] -> - {Type, Name}; + #{kind => undefined, type => Type, name => Name}; [<<"action">>, Type, Name | _] -> - {Type, Name}; + #{kind => action, type => Type, name => Name}; + [<<"source">>, Type, Name | _] -> + #{kind => source, type => Type, name => Name}; _X -> error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))}) end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 88788d6e2..7fef33115 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -442,6 +442,23 @@ try_decode_error(Body0) -> Body0 end. +create_rule_api(Opts) -> + #{ + sql := SQL, + actions := RuleActions + } = Opts, + Params = #{ + enable => true, + sql => SQL, + actions => RuleActions + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + ct:pal("create rule:\n ~p", [Params]), + Method = post, + Res = request(Method, Path, Params), + ct:pal("create rule results:\n ~p", [Res]), + Res. + create_rule_and_action_http(BridgeType, RuleTopic, Config) -> create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 7caab1d87..459e259d2 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -135,7 +135,7 @@ create_producers_for_bridge_v2( KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), - {_BridgeType, BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), + #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), IsDryRun = case TestIdStart of diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl index f765581f9..b4c2b63ba 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_pubsub_schema.erl @@ -82,6 +82,7 @@ fields("mqtt_subscriber_source") -> fields(ingress_parameters) -> Fields0 = emqx_bridge_mqtt_connector_schema:fields("ingress"), Fields1 = proplists:delete(pool_size, Fields0), + %% FIXME: should we make `local` hidden? Fields1; fields(action_resource_opts) -> UnsupportedOpts = [enable_batch, batch_size, batch_time], @@ -120,8 +121,6 @@ desc(ingress_parameters) -> ?DESC(ingress_parameters); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; -desc("config_connector") -> - ?DESC("desc_config"); desc("http_action") -> ?DESC("desc_config"); desc("parameters_opts") -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl index fde15a1b6..5569a826b 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_v2_subscriber_SUITE.erl @@ -18,6 +18,7 @@ -compile(export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("emqx/include/asserts.hrl"). @@ -75,6 +76,11 @@ init_per_testcase(TestCase, Config) -> | Config ]. +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + ok. + %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ @@ -85,6 +91,7 @@ connector_config() -> <<"enable">> => true, <<"description">> => <<"my connector">>, <<"pool_size">> => 3, + <<"proto_ver">> => <<"v5">>, <<"server">> => <<"127.0.0.1:1883">>, <<"resource_opts">> => #{ <<"health_check_interval">> => <<"15s">>, @@ -105,13 +112,6 @@ source_config(Overrides0) -> #{ <<"topic">> => <<"remote/topic">>, <<"qos">> => 2 - }, - <<"local">> => - #{ - <<"topic">> => <<"local/topic">>, - <<"qos">> => 2, - <<"retain">> => false, - <<"payload">> => <<"${payload}">> } }, <<"resource_opts">> => #{ @@ -134,6 +134,15 @@ source_config(Overrides0) -> replace(Key, Value, Proplist) -> lists:keyreplace(Key, 1, Proplist, {Key, Value}). +bridge_id(Config) -> + Type = ?config(source_type, Config), + Name = ?config(source_name, Config), + emqx_bridge_resource:bridge_id(Type, Name). + +hookpoint(Config) -> + BridgeId = bridge_id(Config), + emqx_bridge_resource:bridge_hookpoint(BridgeId). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -151,6 +160,11 @@ t_create_via_http(Config) -> ]}}, emqx_bridge_v2_testlib:list_bridges_http_api_v1() ), + ?assertMatch( + {ok, {{_, 200, _}, _, [#{<<"enable">> := true}]}}, + emqx_bridge_v2_testlib:list_connectors_http_api() + ), + NewSourceName = <<"my_other_source">>, {ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_kind_api( @@ -173,3 +187,71 @@ t_create_via_http(Config) -> t_start_stop(Config) -> ok = emqx_bridge_v2_testlib:t_start_stop(Config, mqtt_connector_stopped), ok. + +t_receive_via_rule(Config) -> + SourceConfig = ?config(source_config, Config), + ?check_trace( + begin + {ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_connector_api(Config), + {ok, {{_, 201, _}, _, _}} = emqx_bridge_v2_testlib:create_kind_api(Config), + Hookpoint = hookpoint(Config), + RepublishTopic = <<"rep/t">>, + RemoteTopic = emqx_utils_maps:deep_get( + [<<"parameters">>, <<"remote">>, <<"topic">>], + SourceConfig + ), + RuleOpts = #{ + sql => <<"select * from \"", Hookpoint/binary, "\"">>, + actions => [ + %% #{function => console}, + #{ + function => republish, + args => #{ + topic => RepublishTopic, + payload => <<"${.}">>, + qos => 0, + retain => false, + user_properties => <<"${.pub_props.'User-Property'}">> + } + } + ] + }, + {ok, {{_, 201, _}, _, #{<<"id">> := RuleId}}} = + emqx_bridge_v2_testlib:create_rule_api(RuleOpts), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + {ok, Client} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client), + {ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(Client, RepublishTopic), + ok = emqtt:publish( + Client, + RemoteTopic, + #{'User-Property' => [{<<"key">>, <<"value">>}]}, + <<"mypayload">>, + _Opts = [] + ), + {publish, Msg} = + ?assertReceive( + {publish, #{ + topic := RepublishTopic, + retain := false, + qos := 0, + properties := #{'User-Property' := [{<<"key">>, <<"value">>}]} + }} + ), + Payload = emqx_utils_json:decode(maps:get(payload, Msg), [return_maps]), + ?assertMatch( + #{ + <<"event">> := Hookpoint, + <<"payload">> := <<"mypayload">> + }, + Payload + ), + emqtt:stop(Client), + ok + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("action_references_nonexistent_bridges", Trace)), + ok + end + ), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 83d387cd7..aae913001 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -326,7 +326,7 @@ schema("/connectors_probe") -> create_connector(ConnectorType, ConnectorName, Conf) end; '/connectors'(get, _Params) -> - Nodes = mria:running_nodes(), + Nodes = emqx:running_nodes(), NodeReplies = emqx_connector_proto_v1:list_connectors_on_nodes(Nodes), case is_ok(NodeReplies) of {ok, NodeConnectors} -> @@ -674,7 +674,10 @@ unpack_connector_conf(Type, PackedConf) -> RawConf. format_action(ActionId) -> - element(2, emqx_bridge_v2:parse_id(ActionId)). + case emqx_bridge_v2:parse_id(ActionId) of + #{name := Name} -> + Name + end. is_ok(ok) -> ok; diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index cd8d597de..30e60df2b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -241,6 +241,12 @@ parse_user_properties(<<"${pub_props.'User-Property'}">>) -> %% we do not want to force users to select the value %% the value will be taken from Env.pub_props directly ?ORIGINAL_USER_PROPERTIES; +parse_user_properties(<<"${.pub_props.'User-Property'}">>) -> + %% keep the original + %% avoid processing this special variable because + %% we do not want to force users to select the value + %% the value will be taken from Env.pub_props directly + ?ORIGINAL_USER_PROPERTIES; parse_user_properties(<<"${", _/binary>> = V) -> %% use a variable emqx_template:parse(V); diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 70a7fc32c..b58d877e3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -23,6 +23,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([start_link/0]). @@ -482,8 +483,7 @@ with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat ok -> ok; {error, NonExistentBridgeIDs} -> - ?SLOG(error, #{ - msg => "action_references_nonexistent_bridges", + ?tp(error, "action_references_nonexistent_bridges", #{ rule_id => RuleId, nonexistent_bridge_ids => NonExistentBridgeIDs, hint => "this rule will be disabled" @@ -626,7 +626,7 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul {Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}), case emqx_action_info:is_action_type(Type) of - true -> {action, Type, Name}; + true -> {source, Type, Name}; false -> {bridge_v1, Type, Name} end end, @@ -646,7 +646,8 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul fun({Kind, Type, Name}) -> LookupFn = case Kind of - action -> fun emqx_bridge_v2:lookup/2; + action -> fun emqx_bridge_v2:lookup_action/2; + source -> fun emqx_bridge_v2:lookup_source/2; bridge_v1 -> fun emqx_bridge:lookup/2 end, try