From 9a7452e1c5abe778a89711d582a1bdd8162d3f0a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 03:07:31 +0800 Subject: [PATCH 1/3] fix(connector): add testcase for binding ingress mqtt bridge to rules --- apps/emqx_bridge/src/emqx_bridge.erl | 4 +- .../src/mqtt/emqx_connector_mqtt_mod.erl | 63 ++++++++++---- .../src/mqtt/emqx_connector_mqtt_msg.erl | 30 +------ .../test/emqx_connector_api_SUITE.erl | 82 +++++++++++++++++-- 4 files changed, 125 insertions(+), 54 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3518190c7..0495f00e4 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -81,12 +81,12 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> Msg = emqx_rule_events:eventmsg_publish(Message), - send_to_egress_matched_bridges(Topic, Msg); + send_to_matched_egress_bridges(Topic, Msg); true -> ok end, {ok, Message}. -send_to_egress_matched_bridges(Topic, Msg) -> +send_to_matched_egress_bridges(Topic, Msg) -> lists:foreach(fun (Id) -> try send_message(Id, Msg) of ok -> ok; diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 30a1ccb30..7d5021f82 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -165,7 +165,8 @@ handle_publish(Msg, undefined) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(Msg, Vars) -> +handle_publish(Msg0, Vars) -> + Msg = format_msg_received(Msg0), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), case Vars of @@ -173,27 +174,11 @@ handle_publish(Msg, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - maybe_publish_to_local_broker(Msg, Vars). + maybe_publish_to_local_broker(Msg0, Vars). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. -maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> - case maps:get(local_topic, Vars, undefined) of - undefined -> - %% local topic is not set, discard it - ok; - _ -> - case emqx_topic:match(Topic, SubTopic) of - true -> - _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), - ok; - false -> - ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", - message => Msg, subscribed => SubTopic, got_topic => Topic}) - end - end. - make_hdlr(Parent, Vars) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, publish => {fun ?MODULE:handle_publish/2, [Vars]}, @@ -209,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). + +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> + case maps:get(local_topic, Vars, undefined) of + undefined -> + ok; %% local topic is not set, discard it + _ -> + case emqx_topic:match(Topic, SubTopic) of + true -> + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + ok; + false -> + ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", + message => Msg, subscribed => SubTopic, got_topic => Topic}) + end + end. + +format_msg_received(#{dup := Dup, payload := Payload, properties := Props, + qos := QoS, retain := Retain, topic := Topic}) -> + #{event => '$bridges/mqtt', + id => emqx_guid:to_hexstr(emqx_guid:gen()), + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + timestamp => erlang:system_time(millisecond) + }. + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index a0dd9eec1..35bcf3de1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> - MapMsg = format_msg_received(MapMsg0), Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), @@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). -format_msg_received(#{dup := Dup, payload := Payload, properties := Props, - qos := QoS, retain := Retain, topic := Topic}) -> - #{event => '$bridges/mqtt', - id => emqx_guid:to_hexstr(emqx_guid:gen()), - payload => Payload, - topic => Topic, - qos => QoS, - flags => #{dup => Dup, retain => Retain}, - pub_props => printable_maps(Props), - timestamp => erlang:system_time(millisecond), - node => node() - }. - -printable_maps(undefined) -> #{}; -printable_maps(Headers) -> - maps:fold( - fun ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [#{ - key => Key, - value => Value - } || {Key, Value} <- V0] - }; - (K, V0, AccIn) -> AccIn#{K => V0} - end, #{}, Headers). - process_payload([], Msg) -> emqx_json:encode(Msg); process_payload(Tks, Msg) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 1a96a3596..96d793640 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -22,7 +22,10 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CONF_DEFAULT, <<"connectors: {}">>). +%% output functions +-export([ inspect/3 + ]). + -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_NAME, <<"test_connector">>). @@ -67,6 +70,9 @@ <<"failed">> := FAILED, <<"rate">> := SPEED, <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). +inspect(Selected, _Envs, _Args) -> + persistent_term:put(?MODULE, #{inspect => Selected}). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -89,13 +95,15 @@ init_per_suite(Config) -> %% some testcases (may from other app) already get emqx_connector started _ = application:stop(emqx_resource), _ = application:stop(emqx_connector), - ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]), - ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT), + ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector, + emqx_bridge, emqx_dashboard]), + ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>), + ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>), ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]), ok. init_per_testcase(_, Config) -> @@ -223,7 +231,6 @@ t_mqtt_conn_bridge_ingress(_) -> %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. emqx:publish(emqx_message:make(RemoteTopic, Payload)), - %% we should receive a message on the local broker, with specified topic ?assert( receive @@ -435,6 +442,71 @@ t_mqtt_conn_testing(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }). +t_ingress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_INGRESS + }), + #{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule get messages from a source mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], + <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + + RemoteTopic = <<"remote_topic/1">>, + LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + %% and also the rule should be matched, with matched + 1: + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we also check if the outputs of the rule is triggered + ?assertMatch(#{inspect := #{ + event := '$bridges/mqtt', + id := MsgId, + payload := Payload, + topic := RemoteTopic, + qos := 0, + dup := false, + retain := false, + pub_props := #{}, + timestamp := _ + }} when is_binary(MsgId), persistent_term:get(?MODULE)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- From 925d46fe86433aef4973424e8403239ddd00673e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 04:12:20 +0800 Subject: [PATCH 2/3] fix(connector): add testcase for binding egress mqtt bridge to rules --- apps/emqx_bridge/src/emqx_bridge.erl | 15 +++ .../test/emqx_connector_api_SUITE.erl | 91 +++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0495f00e4..2e610b2b9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,11 +39,14 @@ , lookup/3 , list/0 , list_bridges_by_connector/1 + , create/2 , create/3 , recreate/2 , recreate/3 , create_dry_run/2 + , remove/1 , remove/3 + , update/2 , update/3 , start/2 , stop/2 @@ -207,6 +210,10 @@ stop(Type, Name) -> restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). +create(BridgeId, Conf) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + create(BridgeType, BridgeName, Conf). + create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), @@ -217,6 +224,10 @@ create(Type, Name, Conf) -> {error, Reason} -> {error, Reason} end. +update(BridgeId, {OldConf, Conf}) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + update(BridgeType, BridgeName, {OldConf, Conf}). + update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% @@ -263,6 +274,10 @@ create_dry_run(Type, Conf) -> Error end. +remove(BridgeId) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + remove(BridgeType, BridgeName, #{}). + remove(Type, Name, _Conf) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 96d793640..936982e75 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -110,8 +110,20 @@ init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(_, _Config) -> + clear_resources(), ok. +clear_resources() -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id) + end, emqx_rule_engine:get_rules()), + lists:foreach(fun(#{id := Id}) -> + ok = emqx_bridge:remove(Id) + end, emqx_bridge:list()), + lists:foreach(fun(#{<<"id">> := Id}) -> + ok = emqx_connector:delete(Id) + end, emqx_connector:list()). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -507,6 +519,85 @@ t_ingress_mqtt_bridge_with_rules(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). +t_egress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [BridgeIDEgress], + <<"sql">> => <<"SELECT * from \"t/1\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + emqx:unsubscribe(RemoteTopic), + + %% PUBLISH a message to the rule. + Payload2 = <<"hi">>, + RuleTopic = <<"t/1">>, + RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + emqx:subscribe(RemoteTopic2), + emqx:publish(emqx_message:make(RuleTopic, Payload2)), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic2, #message{payload = Payload2}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress + , <<"metrics">> := ?metrics(2, 2, 0, _, _, _) + , <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + }, jsx:decode(BridgeStr)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- From 59e2614574b6cff5837a6524b4c07f98930d8edb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 04:23:51 +0800 Subject: [PATCH 3/3] fix(dialyzer): unmatched results in emqx_statsd_api --- apps/emqx_statsd/src/emqx_statsd_api.erl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 2278fa492..d545003b0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -59,12 +59,10 @@ statsd(put, #{body := Body}) -> Body, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfig, config := Config}} -> + _ = emqx_statsd_sup:stop_child(?APP), case maps:get(<<"enable">>, Body) of - true -> - _ = emqx_statsd_sup:stop_child(?APP), - emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); - false -> - _ = emqx_statsd_sup:stop_child(?APP) + true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); + false -> ok end, {200, NewConfig}; {error, Reason} ->