test(pulsar): add more test cases for Pulsar Producer bridge
Fixes https://emqx.atlassian.net/browse/EMQX-8400
This commit is contained in:
parent
15fe445c66
commit
633eacad3b
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps, [
|
{deps, [
|
||||||
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.0"}}},
|
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}},
|
||||||
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||||
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||||
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_pulsar, [
|
{application, emqx_bridge_pulsar, [
|
||||||
{description, "EMQX Pulsar Bridge"},
|
{description, "EMQX Pulsar Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -87,11 +87,14 @@ on_start(InstanceId, Config) ->
|
||||||
},
|
},
|
||||||
case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
|
case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of
|
||||||
{ok, _Pid} ->
|
{ok, _Pid} ->
|
||||||
?SLOG(info, #{
|
?tp(
|
||||||
msg => "pulsar_client_started",
|
info,
|
||||||
|
"pulsar_client_started",
|
||||||
|
#{
|
||||||
instance_id => InstanceId,
|
instance_id => InstanceId,
|
||||||
pulsar_hosts => Servers
|
pulsar_hosts => Servers
|
||||||
});
|
}
|
||||||
|
);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "failed_to_start_pulsar_client",
|
msg => "failed_to_start_pulsar_client",
|
||||||
|
@ -115,7 +118,7 @@ on_stop(_InstanceId, State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
|
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
|
||||||
on_get_status(_InstanceId, State) ->
|
on_get_status(_InstanceId, State = #{}) ->
|
||||||
#{
|
#{
|
||||||
pulsar_client_id := ClientId,
|
pulsar_client_id := ClientId,
|
||||||
producers := Producers
|
producers := Producers
|
||||||
|
@ -135,7 +138,11 @@ on_get_status(_InstanceId, State) ->
|
||||||
end;
|
end;
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
disconnected
|
disconnected
|
||||||
end.
|
end;
|
||||||
|
on_get_status(_InstanceId, _State) ->
|
||||||
|
%% If a health check happens just after a concurrent request to
|
||||||
|
%% create the bridge is not quite finished, `State = undefined'.
|
||||||
|
connecting.
|
||||||
|
|
||||||
-spec on_query(manager_id(), {send_message, map()}, state()) ->
|
-spec on_query(manager_id(), {send_message, map()}, state()) ->
|
||||||
{ok, term()}
|
{ok, term()}
|
||||||
|
@ -160,6 +167,13 @@ on_query(_InstanceId, {send_message, Message}, State) ->
|
||||||
) ->
|
) ->
|
||||||
{ok, pid()}.
|
{ok, pid()}.
|
||||||
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
|
on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) ->
|
||||||
|
?tp_span(
|
||||||
|
pulsar_producer_on_query_async,
|
||||||
|
#{instance_id => _InstanceId, message => Message},
|
||||||
|
do_on_query_async(Message, AsyncReplyFn, State)
|
||||||
|
).
|
||||||
|
|
||||||
|
do_on_query_async(Message, AsyncReplyFn, State) ->
|
||||||
#{
|
#{
|
||||||
producers := Producers,
|
producers := Producers,
|
||||||
message_template := MessageTemplate
|
message_template := MessageTemplate
|
||||||
|
@ -283,6 +297,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
|
||||||
drop_if_highmem => MemOLP
|
drop_if_highmem => MemOLP
|
||||||
},
|
},
|
||||||
ProducerName = producer_name(ClientId),
|
ProducerName = producer_name(ClientId),
|
||||||
|
?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}),
|
||||||
MessageTemplate = compile_message_template(MessageTemplateOpts),
|
MessageTemplate = compile_message_template(MessageTemplateOpts),
|
||||||
ProducerOpts0 =
|
ProducerOpts0 =
|
||||||
#{
|
#{
|
||||||
|
@ -298,6 +313,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
|
||||||
},
|
},
|
||||||
ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
|
ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0),
|
||||||
PulsarTopic = binary_to_list(PulsarTopic0),
|
PulsarTopic = binary_to_list(PulsarTopic0),
|
||||||
|
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
|
||||||
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
|
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
|
||||||
{ok, Producers} ->
|
{ok, Producers} ->
|
||||||
State = #{
|
State = #{
|
||||||
|
@ -310,13 +326,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
|
||||||
{ok, State}
|
{ok, State}
|
||||||
catch
|
catch
|
||||||
Kind:Error:Stacktrace ->
|
Kind:Error:Stacktrace ->
|
||||||
?SLOG(error, #{
|
?tp(
|
||||||
msg => "failed_to_start_pulsar_producer",
|
error,
|
||||||
|
"failed_to_start_pulsar_producer",
|
||||||
|
#{
|
||||||
instance_id => InstanceId,
|
instance_id => InstanceId,
|
||||||
kind => Kind,
|
kind => Kind,
|
||||||
reason => Error,
|
reason => Error,
|
||||||
stacktrace => Stacktrace
|
stacktrace => Stacktrace
|
||||||
}),
|
}
|
||||||
|
),
|
||||||
stop_client(ClientId),
|
stop_client(ClientId),
|
||||||
throw(failed_to_start_pulsar_producer)
|
throw(failed_to_start_pulsar_producer)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -37,7 +37,14 @@ groups() ->
|
||||||
].
|
].
|
||||||
|
|
||||||
only_once_tests() ->
|
only_once_tests() ->
|
||||||
[t_create_via_http].
|
[
|
||||||
|
t_create_via_http,
|
||||||
|
t_start_when_down,
|
||||||
|
t_send_when_down,
|
||||||
|
t_send_when_timeout,
|
||||||
|
t_failure_to_start_producer,
|
||||||
|
t_producer_process_crash
|
||||||
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
@ -753,6 +760,198 @@ t_on_get_status(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_start_when_down(Config) ->
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyName = ?config(proxy_name, Config),
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end),
|
||||||
|
%% Should recover given enough time.
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_send_when_down(Config) ->
|
||||||
|
do_t_send_with_failure(Config, down).
|
||||||
|
|
||||||
|
t_send_when_timeout(Config) ->
|
||||||
|
do_t_send_with_failure(Config, timeout).
|
||||||
|
|
||||||
|
do_t_send_with_failure(Config, FailureType) ->
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyName = ?config(proxy_name, Config),
|
||||||
|
MQTTTopic = ?config(mqtt_topic, Config),
|
||||||
|
QoS = 0,
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
|
||||||
|
|
||||||
|
{{ok, _}, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
create_bridge(Config),
|
||||||
|
#{?snk_kind := pulsar_producer_bridge_started},
|
||||||
|
10_000
|
||||||
|
),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
emqx_common_test_helpers:with_failure(
|
||||||
|
FailureType, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
|
{_, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqx:publish(Message0),
|
||||||
|
#{
|
||||||
|
?snk_kind := pulsar_producer_on_query_async,
|
||||||
|
?snk_span := {complete, _}
|
||||||
|
},
|
||||||
|
5_000
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(_Trace) ->
|
||||||
|
%% Should recover given enough time.
|
||||||
|
Data0 = receive_consumed(20_000),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"clientid">> := ClientId,
|
||||||
|
<<"event">> := <<"message.publish">>,
|
||||||
|
<<"payload">> := Payload,
|
||||||
|
<<"topic">> := MQTTTopic
|
||||||
|
}
|
||||||
|
],
|
||||||
|
Data0
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% Check that we correctly terminate the pulsar client when the pulsar
|
||||||
|
%% producer processes fail to start for whatever reason.
|
||||||
|
t_failure_to_start_producer(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?force_ordering(
|
||||||
|
#{?snk_kind := name_registered},
|
||||||
|
#{?snk_kind := pulsar_producer_about_to_start_producers}
|
||||||
|
),
|
||||||
|
spawn_link(fun() ->
|
||||||
|
?tp(will_register_name, #{}),
|
||||||
|
{ok, #{producer_name := ProducerName}} = ?block_until(
|
||||||
|
#{?snk_kind := pulsar_producer_capture_name}, 10_000
|
||||||
|
),
|
||||||
|
true = register(ProducerName, self()),
|
||||||
|
?tp(name_registered, #{name => ProducerName}),
|
||||||
|
%% Just simulating another process so that starting the
|
||||||
|
%% producers fail. Currently it does a gen_server:call
|
||||||
|
%% with `infinity' timeout, so this is just to avoid
|
||||||
|
%% hanging.
|
||||||
|
receive
|
||||||
|
{'$gen_call', From, _Request} ->
|
||||||
|
gen_server:reply(From, {error, im_not, your_producer})
|
||||||
|
end,
|
||||||
|
receive
|
||||||
|
die -> ok
|
||||||
|
end
|
||||||
|
end),
|
||||||
|
{{ok, _}, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
create_bridge(Config),
|
||||||
|
#{?snk_kind := pulsar_bridge_client_stopped},
|
||||||
|
20_000
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% Check the driver recovers itself if one of the producer processes
|
||||||
|
%% die for whatever reason.
|
||||||
|
t_producer_process_crash(Config) ->
|
||||||
|
MQTTTopic = ?config(mqtt_topic, Config),
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
QoS = 0,
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
{{ok, _}, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
create_bridge(
|
||||||
|
Config,
|
||||||
|
#{<<"buffer">> => #{<<"mode">> => <<"disk">>}}
|
||||||
|
),
|
||||||
|
#{?snk_kind := pulsar_producer_bridge_started},
|
||||||
|
10_000
|
||||||
|
),
|
||||||
|
[ProducerPid | _] = [
|
||||||
|
Pid
|
||||||
|
|| {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup),
|
||||||
|
Pid <- element(2, process_info(PS, links)),
|
||||||
|
case proc_lib:initial_call(Pid) of
|
||||||
|
{pulsar_producer, init, _} -> true;
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
],
|
||||||
|
Ref = monitor(process, ProducerPid),
|
||||||
|
exit(ProducerPid, kill),
|
||||||
|
receive
|
||||||
|
{'DOWN', Ref, process, ProducerPid, _Killed} ->
|
||||||
|
ok
|
||||||
|
after 1_000 -> ct:fail("pid didn't die")
|
||||||
|
end,
|
||||||
|
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)),
|
||||||
|
%% Should recover given enough time.
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
),
|
||||||
|
{_, {ok, _}} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqx:publish(Message0),
|
||||||
|
#{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}},
|
||||||
|
5_000
|
||||||
|
),
|
||||||
|
Data0 = receive_consumed(20_000),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"clientid">> := ClientId,
|
||||||
|
<<"event">> := <<"message.publish">>,
|
||||||
|
<<"payload">> := Payload,
|
||||||
|
<<"topic">> := MQTTTopic
|
||||||
|
}
|
||||||
|
],
|
||||||
|
Data0
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_cluster(Config) ->
|
t_cluster(Config) ->
|
||||||
MQTTTopic = ?config(mqtt_topic, Config),
|
MQTTTopic = ?config(mqtt_topic, Config),
|
||||||
ResourceId = resource_id(Config),
|
ResourceId = resource_id(Config),
|
||||||
|
|
Loading…
Reference in New Issue