Merge pull request #10908 from lafirest/feat/rocketmq_on_stop

feat(rocketmq): refactored bridge to avoid leaking resources during crashes at creation
This commit is contained in:
lafirest 2023-06-05 15:00:32 +08:00 committed by GitHub
commit d51c658a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 44 deletions

View File

@ -102,22 +102,23 @@ on_start(
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
),
ClientId = client_id(InstanceId),
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
ProducersMapPID = create_producers_map(ClientId),
State = #{
client_id => ClientId,
topic => Topic,
topic_tokens => TopicTks,
sync_timeout => SyncTimeout,
templates => Templates,
producers_map_pid => ProducersMapPID,
producers_opts => ProducerOpts
},
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
create_producers_map(ClientId),
case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of
{ok, _Pid} ->
{ok, State};
@ -130,23 +131,22 @@ on_start(
{error, Reason}
end.
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
on_stop(InstanceId, _State) ->
?SLOG(info, #{
msg => "stopping_rocketmq_connector",
connector => InstanceId
}),
Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}),
lists:foreach(
fun([Topic, Producer]) ->
ets:delete(ClientId, {RawTopic, Topic}),
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
fun
({_, client_id, ClientId}) ->
destory_producers_map(ClientId),
ok = rocketmq:stop_and_delete_supervised_client(ClientId);
({_, _Topic, Producer}) ->
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
end,
Producers
),
Pid ! ok,
ok = rocketmq:stop_and_delete_supervised_client(ClientId).
emqx_resource:get_allocated_resources_list(InstanceId)
).
on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, send_sync, State).
@ -179,7 +179,6 @@ do_query(
#{
templates := Templates,
client_id := ClientId,
topic := RawTopic,
topic_tokens := TopicTks,
producers_opts := ProducerOpts,
sync_timeout := RequestTimeout
@ -191,7 +190,7 @@ do_query(
#{connector => InstanceId, query => Query, state => State}
),
TopicKey = get_topic_key(Query, RawTopic, TopicTks),
TopicKey = get_topic_key(Query, TopicTks),
Data = apply_template(Query, Templates),
Result = safe_do_produce(
@ -220,7 +219,7 @@ do_query(
safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) ->
try
Producers = get_producers(ClientId, TopicKey, ProducerOpts),
Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts),
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
catch
_Type:Reason ->
@ -249,10 +248,10 @@ parse_template([{Key, H} | T], Templates) ->
parse_template([], Templates) ->
Templates.
get_topic_key({_, Msg}, RawTopic, TopicTks) ->
{RawTopic, emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)};
get_topic_key([Query | _], RawTopic, TopicTks) ->
get_topic_key(Query, RawTopic, TopicTks).
get_topic_key({_, Msg}, TopicTks) ->
emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg);
get_topic_key([Query | _], TopicTks) ->
get_topic_key(Query, TopicTks).
apply_template({Key, Msg} = _Req, Templates) ->
case maps:get(Key, Templates, undefined) of
@ -317,29 +316,29 @@ acl_info(_, _, _) ->
#{}.
create_producers_map(ClientId) ->
erlang:spawn(fun() ->
case ets:whereis(ClientId) of
undefined ->
_ = ets:new(ClientId, [public, named_table]),
ok;
_ ->
ok
end,
receive
_Msg ->
ok
end
end).
_ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]),
ok.
get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
case ets:lookup(ClientId, TopicKey) of
[{_, Producers0}] ->
Producers0;
_ ->
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]),
{ok, Producers0} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic1, ProducerOpts
),
ets:insert(ClientId, {TopicKey, Producers0}),
Producers0
%% The resource manager will not terminate when restarting a resource,
%% so manually destroying the ets table is necessary.
destory_producers_map(ClientId) ->
case ets:whereis(ClientId) of
undefined ->
ok;
Tid ->
ets:delete(Tid)
end.
get_producers(InstanceId, ClientId, Topic, ProducerOpts) ->
case ets:lookup(ClientId, Topic) of
[{_, Producers}] ->
Producers;
_ ->
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]),
{ok, Producers} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic, ProducerOpts
),
ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers),
ets:insert(ClientId, {Topic, Producers}),
Producers
end.

View File

@ -85,6 +85,7 @@
allocate_resource/3,
has_allocated_resources/1,
get_allocated_resources/1,
get_allocated_resources_list/1,
forget_allocated_resources/1
]).
@ -519,6 +520,10 @@ get_allocated_resources(InstanceId) ->
Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId),
maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]).
-spec get_allocated_resources_list(resource_id()) -> list(tuple()).
get_allocated_resources_list(InstanceId) ->
ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId).
-spec forget_allocated_resources(resource_id()) -> ok.
forget_allocated_resources(InstanceId) ->
true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId),

View File

@ -0,0 +1 @@
Refactored the RocketMQ bridge to avoid leaking resources during crashes at creation.