From 633eacad3b77d2d0860b75e9fc3d1490882770af Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Apr 2023 09:42:05 -0300 Subject: [PATCH 1/5] test(pulsar): add more test cases for Pulsar Producer bridge Fixes https://emqx.atlassian.net/browse/EMQX-8400 --- apps/emqx_bridge_pulsar/rebar.config | 2 +- .../src/emqx_bridge_pulsar.app.src | 2 +- .../src/emqx_bridge_pulsar_impl_producer.erl | 47 ++-- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 201 +++++++++++++++++- 4 files changed, 235 insertions(+), 17 deletions(-) diff --git a/apps/emqx_bridge_pulsar/rebar.config b/apps/emqx_bridge_pulsar/rebar.config index be5f282df..d5a63f320 100644 --- a/apps/emqx_bridge_pulsar/rebar.config +++ b/apps/emqx_bridge_pulsar/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.0"}}}, + {pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.1"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index ead7cb715..b169aa2c4 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 2bd44d16a..27d50f077 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -87,11 +87,14 @@ on_start(InstanceId, Config) -> }, case pulsar:ensure_supervised_client(ClientId, Servers, ClientOpts) of {ok, _Pid} -> - ?SLOG(info, #{ - msg => "pulsar_client_started", - instance_id => InstanceId, - pulsar_hosts => Servers - }); + ?tp( + info, + "pulsar_client_started", + #{ + instance_id => InstanceId, + pulsar_hosts => Servers + } + ); {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_pulsar_client", @@ -115,7 +118,7 @@ on_stop(_InstanceId, State) -> ok. -spec on_get_status(manager_id(), state()) -> connected | disconnected. -on_get_status(_InstanceId, State) -> +on_get_status(_InstanceId, State = #{}) -> #{ pulsar_client_id := ClientId, producers := Producers @@ -135,7 +138,11 @@ on_get_status(_InstanceId, State) -> end; {error, _} -> disconnected - end. + end; +on_get_status(_InstanceId, _State) -> + %% If a health check happens just after a concurrent request to + %% create the bridge is not quite finished, `State = undefined'. + connecting. -spec on_query(manager_id(), {send_message, map()}, state()) -> {ok, term()} @@ -160,6 +167,13 @@ on_query(_InstanceId, {send_message, Message}, State) -> ) -> {ok, pid()}. on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> + ?tp_span( + pulsar_producer_on_query_async, + #{instance_id => _InstanceId, message => Message}, + do_on_query_async(Message, AsyncReplyFn, State) + ). + +do_on_query_async(Message, AsyncReplyFn, State) -> #{ producers := Producers, message_template := MessageTemplate @@ -283,6 +297,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> drop_if_highmem => MemOLP }, ProducerName = producer_name(ClientId), + ?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}), MessageTemplate = compile_message_template(MessageTemplateOpts), ProducerOpts0 = #{ @@ -298,6 +313,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0), PulsarTopic = binary_to_list(PulsarTopic0), + ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> State = #{ @@ -310,13 +326,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> {ok, State} catch Kind:Error:Stacktrace -> - ?SLOG(error, #{ - msg => "failed_to_start_pulsar_producer", - instance_id => InstanceId, - kind => Kind, - reason => Error, - stacktrace => Stacktrace - }), + ?tp( + error, + "failed_to_start_pulsar_producer", + #{ + instance_id => InstanceId, + kind => Kind, + reason => Error, + stacktrace => Stacktrace + } + ), stop_client(ClientId), throw(failed_to_start_pulsar_producer) end. diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index d254b01fc..be38f6625 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -37,7 +37,14 @@ groups() -> ]. only_once_tests() -> - [t_create_via_http]. + [ + t_create_via_http, + t_start_when_down, + t_send_when_down, + t_send_when_timeout, + t_failure_to_start_producer, + t_producer_process_crash + ]. init_per_suite(Config) -> Config. @@ -753,6 +760,198 @@ t_on_get_status(Config) -> ), ok. +t_start_when_down(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ResourceId = resource_id(Config), + ?check_trace( + begin + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + ok + end), + %% Should recover given enough time. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ok + end, + [] + ), + ok. + +t_send_when_down(Config) -> + do_t_send_with_failure(Config, down). + +t_send_when_timeout(Config) -> + do_t_send_with_failure(Config, timeout). + +do_t_send_with_failure(Config, FailureType) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + MQTTTopic = ?config(mqtt_topic, Config), + QoS = 0, + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload), + + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := pulsar_producer_bridge_started}, + 10_000 + ), + ?check_trace( + begin + emqx_common_test_helpers:with_failure( + FailureType, ProxyName, ProxyHost, ProxyPort, fun() -> + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message0), + #{ + ?snk_kind := pulsar_producer_on_query_async, + ?snk_span := {complete, _} + }, + 5_000 + ), + ok + end + ), + ok + end, + fun(_Trace) -> + %% Should recover given enough time. + Data0 = receive_consumed(20_000), + ?assertMatch( + [ + #{ + <<"clientid">> := ClientId, + <<"event">> := <<"message.publish">>, + <<"payload">> := Payload, + <<"topic">> := MQTTTopic + } + ], + Data0 + ), + ok + end + ), + ok. + +%% Check that we correctly terminate the pulsar client when the pulsar +%% producer processes fail to start for whatever reason. +t_failure_to_start_producer(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := name_registered}, + #{?snk_kind := pulsar_producer_about_to_start_producers} + ), + spawn_link(fun() -> + ?tp(will_register_name, #{}), + {ok, #{producer_name := ProducerName}} = ?block_until( + #{?snk_kind := pulsar_producer_capture_name}, 10_000 + ), + true = register(ProducerName, self()), + ?tp(name_registered, #{name => ProducerName}), + %% Just simulating another process so that starting the + %% producers fail. Currently it does a gen_server:call + %% with `infinity' timeout, so this is just to avoid + %% hanging. + receive + {'$gen_call', From, _Request} -> + gen_server:reply(From, {error, im_not, your_producer}) + end, + receive + die -> ok + end + end), + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := pulsar_bridge_client_stopped}, + 20_000 + ), + ok + end, + [] + ), + ok. + +%% Check the driver recovers itself if one of the producer processes +%% die for whatever reason. +t_producer_process_crash(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + QoS = 0, + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload), + ?check_trace( + begin + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge( + Config, + #{<<"buffer">> => #{<<"mode">> => <<"disk">>}} + ), + #{?snk_kind := pulsar_producer_bridge_started}, + 10_000 + ), + [ProducerPid | _] = [ + Pid + || {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup), + Pid <- element(2, process_info(PS, links)), + case proc_lib:initial_call(Pid) of + {pulsar_producer, init, _} -> true; + _ -> false + end + ], + Ref = monitor(process, ProducerPid), + exit(ProducerPid, kill), + receive + {'DOWN', Ref, process, ProducerPid, _Killed} -> + ok + after 1_000 -> ct:fail("pid didn't die") + end, + ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)), + %% Should recover given enough time. + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message0), + #{?snk_kind := pulsar_producer_on_query_async, ?snk_span := {complete, _}}, + 5_000 + ), + Data0 = receive_consumed(20_000), + ?assertMatch( + [ + #{ + <<"clientid">> := ClientId, + <<"event">> := <<"message.publish">>, + <<"payload">> := Payload, + <<"topic">> := MQTTTopic + } + ], + Data0 + ), + ok + end, + [] + ), + ok. + t_cluster(Config) -> MQTTTopic = ?config(mqtt_topic, Config), ResourceId = resource_id(Config), From aaef95b1daf14ba84958084290a1cc2c697f8f18 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 10 Apr 2023 22:46:18 +0300 Subject: [PATCH 2/5] feat(resman): stop adding uniqueness to manager ids Before this change, a separate `manager_id` / `instance_id` was used as resource manager id, which made connector interface somewhat inconsistent: part of function calls to connector implementation used instance id as first argument while the rest used resource id itself. --- .../src/emqx_connector_mqtt.erl | 5 +- apps/emqx_resource/src/emqx_resource.erl | 11 ++- .../src/emqx_resource_buffer_worker_sup.erl | 3 + .../src/emqx_resource_manager.erl | 92 ++++--------------- .../src/emqx_resource_manager_sup.erl | 6 +- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 9 +- .../src/emqx_ee_connector_rocketmq.erl | 5 +- 7 files changed, 39 insertions(+), 92 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5b488825b..5cafd2d50 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -248,13 +248,12 @@ make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined, _Conf, _) -> undefined; -make_sub_confs(SubRemoteConf, Conf, InstanceId) -> - ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId), +make_sub_confs(SubRemoteConf, Conf, ResourceId) -> case maps:find(hookpoint, Conf) of error -> error({no_hookpoint_provided, Conf}); {ok, HookPoint} -> - MFA = {?MODULE, on_message_received, [HookPoint, ResId]}, + MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, SubRemoteConf#{on_message_received => MFA} end. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d8b91942b..6f72f8a16 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -113,7 +113,10 @@ -export([apply_reply_fun/2]). --export_type([resource_data/0]). +-export_type([ + resource_id/0, + resource_data/0 +]). -optional_callbacks([ on_query/3, @@ -362,7 +365,7 @@ is_buffer_supported(Module) -> false end. --spec call_start(manager_id(), module(), resource_config()) -> +-spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> try @@ -374,7 +377,7 @@ call_start(MgrId, Mod, Config) -> {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} end. --spec call_health_check(manager_id(), module(), resource_state()) -> +-spec call_health_check(resource_id(), module(), resource_state()) -> resource_status() | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()} @@ -382,7 +385,7 @@ call_start(MgrId, Mod, Config) -> call_health_check(MgrId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). --spec call_stop(manager_id(), module(), resource_state()) -> term(). +-spec call_stop(resource_id(), module(), resource_state()) -> term(). call_stop(MgrId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl index ae30c3927..104ad7ade 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker_sup.erl @@ -52,6 +52,7 @@ init([]) -> ChildSpecs = [], {ok, {SupFlags, ChildSpecs}}. +-spec start_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok. start_workers(ResId, Opts) -> WorkerPoolSize = worker_pool_size(Opts), _ = ensure_worker_pool(ResId, hash, [{size, WorkerPoolSize}]), @@ -63,6 +64,7 @@ start_workers(ResId, Opts) -> lists:seq(1, WorkerPoolSize) ). +-spec stop_workers(emqx_resource:resource_id(), _Opts :: #{atom() => _}) -> ok. stop_workers(ResId, Opts) -> WorkerPoolSize = worker_pool_size(Opts), lists:foreach( @@ -75,6 +77,7 @@ stop_workers(ResId, Opts) -> ensure_worker_pool_removed(ResId), ok. +-spec worker_pids(emqx_resource:resource_id()) -> [pid()]. worker_pids(ResId) -> lists:map( fun({_Name, Pid}) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f6a0ebebf..c8be34f87 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -42,19 +42,18 @@ ]). -export([ - set_resource_status_connecting/1, - manager_id_to_resource_id/1 + set_resource_status_connecting/1 ]). % Server --export([start_link/6]). +-export([start_link/5]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). % State record -record(data, { - id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid + id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid }). -type data() :: #data{}. @@ -69,13 +68,6 @@ %% API %%------------------------------------------------------------------------------ -make_manager_id(ResId) -> - emqx_resource:generate_id(ResId). - -manager_id_to_resource_id(MgrId) -> - [ResId, _Index] = string:split(MgrId, ":", trailing), - ResId. - %% @doc Called from emqx_resource when starting a resource instance. %% %% Triggers the emqx_resource_manager_sup supervisor to actually create @@ -92,8 +84,7 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) -> {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - MgrId = set_new_owner(ResId), - create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) + create_and_return_data(ResId, Group, ResourceType, Config, Opts) end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist @@ -103,23 +94,22 @@ recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(ResId, false), - MgrId = set_new_owner(ResId), - create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts); + create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts); {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> {error, not_found} end. -create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> - _ = create(MgrId, ResId, Group, ResourceType, Config, Opts), +create_and_return_data(ResId, Group, ResourceType, Config, Opts) -> + _ = create(ResId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(ResId), {ok, Data}. %% @doc Create a resource_manager and wait until it is running -create(MgrId, ResId, Group, ResourceType, Config, Opts) -> +create(ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts), ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, @@ -164,15 +154,12 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> ResId = make_test_id(), - MgrId = set_new_owner(ResId), Opts = case is_map(Config) of true -> maps:get(resource_opts, Config, #{}); false -> #{} end, - ok = emqx_resource_manager_sup:ensure_child( - MgrId, ResId, <<"dry_run">>, ResourceType, Config, Opts - ), + ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts), case wait_for_ready(ResId, 5000) of ok -> remove(ResId); @@ -283,10 +270,9 @@ health_check(ResId) -> %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server -start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> +start_link(ResId, Group, ResourceType, Config, Opts) -> Data = #data{ id = ResId, - manager_id = MgrId, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), @@ -320,7 +306,7 @@ terminate({shutdown, removed}, _State, _Data) -> ok; terminate(_Reason, _State, Data) -> _ = maybe_stop_resource(Data), - ok = delete_cache(Data#data.id, Data#data.manager_id), + ok = delete_cache(Data#data.id), ok. %% Behavior callback @@ -345,9 +331,6 @@ handle_event({call, From}, start, State, Data) when start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; -% Called when the resource received a `quit` message -handle_event(info, quit, _State, _Data) -> - {stop, {shutdown, quit}}; % Called when the resource is to be stopped handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; @@ -429,20 +412,8 @@ log_cache_consistency({_, DataCached}, Data) -> %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ -insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> - case get_owner(ResId) of - not_found -> - ets:insert(?ETS_TABLE, {ResId, Group, Data}); - MgrId -> - ets:insert(?ETS_TABLE, {ResId, Group, Data}); - _ -> - ?SLOG(error, #{ - msg => get_resource_owner_failed, - resource_id => ResId, - action => quit_resource - }), - self() ! quit - end. +insert_cache(ResId, Group, Data = #data{}) -> + ets:insert(?ETS_TABLE, {ResId, Group, Data}). read_cache(ResId) -> case ets:lookup(?ETS_TABLE, ResId) of @@ -450,37 +421,14 @@ read_cache(ResId) -> [] -> not_found end. -delete_cache(ResId, MgrId) -> - case get_owner(ResId) of - MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId -> - do_delete_cache(ResId); - _ -> - ok - end. - -do_delete_cache(<> = ResId) -> +delete_cache(<> = ResId) -> true = ets:delete(?ETS_TABLE, {owner, ResId}), true = ets:delete(?ETS_TABLE, ResId), ok; -do_delete_cache(ResId) -> +delete_cache(ResId) -> true = ets:delete(?ETS_TABLE, ResId), ok. -set_new_owner(ResId) -> - MgrId = make_manager_id(ResId), - ok = set_owner(ResId, MgrId), - MgrId. - -set_owner(ResId, MgrId) -> - ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}), - ok. - -get_owner(ResId) -> - case ets:lookup(?ETS_TABLE, {owner, ResId}) of - [{_, MgrId}] -> MgrId; - [] -> not_found - end. - retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> @@ -494,7 +442,7 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> _ = stop_resource(Data), - ok = delete_cache(Data#data.id, Data#data.manager_id), + ok = delete_cache(Data#data.id), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); @@ -504,7 +452,7 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of + case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of {ok, ResourceState} -> UpdatedData = Data#data{status = connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -535,7 +483,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> %% is returned. case ResState /= undefined of true -> - emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState); + emqx_resource:call_stop(Data#data.id, Data#data.mod, ResState); false -> ok end, @@ -589,7 +537,7 @@ with_health_check(#data{state = undefined} = Data, Func) -> Func(disconnected, Data); with_health_check(#data{error = PrevError} = Data, Func) -> ResId = Data#data.id, - HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), + HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId, Err, PrevError), ok = maybe_resume_resource_workers(ResId, Status), diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 5b731d6cf..b27c46739 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,14 +17,14 @@ -behaviour(supervisor). --export([ensure_child/6]). +-export([ensure_child/5]). -export([start_link/0]). -export([init/1]). -ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]), +ensure_child(ResId, Group, ResourceType, Config, Opts) -> + _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), ok. start_link() -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index 3def35920..6833b50c3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -502,11 +502,6 @@ resource_id(Config) -> Name = ?config(influxdb_name, Config), emqx_bridge_resource:resource_id(Type, Name). -instance_id(Config) -> - ResourceId = resource_id(Config), - [{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}), - InstanceId. - %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -581,14 +576,14 @@ t_start_already_started(Config) -> {ok, _}, create_bridge(Config) ), - InstanceId = instance_id(Config), + ResourceId = resource_id(Config), TypeAtom = binary_to_atom(Type), NameAtom = binary_to_atom(Name), {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( emqx_bridge_schema, InfluxDBConfigString ), ?check_trace( - emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap), + emqx_ee_connector_influxdb:on_start(ResourceId, InfluxDBConfigMap), fun(Result, Trace) -> ?assertMatch({ok, _}, Result), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 2e1730b52..da72590d4 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -267,9 +267,8 @@ apply_template([{Key, _} | _] = Reqs, Templates) -> [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] end. -client_id(InstanceId) -> - Name = emqx_resource_manager:manager_id_to_resource_id(InstanceId), - erlang:binary_to_atom(Name, utf8). +client_id(ResourceId) -> + erlang:binary_to_atom(ResourceId, utf8). redact(Msg) -> emqx_utils:redact(Msg, fun is_sensitive_key/1). From 457516760709828c97696316ae8771a7d0d93b46 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 19 Apr 2023 12:32:31 +0300 Subject: [PATCH 3/5] feat(resource): drop `manager_id()` type --- .../src/emqx_bridge_gcp_pubsub_connector.erl | 80 +++++++++---------- .../src/emqx_bridge_kafka_impl_consumer.erl | 46 +++++------ .../src/emqx_bridge_pulsar_impl_producer.erl | 16 ++-- apps/emqx_resource/include/emqx_resource.hrl | 1 - apps/emqx_resource/src/emqx_resource.erl | 12 +-- .../src/emqx_ee_connector_sqlserver.erl | 58 +++++++------- 6 files changed, 103 insertions(+), 110 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 98f3e497d..be5e56e85 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -38,7 +38,6 @@ }. -type state() :: #{ connect_timeout := timer:time(), - instance_id := manager_id(), jwt_worker_id := jwt_worker(), max_retries := non_neg_integer(), payload_template := emqx_plugin_libs_rule:tmpl_token(), @@ -61,9 +60,9 @@ is_buffer_supported() -> false. callback_mode() -> async_if_possible. --spec on_start(manager_id(), config()) -> {ok, state()} | {error, term()}. +-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. on_start( - InstanceId, + ResourceId, #{ connect_timeout := ConnectTimeout, max_retries := MaxRetries, @@ -75,7 +74,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_gcp_pubsub_bridge", - connector => InstanceId, + connector => ResourceId, config => Config }), %% emulating the emulator behavior @@ -100,14 +99,13 @@ on_start( #{ jwt_worker_id := JWTWorkerId, project_id := ProjectId - } = ensure_jwt_worker(InstanceId, Config), + } = ensure_jwt_worker(ResourceId, Config), State = #{ connect_timeout => ConnectTimeout, - instance_id => InstanceId, jwt_worker_id => JWTWorkerId, max_retries => MaxRetries, payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), - pool_name => InstanceId, + pool_name => ResourceId, project_id => ProjectId, pubsub_topic => PubSubTopic, request_timeout => RequestTimeout @@ -115,39 +113,39 @@ on_start( ?tp( gcp_pubsub_on_start_before_starting_pool, #{ - instance_id => InstanceId, - pool_name => InstanceId, + resource_id => ResourceId, + pool_name => ResourceId, pool_opts => PoolOpts } ), - ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => InstanceId}), - case ehttpc_sup:start_pool(InstanceId, PoolOpts) of + ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => ResourceId}), + case ehttpc_sup:start_pool(ResourceId, PoolOpts) of {ok, _} -> {ok, State}; {error, {already_started, _}} -> - ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => InstanceId}), + ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => ResourceId}), {ok, State}; {error, Reason} -> ?tp(gcp_pubsub_ehttpc_pool_start_failure, #{ - pool_name => InstanceId, + pool_name => ResourceId, reason => Reason }), {error, Reason} end. --spec on_stop(manager_id(), state()) -> ok | {error, term()}. +-spec on_stop(resource_id(), state()) -> ok | {error, term()}. on_stop( - InstanceId, - _State = #{jwt_worker_id := JWTWorkerId, pool_name := PoolName} + ResourceId, + _State = #{jwt_worker_id := JWTWorkerId} ) -> - ?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}), + ?tp(gcp_pubsub_stop, #{resource_id => ResourceId, jwt_worker_id => JWTWorkerId}), ?SLOG(info, #{ msg => "stopping_gcp_pubsub_bridge", - connector => InstanceId + connector => ResourceId }), emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), - emqx_connector_jwt:delete_jwt(?JWT_TABLE, InstanceId), - ehttpc_sup:stop_pool(PoolName). + emqx_connector_jwt:delete_jwt(?JWT_TABLE, ResourceId), + ehttpc_sup:stop_pool(ResourceId). -spec on_query( resource_id(), @@ -213,9 +211,9 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ), do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). --spec on_get_status(manager_id(), state()) -> connected | disconnected. -on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = State) -> - case do_get_status(InstanceId, PoolName, Timeout) of +-spec on_get_status(resource_id(), state()) -> connected | disconnected. +on_get_status(ResourceId, #{connect_timeout := Timeout} = State) -> + case do_get_status(ResourceId, Timeout) of true -> connected; false -> @@ -230,12 +228,12 @@ on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = %% Helper fns %%------------------------------------------------------------------------------------------------- --spec ensure_jwt_worker(manager_id(), config()) -> +-spec ensure_jwt_worker(resource_id(), config()) -> #{ jwt_worker_id := jwt_worker(), project_id := binary() }. -ensure_jwt_worker(InstanceId, #{ +ensure_jwt_worker(ResourceId, #{ service_account_json := ServiceAccountJSON }) -> #{ @@ -250,7 +248,7 @@ ensure_jwt_worker(InstanceId, #{ Alg = <<"RS256">>, Config = #{ private_key => PrivateKeyPEM, - resource_id => InstanceId, + resource_id => ResourceId, expiration => ExpirationMS, table => ?JWT_TABLE, iss => ServiceAccountEmail, @@ -260,14 +258,14 @@ ensure_jwt_worker(InstanceId, #{ alg => Alg }, - JWTWorkerId = <<"gcp_pubsub_jwt_worker:", InstanceId/binary>>, + JWTWorkerId = <<"gcp_pubsub_jwt_worker:", ResourceId/binary>>, Worker = case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of {ok, Worker0} -> Worker0; Error -> ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ - connector => InstanceId, + connector => ResourceId, reason => Error }), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), @@ -281,18 +279,18 @@ ensure_jwt_worker(InstanceId, #{ %% produced by the worker. receive {Ref, token_created} -> - ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => InstanceId}), + ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => ResourceId}), demonitor(MRef, [flush]), ok; {'DOWN', MRef, process, Worker, Reason} -> ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ - connector => InstanceId, + connector => ResourceId, reason => Reason }), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), throw(failed_to_start_jwt_worker) after 10_000 -> - ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => InstanceId}), + ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => ResourceId}), demonitor(MRef, [flush]), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), throw(timeout_creating_jwt) @@ -325,8 +323,8 @@ publish_path( <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. -spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}]. -get_jwt_authorization_header(InstanceId) -> - case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, InstanceId) of +get_jwt_authorization_header(ResourceId) -> + case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, ResourceId) of %% Since we synchronize the JWT creation during resource start %% (see `on_start/2'), this will be always be populated. {ok, JWT} -> @@ -345,7 +343,6 @@ get_jwt_authorization_header(InstanceId) -> do_send_requests_sync(State, Requests, ResourceId) -> #{ pool_name := PoolName, - instance_id := InstanceId, max_retries := MaxRetries, request_timeout := RequestTimeout } = State, @@ -353,12 +350,11 @@ do_send_requests_sync(State, Requests, ResourceId) -> gcp_pubsub_bridge_do_send_requests, #{ query_mode => sync, - instance_id => InstanceId, resource_id => ResourceId, requests => Requests } ), - Headers = get_jwt_authorization_header(InstanceId), + Headers = get_jwt_authorization_header(ResourceId), Payloads = lists:map( fun({send_message, Selected}) -> @@ -471,19 +467,17 @@ do_send_requests_sync(State, Requests, ResourceId) -> do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ pool_name := PoolName, - instance_id := InstanceId, request_timeout := RequestTimeout } = State, ?tp( gcp_pubsub_bridge_do_send_requests, #{ query_mode => async, - instance_id => InstanceId, resource_id => ResourceId, requests => Requests } ), - Headers = get_jwt_authorization_header(InstanceId), + Headers = get_jwt_authorization_header(ResourceId), Payloads = lists:map( fun({send_message, Selected}) -> @@ -541,9 +535,9 @@ reply_delegator(_ResourceId, ReplyFunAndArgs, Result) -> emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) end. --spec do_get_status(manager_id(), binary(), timer:time()) -> boolean(). -do_get_status(InstanceId, PoolName, Timeout) -> - Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], +-spec do_get_status(resource_id(), timer:time()) -> boolean(). +do_get_status(ResourceId, Timeout) -> + Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ResourceId)], DoPerWorker = fun(Worker) -> case ehttpc:health_check(Worker, Timeout) of @@ -552,7 +546,7 @@ do_get_status(InstanceId, PoolName, Timeout) -> {error, Reason} -> ?SLOG(error, #{ msg => "ehttpc_health_check_failed", - instance_id => InstanceId, + connector => ResourceId, reason => Reason, worker => Worker }), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c549b3467..f7958af81 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -114,8 +114,8 @@ callback_mode() -> is_buffer_supported() -> true. --spec on_start(manager_id(), config()) -> {ok, state()}. -on_start(InstanceId, Config) -> +-spec on_start(resource_id(), config()) -> {ok, state()}. +on_start(ResourceId, Config) -> #{ authentication := Auth, bootstrap_hosts := BootstrapHosts0, @@ -133,7 +133,7 @@ on_start(InstanceId, Config) -> BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), KafkaType = kafka_consumer, %% Note: this is distinct per node. - ClientID = make_client_id(InstanceId, KafkaType, BridgeName), + ClientID = make_client_id(ResourceId, KafkaType, BridgeName), ClientOpts0 = case Auth of none -> []; @@ -144,26 +144,26 @@ on_start(InstanceId, Config) -> ok -> ?tp( kafka_consumer_client_started, - #{client_id => ClientID, instance_id => InstanceId} + #{client_id => ClientID, resource_id => ResourceId} ), ?SLOG(info, #{ msg => "kafka_consumer_client_started", - instance_id => InstanceId, + resource_id => ResourceId, kafka_hosts => BootstrapHosts }); {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer_client", - instance_id => InstanceId, + resource_id => ResourceId, kafka_hosts => BootstrapHosts, reason => emqx_utils:redact(Reason) }), throw(?CLIENT_DOWN_MESSAGE) end, - start_consumer(Config, InstanceId, ClientID). + start_consumer(Config, ResourceId, ClientID). --spec on_stop(manager_id(), state()) -> ok. -on_stop(_InstanceID, State) -> +-spec on_stop(resource_id(), state()) -> ok. +on_stop(_ResourceID, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID @@ -172,8 +172,8 @@ on_stop(_InstanceID, State) -> stop_client(ClientID), ok. --spec on_get_status(manager_id(), state()) -> connected | disconnected. -on_get_status(_InstanceID, State) -> +-spec on_get_status(resource_id(), state()) -> connected | disconnected. +on_get_status(_ResourceID, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID, @@ -271,8 +271,8 @@ ensure_consumer_supervisor_started() -> ok end. --spec start_consumer(config(), manager_id(), brod:client_id()) -> {ok, state()}. -start_consumer(Config, InstanceId, ClientID) -> +-spec start_consumer(config(), resource_id(), brod:client_id()) -> {ok, state()}. +start_consumer(Config, ResourceId, ClientID) -> #{ bootstrap_hosts := BootstrapHosts0, bridge_name := BridgeName, @@ -292,7 +292,7 @@ start_consumer(Config, InstanceId, ClientID) -> InitialState = #{ key_encoding_mode => KeyEncodingMode, hookpoint => Hookpoint, - resource_id => emqx_bridge_resource:resource_id(kafka_consumer, BridgeName), + resource_id => ResourceId, topic_mapping => TopicMapping, value_encoding_mode => ValueEncodingMode }, @@ -337,7 +337,7 @@ start_consumer(Config, InstanceId, ClientID) -> {ok, _ConsumerPid} -> ?tp( kafka_consumer_subscriber_started, - #{instance_id => InstanceId, subscriber_id => SubscriberId} + #{resource_id => ResourceId, subscriber_id => SubscriberId} ), {ok, #{ subscriber_id => SubscriberId, @@ -347,7 +347,7 @@ start_consumer(Config, InstanceId, ClientID) -> {error, Reason2} -> ?SLOG(error, #{ msg => "failed_to_start_kafka_consumer", - instance_id => InstanceId, + resource_id => ResourceId, kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0), reason => emqx_utils:redact(Reason2) }), @@ -471,19 +471,19 @@ consumer_group_id(BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. --spec is_dry_run(manager_id()) -> boolean(). -is_dry_run(InstanceId) -> - TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), +-spec is_dry_run(resource_id()) -> boolean(). +is_dry_run(ResourceId) -> + TestIdStart = string:find(ResourceId, ?TEST_ID_PREFIX), case TestIdStart of nomatch -> false; _ -> - string:equal(TestIdStart, InstanceId) + string:equal(TestIdStart, ResourceId) end. --spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom(). -make_client_id(InstanceId, KafkaType, KafkaName) -> - case is_dry_run(InstanceId) of +-spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom(). +make_client_id(ResourceId, KafkaType, KafkaName) -> + case is_dry_run(ResourceId) of false -> ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName), binary_to_atom(ClientID0); diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 27d50f077..300fe9b2d 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -70,7 +70,7 @@ callback_mode() -> async_if_possible. %% workers. is_buffer_supported() -> true. --spec on_start(manager_id(), config()) -> {ok, state()}. +-spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> #{ authentication := _Auth, @@ -106,7 +106,7 @@ on_start(InstanceId, Config) -> end, start_producer(Config, InstanceId, ClientId, ClientOpts). --spec on_stop(manager_id(), state()) -> ok. +-spec on_stop(resource_id(), state()) -> ok. on_stop(_InstanceId, State) -> #{ pulsar_client_id := ClientId, @@ -117,7 +117,7 @@ on_stop(_InstanceId, State) -> ?tp(pulsar_bridge_stopped, #{instance_id => _InstanceId}), ok. --spec on_get_status(manager_id(), state()) -> connected | disconnected. +-spec on_get_status(resource_id(), state()) -> connected | disconnected. on_get_status(_InstanceId, State = #{}) -> #{ pulsar_client_id := ClientId, @@ -144,7 +144,7 @@ on_get_status(_InstanceId, _State) -> %% create the bridge is not quite finished, `State = undefined'. connecting. --spec on_query(manager_id(), {send_message, map()}, state()) -> +-spec on_query(resource_id(), {send_message, map()}, state()) -> {ok, term()} | {error, timeout} | {error, term()}. @@ -163,7 +163,7 @@ on_query(_InstanceId, {send_message, Message}, State) -> end. -spec on_query_async( - manager_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() + resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. on_query_async(_InstanceId, {send_message, Message}, AsyncReplyFn, State) -> @@ -203,7 +203,7 @@ format_servers(Servers0) -> Servers1 ). --spec make_client_id(manager_id(), atom() | binary()) -> pulsar_client_id(). +-spec make_client_id(resource_id(), atom() | binary()) -> pulsar_client_id(). make_client_id(InstanceId, BridgeName) -> case is_dry_run(InstanceId) of true -> @@ -218,7 +218,7 @@ make_client_id(InstanceId, BridgeName) -> binary_to_atom(ClientIdBin) end. --spec is_dry_run(manager_id()) -> boolean(). +-spec is_dry_run(resource_id()) -> boolean(). is_dry_run(InstanceId) -> TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), case TestIdStart of @@ -255,7 +255,7 @@ producer_name(ClientId) -> ]) ). --spec start_producer(config(), manager_id(), pulsar_client_id(), map()) -> {ok, state()}. +-spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}. start_producer(Config, InstanceId, ClientId, ClientOpts) -> #{ conn_opts := ConnOpts, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 91572eac3..e6f86fb59 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- -type resource_type() :: module(). -type resource_id() :: binary(). --type manager_id() :: binary(). -type raw_resource_config() :: binary() | raw_term_resource_config(). -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()]. -type resource_config() :: term(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 6f72f8a16..7c48e8ee4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -367,9 +367,9 @@ is_buffer_supported(Module) -> -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. -call_start(MgrId, Mod, Config) -> +call_start(ResId, Mod, Config) -> try - Mod:on_start(MgrId, Config) + Mod:on_start(ResId, Config) catch throw:Error -> {error, Error}; @@ -382,12 +382,12 @@ call_start(MgrId, Mod, Config) -> | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()} | {error, term()}. -call_health_check(MgrId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). +call_health_check(ResId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)). -spec call_stop(resource_id(), module(), resource_state()) -> term(). -call_stop(MgrId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). +call_stop(ResId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_stop(ResId, ResourceState)). -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index 90d90cb36..61e7b24c8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -174,7 +174,7 @@ callback_mode() -> async_if_possible. is_buffer_supported() -> false. on_start( - InstanceId = PoolName, + ResourceId = PoolName, #{ server := Server, username := Username, @@ -187,7 +187,7 @@ on_start( ) -> ?SLOG(info, #{ msg => "starting_sqlserver_connector", - connector => InstanceId, + connector => ResourceId, config => emqx_utils:redact(Config) }), @@ -212,7 +212,7 @@ on_start( ], State = #{ - %% also InstanceId + %% also ResourceId pool_name => PoolName, sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts @@ -228,15 +228,15 @@ on_start( {error, Reason} end. -on_stop(InstanceId, #{pool_name := PoolName} = _State) -> +on_stop(ResourceId, _State) -> ?SLOG(info, #{ msg => "stopping_sqlserver_connector", - connector => InstanceId + connector => ResourceId }), - emqx_resource_pool:stop(PoolName). + emqx_resource_pool:stop(ResourceId). -spec on_query( - manager_id(), + resource_id(), {?ACTION_SEND_MESSAGE, map()}, state() ) -> @@ -244,16 +244,16 @@ on_stop(InstanceId, #{pool_name := PoolName} = _State) -> | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> +on_query(ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> ?TRACE( "SINGLE_QUERY_SYNC", "bridge_sqlserver_received", - #{requests => Query, connector => InstanceId, state => State} + #{requests => Query, connector => ResourceId, state => State} ), - do_query(InstanceId, Query, ?SYNC_QUERY_MODE, State). + do_query(ResourceId, Query, ?SYNC_QUERY_MODE, State). -spec on_query_async( - manager_id(), + resource_id(), {?ACTION_SEND_MESSAGE, map()}, {ReplyFun :: function(), Args :: list()}, state() @@ -261,7 +261,7 @@ on_query(InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, State) -> {ok, any()} | {error, term()}. on_query_async( - InstanceId, + ResourceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, ReplyFunAndArgs, State @@ -269,12 +269,12 @@ on_query_async( ?TRACE( "SINGLE_QUERY_ASYNC", "bridge_sqlserver_received", - #{requests => Query, connector => InstanceId, state => State} + #{requests => Query, connector => ResourceId, state => State} ), - do_query(InstanceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + do_query(ResourceId, Query, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -spec on_batch_query( - manager_id(), + resource_id(), [{?ACTION_SEND_MESSAGE, map()}], state() ) -> @@ -282,29 +282,29 @@ on_query_async( | {ok, list()} | {error, {recoverable_error, term()}} | {error, term()}. -on_batch_query(InstanceId, BatchRequests, State) -> +on_batch_query(ResourceId, BatchRequests, State) -> ?TRACE( "BATCH_QUERY_SYNC", "bridge_sqlserver_received", - #{requests => BatchRequests, connector => InstanceId, state => State} + #{requests => BatchRequests, connector => ResourceId, state => State} ), - do_query(InstanceId, BatchRequests, ?SYNC_QUERY_MODE, State). + do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State). -spec on_batch_query_async( - manager_id(), + resource_id(), [{?ACTION_SEND_MESSAGE, map()}], {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, any()}. -on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> +on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "BATCH_QUERY_ASYNC", "bridge_sqlserver_received", - #{requests => Requests, connector => InstanceId, state => State} + #{requests => Requests, connector => ResourceId, state => State} ), - do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). + do_query(ResourceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> +on_get_status(_ResourceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, {?MODULE, do_get_status, []} @@ -366,7 +366,7 @@ conn_str([{_, _} | Opts], Acc) -> %% Sync & Async query with singe & batch sql statement -spec do_query( - manager_id(), + resource_id(), Query :: {?ACTION_SEND_MESSAGE, map()} | [{?ACTION_SEND_MESSAGE, map()}], ApplyMode :: handover @@ -377,7 +377,7 @@ conn_str([{_, _} | Opts], Acc) -> | {error, {recoverable_error, term()}} | {error, term()}. do_query( - InstanceId, + ResourceId, Query, ApplyMode, #{pool_name := PoolName, sql_templates := Templates} = State @@ -385,7 +385,7 @@ do_query( ?TRACE( "SINGLE_QUERY_SYNC", "sqlserver_connector_received", - #{query => Query, connector => InstanceId, state => State} + #{query => Query, connector => ResourceId, state => State} ), %% only insert sql statement for single query and batch query @@ -409,7 +409,7 @@ do_query( ), ?SLOG(error, #{ msg => "sqlserver_connector_do_query_failed", - connector => InstanceId, + connector => ResourceId, query => Query, reason => Reason }), @@ -423,9 +423,9 @@ do_query( end. worker_do_insert( - Conn, SQL, #{resource_opts := ResourceOpts, pool_name := InstanceId} = State + Conn, SQL, #{resource_opts := ResourceOpts, pool_name := ResourceId} = State ) -> - LogMeta = #{connector => InstanceId, state => State}, + LogMeta = #{connector => ResourceId, state => State}, try case execute(Conn, SQL, ?REQUEST_TIMEOUT(ResourceOpts)) of {selected, Rows, _} -> From 670709f746a7820dd0d981dc170c5ae337a6777d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 19 Apr 2023 14:32:03 +0300 Subject: [PATCH 4/5] feat(resource): ensure uniqueness through `gproc` Also use it instead of a custom ETS table for simplicity and better consistency. This has drawbacks though: expect slightly increased load on gproc gen_server due to how `gproc:set_value/2` works. --- .../src/emqx_resource_manager.erl | 79 +++++++++---------- .../src/emqx_resource_manager_sup.erl | 4 - .../test/emqx_resource_SUITE.erl | 32 ++++---- 3 files changed, 53 insertions(+), 62 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c8be34f87..f42d3c1b5 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -57,7 +57,9 @@ }). -type data() :: #data{}. --define(ETS_TABLE, ?MODULE). +-define(NAME(ResId), {n, l, {?MODULE, ResId}}). +-define(REF(ResId), {via, gproc, ?NAME(ResId)}). + -define(WAIT_FOR_RESOURCE_DELAY, 100). -define(T_OPERATION, 5000). -define(T_LOOKUP, 1000). @@ -229,10 +231,11 @@ lookup(ResId) -> %% @doc Lookup the group and data of a resource from the cache -spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup_cached(ResId) -> - case read_cache(ResId) of - {Group, Data} -> - {ok, Group, data_record_to_external_map(Data)}; - not_found -> + try read_cache(ResId) of + Data = #data{group = Group} -> + {ok, Group, data_record_to_external_map(Data)} + catch + error:badarg -> {error, not_found} end. @@ -248,20 +251,16 @@ reset_metrics(ResId) -> %% @doc Returns the data for all resources -spec list_all() -> [resource_data()]. list_all() -> - try - [ - data_record_to_external_map(Data) - || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE) - ] - catch - error:badarg -> [] - end. + lists:map( + fun data_record_to_external_map/1, + gproc:select({local, names}, [{{?NAME('_'), '_', '$1'}, [], ['$1']}]) + ). %% @doc Returns a list of ids for all the resources in a group -spec list_group(resource_group()) -> [resource_id()]. list_group(Group) -> - List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), - lists:flatten(List). + Guard = {'==', {element, #data.group, '$1'}, Group}, + gproc:select({local, names}, [{{?NAME('$2'), '_', '$1'}, [Guard], ['$2']}]). -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}. health_check(ResId) -> @@ -286,7 +285,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) -> state = undefined, error = undefined }, - gen_statem:start_link(?MODULE, {Data, Opts}, []). + gen_statem:start_link(?REF(ResId), ?MODULE, {Data, Opts}, []). init({DataIn, Opts}) -> process_flag(trap_exit, true), @@ -306,7 +305,7 @@ terminate({shutdown, removed}, _State, _Data) -> ok; terminate(_Reason, _State, Data) -> _ = maybe_stop_resource(Data), - ok = delete_cache(Data#data.id), + _ = erase_cache(Data), ok. %% Behavior callback @@ -401,9 +400,9 @@ log_state_consistency(State, Data) -> data => Data }). -log_cache_consistency({_, Data}, Data) -> +log_cache_consistency(Data, Data) -> ok; -log_cache_consistency({_, DataCached}, Data) -> +log_cache_consistency(DataCached, Data) -> ?tp(warning, "inconsistent_cache", #{ cache => DataCached, data => Data @@ -412,22 +411,21 @@ log_cache_consistency({_, DataCached}, Data) -> %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ -insert_cache(ResId, Group, Data = #data{}) -> - ets:insert(?ETS_TABLE, {ResId, Group, Data}). +insert_cache(ResId, Data = #data{}) -> + gproc:set_value(?NAME(ResId), Data). read_cache(ResId) -> - case ets:lookup(?ETS_TABLE, ResId) of - [{_Id, Group, Data}] -> {Group, Data}; - [] -> not_found - end. + gproc:lookup_value(?NAME(ResId)). -delete_cache(<> = ResId) -> - true = ets:delete(?ETS_TABLE, {owner, ResId}), - true = ets:delete(?ETS_TABLE, ResId), - ok; -delete_cache(ResId) -> - true = ets:delete(?ETS_TABLE, ResId), - ok. +erase_cache(_Data = #data{id = ResId}) -> + gproc:unreg(?NAME(ResId)). + +try_read_cache(ResId) -> + try + read_cache(ResId) + catch + error:badarg -> not_found + end. retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of @@ -442,12 +440,12 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> _ = stop_resource(Data), - ok = delete_cache(Data#data.id), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, + _ = erase_cache(Data), {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> @@ -552,7 +550,7 @@ update_state(Data) -> update_state(DataWas, DataWas) -> DataWas; update_state(Data, _DataWas) -> - _ = insert_cache(Data#data.id, Data#data.group, Data), + _ = insert_cache(Data#data.id, Data), Data. health_check_interval(Opts) -> @@ -642,10 +640,10 @@ wait_for_ready(ResId, WaitTime) -> do_wait_for_ready(_ResId, 0) -> timeout; do_wait_for_ready(ResId, Retry) -> - case read_cache(ResId) of - {_Group, #data{status = connected}} -> + case try_read_cache(ResId) of + #data{status = connected} -> ok; - {_Group, #data{status = disconnected, error = Err}} -> + #data{status = disconnected, error = Err} -> {error, external_error(Err)}; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), @@ -654,12 +652,7 @@ do_wait_for_ready(ResId, Retry) -> safe_call(ResId, Message, Timeout) -> try - case read_cache(ResId) of - not_found -> - {error, not_found}; - {_, #data{pid = ManagerPid}} -> - gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout}) - end + gen_statem:call(?REF(ResId), Message, {clean_timeout, Timeout}) catch error:badarg -> {error, not_found}; diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index b27c46739..2f442cd56 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -31,9 +31,6 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - TabOpts = [named_table, set, public, {read_concurrency, true}], - _ = ets:new(emqx_resource_manager, TabOpts), - ChildSpecs = [ #{ id => emqx_resource_manager, @@ -44,6 +41,5 @@ init([]) -> modules => [emqx_resource_manager] } ], - SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 34781df6c..6fd5a552e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1055,28 +1055,22 @@ t_list_filter(_) -> ). t_create_dry_run_local(_) -> - ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}), lists:foreach( fun(_) -> create_dry_run_local_succ() end, lists:seq(1, 10) ), - case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of - false -> - %% Sleep to remove flakyness in test case. It take some time for - %% the ETS table to be cleared. - timer:sleep(2000), - [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}); - true -> - ok - end. + ?retry( + 100, + 5, + ?assertEqual( + [], + emqx_resource:list_instances_verbose() + ) + ). create_dry_run_local_succ() -> - case whereis(test_resource) of - undefined -> ok; - Pid -> exit(Pid, kill) - end, ?assertEqual( ok, emqx_resource:create_dry_run_local( @@ -1107,7 +1101,15 @@ t_create_dry_run_local_failed(_) -> ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), - ?assertEqual(ok, Res3). + ?assertEqual(ok, Res3), + ?retry( + 100, + 5, + ?assertEqual( + [], + emqx_resource:list_instances_verbose() + ) + ). t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])), From 90cf1ade74380f2827671752c24333375da9efe3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 2 May 2023 14:39:20 +0300 Subject: [PATCH 5/5] chore: bump application versions * emqx_connector 0.1.22 * emqx_bridge_gcp_pubsub 0.1.1 * emqx_bridge_kafka 0.1.2 * emqx_bridge_pulsar 0.1.1 * emqx_ee_connector 0.1.12 --- apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src | 2 +- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src | 2 +- apps/emqx_connector/src/emqx_connector.app.src | 2 +- lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 86627eb2a..2b3d359d3 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index e5680cfc4..f01a011d1 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index c0a19824c..db55c7032 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.21"}, + {vsn, "0.1.22"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 82f556bdb..baf54eff1 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {applications, [ kernel,