From 7eef86363af3e104b304c82bdaf6411406e8cb4c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 May 2023 17:46:36 +0200 Subject: [PATCH 1/7] test: make erlfmt happy --- .../test/emqx_bridge_tdengine_SUITE.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 3d06aee52..0b8d20f15 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -435,7 +435,12 @@ t_write_failure(Config) -> #{?snk_kind := buffer_worker_flush_ack}, 2_000 ), - ?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result), + case Result of + {error, Reason} when Reason =:= econnrefused; Reason =:= closed -> + ok; + _ -> + throw({unexpected, Result}) + end, ok end), ok. From 5bbcf4b712439c9f29069bd6371bc964bbb5a299 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 17 May 2023 21:53:35 +0200 Subject: [PATCH 2/7] fix(mqtt-connector): faster shutdown --- apps/emqx_connector/src/emqx_connector.app.src | 2 +- apps/emqx_connector/src/emqx_connector_mqtt.erl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index db55c7032..76c3e8bb9 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.22"}, + {vsn, "0.1.23"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5cafd2d50..bb8cc00d1 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -112,7 +112,7 @@ bridge_spec(Config) -> id => Name, start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]}, restart => temporary, - shutdown => 5000 + shutdown => 1000 }. -spec bridges() -> [{_Name, _Status}]. @@ -181,7 +181,7 @@ on_stop(_InstId, #{name := InstanceId}) -> ok; {error, Reason} -> ?SLOG(error, #{ - msg => "stop_mqtt_connector", + msg => "stop_mqtt_connector_error", connector => InstanceId, reason => Reason }) From be90c63c786ba4445196a33f4849dd2c4f7d9651 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 18 May 2023 11:54:14 +0200 Subject: [PATCH 3/7] chore(mqtt-connector): refine logging level connect failure should be at warning level but not error, the connecting state is visiable from dashbaord also the resource manager logs connection failures in general at warning level --- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 880a99313..e49603e51 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -202,13 +202,13 @@ connect(Name) -> Error end; {error, Reason} = Error -> - ?SLOG(error, #{ + ?SLOG(warning, #{ msg => "client_connect_failed", - reason => Reason + reason => Reason, + name => Name }), Error end. - subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) -> emqtt:subscribe(ref(Ref), FromTopic, QoS); subscribe_remote_topics(_Ref, undefined) -> From 21de0f8274851bb11ba0456e308c106c923c3e8d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 17 May 2023 22:42:15 +0200 Subject: [PATCH 4/7] fix(buffer-worker-sup): fast stop the timeout shutdown in child spec may significantly slow down the deletion of a resource this commit chagnes the shutdown to brutal kill also, the pool worker removal code has been delete because it's not necessary since the entier pool is going to be force-delete later anyway --- .../src/emqx_resource_buffer_worker_sup.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index 104ad7ade..e4fa04d36 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -115,7 +115,8 @@ ensure_worker_started(ResId, Idx, Opts) -> id => ?CHILD_ID(Mod, ResId, Idx), start => {Mod, start_link, [ResId, Idx, Opts]}, restart => transient, - shutdown => 5000, + %% if we delay shutdown, when the pool is big, it will take a long time + shutdown => brutal_kill, type => worker, modules => [Mod] }, @@ -130,13 +131,12 @@ ensure_worker_removed(ResId, Idx) -> ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx), case supervisor:terminate_child(?SERVER, ChildId) of ok -> - Res = supervisor:delete_child(?SERVER, ChildId), - _ = gproc_pool:remove_worker(ResId, {ResId, Idx}), - Res; - {error, not_found} -> + _ = supervisor:delete_child(?SERVER, ChildId), + %% no need to remove worker from the pool, + %% because the entire pool will be forece deleted later ok; - {error, Reason} -> - {error, Reason} + {error, not_found} -> + ok end. ensure_disk_queue_dir_absent(ResourceId, Index) -> From f5e5c59763b1805326c8e70dc5d271c4626708d2 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 May 2023 18:02:38 +0200 Subject: [PATCH 5/7] refactor(resource-manager-sup): do not force kill resource manager the shutdown timeout is now set to infinity so it will never force kill a resource manager, otherwise there will be resource leaks --- .../src/emqx_resource_buffer_worker_sup.erl | 2 +- apps/emqx_resource/src/emqx_resource_manager_sup.erl | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index e4fa04d36..4b85fe92f 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -133,7 +133,7 @@ ensure_worker_removed(ResId, Idx) -> ok -> _ = supervisor:delete_child(?SERVER, ChildId), %% no need to remove worker from the pool, - %% because the entire pool will be forece deleted later + %% because the entire pool will be force deleted later ok; {error, not_found} -> ok diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 2f442cd56..73f1988c6 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,7 +17,7 @@ -behaviour(supervisor). --export([ensure_child/5]). +-export([ensure_child/5, delete_child/1]). -export([start_link/0]). @@ -27,6 +27,11 @@ ensure_child(ResId, Group, ResourceType, Config, Opts) -> _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), ok. +delete_child(Pid) -> + _ = supervisor:terminate_child(?MODULE, Pid), + _ = supervisor:delete_child(?MODULE, Pid), + ok. + start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -36,7 +41,10 @@ init([]) -> id => emqx_resource_manager, start => {emqx_resource_manager, start_link, []}, restart => transient, - shutdown => brutal_kill, + %% never force kill a resource manager. + %% becasue otherwise it may lead to release leak, + %% resource_manager's terminate callback calls resource on_stop + shutdown => infinity, type => worker, modules => [emqx_resource_manager] } From 0d8ffc0d59b8abca7f0b5330ecd0eacf2da5ac5b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 May 2023 18:28:22 +0200 Subject: [PATCH 6/7] fix(resource-manager): ensure no false creation Update is implemented as remove + create. If a dleete call is made while the create is in progress the remove call is likely to timeout too. This causes the follwing creation to falsely succeed, because there is alreay a running child under the supervisor. As a result, the resource is permanently removed after resource_manager eventually handles the remove call. --- .../src/emqx_resource_manager.erl | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index cd858bda3..9eb08e80d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -53,7 +53,18 @@ % State record -record(data, { - id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid + id, + group, + mod, + callback_mode, + query_mode, + config, + opts, + status, + state, + error, + pid, + extra }). -type data() :: #data{}. @@ -181,7 +192,15 @@ remove(ResId) when is_binary(ResId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> - safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). + ResourceManagerPid = gproc:whereis_name(?NAME(ResId)), + try + safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) + after + %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process + %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. + %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. + emqx_resource_manager_sup:delete_child(ResourceManagerPid) + end. %% @doc Stops and then starts an instance that was already running -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. @@ -439,8 +458,10 @@ health_check_actions(Data) -> [{state_timeout, health_check_interval(Data#data.opts), health_check}]. handle_remove_event(From, ClearMetrics, Data) -> - _ = stop_resource(Data), + %% stop the buffer workers first, brutal_kill, so it should be fast ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), + %% no stop the resource, this can be slow + _ = stop_resource(Data), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok From cb76e5a2418120d8ebfc8ac704a206dcb9aebde9 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 May 2023 18:11:46 +0200 Subject: [PATCH 7/7] docs: add changelog for 10755 --- apps/emqx_resource/src/emqx_resource_manager.erl | 2 +- changes/ce/fix-10755.en.md | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-10755.en.md diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 9eb08e80d..97ac355f4 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -460,7 +460,7 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> %% stop the buffer workers first, brutal_kill, so it should be fast ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), - %% no stop the resource, this can be slow + %% now stop the resource, this can be slow _ = stop_resource(Data), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); diff --git a/changes/ce/fix-10755.en.md b/changes/ce/fix-10755.en.md new file mode 100644 index 000000000..6c887166b --- /dev/null +++ b/changes/ce/fix-10755.en.md @@ -0,0 +1,10 @@ +Fixed data bridge resource update race condition. + +In the 'delete + create' process for EMQX resource updates, +long bridge creation times could cause dashboard request timeouts. +If a bridge resource update was initiated before completion of its creation, +it led to an erroneous deletion from the runtime, despite being present in the config file. + +This fix addresses the race condition in bridge resource updates, +ensuring the accurate identification and addition of new resources, +maintaining consistency between runtime and configuration file statuses.