diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 3d06aee52..0b8d20f15 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -435,7 +435,12 @@ t_write_failure(Config) -> #{?snk_kind := buffer_worker_flush_ack}, 2_000 ), - ?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result), + case Result of + {error, Reason} when Reason =:= econnrefused; Reason =:= closed -> + ok; + _ -> + throw({unexpected, Result}) + end, ok end), ok. diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index db55c7032..76c3e8bb9 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.22"}, + {vsn, "0.1.23"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5cafd2d50..bb8cc00d1 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -112,7 +112,7 @@ bridge_spec(Config) -> id => Name, start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]}, restart => temporary, - shutdown => 5000 + shutdown => 1000 }. -spec bridges() -> [{_Name, _Status}]. @@ -181,7 +181,7 @@ on_stop(_InstId, #{name := InstanceId}) -> ok; {error, Reason} -> ?SLOG(error, #{ - msg => "stop_mqtt_connector", + msg => "stop_mqtt_connector_error", connector => InstanceId, reason => Reason }) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 880a99313..e49603e51 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -202,13 +202,13 @@ connect(Name) -> Error end; {error, Reason} = Error -> - ?SLOG(error, #{ + ?SLOG(warning, #{ msg => "client_connect_failed", - reason => Reason + reason => Reason, + name => Name }), Error end. - subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) -> emqtt:subscribe(ref(Ref), FromTopic, QoS); subscribe_remote_topics(_Ref, undefined) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index 104ad7ade..4b85fe92f 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -115,7 +115,8 @@ ensure_worker_started(ResId, Idx, Opts) -> id => ?CHILD_ID(Mod, ResId, Idx), start => {Mod, start_link, [ResId, Idx, Opts]}, restart => transient, - shutdown => 5000, + %% if we delay shutdown, when the pool is big, it will take a long time + shutdown => brutal_kill, type => worker, modules => [Mod] }, @@ -130,13 +131,12 @@ ensure_worker_removed(ResId, Idx) -> ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx), case supervisor:terminate_child(?SERVER, ChildId) of ok -> - Res = supervisor:delete_child(?SERVER, ChildId), - _ = gproc_pool:remove_worker(ResId, {ResId, Idx}), - Res; - {error, not_found} -> + _ = supervisor:delete_child(?SERVER, ChildId), + %% no need to remove worker from the pool, + %% because the entire pool will be force deleted later ok; - {error, Reason} -> - {error, Reason} + {error, not_found} -> + ok end. ensure_disk_queue_dir_absent(ResourceId, Index) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index cd858bda3..97ac355f4 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -53,7 +53,18 @@ % State record -record(data, { - id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid + id, + group, + mod, + callback_mode, + query_mode, + config, + opts, + status, + state, + error, + pid, + extra }). -type data() :: #data{}. @@ -181,7 +192,15 @@ remove(ResId) when is_binary(ResId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> - safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). + ResourceManagerPid = gproc:whereis_name(?NAME(ResId)), + try + safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) + after + %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process + %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. + %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. + emqx_resource_manager_sup:delete_child(ResourceManagerPid) + end. %% @doc Stops and then starts an instance that was already running -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. @@ -439,8 +458,10 @@ health_check_actions(Data) -> [{state_timeout, health_check_interval(Data#data.opts), health_check}]. handle_remove_event(From, ClearMetrics, Data) -> - _ = stop_resource(Data), + %% stop the buffer workers first, brutal_kill, so it should be fast ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), + %% now stop the resource, this can be slow + _ = stop_resource(Data), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 2f442cd56..73f1988c6 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,7 +17,7 @@ -behaviour(supervisor). --export([ensure_child/5]). +-export([ensure_child/5, delete_child/1]). -export([start_link/0]). @@ -27,6 +27,11 @@ ensure_child(ResId, Group, ResourceType, Config, Opts) -> _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), ok. +delete_child(Pid) -> + _ = supervisor:terminate_child(?MODULE, Pid), + _ = supervisor:delete_child(?MODULE, Pid), + ok. + start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -36,7 +41,10 @@ init([]) -> id => emqx_resource_manager, start => {emqx_resource_manager, start_link, []}, restart => transient, - shutdown => brutal_kill, + %% never force kill a resource manager. + %% becasue otherwise it may lead to release leak, + %% resource_manager's terminate callback calls resource on_stop + shutdown => infinity, type => worker, modules => [emqx_resource_manager] } diff --git a/changes/ce/fix-10755.en.md b/changes/ce/fix-10755.en.md new file mode 100644 index 000000000..6c887166b --- /dev/null +++ b/changes/ce/fix-10755.en.md @@ -0,0 +1,10 @@ +Fixed data bridge resource update race condition. + +In the 'delete + create' process for EMQX resource updates, +long bridge creation times could cause dashboard request timeouts. +If a bridge resource update was initiated before completion of its creation, +it led to an erroneous deletion from the runtime, despite being present in the config file. + +This fix addresses the race condition in bridge resource updates, +ensuring the accurate identification and addition of new resources, +maintaining consistency between runtime and configuration file statuses.