test(pulsar): add testcase for different producers using the same topic
This commit is contained in:
parent
ed5e6599d9
commit
164a507899
|
@ -127,23 +127,18 @@ init_per_testcase(TestCase, Config) ->
|
||||||
common_init_per_testcase(TestCase, Config).
|
common_init_per_testcase(TestCase, Config).
|
||||||
|
|
||||||
end_per_testcase(_Testcase, Config) ->
|
end_per_testcase(_Testcase, Config) ->
|
||||||
case proplists:get_bool(skip_does_not_apply, Config) of
|
ok = emqx_config:delete_override_conf_files(),
|
||||||
true ->
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
ok;
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
false ->
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
ok = emqx_config:delete_override_conf_files(),
|
emqx_bridge_v2_testlib:delete_all_bridges(),
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
stop_consumer(Config),
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
%% in CI, apparently this needs more time since the
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
%% machines struggle with all the containers running...
|
||||||
emqx_bridge_v2_testlib:delete_all_bridges(),
|
emqx_common_test_helpers:call_janitor(60_000),
|
||||||
stop_consumer(Config),
|
ok = snabbkaffe:stop(),
|
||||||
%% in CI, apparently this needs more time since the
|
flush_consumed(),
|
||||||
%% machines struggle with all the containers running...
|
ok.
|
||||||
emqx_common_test_helpers:call_janitor(60_000),
|
|
||||||
ok = snabbkaffe:stop(),
|
|
||||||
flush_consumed(),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
common_init_per_testcase(TestCase, Config0) ->
|
common_init_per_testcase(TestCase, Config0) ->
|
||||||
ct:timetrap(timer:seconds(60)),
|
ct:timetrap(timer:seconds(60)),
|
||||||
|
@ -160,6 +155,10 @@ common_init_per_testcase(TestCase, Config0) ->
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helper fns
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
create_connector(Name, Config) ->
|
create_connector(Name, Config) ->
|
||||||
Connector = pulsar_connector(Config),
|
Connector = pulsar_connector(Config),
|
||||||
{ok, _} = emqx_connector:create(?TYPE, Name, Connector).
|
{ok, _} = emqx_connector:create(?TYPE, Name, Connector).
|
||||||
|
@ -174,69 +173,6 @@ create_action(Name, Config) ->
|
||||||
delete_action(Name) ->
|
delete_action(Name) ->
|
||||||
ok = emqx_bridge_v2:remove(actions, ?TYPE, Name).
|
ok = emqx_bridge_v2:remove(actions, ?TYPE, Name).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Testcases
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_action_probe(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
Action = pulsar_action(Config),
|
|
||||||
{ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
|
|
||||||
?assertMatch({{_, 204, _}, _, _}, Res0),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_action(Config) ->
|
|
||||||
Name = atom_to_binary(?FUNCTION_NAME),
|
|
||||||
create_action(Name, Config),
|
|
||||||
Actions = emqx_bridge_v2:list(actions),
|
|
||||||
Any = fun(#{name := BName}) -> BName =:= Name end,
|
|
||||||
?assert(lists:any(Any, Actions), Actions),
|
|
||||||
Topic = <<"lkadfdaction">>,
|
|
||||||
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
|
||||||
#{
|
|
||||||
sql => <<"select * from \"", Topic/binary, "\"">>,
|
|
||||||
id => atom_to_binary(?FUNCTION_NAME),
|
|
||||||
actions => [<<"pulsar:", Name/binary>>],
|
|
||||||
description => <<"bridge_v2 send msg to pulsar action">>
|
|
||||||
}
|
|
||||||
),
|
|
||||||
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
|
||||||
MQTTClientID = <<"pulsar_mqtt_clientid">>,
|
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]),
|
|
||||||
{ok, _} = emqtt:connect(C1),
|
|
||||||
ReqPayload = payload(),
|
|
||||||
ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
|
|
||||||
{ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]),
|
|
||||||
[#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000),
|
|
||||||
?assertEqual(MQTTClientID, ClientID),
|
|
||||||
?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)),
|
|
||||||
ok = emqtt:disconnect(C1),
|
|
||||||
InstanceId = instance_id(actions, Name),
|
|
||||||
?retry(
|
|
||||||
100,
|
|
||||||
20,
|
|
||||||
?assertMatch(
|
|
||||||
#{
|
|
||||||
counters := #{
|
|
||||||
dropped := 0,
|
|
||||||
success := 1,
|
|
||||||
matched := 1,
|
|
||||||
failed := 0,
|
|
||||||
received := 0
|
|
||||||
}
|
|
||||||
},
|
|
||||||
emqx_resource:get_metrics(InstanceId)
|
|
||||||
)
|
|
||||||
),
|
|
||||||
ok = delete_action(Name),
|
|
||||||
ActionsAfterDelete = emqx_bridge_v2:list(actions),
|
|
||||||
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% Helper fns
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
pulsar_connector(Config) ->
|
pulsar_connector(Config) ->
|
||||||
PulsarHost = ?config(pulsar_host, Config),
|
PulsarHost = ?config(pulsar_host, Config),
|
||||||
PulsarPort = ?config(pulsar_port, Config),
|
PulsarPort = ?config(pulsar_port, Config),
|
||||||
|
@ -455,3 +391,158 @@ maybe_skip_without_ci() ->
|
||||||
_ ->
|
_ ->
|
||||||
{skip, no_pulsar}
|
{skip, no_pulsar}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
assert_status_api(Line, Type, Name, Status) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"status">> := Status,
|
||||||
|
<<"node_status">> := [#{<<"status">> := Status}]
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:get_bridge_api(Type, Name),
|
||||||
|
#{line => Line, name => Name, expected_status => Status}
|
||||||
|
).
|
||||||
|
-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_action_probe(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
Action = pulsar_action(Config),
|
||||||
|
{ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
|
||||||
|
?assertMatch({{_, 204, _}, _, _}, Res0),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_action(Config) ->
|
||||||
|
Name = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
create_action(Name, Config),
|
||||||
|
Actions = emqx_bridge_v2:list(actions),
|
||||||
|
Any = fun(#{name := BName}) -> BName =:= Name end,
|
||||||
|
?assert(lists:any(Any, Actions), Actions),
|
||||||
|
Topic = <<"lkadfdaction">>,
|
||||||
|
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => <<"select * from \"", Topic/binary, "\"">>,
|
||||||
|
id => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
actions => [<<"pulsar:", Name/binary>>],
|
||||||
|
description => <<"bridge_v2 send msg to pulsar action">>
|
||||||
|
}
|
||||||
|
),
|
||||||
|
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end),
|
||||||
|
MQTTClientID = <<"pulsar_mqtt_clientid">>,
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
ReqPayload = payload(),
|
||||||
|
ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
|
||||||
|
{ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]),
|
||||||
|
[#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000),
|
||||||
|
?assertEqual(MQTTClientID, ClientID),
|
||||||
|
?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)),
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
InstanceId = instance_id(actions, Name),
|
||||||
|
?retry(
|
||||||
|
100,
|
||||||
|
20,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
counters := #{
|
||||||
|
dropped := 0,
|
||||||
|
success := 1,
|
||||||
|
matched := 1,
|
||||||
|
failed := 0,
|
||||||
|
received := 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_resource:get_metrics(InstanceId)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok = delete_action(Name),
|
||||||
|
ActionsAfterDelete = emqx_bridge_v2:list(actions),
|
||||||
|
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% Tests that deleting/disabling an action that share the same Pulsar topic with other
|
||||||
|
%% actions do not disturb the latter.
|
||||||
|
t_multiple_actions_sharing_topic(Config) ->
|
||||||
|
Type = ?TYPE,
|
||||||
|
ConnectorName = <<"c">>,
|
||||||
|
ConnectorConfig = pulsar_connector(Config),
|
||||||
|
ActionConfig = pulsar_action(Config),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
ConnectorParams = [
|
||||||
|
{connector_config, ConnectorConfig},
|
||||||
|
{connector_name, ConnectorName},
|
||||||
|
{connector_type, Type}
|
||||||
|
],
|
||||||
|
ActionName1 = <<"a1">>,
|
||||||
|
ActionParams1 = [
|
||||||
|
{action_config, ActionConfig},
|
||||||
|
{action_name, ActionName1},
|
||||||
|
{action_type, Type}
|
||||||
|
],
|
||||||
|
ActionName2 = <<"a2">>,
|
||||||
|
ActionParams2 = [
|
||||||
|
{action_config, ActionConfig},
|
||||||
|
{action_name, ActionName2},
|
||||||
|
{action_type, Type}
|
||||||
|
],
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_action_api(ActionParams1),
|
||||||
|
{ok, {{_, 201, _}, _, #{}}} =
|
||||||
|
emqx_bridge_v2_testlib:create_action_api(ActionParams2),
|
||||||
|
|
||||||
|
?assertStatusAPI(Type, ActionName1, <<"connected">>),
|
||||||
|
?assertStatusAPI(Type, ActionName2, <<"connected">>),
|
||||||
|
|
||||||
|
RuleTopic = <<"t/a2">>,
|
||||||
|
{ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [
|
||||||
|
{bridge_name, ActionName2}
|
||||||
|
]),
|
||||||
|
{ok, C} = emqtt:start_link([]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
SendMessage = fun() ->
|
||||||
|
ReqPayload = payload(),
|
||||||
|
ReqPayloadBin = emqx_utils_json:encode(ReqPayload),
|
||||||
|
{ok, _} = emqtt:publish(C, RuleTopic, #{}, ReqPayloadBin, [
|
||||||
|
{qos, 1}, {retain, false}
|
||||||
|
]),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% Disabling a1 shouldn't disturb a2.
|
||||||
|
?assertMatch(
|
||||||
|
{204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertStatusAPI(Type, ActionName1, <<"disconnected">>),
|
||||||
|
?assertStatusAPI(Type, ActionName2, <<"connected">>),
|
||||||
|
|
||||||
|
?assertMatch(ok, SendMessage()),
|
||||||
|
?assertStatusAPI(Type, ActionName2, <<"connected">>),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{204, _},
|
||||||
|
emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1)
|
||||||
|
),
|
||||||
|
?assertStatusAPI(Type, ActionName1, <<"connected">>),
|
||||||
|
?assertStatusAPI(Type, ActionName2, <<"connected">>),
|
||||||
|
?assertMatch(ok, SendMessage()),
|
||||||
|
|
||||||
|
%% Deleting also shouldn't disrupt a2.
|
||||||
|
?assertMatch(
|
||||||
|
{204, _},
|
||||||
|
emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1)
|
||||||
|
),
|
||||||
|
?assertStatusAPI(Type, ActionName2, <<"connected">>),
|
||||||
|
?assertMatch(ok, SendMessage()),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue