diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl new file mode 100644 index 000000000..69b384ab8 --- /dev/null +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -0,0 +1,451 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_pulsar_v2_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(TYPE, <<"pulsar">>). +-define(APPS, [emqx_conf, emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]). +-define(RULE_TOPIC, "pulsar/rule"). +-define(RULE_TOPIC_BIN, <>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, plain}, + {group, tls} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {plain, AllTCs}, + {tls, AllTCs} + ]. + +init_per_suite(Config) -> + %% Ensure enterprise bridge module is loaded + _ = emqx_bridge_enterprise:module_info(), + {ok, Cwd} = file:get_cwd(), + PrivDir = ?config(priv_dir, Config), + WorkDir = emqx_utils_fs:find_relpath(filename:join(PrivDir, "ebp"), Cwd), + Apps = emqx_cth_suite:start( + lists:flatten([ + ?APPS, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ]), + #{work_dir => WorkDir} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)). + +init_per_group(plain = Type, Config) -> + PulsarHost = os:getenv("PULSAR_PLAIN_HOST", "toxiproxy"), + PulsarPort = list_to_integer(os:getenv("PULSAR_PLAIN_PORT", "6652")), + ProxyName = "pulsar_plain", + case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of + true -> + Config1 = common_init_per_group(), + NewConfig = + [ + {proxy_name, ProxyName}, + {pulsar_host, PulsarHost}, + {pulsar_port, PulsarPort}, + {pulsar_type, Type}, + {use_tls, false} + | Config1 ++ Config + ], + create_connector(?MODULE, NewConfig), + NewConfig; + false -> + maybe_skip_without_ci() + end; +init_per_group(tls = Type, Config) -> + PulsarHost = os:getenv("PULSAR_TLS_HOST", "toxiproxy"), + PulsarPort = list_to_integer(os:getenv("PULSAR_TLS_PORT", "6653")), + ProxyName = "pulsar_tls", + case emqx_common_test_helpers:is_tcp_server_available(PulsarHost, PulsarPort) of + true -> + Config1 = common_init_per_group(), + NewConfig = + [ + {proxy_name, ProxyName}, + {pulsar_host, PulsarHost}, + {pulsar_port, PulsarPort}, + {pulsar_type, Type}, + {use_tls, true} + | Config1 ++ Config + ], + create_connector(?MODULE, NewConfig), + NewConfig; + false -> + maybe_skip_without_ci() + end. + +end_per_group(Group, Config) when + Group =:= plain; + Group =:= tls +-> + common_end_per_group(Config), + ok. + +common_init_per_group() -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + UniqueNum = integer_to_binary(erlang:unique_integer()), + MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {mqtt_topic, MQTTTopic} + ]. + +common_end_per_group(Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + ok. + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +end_per_testcase(_Testcase, Config) -> + case proplists:get_bool(skip_does_not_apply, Config) of + true -> + ok; + false -> + ok = emqx_config:delete_override_conf_files(), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges(), + stop_consumer(Config), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + flush_consumed(), + ok + end. + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + emqx_bridge_v2_testlib:delete_all_bridges(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + PulsarTopic = + << + (atom_to_binary(TestCase))/binary, + UniqueNum/binary + >>, + Config1 = [{pulsar_topic, PulsarTopic} | Config0], + ConsumerConfig = start_consumer(TestCase, Config1), + Config = ConsumerConfig ++ Config1, + ok = snabbkaffe:start_trace(), + Config. + +create_connector(Name, Config) -> + Connector = pulsar_connector(Config), + {ok, _} = emqx_connector:create(?TYPE, Name, Connector). + +delete_connector(Name) -> + ok = emqx_connector:remove(?TYPE, Name). + +create_action(Name, Config) -> + Action = pulsar_action(Config), + {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action). + +delete_action(Name) -> + ok = emqx_bridge_v2:remove(actions, ?TYPE, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_action_probe(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + Action = pulsar_action(Config), + {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), + ?assertMatch({{_, 204, _}, _, _}, Res0), + ok. + +t_action(Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name, Config), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfdaction">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"pulsar:", Name/binary>>], + description => <<"bridge_v2 send msg to pulsar action">> + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + MQTTClientID = <<"pulsar_mqtt_clientid">>, + {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, MQTTClientID}]), + {ok, _} = emqtt:connect(C1), + ReqPayload = payload(), + ReqPayloadBin = emqx_utils_json:encode(ReqPayload), + {ok, _} = emqtt:publish(C1, Topic, #{}, ReqPayloadBin, [{qos, 1}, {retain, false}]), + [#{<<"clientid">> := ClientID, <<"payload">> := RespPayload}] = receive_consumed(5000), + ?assertEqual(MQTTClientID, ClientID), + ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), + ok = emqtt:disconnect(C1), + InstanceId = instance_id(actions, Name), + #{counters := Counters} = emqx_resource:get_metrics(InstanceId), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ?assertMatch( + #{ + dropped := 0, + success := 1, + matched := 1, + failed := 0, + received := 0 + }, + Counters + ), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +pulsar_connector(Config) -> + PulsarHost = ?config(pulsar_host, Config), + PulsarPort = ?config(pulsar_port, Config), + UseTLS = proplists:get_value(use_tls, Config, false), + Name = atom_to_binary(?MODULE), + Prefix = + case UseTLS of + true -> <<"pulsar+ssl://">>; + false -> <<"pulsar://">> + end, + ServerURL = iolist_to_binary([ + Prefix, + PulsarHost, + ":", + integer_to_binary(PulsarPort) + ]), + Connector = #{ + <<"connectors">> => #{ + <<"pulsar">> => #{ + Name => #{ + <<"enable">> => true, + <<"ssl">> => #{ + <<"enable">> => UseTLS, + <<"verify">> => <<"verify_none">>, + <<"server_name_indication">> => <<"auto">> + }, + <<"authentication">> => <<"none">>, + <<"servers">> => ServerURL + } + } + } + }, + parse_and_check(<<"connectors">>, emqx_connector_schema, Connector, Name). + +pulsar_action(Config) -> + Name = atom_to_binary(?MODULE), + Action = #{ + <<"actions">> => #{ + <<"pulsar">> => #{ + Name => #{ + <<"connector">> => Name, + <<"enable">> => true, + <<"parameters">> => #{ + <<"retention_period">> => <<"infinity">>, + <<"max_batch_bytes">> => <<"1MB">>, + <<"batch_size">> => 100, + <<"strategy">> => <<"random">>, + <<"buffer">> => #{ + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">>, + <<"memory_overload_protection">> => true + }, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"value">> => <<"${.}">> + }, + <<"pulsar_topic">> => ?config(pulsar_topic, Config) + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">> + } + } + } + } + }, + parse_and_check(<<"actions">>, emqx_bridge_v2_schema, Action, Name). + +parse_and_check(Key, Mod, Conf, Name) -> + ConfStr = hocon_pp:do(Conf, #{}), + ct:pal(ConfStr), + {ok, RawConf} = hocon:binary(ConfStr, #{format => map}), + hocon_tconf:check_plain(Mod, RawConf, #{required => false, atom_key => false}), + #{Key := #{<<"pulsar">> := #{Name := RetConf}}} = RawConf, + RetConf. + +instance_id(Type, Name) -> + ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), + BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name), + TypeBin = + case Type of + sources -> <<"source:">>; + actions -> <<"action:">> + end, + <>. + +start_consumer(TestCase, Config) -> + PulsarHost = ?config(pulsar_host, Config), + PulsarPort = ?config(pulsar_port, Config), + PulsarTopic = ?config(pulsar_topic, Config), + UseTLS = ?config(use_tls, Config), + Scheme = + case UseTLS of + true -> <<"pulsar+ssl://">>; + false -> <<"pulsar://">> + end, + URL = + binary_to_list( + <> + ), + ConsumerClientId = list_to_atom( + atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()) + ), + CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"), + SSLOpts = #{ + enable => UseTLS, + keyfile => filename:join([CertsPath, "key.pem"]), + certfile => filename:join([CertsPath, "cert.pem"]), + cacertfile => filename:join([CertsPath, "cacert.pem"]) + }, + Opts = #{enable_ssl => UseTLS, ssl_opts => emqx_tls_lib:to_client_opts(SSLOpts)}, + {ok, _ClientPid} = pulsar:ensure_supervised_client(ConsumerClientId, [URL], Opts), + ConsumerOpts = Opts#{ + cb_init_args => #{send_to => self()}, + cb_module => pulsar_echo_consumer, + sub_type => 'Shared', + subscription => atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()), + max_consumer_num => 1, + %% Note! This must not coincide with the client + %% id, or else weird bugs will happen, like the + %% consumer never starts... + name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())), + consumer_id => 1 + }, + {ok, Consumer} = pulsar:ensure_supervised_consumers( + ConsumerClientId, + PulsarTopic, + ConsumerOpts + ), + %% since connection is async, and there's currently no way to + %% specify the subscription initial position as `Earliest', we + %% need to wait until the consumer is connected to avoid + %% flakiness. + ok = wait_until_consumer_connected(Consumer), + [ + {consumer_client_id, ConsumerClientId}, + {pulsar_consumer, Consumer} + ]. + +stop_consumer(Config) -> + ConsumerClientId = ?config(consumer_client_id, Config), + Consumer = ?config(pulsar_consumer, Config), + ok = pulsar:stop_and_delete_supervised_consumers(Consumer), + ok = pulsar:stop_and_delete_supervised_client(ConsumerClientId), + ok. + +wait_until_consumer_connected(Consumer) -> + ?retry( + _Sleep = 300, + _Attempts0 = 20, + true = pulsar_consumers:all_connected(Consumer) + ), + ok. + +wait_until_producer_connected() -> + wait_until_connected(pulsar_producers_sup, pulsar_producer). + +wait_until_connected(SupMod, Mod) -> + Pids = get_pids(SupMod, Mod), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + begin + true = length(Pids) > 0, + lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids) + end + ), + ok. + +get_pulsar_producers() -> + get_pids(pulsar_producers_sup, pulsar_producer). + +get_pids(SupMod, Mod) -> + [ + P + || {_Name, SupPid, _Type, _Mods} <- supervisor:which_children(SupMod), + P <- element(2, process_info(SupPid, links)), + case proc_lib:initial_call(P) of + {Mod, init, _} -> true; + _ -> false + end + ]. + +receive_consumed(Timeout) -> + receive + {pulsar_message, #{payloads := Payloads}} -> + lists:map(fun try_decode_json/1, Payloads) + after Timeout -> + ct:pal("mailbox: ~p", [process_info(self(), messages)]), + ct:fail("no message consumed") + end. + +flush_consumed() -> + receive + {pulsar_message, _} -> flush_consumed() + after 0 -> ok + end. + +try_decode_json(Payload) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {error, _} -> + Payload; + {ok, JSON} -> + JSON + end. + +payload() -> + #{<<"key">> => 42, <<"data">> => <<"pulsar">>, <<"timestamp">> => 10000}. + +maybe_skip_without_ci() -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_pulsar); + _ -> + {skip, no_pulsar} + end.