Merge pull request #6601 from terry-xiaoyu/bridge_bug_fixes_3
fix(connector): add testcase for binding ingress mqtt bridge to rules
This commit is contained in:
commit
6ac0f83aad
|
@ -39,11 +39,14 @@
|
|||
, lookup/3
|
||||
, list/0
|
||||
, list_bridges_by_connector/1
|
||||
, create/2
|
||||
, create/3
|
||||
, recreate/2
|
||||
, recreate/3
|
||||
, create_dry_run/2
|
||||
, remove/1
|
||||
, remove/3
|
||||
, update/2
|
||||
, update/3
|
||||
, start/2
|
||||
, stop/2
|
||||
|
@ -81,12 +84,12 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
|||
case maps:get(sys, Flags, false) of
|
||||
false ->
|
||||
Msg = emqx_rule_events:eventmsg_publish(Message),
|
||||
send_to_egress_matched_bridges(Topic, Msg);
|
||||
send_to_matched_egress_bridges(Topic, Msg);
|
||||
true -> ok
|
||||
end,
|
||||
{ok, Message}.
|
||||
|
||||
send_to_egress_matched_bridges(Topic, Msg) ->
|
||||
send_to_matched_egress_bridges(Topic, Msg) ->
|
||||
lists:foreach(fun (Id) ->
|
||||
try send_message(Id, Msg) of
|
||||
ok -> ok;
|
||||
|
@ -207,6 +210,10 @@ stop(Type, Name) ->
|
|||
restart(Type, Name) ->
|
||||
emqx_resource:restart(resource_id(Type, Name)).
|
||||
|
||||
create(BridgeId, Conf) ->
|
||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||
create(BridgeType, BridgeName, Conf).
|
||||
|
||||
create(Type, Name, Conf) ->
|
||||
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
||||
config => Conf}),
|
||||
|
@ -217,6 +224,10 @@ create(Type, Name, Conf) ->
|
|||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
update(BridgeId, {OldConf, Conf}) ->
|
||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||
update(BridgeType, BridgeName, {OldConf, Conf}).
|
||||
|
||||
update(Type, Name, {OldConf, Conf}) ->
|
||||
%% TODO: sometimes its not necessary to restart the bridge connection.
|
||||
%%
|
||||
|
@ -263,6 +274,10 @@ create_dry_run(Type, Conf) ->
|
|||
Error
|
||||
end.
|
||||
|
||||
remove(BridgeId) ->
|
||||
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||
remove(BridgeType, BridgeName, #{}).
|
||||
|
||||
remove(Type, Name, _Conf) ->
|
||||
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
||||
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
||||
|
|
|
@ -165,7 +165,8 @@ handle_publish(Msg, undefined) ->
|
|||
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
|
||||
"_'ingress'_is_not_configured",
|
||||
message => Msg});
|
||||
handle_publish(Msg, Vars) ->
|
||||
handle_publish(Msg0, Vars) ->
|
||||
Msg = format_msg_received(Msg0),
|
||||
?SLOG(debug, #{msg => "publish_to_local_broker",
|
||||
message => Msg, vars => Vars}),
|
||||
case Vars of
|
||||
|
@ -173,27 +174,11 @@ handle_publish(Msg, Vars) ->
|
|||
_ = erlang:apply(Mod, Func, [Msg | Args]);
|
||||
_ -> ok
|
||||
end,
|
||||
maybe_publish_to_local_broker(Msg, Vars).
|
||||
maybe_publish_to_local_broker(Msg0, Vars).
|
||||
|
||||
handle_disconnected(Reason, Parent) ->
|
||||
Parent ! {disconnected, self(), Reason}.
|
||||
|
||||
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
|
||||
case maps:get(local_topic, Vars, undefined) of
|
||||
undefined ->
|
||||
%% local topic is not set, discard it
|
||||
ok;
|
||||
_ ->
|
||||
case emqx_topic:match(Topic, SubTopic) of
|
||||
true ->
|
||||
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
|
||||
ok;
|
||||
false ->
|
||||
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
|
||||
message => Msg, subscribed => SubTopic, got_topic => Topic})
|
||||
end
|
||||
end.
|
||||
|
||||
make_hdlr(Parent, Vars) ->
|
||||
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
|
||||
publish => {fun ?MODULE:handle_publish/2, [Vars]},
|
||||
|
@ -209,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
|
|||
|
||||
process_config(Config) ->
|
||||
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
|
||||
|
||||
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
|
||||
case maps:get(local_topic, Vars, undefined) of
|
||||
undefined ->
|
||||
ok; %% local topic is not set, discard it
|
||||
_ ->
|
||||
case emqx_topic:match(Topic, SubTopic) of
|
||||
true ->
|
||||
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
|
||||
ok;
|
||||
false ->
|
||||
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
|
||||
message => Msg, subscribed => SubTopic, got_topic => Topic})
|
||||
end
|
||||
end.
|
||||
|
||||
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
|
||||
qos := QoS, retain := Retain, topic := Topic}) ->
|
||||
#{event => '$bridges/mqtt',
|
||||
id => emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
payload => Payload,
|
||||
topic => Topic,
|
||||
qos => QoS,
|
||||
dup => Dup,
|
||||
retain => Retain,
|
||||
pub_props => printable_maps(Props),
|
||||
timestamp => erlang:system_time(millisecond)
|
||||
}.
|
||||
|
||||
printable_maps(undefined) -> #{};
|
||||
printable_maps(Headers) ->
|
||||
maps:fold(
|
||||
fun ('User-Property', V0, AccIn) when is_list(V0) ->
|
||||
AccIn#{
|
||||
'User-Property' => maps:from_list(V0),
|
||||
'User-Property-Pairs' => [#{
|
||||
key => Key,
|
||||
value => Value
|
||||
} || {Key, Value} <- V0]
|
||||
};
|
||||
(K, V0, AccIn) -> AccIn#{K => V0}
|
||||
end, #{}, Headers).
|
||||
|
|
|
@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
|
|||
Msg#message{topic = topic(Mountpoint, Topic)}.
|
||||
|
||||
%% published from remote node over a MQTT connection
|
||||
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
|
||||
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
|
||||
#{local_topic := TopicToken, payload := PayloadToken,
|
||||
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
|
||||
MapMsg = format_msg_received(MapMsg0),
|
||||
Topic = replace_vars_in_str(TopicToken, MapMsg),
|
||||
Payload = process_payload(PayloadToken, MapMsg),
|
||||
QoS = replace_simple_var(QoSToken, MapMsg),
|
||||
|
@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
|
|||
emqx_message:set_flags(#{dup => Dup, retain => Retain},
|
||||
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
|
||||
|
||||
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
|
||||
qos := QoS, retain := Retain, topic := Topic}) ->
|
||||
#{event => '$bridges/mqtt',
|
||||
id => emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
payload => Payload,
|
||||
topic => Topic,
|
||||
qos => QoS,
|
||||
flags => #{dup => Dup, retain => Retain},
|
||||
pub_props => printable_maps(Props),
|
||||
timestamp => erlang:system_time(millisecond),
|
||||
node => node()
|
||||
}.
|
||||
|
||||
printable_maps(undefined) -> #{};
|
||||
printable_maps(Headers) ->
|
||||
maps:fold(
|
||||
fun ('User-Property', V0, AccIn) when is_list(V0) ->
|
||||
AccIn#{
|
||||
'User-Property' => maps:from_list(V0),
|
||||
'User-Property-Pairs' => [#{
|
||||
key => Key,
|
||||
value => Value
|
||||
} || {Key, Value} <- V0]
|
||||
};
|
||||
(K, V0, AccIn) -> AccIn#{K => V0}
|
||||
end, #{}, Headers).
|
||||
|
||||
process_payload([], Msg) ->
|
||||
emqx_json:encode(Msg);
|
||||
process_payload(Tks, Msg) ->
|
||||
|
|
|
@ -22,7 +22,10 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(CONF_DEFAULT, <<"connectors: {}">>).
|
||||
%% output functions
|
||||
-export([ inspect/3
|
||||
]).
|
||||
|
||||
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
|
||||
-define(CONNECTR_TYPE, <<"mqtt">>).
|
||||
-define(CONNECTR_NAME, <<"test_connector">>).
|
||||
|
@ -67,6 +70,9 @@
|
|||
<<"failed">> := FAILED, <<"rate">> := SPEED,
|
||||
<<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
|
||||
|
||||
inspect(Selected, _Envs, _Args) ->
|
||||
persistent_term:put(?MODULE, #{inspect => Selected}).
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
|
@ -89,21 +95,35 @@ init_per_suite(Config) ->
|
|||
%% some testcases (may from other app) already get emqx_connector started
|
||||
_ = application:stop(emqx_resource),
|
||||
_ = application:stop(emqx_connector),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
|
||||
ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector,
|
||||
emqx_bridge, emqx_dashboard]),
|
||||
ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>),
|
||||
ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>),
|
||||
ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
|
||||
emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||
Config.
|
||||
end_per_testcase(_, _Config) ->
|
||||
clear_resources(),
|
||||
ok.
|
||||
|
||||
clear_resources() ->
|
||||
lists:foreach(fun(#{id := Id}) ->
|
||||
ok = emqx_rule_engine:delete_rule(Id)
|
||||
end, emqx_rule_engine:get_rules()),
|
||||
lists:foreach(fun(#{id := Id}) ->
|
||||
ok = emqx_bridge:remove(Id)
|
||||
end, emqx_bridge:list()),
|
||||
lists:foreach(fun(#{<<"id">> := Id}) ->
|
||||
ok = emqx_connector:delete(Id)
|
||||
end, emqx_connector:list()).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -223,7 +243,6 @@ t_mqtt_conn_bridge_ingress(_) ->
|
|||
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
||||
%% the remote broker is also the local one.
|
||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||
|
||||
%% we should receive a message on the local broker, with specified topic
|
||||
?assert(
|
||||
receive
|
||||
|
@ -435,6 +454,150 @@ t_mqtt_conn_testing(_) ->
|
|||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}).
|
||||
|
||||
t_ingress_mqtt_bridge_with_rules(_) ->
|
||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
||||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
|
||||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_INGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
|
||||
|
||||
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
|
||||
<<"enable">> => true,
|
||||
<<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
|
||||
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
|
||||
}),
|
||||
#{<<"id">> := RuleId} = jsx:decode(Rule),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
|
||||
RemoteTopic = <<"remote_topic/1">>,
|
||||
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(LocalTopic),
|
||||
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
|
||||
%% the remote broker is also the local one.
|
||||
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
|
||||
%% we should receive a message on the local broker, with specified topic
|
||||
?assert(
|
||||
receive
|
||||
{deliver, LocalTopic, #message{payload = Payload}} ->
|
||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
|
||||
true;
|
||||
Msg ->
|
||||
ct:pal("Msg: ~p", [Msg]),
|
||||
false
|
||||
after 100 ->
|
||||
false
|
||||
end),
|
||||
%% and also the rule should be matched, with matched + 1:
|
||||
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
||||
#{ <<"id">> := RuleId
|
||||
, <<"metrics">> := #{<<"matched">> := 1}
|
||||
} = jsx:decode(Rule1),
|
||||
%% we also check if the outputs of the rule is triggered
|
||||
?assertMatch(#{inspect := #{
|
||||
event := '$bridges/mqtt',
|
||||
id := MsgId,
|
||||
payload := Payload,
|
||||
topic := RemoteTopic,
|
||||
qos := 0,
|
||||
dup := false,
|
||||
retain := false,
|
||||
pub_props := #{},
|
||||
timestamp := _
|
||||
}} when is_binary(MsgId), persistent_term:get(?MODULE)),
|
||||
|
||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
||||
|
||||
t_egress_mqtt_bridge_with_rules(_) ->
|
||||
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
||||
, <<"name">> => ?CONNECTR_NAME
|
||||
}),
|
||||
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||
|
||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
||||
<<"type">> => ?CONNECTR_TYPE,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||
}),
|
||||
#{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
|
||||
|
||||
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
|
||||
<<"enable">> => true,
|
||||
<<"outputs">> => [BridgeIDEgress],
|
||||
<<"sql">> => <<"SELECT * from \"t/1\"">>
|
||||
}),
|
||||
#{<<"id">> := RuleId} = jsx:decode(Rule),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<"local_topic/1">>,
|
||||
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
emqx:subscribe(RemoteTopic),
|
||||
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||
%% the remote broker is also the local one.
|
||||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
?assert(
|
||||
receive
|
||||
{deliver, RemoteTopic, #message{payload = Payload}} ->
|
||||
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
||||
true;
|
||||
Msg ->
|
||||
ct:pal("Msg: ~p", [Msg]),
|
||||
false
|
||||
after 100 ->
|
||||
false
|
||||
end),
|
||||
emqx:unsubscribe(RemoteTopic),
|
||||
|
||||
%% PUBLISH a message to the rule.
|
||||
Payload2 = <<"hi">>,
|
||||
RuleTopic = <<"t/1">>,
|
||||
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
|
||||
emqx:subscribe(RemoteTopic2),
|
||||
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
||||
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
||||
#{ <<"id">> := RuleId
|
||||
, <<"metrics">> := #{<<"matched">> := 1}
|
||||
} = jsx:decode(Rule1),
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
?assert(
|
||||
receive
|
||||
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
|
||||
ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
|
||||
true;
|
||||
Msg ->
|
||||
ct:pal("Msg: ~p", [Msg]),
|
||||
false
|
||||
after 100 ->
|
||||
false
|
||||
end),
|
||||
|
||||
%% verify the metrics of the bridge
|
||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
||||
, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
||||
, <<"node_metrics">> :=
|
||||
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
|
||||
}, jsx:decode(BridgeStr)),
|
||||
|
||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% HTTP Request
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -59,12 +59,10 @@ statsd(put, #{body := Body}) ->
|
|||
Body,
|
||||
#{rawconf_with_defaults => true, override_to => cluster}) of
|
||||
{ok, #{raw_config := NewConfig, config := Config}} ->
|
||||
_ = emqx_statsd_sup:stop_child(?APP),
|
||||
case maps:get(<<"enable">>, Body) of
|
||||
true ->
|
||||
_ = emqx_statsd_sup:stop_child(?APP),
|
||||
emqx_statsd_sup:start_child(?APP, maps:get(config, Config));
|
||||
false ->
|
||||
_ = emqx_statsd_sup:stop_child(?APP)
|
||||
true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config));
|
||||
false -> ok
|
||||
end,
|
||||
{200, NewConfig};
|
||||
{error, Reason} ->
|
||||
|
|
Loading…
Reference in New Issue