From f29877bb6a17dd2b0615b038c9b59b9913aeb54d Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 25 Feb 2022 11:18:21 +0800 Subject: [PATCH] fix(emqx_resource): remove create_opts async_create --- apps/emqx_bridge/src/emqx_bridge.erl | 8 ++--- .../test/emqx_bridge_api_SUITE.erl | 13 -------- apps/emqx_resource/include/emqx_resource.hrl | 7 ++-- apps/emqx_resource/src/emqx_resource.erl | 6 ++-- .../src/emqx_resource_health_check.erl | 2 +- .../src/emqx_resource_instance.erl | 33 +++++++------------ .../test/emqx_resource_SUITE.erl | 11 +++---- 7 files changed, 26 insertions(+), 54 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index e56b694d5..2d40c997b 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -224,9 +224,8 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), - case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, - emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), #{async_create => true}) of + case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type), + parse_confs(Type, Name, Conf), #{wait_connected => 1000}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} @@ -271,8 +270,7 @@ recreate(Type, Name) -> recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), - emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), - #{async_create => true}). + emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{wait_connected => 1000}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 65a369a3d..47dc55f6d 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -164,7 +164,6 @@ t_http_crud_apis(_) -> BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% send an message to emqx and the message should be forwarded to the HTTP server - wait_for_resource_ready(BridgeID, 5), Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( @@ -214,7 +213,6 @@ t_http_crud_apis(_) -> }, jsx:decode(Bridge3Str)), %% send an message to emqx again, check the path has been changed - wait_for_resource_ready(BridgeID, 5), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( receive @@ -319,14 +317,3 @@ auth_header_() -> operation_path(Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). - -wait_for_resource_ready(InstId, 0) -> - ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), - ct:fail(wait_resource_timeout); -wait_for_resource_ready(InstId, Retry) -> - case emqx_bridge:lookup(InstId) of - {ok, #{resource_data := #{status := connected}}} -> ok; - _ -> - timer:sleep(100), - wait_for_resource_ready(InstId, Retry-1) - end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 78aae4313..cd10f9fa4 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -30,10 +30,9 @@ }. -type resource_group() :: binary(). -type create_opts() :: #{ - %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, - %% the 'status' of the resource will be 'stopped' in this case. - %% Defaults to 'false' - async_create => boolean() + health_check_interval => integer(), + health_check_timeout => integer(), + wait_connected => integer() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 770ba8fa1..5957222cb 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -60,7 +60,7 @@ -export([ restart/1 %% restart the instance. , restart/2 , health_check/1 %% verify if the resource is working normally - , set_resource_status_disconnected/1 %% set resource status to disconnected + , set_resource_status_connecting/1 %% set resource status to disconnected , stop/1 %% stop the instance , query/2 %% query the instance , query/3 %% query the instance with after_query() @@ -225,8 +225,8 @@ stop(InstId) -> health_check(InstId) -> call_instance(InstId, {health_check, InstId}). -set_resource_status_disconnected(InstId) -> - call_instance(InstId, {set_resource_status_disconnected, InstId}). +set_resource_status_connecting(InstId) -> + call_instance(InstId, {set_resource_status_connecting, InstId}). -spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 801c9d02f..6d4abc9cc 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -83,7 +83,7 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) -> after Timeout -> emqx_alarm:activate(Name, #{name => Name}, <>), - emqx_resource:set_resource_status_disconnected(Name), + emqx_resource:set_resource_status_connecting(Name), receive health_check_finish -> timer:sleep(SleepTime) end diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index e08c57467..a3631d337 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -126,8 +126,8 @@ handle_call({stop, InstId}, _From, State) -> handle_call({health_check, InstId}, _From, State) -> {reply, do_health_check(InstId), State}; -handle_call({set_resource_status_disconnected, InstId}, _From, State) -> - {reply, do_set_resource_status_disconnected(InstId), State}; +handle_call({set_resource_status_connecting, InstId}, _From, State) -> + {reply, do_set_resource_status_connecting(InstId), State}; handle_call(Req, _From, State) -> logger:error("Received unexpected call: ~p", [Req]), @@ -249,28 +249,17 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) -> status => connecting, state => undefined}, %% The `emqx_resource:call_start/3` need the instance exist beforehand ets:insert(emqx_resource_instance, {InstId, Group, InitData}), - case maps:get(async_create, Opts, false) of - false -> - start_and_check(InstId, Group, ResourceType, Config, Opts, InitData); - true -> - spawn(fun() -> - start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) - end), - ok - end. + spawn(fun() -> + start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) + end), + ok. start_and_check(InstId, Group, ResourceType, Config, Opts, Data) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> - Data2 = Data#{state => ResourceState}, + Data2 = Data#{state => ResourceState, status => connected}, ets:insert(emqx_resource_instance, {InstId, Group, Data2}), - case maps:get(async_create, Opts, false) of - false -> case do_health_check(Group, Data2) of - ok -> create_default_checker(InstId, Opts); - {error, Reason} -> {error, Reason} - end; - true -> create_default_checker(InstId, Opts) - end; + create_default_checker(InstId, Opts); {error, Reason} -> ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), {error, Reason} @@ -306,15 +295,15 @@ do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Da {error, Reason, ResourceState1} -> logger:error("health check for ~p failed: ~p", [InstId, Reason]), ets:insert(emqx_resource_instance, - {InstId, Group, Data#{status => disconnected, state => ResourceState1}}), + {InstId, Group, Data#{status => connecting, state => ResourceState1}}), {error, Reason} end. -do_set_resource_status_disconnected(InstId) -> +do_set_resource_status_connecting(InstId) -> case emqx_resource_instance:lookup(InstId) of {ok, Group, #{id := InstId} = Data} -> logger:error("health check for ~p failed: timeout", [InstId]), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}); + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}}); Error -> {error, Error} end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index f0a2efcf8..4a40ce55a 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -107,7 +107,7 @@ t_create_remove_local(_) -> ?assert(is_process_alive(Pid)), - emqx_resource:set_resource_status_disconnected(?ID), + emqx_resource:set_resource_status_connecting(?ID), emqx_resource:recreate_local( ?ID, @@ -153,7 +153,7 @@ t_healthy_timeout(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => <<"test_resource">>}, - #{async_create => true, health_check_timeout => 200}), + #{health_check_timeout => 200}), timer:sleep(500), ok = emqx_resource:remove_local(?ID). @@ -163,14 +163,13 @@ t_healthy(_) -> ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, - #{name => <<"test_resource">>}, - #{async_create => true}), + #{name => <<"test_resource">>}), timer:sleep(400), emqx_resource_health_check:create_checker(?ID, 15000, 10000), #{pid := Pid} = emqx_resource:query(?ID, get_state), timer:sleep(300), - emqx_resource:set_resource_status_disconnected(?ID), + emqx_resource:set_resource_status_connecting(?ID), ok = emqx_resource:health_check(?ID), @@ -185,7 +184,7 @@ t_healthy(_) -> emqx_resource:health_check(?ID)), ?assertMatch( - [#{status := disconnected}], + [#{status := connecting}], emqx_resource:list_instances_verbose()), ok = emqx_resource:remove_local(?ID).