fix(connector): add testcase for binding egress mqtt bridge to rules
This commit is contained in:
parent
9a7452e1c5
commit
925d46fe86
|
@ -39,11 +39,14 @@
|
||||||
, lookup/3
|
, lookup/3
|
||||||
, list/0
|
, list/0
|
||||||
, list_bridges_by_connector/1
|
, list_bridges_by_connector/1
|
||||||
|
, create/2
|
||||||
, create/3
|
, create/3
|
||||||
, recreate/2
|
, recreate/2
|
||||||
, recreate/3
|
, recreate/3
|
||||||
, create_dry_run/2
|
, create_dry_run/2
|
||||||
|
, remove/1
|
||||||
, remove/3
|
, remove/3
|
||||||
|
, update/2
|
||||||
, update/3
|
, update/3
|
||||||
, start/2
|
, start/2
|
||||||
, stop/2
|
, stop/2
|
||||||
|
@ -207,6 +210,10 @@ stop(Type, Name) ->
|
||||||
restart(Type, Name) ->
|
restart(Type, Name) ->
|
||||||
emqx_resource:restart(resource_id(Type, Name)).
|
emqx_resource:restart(resource_id(Type, Name)).
|
||||||
|
|
||||||
|
create(BridgeId, Conf) ->
|
||||||
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
|
create(BridgeType, BridgeName, Conf).
|
||||||
|
|
||||||
create(Type, Name, Conf) ->
|
create(Type, Name, Conf) ->
|
||||||
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
|
@ -217,6 +224,10 @@ create(Type, Name, Conf) ->
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
update(BridgeId, {OldConf, Conf}) ->
|
||||||
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
|
update(BridgeType, BridgeName, {OldConf, Conf}).
|
||||||
|
|
||||||
update(Type, Name, {OldConf, Conf}) ->
|
update(Type, Name, {OldConf, Conf}) ->
|
||||||
%% TODO: sometimes its not necessary to restart the bridge connection.
|
%% TODO: sometimes its not necessary to restart the bridge connection.
|
||||||
%%
|
%%
|
||||||
|
@ -263,6 +274,10 @@ create_dry_run(Type, Conf) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
remove(BridgeId) ->
|
||||||
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
|
remove(BridgeType, BridgeName, #{}).
|
||||||
|
|
||||||
remove(Type, Name, _Conf) ->
|
remove(Type, Name, _Conf) ->
|
||||||
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
|
||||||
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
case emqx_resource:remove_local(resource_id(Type, Name)) of
|
||||||
|
|
|
@ -110,8 +110,20 @@ init_per_testcase(_, Config) ->
|
||||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
Config.
|
Config.
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
|
clear_resources(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
clear_resources() ->
|
||||||
|
lists:foreach(fun(#{id := Id}) ->
|
||||||
|
ok = emqx_rule_engine:delete_rule(Id)
|
||||||
|
end, emqx_rule_engine:get_rules()),
|
||||||
|
lists:foreach(fun(#{id := Id}) ->
|
||||||
|
ok = emqx_bridge:remove(Id)
|
||||||
|
end, emqx_bridge:list()),
|
||||||
|
lists:foreach(fun(#{<<"id">> := Id}) ->
|
||||||
|
ok = emqx_connector:delete(Id)
|
||||||
|
end, emqx_connector:list()).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcases
|
%% Testcases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -507,6 +519,85 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
|
||||||
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
||||||
|
|
||||||
|
t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
|
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
|
||||||
|
, <<"name">> => ?CONNECTR_NAME
|
||||||
|
}),
|
||||||
|
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
|
||||||
|
|
||||||
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||||
|
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
|
||||||
|
<<"type">> => ?CONNECTR_TYPE,
|
||||||
|
<<"name">> => ?BRIDGE_NAME_EGRESS
|
||||||
|
}),
|
||||||
|
#{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
|
||||||
|
|
||||||
|
{ok, 201, Rule} = request(post, uri(["rules"]),
|
||||||
|
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"outputs">> => [BridgeIDEgress],
|
||||||
|
<<"sql">> => <<"SELECT * from \"t/1\"">>
|
||||||
|
}),
|
||||||
|
#{<<"id">> := RuleId} = jsx:decode(Rule),
|
||||||
|
|
||||||
|
%% we now test if the bridge works as expected
|
||||||
|
LocalTopic = <<"local_topic/1">>,
|
||||||
|
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
emqx:subscribe(RemoteTopic),
|
||||||
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||||
|
%% the remote broker is also the local one.
|
||||||
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
|
?assert(
|
||||||
|
receive
|
||||||
|
{deliver, RemoteTopic, #message{payload = Payload}} ->
|
||||||
|
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
||||||
|
true;
|
||||||
|
Msg ->
|
||||||
|
ct:pal("Msg: ~p", [Msg]),
|
||||||
|
false
|
||||||
|
after 100 ->
|
||||||
|
false
|
||||||
|
end),
|
||||||
|
emqx:unsubscribe(RemoteTopic),
|
||||||
|
|
||||||
|
%% PUBLISH a message to the rule.
|
||||||
|
Payload2 = <<"hi">>,
|
||||||
|
RuleTopic = <<"t/1">>,
|
||||||
|
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
|
||||||
|
emqx:subscribe(RemoteTopic2),
|
||||||
|
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
|
||||||
|
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
|
||||||
|
#{ <<"id">> := RuleId
|
||||||
|
, <<"metrics">> := #{<<"matched">> := 1}
|
||||||
|
} = jsx:decode(Rule1),
|
||||||
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
|
?assert(
|
||||||
|
receive
|
||||||
|
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
|
||||||
|
ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
|
||||||
|
true;
|
||||||
|
Msg ->
|
||||||
|
ct:pal("Msg: ~p", [Msg]),
|
||||||
|
false
|
||||||
|
after 100 ->
|
||||||
|
false
|
||||||
|
end),
|
||||||
|
|
||||||
|
%% verify the metrics of the bridge
|
||||||
|
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||||
|
?assertMatch(#{ <<"id">> := BridgeIDEgress
|
||||||
|
, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
|
||||||
|
, <<"node_metrics">> :=
|
||||||
|
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
|
||||||
|
}, jsx:decode(BridgeStr)),
|
||||||
|
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% HTTP Request
|
%% HTTP Request
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue