feat(resman): stop adding uniqueness to manager ids

Before this change, a separate `manager_id` / `instance_id` was used
as resource manager id, which made connector interface somewhat
inconsistent: part of function calls to connector implementation
used instance id as first argument while the rest used resource id
itself.
This commit is contained in:
Andrew Mayorov 2023-04-10 22:46:18 +03:00
parent 2c4fd98ce5
commit aaef95b1da
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
7 changed files with 39 additions and 92 deletions

View File

@ -248,13 +248,12 @@ make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined; undefined;
make_sub_confs(undefined, _Conf, _) -> make_sub_confs(undefined, _Conf, _) ->
undefined; undefined;
make_sub_confs(SubRemoteConf, Conf, InstanceId) -> make_sub_confs(SubRemoteConf, Conf, ResourceId) ->
ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId),
case maps:find(hookpoint, Conf) of case maps:find(hookpoint, Conf) of
error -> error ->
error({no_hookpoint_provided, Conf}); error({no_hookpoint_provided, Conf});
{ok, HookPoint} -> {ok, HookPoint} ->
MFA = {?MODULE, on_message_received, [HookPoint, ResId]}, MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
SubRemoteConf#{on_message_received => MFA} SubRemoteConf#{on_message_received => MFA}
end. end.

View File

@ -113,7 +113,10 @@
-export([apply_reply_fun/2]). -export([apply_reply_fun/2]).
-export_type([resource_data/0]). -export_type([
resource_id/0,
resource_data/0
]).
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
@ -362,7 +365,7 @@ is_buffer_supported(Module) ->
false false
end. end.
-spec call_start(manager_id(), module(), resource_config()) -> -spec call_start(resource_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
call_start(MgrId, Mod, Config) -> call_start(MgrId, Mod, Config) ->
try try
@ -374,7 +377,7 @@ call_start(MgrId, Mod, Config) ->
{error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}}
end. end.
-spec call_health_check(manager_id(), module(), resource_state()) -> -spec call_health_check(resource_id(), module(), resource_state()) ->
resource_status() resource_status()
| {resource_status(), resource_state()} | {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()} | {resource_status(), resource_state(), term()}
@ -382,7 +385,7 @@ call_start(MgrId, Mod, Config) ->
call_health_check(MgrId, Mod, ResourceState) -> call_health_check(MgrId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)).
-spec call_stop(manager_id(), module(), resource_state()) -> term(). -spec call_stop(resource_id(), module(), resource_state()) -> term().
call_stop(MgrId, Mod, ResourceState) -> call_stop(MgrId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)).

View File

@ -52,6 +52,7 @@ init([]) ->
ChildSpecs = [], ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
-spec start_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok.
start_workers(ResId, Opts) -> start_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
_ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]),
@ -63,6 +64,7 @@ start_workers(ResId, Opts) ->
lists:seq(1, WorkerPoolSize) lists:seq(1, WorkerPoolSize)
). ).
-spec stop_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok.
stop_workers(ResId, Opts) -> stop_workers(ResId, Opts) ->
WorkerPoolSize = worker_pool_size(Opts), WorkerPoolSize = worker_pool_size(Opts),
lists:foreach( lists:foreach(
@ -75,6 +77,7 @@ stop_workers(ResId, Opts) ->
ensure_worker_pool_removed(ResId), ensure_worker_pool_removed(ResId),
ok. ok.
-spec worker_pids(emqx_resource:resource_id()) -> [pid()].
worker_pids(ResId) -> worker_pids(ResId) ->
lists:map( lists:map(
fun({_Name, Pid}) -> fun({_Name, Pid}) ->

View File

@ -42,19 +42,18 @@
]). ]).
-export([ -export([
set_resource_status_connecting/1, set_resource_status_connecting/1
manager_id_to_resource_id/1
]). ]).
% Server % Server
-export([start_link/6]). -export([start_link/5]).
% Behaviour % Behaviour
-export([init/1, callback_mode/0, handle_event/4, terminate/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
% State record % State record
-record(data, { -record(data, {
id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid
}). }).
-type data() :: #data{}. -type data() :: #data{}.
@ -69,13 +68,6 @@
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
make_manager_id(ResId) ->
emqx_resource:generate_id(ResId).
manager_id_to_resource_id(MgrId) ->
[ResId, _Index] = string:split(MgrId, ":", trailing),
ResId.
%% @doc Called from emqx_resource when starting a resource instance. %% @doc Called from emqx_resource when starting a resource instance.
%% %%
%% Triggers the emqx_resource_manager_sup supervisor to actually create %% Triggers the emqx_resource_manager_sup supervisor to actually create
@ -92,8 +84,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
{ok, _Group, Data} -> {ok, _Group, Data} ->
{ok, Data}; {ok, Data};
{error, not_found} -> {error, not_found} ->
MgrId = set_new_owner(ResId), create_and_return_data(ResId, Group, ResourceType, Config, Opts)
create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts)
end. end.
%% @doc Called from emqx_resource when recreating a resource which may or may not exist %% @doc Called from emqx_resource when recreating a resource which may or may not exist
@ -103,23 +94,22 @@ recreate(ResId, ResourceType, NewConfig, Opts) ->
case lookup(ResId) of case lookup(ResId) of
{ok, Group, #{mod := ResourceType, status := _} = _Data} -> {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
_ = remove(ResId, false), _ = remove(ResId, false),
MgrId = set_new_owner(ResId), create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts);
create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts);
{ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type}; {error, updating_to_incorrect_resource_type};
{error, not_found} -> {error, not_found} ->
{error, not_found} {error, not_found}
end. end.
create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> create_and_return_data(ResId, Group, ResourceType, Config, Opts) ->
_ = create(MgrId, ResId, Group, ResourceType, Config, Opts), _ = create(ResId, Group, ResourceType, Config, Opts),
{ok, _Group, Data} = lookup(ResId), {ok, _Group, Data} = lookup(ResId),
{ok, Data}. {ok, Data}.
%% @doc Create a resource_manager and wait until it is running %% @doc Create a resource_manager and wait until it is running
create(MgrId, ResId, Group, ResourceType, Config, Opts) -> create(ResId, Group, ResourceType, Config, Opts) ->
% The state machine will make the actual call to the callback/resource module after init % The state machine will make the actual call to the callback/resource module after init
ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts),
ok = emqx_metrics_worker:create_metrics( ok = emqx_metrics_worker:create_metrics(
?RES_METRICS, ?RES_METRICS,
ResId, ResId,
@ -164,15 +154,12 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) -> create_dry_run(ResourceType, Config) ->
ResId = make_test_id(), ResId = make_test_id(),
MgrId = set_new_owner(ResId),
Opts = Opts =
case is_map(Config) of case is_map(Config) of
true -> maps:get(resource_opts, Config, #{}); true -> maps:get(resource_opts, Config, #{});
false -> #{} false -> #{}
end, end,
ok = emqx_resource_manager_sup:ensure_child( ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts),
MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts
),
case wait_for_ready(ResId, 5000) of case wait_for_ready(ResId, 5000) of
ok -> ok ->
remove(ResId); remove(ResId);
@ -283,10 +270,9 @@ health_check(ResId) ->
%% Server start/stop callbacks %% Server start/stop callbacks
%% @doc Function called from the supervisor to actually start the server %% @doc Function called from the supervisor to actually start the server
start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> start_link(ResId, Group, ResourceType, Config, Opts) ->
Data = #data{ Data = #data{
id = ResId, id = ResId,
manager_id = MgrId,
group = Group, group = Group,
mod = ResourceType, mod = ResourceType,
callback_mode = emqx_resource:get_callback_mode(ResourceType), callback_mode = emqx_resource:get_callback_mode(ResourceType),
@ -320,7 +306,7 @@ terminate({shutdown, removed}, _State, _Data) ->
ok; ok;
terminate(_Reason, _State, Data) -> terminate(_Reason, _State, Data) ->
_ = maybe_stop_resource(Data), _ = maybe_stop_resource(Data),
ok = delete_cache(Data#data.id, Data#data.manager_id), ok = delete_cache(Data#data.id),
ok. ok.
%% Behavior callback %% Behavior callback
@ -345,9 +331,6 @@ handle_event({call, From}, start, State, Data) when
start_resource(Data, From); start_resource(Data, From);
handle_event({call, From}, start, _State, _Data) -> handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
% Called when the resource received a `quit` message
handle_event(info, quit, _State, _Data) ->
{stop, {shutdown, quit}};
% Called when the resource is to be stopped % Called when the resource is to be stopped
handle_event({call, From}, stop, stopped, _Data) -> handle_event({call, From}, stop, stopped, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
@ -429,20 +412,8 @@ log_cache_consistency({_, DataCached}, Data) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> insert_cache(ResId, Group, Data = #data{}) ->
case get_owner(ResId) of ets:insert(?ETS_TABLE, {ResId, Group, Data}).
not_found ->
ets:insert(?ETS_TABLE, {ResId, Group, Data});
MgrId ->
ets:insert(?ETS_TABLE, {ResId, Group, Data});
_ ->
?SLOG(error, #{
msg => get_resource_owner_failed,
resource_id => ResId,
action => quit_resource
}),
self() ! quit
end.
read_cache(ResId) -> read_cache(ResId) ->
case ets:lookup(?ETS_TABLE, ResId) of case ets:lookup(?ETS_TABLE, ResId) of
@ -450,37 +421,14 @@ read_cache(ResId) ->
[] -> not_found [] -> not_found
end. end.
delete_cache(ResId, MgrId) -> delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
case get_owner(ResId) of
MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId ->
do_delete_cache(ResId);
_ ->
ok
end.
do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
true = ets:delete(?ETS_TABLE, {owner, ResId}), true = ets:delete(?ETS_TABLE, {owner, ResId}),
true = ets:delete(?ETS_TABLE, ResId), true = ets:delete(?ETS_TABLE, ResId),
ok; ok;
do_delete_cache(ResId) -> delete_cache(ResId) ->
true = ets:delete(?ETS_TABLE, ResId), true = ets:delete(?ETS_TABLE, ResId),
ok. ok.
set_new_owner(ResId) ->
MgrId = make_manager_id(ResId),
ok = set_owner(ResId, MgrId),
MgrId.
set_owner(ResId, MgrId) ->
ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}),
ok.
get_owner(ResId) ->
case ets:lookup(?ETS_TABLE, {owner, ResId}) of
[{_, MgrId}] -> MgrId;
[] -> not_found
end.
retry_actions(Data) -> retry_actions(Data) ->
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
undefined -> undefined ->
@ -494,7 +442,7 @@ health_check_actions(Data) ->
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
_ = stop_resource(Data), _ = stop_resource(Data),
ok = delete_cache(Data#data.id, Data#data.manager_id), ok = delete_cache(Data#data.id),
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
@ -504,7 +452,7 @@ handle_remove_event(From, ClearMetrics, Data) ->
start_resource(Data, From) -> start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData = Data#data{status = connecting, state = ResourceState}, UpdatedData = Data#data{status = connecting, state = ResourceState},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
@ -535,7 +483,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
%% is returned. %% is returned.
case ResState /= undefined of case ResState /= undefined of
true -> true ->
emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState); emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState);
false -> false ->
ok ok
end, end,
@ -589,7 +537,7 @@ with_health_check(#data{state = undefined} = Data, Func) ->
Func(disconnected, Data); Func(disconnected, Data);
with_health_check(#data{error = PrevError} = Data, Func) -> with_health_check(#data{error = PrevError} = Data, Func) ->
ResId = Data#data.id, ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data), {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId, Err, PrevError), _ = maybe_alarm(Status, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, Status), ok = maybe_resume_resource_workers(ResId, Status),

View File

@ -17,14 +17,14 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([ensure_child/6]). -export([ensure_child/5]).
-export([start_link/0]). -export([start_link/0]).
-export([init/1]). -export([init/1]).
ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) -> ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]), _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]),
ok. ok.
start_link() -> start_link() ->

View File

@ -502,11 +502,6 @@ resource_id(Config) ->
Name = ?config(influxdb_name, Config), Name = ?config(influxdb_name, Config),
emqx_bridge_resource:resource_id(Type, Name). emqx_bridge_resource:resource_id(Type, Name).
instance_id(Config) ->
ResourceId = resource_id(Config),
[{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}),
InstanceId.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -581,14 +576,14 @@ t_start_already_started(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
InstanceId = instance_id(Config), ResourceId = resource_id(Config),
TypeAtom = binary_to_atom(Type), TypeAtom = binary_to_atom(Type),
NameAtom = binary_to_atom(Name), NameAtom = binary_to_atom(Name),
{ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check(
emqx_bridge_schema, InfluxDBConfigString emqx_bridge_schema, InfluxDBConfigString
), ),
?check_trace( ?check_trace(
emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap), emqx_ee_connector_influxdb:on_start(ResourceId, InfluxDBConfigMap),
fun(Result, Trace) -> fun(Result, Trace) ->
?assertMatch({ok, _}, Result), ?assertMatch({ok, _}, Result),
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),

View File

@ -267,9 +267,8 @@ apply_template([{Key, _} | _] = Reqs, Templates) ->
[emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs]
end. end.
client_id(InstanceId) -> client_id(ResourceId) ->
Name = emqx_resource_manager:manager_id_to_resource_id(InstanceId), erlang:binary_to_atom(ResourceId, utf8).
erlang:binary_to_atom(Name, utf8).
redact(Msg) -> redact(Msg) ->
emqx_utils:redact(Msg, fun is_sensitive_key/1). emqx_utils:redact(Msg, fun is_sensitive_key/1).