refactor(pulsar): use macros for allocatable resources
This commit is contained in:
parent
34be8b3a00
commit
42b37690c7
|
@ -60,6 +60,10 @@
|
||||||
sync_timeout := emqx_schema:duration_ms()
|
sync_timeout := emqx_schema:duration_ms()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% Allocatable resources
|
||||||
|
-define(pulsar_client_id, pulsar_client_id).
|
||||||
|
-define(pulsar_producers, pulsar_producers).
|
||||||
|
|
||||||
%%-------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------
|
||||||
%% `emqx_resource' API
|
%% `emqx_resource' API
|
||||||
%%-------------------------------------------------------------------------------------
|
%%-------------------------------------------------------------------------------------
|
||||||
|
@ -81,7 +85,7 @@ on_start(InstanceId, Config) ->
|
||||||
} = Config,
|
} = Config,
|
||||||
Servers = format_servers(Servers0),
|
Servers = format_servers(Servers0),
|
||||||
ClientId = make_client_id(InstanceId, BridgeName),
|
ClientId = make_client_id(InstanceId, BridgeName),
|
||||||
ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId),
|
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId),
|
||||||
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
|
SSLOpts = emqx_tls_lib:to_client_opts(SSL),
|
||||||
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
|
ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)),
|
||||||
ClientOpts = #{
|
ClientOpts = #{
|
||||||
|
@ -119,7 +123,7 @@ on_start(InstanceId, Config) ->
|
||||||
-spec on_stop(resource_id(), state()) -> ok.
|
-spec on_stop(resource_id(), state()) -> ok.
|
||||||
on_stop(InstanceId, _State) ->
|
on_stop(InstanceId, _State) ->
|
||||||
case emqx_resource:get_allocated_resources(InstanceId) of
|
case emqx_resource:get_allocated_resources(InstanceId) of
|
||||||
#{pulsar_client_id := ClientId, pulsar_producers := Producers} ->
|
#{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} ->
|
||||||
stop_producers(ClientId, Producers),
|
stop_producers(ClientId, Producers),
|
||||||
stop_client(ClientId),
|
stop_client(ClientId),
|
||||||
?tp(pulsar_bridge_stopped, #{
|
?tp(pulsar_bridge_stopped, #{
|
||||||
|
@ -128,7 +132,7 @@ on_stop(InstanceId, _State) ->
|
||||||
pulsar_producers => Producers
|
pulsar_producers => Producers
|
||||||
}),
|
}),
|
||||||
ok;
|
ok;
|
||||||
#{pulsar_client_id := ClientId} ->
|
#{?pulsar_client_id := ClientId} ->
|
||||||
stop_client(ClientId),
|
stop_client(ClientId),
|
||||||
?tp(pulsar_bridge_stopped, #{
|
?tp(pulsar_bridge_stopped, #{
|
||||||
instance_id => InstanceId,
|
instance_id => InstanceId,
|
||||||
|
@ -340,7 +344,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) ->
|
||||||
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
|
?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}),
|
||||||
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
|
try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of
|
||||||
{ok, Producers} ->
|
{ok, Producers} ->
|
||||||
ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers),
|
ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers),
|
||||||
?tp(pulsar_producer_producers_allocated, #{}),
|
?tp(pulsar_producer_producers_allocated, #{}),
|
||||||
State = #{
|
State = #{
|
||||||
pulsar_client_id => ClientId,
|
pulsar_client_id => ClientId,
|
||||||
|
|
|
@ -541,8 +541,14 @@ kill_resource_managers() ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({_, Pid, _, _}) ->
|
fun({_, Pid, _, _}) ->
|
||||||
ct:pal("terminating resource manager ~p", [Pid]),
|
ct:pal("terminating resource manager ~p", [Pid]),
|
||||||
%% sys:terminate(Pid, stop),
|
Ref = monitor(process, Pid),
|
||||||
exit(Pid, kill),
|
exit(Pid, kill),
|
||||||
|
receive
|
||||||
|
{'DOWN', Ref, process, Pid, killed} ->
|
||||||
|
ok
|
||||||
|
after 500 ->
|
||||||
|
ct:fail("pid ~p didn't die!", [Pid])
|
||||||
|
end,
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
supervisor:which_children(emqx_resource_manager_sup)
|
supervisor:which_children(emqx_resource_manager_sup)
|
||||||
|
|
|
@ -533,7 +533,7 @@ clean_allocated_resources(ResourceId, ResourceMod) ->
|
||||||
true ->
|
true ->
|
||||||
%% The resource entries in the ETS table are erased inside
|
%% The resource entries in the ETS table are erased inside
|
||||||
%% `call_stop' if the call is successful.
|
%% `call_stop' if the call is successful.
|
||||||
ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
|
ok = call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
|
|
Loading…
Reference in New Issue