From 42b37690c7666a3def04659b687aa5b6da23cffa Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 16:27:31 -0300 Subject: [PATCH] refactor(pulsar): use macros for allocatable resources --- .../src/emqx_bridge_pulsar_impl_producer.erl | 12 ++++++++---- .../test/emqx_bridge_pulsar_impl_producer_SUITE.erl | 8 +++++++- apps/emqx_resource/src/emqx_resource.erl | 2 +- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 5906cc57a..b8157d4fc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -60,6 +60,10 @@ sync_timeout := emqx_schema:duration_ms() }. +%% Allocatable resources +-define(pulsar_client_id, pulsar_client_id). +-define(pulsar_producers, pulsar_producers). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -81,7 +85,7 @@ on_start(InstanceId, Config) -> } = Config, Servers = format_servers(Servers0), ClientId = make_client_id(InstanceId, BridgeName), - ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId), + ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), ClientOpts = #{ @@ -119,7 +123,7 @@ on_start(InstanceId, Config) -> -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> case emqx_resource:get_allocated_resources(InstanceId) of - #{pulsar_client_id := ClientId, pulsar_producers := Producers} -> + #{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} -> stop_producers(ClientId, Producers), stop_client(ClientId), ?tp(pulsar_bridge_stopped, #{ @@ -128,7 +132,7 @@ on_stop(InstanceId, _State) -> pulsar_producers => Producers }), ok; - #{pulsar_client_id := ClientId} -> + #{?pulsar_client_id := ClientId} -> stop_client(ClientId), ?tp(pulsar_bridge_stopped, #{ instance_id => InstanceId, @@ -340,7 +344,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers), + ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers), ?tp(pulsar_producer_producers_allocated, #{}), State = #{ pulsar_client_id => ClientId, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 3605baaab..4dc318205 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -541,8 +541,14 @@ kill_resource_managers() -> lists:foreach( fun({_, Pid, _, _}) -> ct:pal("terminating resource manager ~p", [Pid]), - %% sys:terminate(Pid, stop), + Ref = monitor(process, Pid), exit(Pid, kill), + receive + {'DOWN', Ref, process, Pid, killed} -> + ok + after 500 -> + ct:fail("pid ~p didn't die!", [Pid]) + end, ok end, supervisor:which_children(emqx_resource_manager_sup) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 10f1de6c4..840c6cfec 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -533,7 +533,7 @@ clean_allocated_resources(ResourceId, ResourceMod) -> true -> %% The resource entries in the ETS table are erased inside %% `call_stop' if the call is successful. - ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined), + ok = call_stop(ResourceId, ResourceMod, _ResourceState = undefined), ok; false -> ok