fix(connector): add testcase for binding ingress mqtt bridge to rules

This commit is contained in:
Shawn 2022-01-01 03:07:31 +08:00
parent 7643564ef1
commit 9a7452e1c5
4 changed files with 125 additions and 54 deletions

View File

@ -81,12 +81,12 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of case maps:get(sys, Flags, false) of
false -> false ->
Msg = emqx_rule_events:eventmsg_publish(Message), Msg = emqx_rule_events:eventmsg_publish(Message),
send_to_egress_matched_bridges(Topic, Msg); send_to_matched_egress_bridges(Topic, Msg);
true -> ok true -> ok
end, end,
{ok, Message}. {ok, Message}.
send_to_egress_matched_bridges(Topic, Msg) -> send_to_matched_egress_bridges(Topic, Msg) ->
lists:foreach(fun (Id) -> lists:foreach(fun (Id) ->
try send_message(Id, Msg) of try send_message(Id, Msg) of
ok -> ok; ok -> ok;

View File

@ -165,7 +165,8 @@ handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured", "_'ingress'_is_not_configured",
message => Msg}); message => Msg});
handle_publish(Msg, Vars) -> handle_publish(Msg0, Vars) ->
Msg = format_msg_received(Msg0),
?SLOG(debug, #{msg => "publish_to_local_broker", ?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}), message => Msg, vars => Vars}),
case Vars of case Vars of
@ -173,27 +174,11 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok _ -> ok
end, end,
maybe_publish_to_local_broker(Msg, Vars). maybe_publish_to_local_broker(Msg0, Vars).
handle_disconnected(Reason, Parent) -> handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}. Parent ! {disconnected, self(), Reason}.
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
%% local topic is not set, discard it
ok;
_ ->
case emqx_topic:match(Topic, SubTopic) of
true ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
ok;
false ->
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
message => Msg, subscribed => SubTopic, got_topic => Topic})
end
end.
make_hdlr(Parent, Vars) -> make_hdlr(Parent, Vars) ->
#{puback => {fun ?MODULE:handle_puback/2, [Parent]}, #{puback => {fun ?MODULE:handle_puback/2, [Parent]},
publish => {fun ?MODULE:handle_publish/2, [Vars]}, publish => {fun ?MODULE:handle_publish/2, [Vars]},
@ -209,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
process_config(Config) -> process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
ok; %% local topic is not set, discard it
_ ->
case emqx_topic:match(Topic, SubTopic) of
true ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
ok;
false ->
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
message => Msg, subscribed => SubTopic, got_topic => Topic})
end
end.
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
dup => Dup,
retain => Retain,
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond)
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).

View File

@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}. Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
MapMsg = format_msg_received(MapMsg0),
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
flags => #{dup => Dup, retain => Retain},
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond),
node => node()
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
process_payload([], Msg) -> process_payload([], Msg) ->
emqx_json:encode(Msg); emqx_json:encode(Msg);
process_payload(Tks, Msg) -> process_payload(Tks, Msg) ->

View File

@ -22,7 +22,10 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"connectors: {}">>). %% output functions
-export([ inspect/3
]).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>). -define(CONNECTR_NAME, <<"test_connector">>).
@ -67,6 +70,9 @@
<<"failed">> := FAILED, <<"rate">> := SPEED, <<"failed">> := FAILED, <<"rate">> := SPEED,
<<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
inspect(Selected, _Envs, _Args) ->
persistent_term:put(?MODULE, #{inspect => Selected}).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -89,13 +95,15 @@ init_per_suite(Config) ->
%% some testcases (may from other app) already get emqx_connector started %% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource), _ = application:stop(emqx_resource),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]), ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector,
ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT), emqx_bridge, emqx_dashboard]),
ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>),
ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>),
ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]), emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]),
ok. ok.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
@ -223,7 +231,6 @@ t_mqtt_conn_bridge_ingress(_) ->
%% PUBLISH a message to the 'remote' broker, as we have only one broker, %% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)), emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic %% we should receive a message on the local broker, with specified topic
?assert( ?assert(
receive receive
@ -435,6 +442,71 @@ t_mqtt_conn_testing(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}). }).
t_ingress_mqtt_bridge_with_rules(_) ->
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
#{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
{ok, 201, Rule} = request(post, uri(["rules"]),
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
<<"enable">> => true,
<<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
%% and also the rule should be matched, with matched + 1:
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId
, <<"metrics">> := #{<<"matched">> := 1}
} = jsx:decode(Rule1),
%% we also check if the outputs of the rule is triggered
?assertMatch(#{inspect := #{
event := '$bridges/mqtt',
id := MsgId,
payload := Payload,
topic := RemoteTopic,
qos := 0,
dup := false,
retain := false,
pub_props := #{},
timestamp := _
}} when is_binary(MsgId), persistent_term:get(?MODULE)),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% HTTP Request %% HTTP Request
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------