diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5b488825b..5cafd2d50 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -248,13 +248,12 @@ make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined, _Conf, _) -> undefined; -make_sub_confs(SubRemoteConf, Conf, InstanceId) -> - ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId), +make_sub_confs(SubRemoteConf, Conf, ResourceId) -> case maps:find(hookpoint, Conf) of error -> error({no_hookpoint_provided, Conf}); {ok, HookPoint} -> - MFA = {?MODULE, on_message_received, [HookPoint, ResId]}, + MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, SubRemoteConf#{on_message_received => MFA} end. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d8b91942b..6f72f8a16 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -113,7 +113,10 @@ -export([apply_reply_fun/2]). --export_type([resource_data/0]). +-export_type([ + resource_id/0, + resource_data/0 +]). -optional_callbacks([ on_query/3, @@ -362,7 +365,7 @@ is_buffer_supported(Module) -> false end. --spec call_start(manager_id(), module(), resource_config()) -> +-spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> try @@ -374,7 +377,7 @@ call_start(MgrId, Mod, Config) -> {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} end. --spec call_health_check(manager_id(), module(), resource_state()) -> +-spec call_health_check(resource_id(), module(), resource_state()) -> resource_status() | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()} @@ -382,7 +385,7 @@ call_start(MgrId, Mod, Config) -> call_health_check(MgrId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). --spec call_stop(manager_id(), module(), resource_state()) -> term(). +-spec call_stop(resource_id(), module(), resource_state()) -> term(). call_stop(MgrId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index ae30c3927..104ad7ade 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -52,6 +52,7 @@ init([]) -> ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. +-spec start_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok. start_workers(ResId, Opts) -> WorkerPoolSize = worker_pool_size(Opts), _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), @@ -63,6 +64,7 @@ start_workers(ResId, Opts) -> lists:seq(1, WorkerPoolSize) ). +-spec stop_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok. stop_workers(ResId, Opts) -> WorkerPoolSize = worker_pool_size(Opts), lists:foreach( @@ -75,6 +77,7 @@ stop_workers(ResId, Opts) -> ensure_worker_pool_removed(ResId), ok. +-spec worker_pids(emqx_resource:resource_id()) -> [pid()]. worker_pids(ResId) -> lists:map( fun({_Name, Pid}) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f6a0ebebf..c8be34f87 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -42,19 +42,18 @@ ]). -export([ - set_resource_status_connecting/1, - manager_id_to_resource_id/1 + set_resource_status_connecting/1 ]). % Server --export([start_link/6]). +-export([start_link/5]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). % State record -record(data, { - id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid + id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid }). -type data() :: #data{}. @@ -69,13 +68,6 @@ %% API %%------------------------------------------------------------------------------ -make_manager_id(ResId) -> - emqx_resource:generate_id(ResId). - -manager_id_to_resource_id(MgrId) -> - [ResId, _Index] = string:split(MgrId, ":", trailing), - ResId. - %% @doc Called from emqx_resource when starting a resource instance. %% %% Triggers the emqx_resource_manager_sup supervisor to actually create @@ -92,8 +84,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - MgrId = set_new_owner(ResId), - create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) + create_and_return_data(ResId, Group, ResourceType, Config, Opts) end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist @@ -103,23 +94,22 @@ recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(ResId, false), - MgrId = set_new_owner(ResId), - create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts); + create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts); {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> {error, not_found} end. -create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> - _ = create(MgrId, ResId, Group, ResourceType, Config, Opts), +create_and_return_data(ResId, Group, ResourceType, Config, Opts) -> + _ = create(ResId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(ResId), {ok, Data}. %% @doc Create a resource_manager and wait until it is running -create(MgrId, ResId, Group, ResourceType, Config, Opts) -> +create(ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts), ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, @@ -164,15 +154,12 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> ResId = make_test_id(), - MgrId = set_new_owner(ResId), Opts = case is_map(Config) of true -> maps:get(resource_opts, Config, #{}); false -> #{} end, - ok = emqx_resource_manager_sup:ensure_child( - MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts - ), + ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts), case wait_for_ready(ResId, 5000) of ok -> remove(ResId); @@ -283,10 +270,9 @@ health_check(ResId) -> %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server -start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> +start_link(ResId, Group, ResourceType, Config, Opts) -> Data = #data{ id = ResId, - manager_id = MgrId, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), @@ -320,7 +306,7 @@ terminate({shutdown, removed}, _State, _Data) -> ok; terminate(_Reason, _State, Data) -> _ = maybe_stop_resource(Data), - ok = delete_cache(Data#data.id, Data#data.manager_id), + ok = delete_cache(Data#data.id), ok. %% Behavior callback @@ -345,9 +331,6 @@ handle_event({call, From}, start, State, Data) when start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; -% Called when the resource received a `quit` message -handle_event(info, quit, _State, _Data) -> - {stop, {shutdown, quit}}; % Called when the resource is to be stopped handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; @@ -429,20 +412,8 @@ log_cache_consistency({_, DataCached}, Data) -> %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ -insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> - case get_owner(ResId) of - not_found -> - ets:insert(?ETS_TABLE, {ResId, Group, Data}); - MgrId -> - ets:insert(?ETS_TABLE, {ResId, Group, Data}); - _ -> - ?SLOG(error, #{ - msg => get_resource_owner_failed, - resource_id => ResId, - action => quit_resource - }), - self() ! quit - end. +insert_cache(ResId, Group, Data = #data{}) -> + ets:insert(?ETS_TABLE, {ResId, Group, Data}). read_cache(ResId) -> case ets:lookup(?ETS_TABLE, ResId) of @@ -450,37 +421,14 @@ read_cache(ResId) -> [] -> not_found end. -delete_cache(ResId, MgrId) -> - case get_owner(ResId) of - MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId -> - do_delete_cache(ResId); - _ -> - ok - end. - -do_delete_cache(<> = ResId) -> +delete_cache(<> = ResId) -> true = ets:delete(?ETS_TABLE, {owner, ResId}), true = ets:delete(?ETS_TABLE, ResId), ok; -do_delete_cache(ResId) -> +delete_cache(ResId) -> true = ets:delete(?ETS_TABLE, ResId), ok. -set_new_owner(ResId) -> - MgrId = make_manager_id(ResId), - ok = set_owner(ResId, MgrId), - MgrId. - -set_owner(ResId, MgrId) -> - ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}), - ok. - -get_owner(ResId) -> - case ets:lookup(?ETS_TABLE, {owner, ResId}) of - [{_, MgrId}] -> MgrId; - [] -> not_found - end. - retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> @@ -494,7 +442,7 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> _ = stop_resource(Data), - ok = delete_cache(Data#data.id, Data#data.manager_id), + ok = delete_cache(Data#data.id), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); @@ -504,7 +452,7 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of + case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of {ok, ResourceState} -> UpdatedData = Data#data{status = connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -535,7 +483,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> %% is returned. case ResState /= undefined of true -> - emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState); + emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState); false -> ok end, @@ -589,7 +537,7 @@ with_health_check(#data{state = undefined} = Data, Func) -> Func(disconnected, Data); with_health_check(#data{error = PrevError} = Data, Func) -> ResId = Data#data.id, - HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), + HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId, Err, PrevError), ok = maybe_resume_resource_workers(ResId, Status), diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 5b731d6cf..b27c46739 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,14 +17,14 @@ -behaviour(supervisor). --export([ensure_child/6]). +-export([ensure_child/5]). -export([start_link/0]). -export([init/1]). -ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]), +ensure_child(ResId, Group, ResourceType, Config, Opts) -> + _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), ok. start_link() -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 3def35920..6833b50c3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -502,11 +502,6 @@ resource_id(Config) -> Name = ?config(influxdb_name, Config), emqx_bridge_resource:resource_id(Type, Name). -instance_id(Config) -> - ResourceId = resource_id(Config), - [{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}), - InstanceId. - %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -581,14 +576,14 @@ t_start_already_started(Config) -> {ok, _}, create_bridge(Config) ), - InstanceId = instance_id(Config), + ResourceId = resource_id(Config), TypeAtom = binary_to_atom(Type), NameAtom = binary_to_atom(Name), {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( emqx_bridge_schema, InfluxDBConfigString ), ?check_trace( - emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap), + emqx_ee_connector_influxdb:on_start(ResourceId, InfluxDBConfigMap), fun(Result, Trace) -> ?assertMatch({ok, _}, Result), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 2e1730b52..da72590d4 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -267,9 +267,8 @@ apply_template([{Key, _} | _] = Reqs, Templates) -> [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] end. -client_id(InstanceId) -> - Name = emqx_resource_manager:manager_id_to_resource_id(InstanceId), - erlang:binary_to_atom(Name, utf8). +client_id(ResourceId) -> + erlang:binary_to_atom(ResourceId, utf8). redact(Msg) -> emqx_utils:redact(Msg, fun is_sensitive_key/1).