Merge pull request #10559 from thalesmg/pulsar-more-tests-v50

test(pulsar): add more test cases for Pulsar Producer bridge
This commit is contained in:
Thales Macedo Garitezi 2023-05-02 10:16:17 -03:00 committed by GitHub
commit 2c4fd98ce5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 235 additions and 17 deletions

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.0"}}}, {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [ {application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"}, {description, "EMQX Pulsar Bridge"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -87,11 +87,14 @@ on_start(InstanceId, Config) ->
}, },
case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
{ok, _Pid} -> {ok, _Pid} ->
?SLOG(info, #{ ?tp(
msg => "pulsar_client_started", info,
"pulsar_client_started",
#{
instance_id => InstanceId, instance_id => InstanceId,
pulsar_hosts => Servers pulsar_hosts => Servers
}); }
);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_start_pulsar_client", msg => "failed_to_start_pulsar_client",
@ -115,7 +118,7 @@ on_stop(_InstanceId, State) ->
ok. ok.
-spec on_get_status(manager_id(), state()) -> connected | disconnected. -spec on_get_status(manager_id(), state()) -> connected | disconnected.
on_get_status(_InstanceId, State) -> on_get_status(_InstanceId, State = #{}) ->
#{ #{
pulsar_client_id := ClientId, pulsar_client_id := ClientId,
producers := Producers producers := Producers
@ -135,7 +138,11 @@ on_get_status(_InstanceId, State) ->
end; end;
{error, _} -> {error, _} ->
disconnected disconnected
end. end;
on_get_status(_InstanceId, _State) ->
%% If a health check happens just after a concurrent request to
%% create the bridge is not quite finished, `State = undefined'.
connecting.
-spec on_query(manager_id(), {send_message, map()}, state()) -> -spec on_query(manager_id(), {send_message, map()}, state()) ->
{ok, term()} {ok, term()}
@ -160,6 +167,13 @@ on_query(_InstanceId, {send_message, Message}, State) ->
) -> ) ->
{ok, pid()}. {ok, pid()}.
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
?tp_span(
pulsar_producer_on_query_async,
#{instance_id => _InstanceId, message => Message},
do_on_query_async(Message, AsyncReplyFn, State)
).
do_on_query_async(Message, AsyncReplyFn, State) ->
#{ #{
producers := Producers, producers := Producers,
message_template := MessageTemplate message_template := MessageTemplate
@ -283,6 +297,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
drop_if_highmem => MemOLP drop_if_highmem => MemOLP
}, },
ProducerName = producer_name(ClientId), ProducerName = producer_name(ClientId),
?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
MessageTemplate = compile_message_template(MessageTemplateOpts), MessageTemplate = compile_message_template(MessageTemplateOpts),
ProducerOpts0 = ProducerOpts0 =
#{ #{
@ -298,6 +313,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
}, },
ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0), ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
PulsarTopic = binary_to_list(PulsarTopic0), PulsarTopic = binary_to_list(PulsarTopic0),
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
{ok, Producers} -> {ok, Producers} ->
State = #{ State = #{
@ -310,13 +326,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
{ok, State} {ok, State}
catch catch
Kind:Error:Stacktrace -> Kind:Error:Stacktrace ->
?SLOG(error, #{ ?tp(
msg => "failed_to_start_pulsar_producer", error,
"failed_to_start_pulsar_producer",
#{
instance_id => InstanceId, instance_id => InstanceId,
kind => Kind, kind => Kind,
reason => Error, reason => Error,
stacktrace => Stacktrace stacktrace => Stacktrace
}), }
),
stop_client(ClientId), stop_client(ClientId),
throw(failed_to_start_pulsar_producer) throw(failed_to_start_pulsar_producer)
end. end.

View File

@ -37,7 +37,14 @@ groups() ->
]. ].
only_once_tests() -> only_once_tests() ->
[t_create_via_http]. [
t_create_via_http,
t_start_when_down,
t_send_when_down,
t_send_when_timeout,
t_failure_to_start_producer,
t_producer_process_crash
].
init_per_suite(Config) -> init_per_suite(Config) ->
Config. Config.
@ -753,6 +760,198 @@ t_on_get_status(Config) ->
), ),
ok. ok.
t_start_when_down(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
ok
end),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
ok
end,
[]
),
ok.
t_send_when_down(Config) ->
do_t_send_with_failure(Config, down).
t_send_when_timeout(Config) ->
do_t_send_with_failure(Config, timeout).
do_t_send_with_failure(Config, FailureType) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
MQTTTopic = ?config(mqtt_topic, Config),
QoS = 0,
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
{{ok, _}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_producer_bridge_started},
10_000
),
?check_trace(
begin
emqx_common_test_helpers:with_failure(
FailureType, ProxyName, ProxyHost, ProxyPort, fun() ->
{_, {ok, _}} =
?wait_async_action(
emqx:publish(Message0),
#{
?snk_kind := pulsar_producer_on_query_async,
?snk_span := {complete, _}
},
5_000
),
ok
end
),
ok
end,
fun(_Trace) ->
%% Should recover given enough time.
Data0 = receive_consumed(20_000),
?assertMatch(
[
#{
<<"clientid">> := ClientId,
<<"event">> := <<"message.publish">>,
<<"payload">> := Payload,
<<"topic">> := MQTTTopic
}
],
Data0
),
ok
end
),
ok.
%% Check that we correctly terminate the pulsar client when the pulsar
%% producer processes fail to start for whatever reason.
t_failure_to_start_producer(Config) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := name_registered},
#{?snk_kind := pulsar_producer_about_to_start_producers}
),
spawn_link(fun() ->
?tp(will_register_name, #{}),
{ok, #{producer_name := ProducerName}} = ?block_until(
#{?snk_kind := pulsar_producer_capture_name}, 10_000
),
true = register(ProducerName, self()),
?tp(name_registered, #{name => ProducerName}),
%% Just simulating another process so that starting the
%% producers fail. Currently it does a gen_server:call
%% with `infinity' timeout, so this is just to avoid
%% hanging.
receive
{'$gen_call', From, _Request} ->
gen_server:reply(From, {error, im_not, your_producer})
end,
receive
die -> ok
end
end),
{{ok, _}, {ok, _}} =
?wait_async_action(
create_bridge(Config),
#{?snk_kind := pulsar_bridge_client_stopped},
20_000
),
ok
end,
[]
),
ok.
%% Check the driver recovers itself if one of the producer processes
%% die for whatever reason.
t_producer_process_crash(Config) ->
MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config),
QoS = 0,
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
?check_trace(
begin
{{ok, _}, {ok, _}} =
?wait_async_action(
create_bridge(
Config,
#{<<"buffer">> => #{<<"mode">> => <<"disk">>}}
),
#{?snk_kind := pulsar_producer_bridge_started},
10_000
),
[ProducerPid | _] = [
Pid
|| {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup),
Pid <- element(2, process_info(PS, links)),
case proc_lib:initial_call(Pid) of
{pulsar_producer, init, _} -> true;
_ -> false
end
],
Ref = monitor(process, ProducerPid),
exit(ProducerPid, kill),
receive
{'DOWN', Ref, process, ProducerPid, _Killed} ->
ok
after 1_000 -> ct:fail("pid didn't die")
end,
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)),
%% Should recover given enough time.
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
{_, {ok, _}} =
?wait_async_action(
emqx:publish(Message0),
#{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}},
5_000
),
Data0 = receive_consumed(20_000),
?assertMatch(
[
#{
<<"clientid">> := ClientId,
<<"event">> := <<"message.publish">>,
<<"payload">> := Payload,
<<"topic">> := MQTTTopic
}
],
Data0
),
ok
end,
[]
),
ok.
t_cluster(Config) -> t_cluster(Config) ->
MQTTTopic = ?config(mqtt_topic, Config), MQTTTopic = ?config(mqtt_topic, Config),
ResourceId = resource_id(Config), ResourceId = resource_id(Config),