diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index dfb1f3def..254d6b39a 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -102,22 +102,23 @@ on_start( emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) ), ClientId = client_id(InstanceId), - TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), - ProducersMapPID = create_producers_map(ClientId), + State = #{ client_id => ClientId, topic => Topic, topic_tokens => TopicTks, sync_timeout => SyncTimeout, templates => Templates, - producers_map_pid => ProducersMapPID, producers_opts => ProducerOpts }, + ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), + create_producers_map(ClientId), + case rocketmq:ensure_supervised_client(ClientId, Servers, ClientCfg) of {ok, _Pid} -> {ok, State}; @@ -130,23 +131,22 @@ on_start( {error, Reason} end. -on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_rocketmq_connector", connector => InstanceId }), - Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}), lists:foreach( - fun([Topic, Producer]) -> - ets:delete(ClientId, {RawTopic, Topic}), - _ = rocketmq:stop_and_delete_supervised_producers(Producer) + fun + ({_, client_id, ClientId}) -> + destory_producers_map(ClientId), + ok = rocketmq:stop_and_delete_supervised_client(ClientId); + ({_, _Topic, Producer}) -> + _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, - Producers - ), - - Pid ! ok, - ok = rocketmq:stop_and_delete_supervised_client(ClientId). + emqx_resource:get_allocated_resources_list(InstanceId) + ). on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, send_sync, State). @@ -179,7 +179,6 @@ do_query( #{ templates := Templates, client_id := ClientId, - topic := RawTopic, topic_tokens := TopicTks, producers_opts := ProducerOpts, sync_timeout := RequestTimeout @@ -191,7 +190,7 @@ do_query( #{connector => InstanceId, query => Query, state => State} ), - TopicKey = get_topic_key(Query, RawTopic, TopicTks), + TopicKey = get_topic_key(Query, TopicTks), Data = apply_template(Query, Templates), Result = safe_do_produce( @@ -220,7 +219,7 @@ do_query( safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> try - Producers = get_producers(ClientId, TopicKey, ProducerOpts), + Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts), produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) catch _Type:Reason -> @@ -249,10 +248,10 @@ parse_template([{Key, H} | T], Templates) -> parse_template([], Templates) -> Templates. -get_topic_key({_, Msg}, RawTopic, TopicTks) -> - {RawTopic, emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)}; -get_topic_key([Query | _], RawTopic, TopicTks) -> - get_topic_key(Query, RawTopic, TopicTks). +get_topic_key({_, Msg}, TopicTks) -> + emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg); +get_topic_key([Query | _], TopicTks) -> + get_topic_key(Query, TopicTks). apply_template({Key, Msg} = _Req, Templates) -> case maps:get(Key, Templates, undefined) of @@ -317,29 +316,29 @@ acl_info(_, _, _) -> #{}. create_producers_map(ClientId) -> - erlang:spawn(fun() -> - case ets:whereis(ClientId) of - undefined -> - _ = ets:new(ClientId, [public, named_table]), - ok; - _ -> - ok - end, - receive - _Msg -> - ok - end - end). + _ = ets:new(ClientId, [public, named_table, {read_concurrency, true}]), + ok. -get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> - case ets:lookup(ClientId, TopicKey) of - [{_, Producers0}] -> - Producers0; - _ -> - ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]), - {ok, Producers0} = rocketmq:ensure_supervised_producers( - ClientId, ProducerGroup, Topic1, ProducerOpts - ), - ets:insert(ClientId, {TopicKey, Producers0}), - Producers0 +%% The resource manager will not terminate when restarting a resource, +%% so manually destroying the ets table is necessary. +destory_producers_map(ClientId) -> + case ets:whereis(ClientId) of + undefined -> + ok; + Tid -> + ets:delete(Tid) + end. + +get_producers(InstanceId, ClientId, Topic, ProducerOpts) -> + case ets:lookup(ClientId, Topic) of + [{_, Producers}] -> + Producers; + _ -> + ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]), + {ok, Producers} = rocketmq:ensure_supervised_producers( + ClientId, ProducerGroup, Topic, ProducerOpts + ), + ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers), + ets:insert(ClientId, {Topic, Producers}), + Producers end. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 37d7b1696..0dbc3067f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,6 +85,7 @@ allocate_resource/3, has_allocated_resources/1, get_allocated_resources/1, + get_allocated_resources_list/1, forget_allocated_resources/1 ]). @@ -519,6 +520,10 @@ get_allocated_resources(InstanceId) -> Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId), maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]). +-spec get_allocated_resources_list(resource_id()) -> list(tuple()). +get_allocated_resources_list(InstanceId) -> + ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId). + -spec forget_allocated_resources(resource_id()) -> ok. forget_allocated_resources(InstanceId) -> true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId), diff --git a/changes/ee/feat-10908.en.md b/changes/ee/feat-10908.en.md new file mode 100644 index 000000000..ee350226c --- /dev/null +++ b/changes/ee/feat-10908.en.md @@ -0,0 +1 @@ +Refactored the RocketMQ bridge to avoid leaking resources during crashes at creation.