diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 57d56b339..1ccb5ca71 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -260,7 +260,7 @@ query(ResId, Request) -> -spec query(resource_id(), Request :: term(), query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> - case emqx_resource_manager:ets_lookup(ResId) of + case emqx_resource_manager:lookup_cached(ResId) of {ok, _Group, #{query_mode := QM, mod := Module}} -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of @@ -311,7 +311,7 @@ set_resource_status_connecting(ResId) -> -spec get_instance(resource_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(ResId) -> - emqx_resource_manager:ets_lookup(ResId, [metrics]). + emqx_resource_manager:lookup_cached(ResId, [metrics]). -spec fetch_creation_opts(map()) -> creation_opts(). fetch_creation_opts(Opts) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 711833963..8bfd77e61 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -885,7 +885,7 @@ handle_async_worker_down(Data0, Pid) -> call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), - case emqx_resource_manager:ets_lookup(Id) of + case emqx_resource_manager:lookup_cached(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, Resource} -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 0983dff8d..40f9fe1ab 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -36,8 +36,8 @@ lookup/1, list_all/0, list_group/1, - ets_lookup/1, - ets_lookup/2, + lookup_cached/1, + lookup_cached/2, get_metrics/1, reset_metrics/1 ]). @@ -231,21 +231,21 @@ set_resource_status_connecting(ResId) -> -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(ResId) -> case safe_call(ResId, lookup, ?T_LOOKUP) of - {error, timeout} -> ets_lookup(ResId, [metrics]); + {error, timeout} -> lookup_cached(ResId, [metrics]); Result -> Result end. %% @doc Lookup the group and data of a resource from the cache --spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -ets_lookup(ResId) -> - ets_lookup(ResId, []). +-spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. +lookup_cached(ResId) -> + lookup_cached(ResId, []). %% @doc Lookup the group and data of a resource from the cache --spec ets_lookup(resource_id(), [Option]) -> +-spec lookup_cached(resource_id(), [Option]) -> {ok, resource_group(), resource_data()} | {error, not_found} when Option :: metrics. -ets_lookup(ResId, Options) -> +lookup_cached(ResId, Options) -> NeedMetrics = lists:member(metrics, Options), case read_cache(ResId) of {Group, Data} when NeedMetrics ->