diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl index 0636806de..11caa15c6 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -127,23 +127,18 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config). end_per_testcase(_Testcase, Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ok = emqx_config:delete_override_conf_files(), - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - emqx_bridge_v2_testlib:delete_all_bridges(), - stop_consumer(Config), - %% in CI, apparently this needs more time since the - %% machines struggle with all the containers running... - emqx_common_test_helpers:call_janitor(60_000), - ok = snabbkaffe:stop(), - flush_consumed(), - ok - end. + ok = emqx_config:delete_override_conf_files(), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges(), + stop_consumer(Config), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + flush_consumed(), + ok. common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), @@ -160,6 +155,10 @@ common_init_per_testcase(TestCase, Config0) -> ok = snabbkaffe:start_trace(), Config. +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + create_connector(Name, Config) -> Connector = pulsar_connector(Config), {ok, _} = emqx_connector:create(?TYPE, Name, Connector). @@ -174,69 +173,6 @@ create_action(Name, Config) -> delete_action(Name) -> ok = emqx_bridge_v2:remove(actions, ?TYPE, Name). -%%------------------------------------------------------------------------------ -%% Testcases -%%------------------------------------------------------------------------------ - -t_action_probe(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - Action = pulsar_action(Config), - {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), - ?assertMatch({{_, 204, _}, _, _}, Res0), - ok. - -t_action(Config) -> - Name = atom_to_binary(?FUNCTION_NAME), - create_action(Name, Config), - Actions = emqx_bridge_v2:list(actions), - Any = fun(#{name := BName}) -> BName =:= Name end, - ?assert(lists:any(Any, Actions), Actions), - Topic = <<"lkadfdaction">>, - {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( - #{ - sql => <<"select * from \"", Topic/binary, "\"">>, - id => atom_to_binary(?FUNCTION_NAME), - actions => [<<"pulsar:", Name/binary>>], - description => <<"bridge_v2 send msg to pulsar action">> - } - ), - on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), - MQTTClientID = <<"pulsar_mqtt_clientid">>, - {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]), - {ok, _} = emqtt:connect(C1), - ReqPayload = payload(), - ReqPayloadBin = emqx_utils_json:encode(ReqPayload), - {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]), - [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000), - ?assertEqual(MQTTClientID, ClientID), - ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), - ok = emqtt:disconnect(C1), - InstanceId = instance_id(actions, Name), - ?retry( - 100, - 20, - ?assertMatch( - #{ - counters := #{ - dropped := 0, - success := 1, - matched := 1, - failed := 0, - received := 0 - } - }, - emqx_resource:get_metrics(InstanceId) - ) - ), - ok = delete_action(Name), - ActionsAfterDelete = emqx_bridge_v2:list(actions), - ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), - ok. - -%%------------------------------------------------------------------------------ -%% Helper fns -%%------------------------------------------------------------------------------ - pulsar_connector(Config) -> PulsarHost = ?config(pulsar_host, Config), PulsarPort = ?config(pulsar_port, Config), @@ -455,3 +391,158 @@ maybe_skip_without_ci() -> _ -> {skip, no_pulsar} end. + +assert_status_api(Line, Type, Name, Status) -> + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := Status, + <<"node_status">> := [#{<<"status">> := Status}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(Type, Name), + #{line => Line, name => Name, expected_status => Status} + ). +-define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_action_probe(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + Action = pulsar_action(Config), + {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), + ?assertMatch({{_, 204, _}, _, _}, Res0), + ok. + +t_action(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name, Config), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfdaction">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"pulsar:", Name/binary>>], + description => <<"bridge_v2 send msg to pulsar action">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + MQTTClientID = <<"pulsar_mqtt_clientid">>, + {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]), + {ok, _} = emqtt:connect(C1), + ReqPayload = payload(), + ReqPayloadBin = emqx_utils_json:encode(ReqPayload), + {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]), + [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000), + ?assertEqual(MQTTClientID, ClientID), + ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), + ok = emqtt:disconnect(C1), + InstanceId = instance_id(actions, Name), + ?retry( + 100, + 20, + ?assertMatch( + #{ + counters := #{ + dropped := 0, + success := 1, + matched := 1, + failed := 0, + received := 0 + } + }, + emqx_resource:get_metrics(InstanceId) + ) + ), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ok. + +%% Tests that deleting/disabling an action that share the same Pulsar topic with other +%% actions do not disturb the latter. +t_multiple_actions_sharing_topic(Config) -> + Type = ?TYPE, + ConnectorName = <<"c">>, + ConnectorConfig = pulsar_connector(Config), + ActionConfig = pulsar_action(Config), + ?check_trace( + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionName1 = <<"a1">>, + ActionParams1 = [ + {action_config, ActionConfig}, + {action_name, ActionName1}, + {action_type, Type} + ], + ActionName2 = <<"a2">>, + ActionParams2 = [ + {action_config, ActionConfig}, + {action_name, ActionName2}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams1), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams2), + + ?assertStatusAPI(Type, ActionName1, <<"connected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + RuleTopic = <<"t/a2">>, + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [ + {bridge_name, ActionName2} + ]), + {ok, C} = emqtt:start_link([]), + {ok, _} = emqtt:connect(C), + SendMessage = fun() -> + ReqPayload = payload(), + ReqPayloadBin = emqx_utils_json:encode(ReqPayload), + {ok, _} = emqtt:publish(C, RuleTopic, #{}, ReqPayloadBin, [ + {qos, 1}, {retain, false} + ]), + ok + end, + + %% Disabling a1 shouldn't disturb a2. + ?assertMatch( + {204, _}, emqx_bridge_v2_testlib:disable_kind_api(action, Type, ActionName1) + ), + + ?assertStatusAPI(Type, ActionName1, <<"disconnected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + ?assertMatch(ok, SendMessage()), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + + ?assertMatch( + {204, _}, + emqx_bridge_v2_testlib:enable_kind_api(action, Type, ActionName1) + ), + ?assertStatusAPI(Type, ActionName1, <<"connected">>), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + ?assertMatch(ok, SendMessage()), + + %% Deleting also shouldn't disrupt a2. + ?assertMatch( + {204, _}, + emqx_bridge_v2_testlib:delete_kind_api(action, Type, ActionName1) + ), + ?assertStatusAPI(Type, ActionName2, <<"connected">>), + ?assertMatch(ok, SendMessage()), + + ok + end, + [] + ), + ok.