Merge pull request #10755 from zmstone/0517-fix-bridge-update-timeout-issue

0517 fix bridge update timeout issue
This commit is contained in:
Zaiming (Stone) Shi 2023-05-20 06:19:26 +02:00 committed by GitHub
commit 3e98b3b050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 63 additions and 19 deletions

View File

@ -435,7 +435,12 @@ t_write_failure(Config) ->
#{?snk_kind := buffer_worker_flush_ack}, #{?snk_kind := buffer_worker_flush_ack},
2_000 2_000
), ),
?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result), case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= closed ->
ok;
_ ->
throw({unexpected, Result})
end,
ok ok
end), end),
ok. ok.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_connector, [ {application, emqx_connector, [
{description, "EMQX Data Integration Connectors"}, {description, "EMQX Data Integration Connectors"},
{vsn, "0.1.22"}, {vsn, "0.1.23"},
{registered, []}, {registered, []},
{mod, {emqx_connector_app, []}}, {mod, {emqx_connector_app, []}},
{applications, [ {applications, [

View File

@ -112,7 +112,7 @@ bridge_spec(Config) ->
id => Name, id => Name,
start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]}, start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
restart => temporary, restart => temporary,
shutdown => 5000 shutdown => 1000
}. }.
-spec bridges() -> [{_Name, _Status}]. -spec bridges() -> [{_Name, _Status}].
@ -181,7 +181,7 @@ on_stop(_InstId, #{name := InstanceId}) ->
ok; ok;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "stop_mqtt_connector", msg => "stop_mqtt_connector_error",
connector => InstanceId, connector => InstanceId,
reason => Reason reason => Reason
}) })

View File

@ -202,13 +202,13 @@ connect(Name) ->
Error Error
end; end;
{error, Reason} = Error -> {error, Reason} = Error ->
?SLOG(error, #{ ?SLOG(warning, #{
msg => "client_connect_failed", msg => "client_connect_failed",
reason => Reason reason => Reason,
name => Name
}), }),
Error Error
end. end.
subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) -> subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
emqtt:subscribe(ref(Ref), FromTopic, QoS); emqtt:subscribe(ref(Ref), FromTopic, QoS);
subscribe_remote_topics(_Ref, undefined) -> subscribe_remote_topics(_Ref, undefined) ->

View File

@ -115,7 +115,8 @@ ensure_worker_started(ResId, Idx, Opts) ->
id => ?CHILD_ID(Mod, ResId, Idx), id => ?CHILD_ID(Mod, ResId, Idx),
start => {Mod, start_link, [ResId, Idx, Opts]}, start => {Mod, start_link, [ResId, Idx, Opts]},
restart => transient, restart => transient,
shutdown => 5000, %% if we delay shutdown, when the pool is big, it will take a long time
shutdown => brutal_kill,
type => worker, type => worker,
modules => [Mod] modules => [Mod]
}, },
@ -130,13 +131,12 @@ ensure_worker_removed(ResId, Idx) ->
ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx), ChildId = ?CHILD_ID(emqx_resource_buffer_worker, ResId, Idx),
case supervisor:terminate_child(?SERVER, ChildId) of case supervisor:terminate_child(?SERVER, ChildId) of
ok -> ok ->
Res = supervisor:delete_child(?SERVER, ChildId), _ = supervisor:delete_child(?SERVER, ChildId),
_ = gproc_pool:remove_worker(ResId, {ResId, Idx}), %% no need to remove worker from the pool,
Res; %% because the entire pool will be force deleted later
{error, not_found} ->
ok; ok;
{error, Reason} -> {error, not_found} ->
{error, Reason} ok
end. end.
ensure_disk_queue_dir_absent(ResourceId, Index) -> ensure_disk_queue_dir_absent(ResourceId, Index) ->

View File

@ -53,7 +53,18 @@
% State record % State record
-record(data, { -record(data, {
id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid id,
group,
mod,
callback_mode,
query_mode,
config,
opts,
status,
state,
error,
pid,
extra
}). }).
-type data() :: #data{}. -type data() :: #data{}.
@ -181,7 +192,15 @@ remove(ResId) when is_binary(ResId) ->
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource %% @doc Stops a running resource_manager and optionally clears the metrics for the resource
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(ResId, ClearMetrics) when is_binary(ResId) -> remove(ResId, ClearMetrics) when is_binary(ResId) ->
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). ResourceManagerPid = gproc:whereis_name(?NAME(ResId)),
try
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
after
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
%% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition.
%% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long.
emqx_resource_manager_sup:delete_child(ResourceManagerPid)
end.
%% @doc Stops and then starts an instance that was already running %% @doc Stops and then starts an instance that was already running
-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
@ -439,8 +458,10 @@ health_check_actions(Data) ->
[{state_timeout, health_check_interval(Data#data.opts), health_check}]. [{state_timeout, health_check_interval(Data#data.opts), health_check}].
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
_ = stop_resource(Data), %% stop the buffer workers first, brutal_kill, so it should be fast
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
%% now stop the resource, this can be slow
_ = stop_resource(Data),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok false -> ok

View File

@ -17,7 +17,7 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([ensure_child/5]). -export([ensure_child/5, delete_child/1]).
-export([start_link/0]). -export([start_link/0]).
@ -27,6 +27,11 @@ ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]),
ok. ok.
delete_child(Pid) ->
_ = supervisor:terminate_child(?MODULE, Pid),
_ = supervisor:delete_child(?MODULE, Pid),
ok.
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -36,7 +41,10 @@ init([]) ->
id => emqx_resource_manager, id => emqx_resource_manager,
start => {emqx_resource_manager, start_link, []}, start => {emqx_resource_manager, start_link, []},
restart => transient, restart => transient,
shutdown => brutal_kill, %% never force kill a resource manager.
%% becasue otherwise it may lead to release leak,
%% resource_manager's terminate callback calls resource on_stop
shutdown => infinity,
type => worker, type => worker,
modules => [emqx_resource_manager] modules => [emqx_resource_manager]
} }

View File

@ -0,0 +1,10 @@
Fixed data bridge resource update race condition.
In the 'delete + create' process for EMQX resource updates,
long bridge creation times could cause dashboard request timeouts.
If a bridge resource update was initiated before completion of its creation,
it led to an erroneous deletion from the runtime, despite being present in the config file.
This fix addresses the race condition in bridge resource updates,
ensuring the accurate identification and addition of new resources,
maintaining consistency between runtime and configuration file statuses.