diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 64ce3e6a4..a7e073581 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -979,7 +979,7 @@ authenticator_examples() -> mechanism => <<"password-based">>, backend => <<"http">>, method => <<"post">>, - url => <<"http://127.0.0.2:8080">>, + url => <<"http://127.0.0.1:18083">>, headers => #{ <<"content-type">> => <<"application/json">> }, diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 6e014f2ec..f27603bf9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -35,6 +35,7 @@ ]). -export([ load/0 + , lookup/1 , lookup/2 , lookup/3 , list/0 @@ -191,6 +192,10 @@ list_bridges_by_connector(ConnectorId) -> [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), ConnectorId =:= Id]. +lookup(Id) -> + {Type, Name} = parse_bridge_id(Id), + lookup(Type, Name). + lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). @@ -218,7 +223,7 @@ create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), #{force_create => true}) of + parse_confs(Type, Name, Conf), #{async_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} @@ -247,7 +252,7 @@ update(Type, Name, {OldConf, Conf}) -> ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" , type => Type, name => Name, config => Conf}), create(Type, Name, Conf); - {error, Reason} -> {update_bridge_failed, Reason} + {error, Reason} -> {error, {update_bridge_failed, Reason}} end; true -> %% we don't need to recreate the bridge if this config change is only to @@ -263,7 +268,8 @@ recreate(Type, Name) -> recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), - emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []). + emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), + #{async_create => true}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 807ad32f6..16da7395f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -160,6 +160,7 @@ t_http_crud_apis(_) -> } = jsx:decode(Bridge), %% send an message to emqx and the message should be forwarded to the HTTP server + wait_for_resource_ready(BridgeID, 5), Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( @@ -212,6 +213,7 @@ t_http_crud_apis(_) -> }, jsx:decode(Bridge3Str)), %% send an message to emqx again, check the path has been changed + wait_for_resource_ready(BridgeID, 5), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( receive @@ -320,3 +322,14 @@ auth_header_() -> operation_path(Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). + +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end. diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 4caf700a9..12a3a8e23 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -241,6 +241,7 @@ t_mqtt_conn_bridge_ingress(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -309,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -370,6 +372,7 @@ t_mqtt_conn_update(_) -> , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -412,6 +415,11 @@ t_mqtt_conn_update2(_) -> , <<"status">> := <<"disconnected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + %% We try to fix the 'server' parameter, to another unavailable server.. + %% The update should success: we don't check the connectivity of the new config + %% if the resource is now disconnected. + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), @@ -444,9 +452,9 @@ t_mqtt_conn_update3(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }), #{ <<"id">> := BridgeIDEgress - , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% delete the connector should fail because it is in use by a bridge {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), @@ -499,6 +507,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic ?assert( @@ -563,6 +572,7 @@ t_egress_mqtt_bridge_with_rules(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic ?assert( @@ -583,6 +593,7 @@ t_egress_mqtt_bridge_with_rules(_) -> RuleTopic = <<"t/1">>, RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, emqx:subscribe(RemoteTopic2), + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(RuleTopic, Payload2)), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId @@ -646,3 +657,13 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index ed1de18cf..363e40a5f 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,7 @@ %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, %% the 'status' of the resource will be 'stopped' in this case. %% Defaults to 'false' - force_create => boolean() + async_create => boolean() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5ec5fd92a..12ae912e8 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -58,6 +58,7 @@ %% Calls to the callback module with current resource state %% They also save the state after the call finished (except query/2,3). -export([ restart/1 %% restart the instance. + , restart/2 , health_check/1 %% verify if the resource is working normally , stop/1 %% stop the instance , query/2 %% query the instance @@ -68,7 +69,6 @@ -export([ call_start/3 %% start the instance , call_health_check/3 %% verify if the resource is working normally , call_stop/3 %% stop the instance - , call_config_merge/4 %% merge the config when updating , call_jsonify/2 ]). @@ -86,12 +86,9 @@ -optional_callbacks([ on_query/4 , on_health_check/2 - , on_config_merge/3 , on_jsonify/1 ]). --callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). - -callback on_jsonify(resource_config()) -> jsx:json_term(). %% when calling emqx_resource:start/1 @@ -169,18 +166,17 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = emqx_resource_instance:make_test_id(), - call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). + call_instance(<>, {create_dry_run, ResourceType, Config}). --spec recreate(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config, Params) -> - cluster_call(recreate_local, [InstId, ResourceType, Config, Params]). +recreate(InstId, ResourceType, Config, Opts) -> + cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]). --spec recreate_local(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(InstId, ResourceType, Config, Params) -> - call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}). +recreate_local(InstId, ResourceType, Config, Opts) -> + call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> @@ -216,7 +212,11 @@ query(InstId, Request, AfterQuery) -> -spec restart(instance_id()) -> ok | {error, Reason :: term()}. restart(InstId) -> - call_instance(InstId, {restart, InstId}). + restart(InstId, #{}). + +-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(InstId, Opts) -> + call_instance(InstId, {restart, InstId, Opts}). -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> @@ -276,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) -> call_stop(InstId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). --spec call_config_merge(module(), resource_config(), resource_config(), term()) -> - resource_config(). -call_config_merge(Mod, OldConfig, NewConfig, Params) -> - case erlang:function_exported(Mod, on_config_merge, 3) of - true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)); - false -> NewConfig - end. - -spec call_jsonify(module(), resource_config()) -> jsx:json_term(). call_jsonify(Mod, Config) -> case erlang:function_exported(Mod, on_jsonify, 1) of @@ -330,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> case check_config(ResourceType, RawConfig) of diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 50b236daa..032ff6999 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -15,29 +15,52 @@ %%-------------------------------------------------------------------- -module(emqx_resource_health_check). --export([child_spec/2]). - --export([start_link/2]). +-export([ start_link/2 + , create_checker/2 + , delete_checker/1 + ]). -export([health_check/2]). -child_spec(Name, Sleep) -> - #{id => {health_check, Name}, +-define(SUP, emqx_resource_health_check_sup). +-define(ID(NAME), {resource_health_check, NAME}). + +child_spec(Name, Sleep) -> + #{id => ?ID(Name), start => {?MODULE, start_link, [Name, Sleep]}, restart => transient, shutdown => 5000, type => worker, modules => [?MODULE]}. -start_link(Name, Sleep) -> +start_link(Name, Sleep) -> Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), {ok, Pid}. -health_check(Name, SleepTime) -> - timer:sleep(SleepTime), - case emqx_resource:health_check(Name) of - ok -> +create_checker(Name, Sleep) -> + create_checker(Name, Sleep, false). + +create_checker(Name, Sleep, Retry) -> + case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of + {ok, _} -> ok; + {error, already_present} -> ok; + {error, {already_started, _}} when Retry == false -> + ok = delete_checker(Name), + create_checker(Name, Sleep, true); + Error -> Error + end. + +delete_checker(Name) -> + case supervisor:terminate_child(?SUP, ?ID(Name)) of + ok -> supervisor:delete_child(?SUP, ?ID(Name)); + Error -> Error + end. + +health_check(Name, SleepTime) -> + case emqx_resource:health_check(Name) of + ok -> emqx_alarm:deactivate(Name); - {error, _} -> - emqx_alarm:activate(Name, #{name => Name}, - <>) + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) end, - health_check(Name, SleepTime). \ No newline at end of file + timer:sleep(SleepTime), + health_check(Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl index 571cd6338..e17186114 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl @@ -19,9 +19,7 @@ -export([start_link/0]). --export([init/1, - create_health_check_process/2, - delete_health_check_process/1]). +-export([init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -29,12 +27,3 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, []}}. - -create_health_check_process(Name, Sleep) -> - supervisor:start_child(emqx_resource_health_check_sup, - emqx_resource_health_check:child_spec(Name, Sleep)). - -delete_health_check_process(Name) -> - _ = supervisor:terminate_child(emqx_resource_health_check_sup, {health_check, Name}), - _ = supervisor:delete_child(emqx_resource_health_check_sup, {health_check, Name}), - ok. \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index b2f2d0d27..86318e355 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -103,17 +103,17 @@ init({Pool, Id}) -> handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> {reply, do_create(InstId, ResourceType, Config, Opts), State}; -handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> - {reply, do_create_dry_run(InstId, ResourceType, Config), State}; +handle_call({create_dry_run, ResourceType, Config}, _From, State) -> + {reply, do_create_dry_run(ResourceType, Config), State}; -handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) -> - {reply, do_recreate(InstId, ResourceType, Config, Params), State}; +handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) -> + {reply, do_recreate(InstId, ResourceType, Config, Opts), State}; handle_call({remove, InstId}, _From, State) -> {reply, do_remove(InstId), State}; -handle_call({restart, InstId}, _From, State) -> - {reply, do_restart(InstId), State}; +handle_call({restart, InstId, Opts}, _From, State) -> + {reply, do_restart(InstId, Opts), State}; handle_call({stop, InstId}, _From, State) -> {reply, do_stop(InstId), State}; @@ -140,25 +140,30 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% suppress the race condition check, as these functions are protected in gproc workers --dialyzer({nowarn_function, [do_recreate/4, - do_create/4, - do_restart/1, - do_stop/1, - do_health_check/1]}). +-dialyzer({nowarn_function, [ do_recreate/4 + , do_create/4 + , do_restart/2 + , do_start/4 + , do_stop/1 + , do_health_check/1 + , start_and_check/5 + ]}). -do_recreate(InstId, ResourceType, NewConfig, Params) -> +do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> - Config = emqx_resource:call_config_merge(ResourceType, OldConfig, - NewConfig, Params), - TestInstId = make_test_id(), - case do_create_dry_run(TestInstId, ResourceType, Config) of + {ok, #{mod := ResourceType, status := started} = Data} -> + %% If this resource is in use (status='started'), we should make sure + %% the new config is OK before removing the old one. + case do_create_dry_run(ResourceType, NewConfig) of ok -> - do_remove(ResourceType, InstId, ResourceState, false), - do_create(InstId, ResourceType, Config, #{force_create => true}); + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts); Error -> Error end; + {ok, #{mod := ResourceType, status := _} = Data} -> + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts); {ok, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> @@ -166,105 +171,96 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> end. do_create(InstId, ResourceType, Config, Opts) -> - ForceCreate = maps:get(force_create, Opts, false), case lookup(InstId) of - {ok, _} -> {ok, already_created}; - _ -> - Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => starting, state => undefined}, - %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Res0}), - case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState} -> - ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), - %% this is the first time we do health check, this will update the - %% status and then do ets:insert/2 - _ = do_health_check(Res0#{state => ResourceState}), - HealthCheckInterval = maps:get(health_check_interval, Opts, 15000), - emqx_resource_health_check_sup:create_health_check_process(InstId, HealthCheckInterval), + {ok, _} -> + {ok, already_created}; + {error, not_found} -> + case do_start(InstId, ResourceType, Config, Opts) of + ok -> + ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; - {error, Reason} when ForceCreate == true -> - logger:error("start ~ts resource ~ts failed: ~p, " - "force_create it as a stopped resource", - [ResourceType, InstId, Reason]), - ets:insert(emqx_resource_instance, {InstId, Res0}), - {ok, Res0}; - {error, Reason} when ForceCreate == false -> - ets:delete(emqx_resource_instance, InstId), - {error, Reason} + Error -> + Error end end. -do_create_dry_run(InstId, ResourceType, Config) -> +do_create_dry_run(ResourceType, Config) -> + InstId = make_test_id(), case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState0} -> - Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of - {ok, ResourceState1} -> ok; - {error, Reason, ResourceState1} -> - {error, Reason} - end, - _ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1), - Return; + {ok, ResourceState} -> + case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of + {ok, _} -> ok; + {error, Reason, _} -> {error, Reason} + end; {error, Reason} -> {error, Reason} end. -do_remove(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState}} -> - do_remove(Mod, InstId, ResourceState); - Error -> - Error - end. +do_remove(Instance) -> + do_remove(Instance, true). -do_remove(Mod, InstId, ResourceState) -> - do_remove(Mod, InstId, ResourceState, true). - -do_remove(Mod, InstId, ResourceState, ClearMetrics) -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), +do_remove(InstId, ClearMetrics) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]); +do_remove(#{id := InstId} = Data, ClearMetrics) -> + _ = do_stop(Data), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok end, - _ = emqx_resource_health_check_sup:delete_health_check_process(InstId), ok. -do_restart(InstId) -> +do_restart(InstId, Opts) -> case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> - _ = case ResourceState of - undefined -> ok; - _ -> emqx_resource:call_stop(InstId, Mod, ResourceState) - end, - case emqx_resource:call_start(InstId, Mod, Config) of - {ok, NewResourceState} -> - ets:insert(emqx_resource_instance, - {InstId, Data#{state => NewResourceState, status => started}}), - ok; - {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), - {error, Reason} - end; + {ok, #{mod := ResourceType, config := Config} = Data} -> + ok = do_stop(Data), + do_start(InstId, ResourceType, Config, Opts); Error -> Error end. -do_stop(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState} = Data} -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), - ok; - Error -> - Error +do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> + InitData = #{id => InstId, mod => ResourceType, config => Config, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, InitData}), + case maps:get(async_create, Opts, false) of + false -> + start_and_check(InstId, ResourceType, Config, Opts, InitData); + true -> + spawn(fun() -> + start_and_check(InstId, ResourceType, Config, Opts, InitData) + end), + ok end. +start_and_check(InstId, ResourceType, Config, Opts, Data) -> + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState} -> + Data2 = Data#{state => ResourceState}, + ets:insert(emqx_resource_instance, {InstId, Data2}), + case maps:get(async_create, Opts, false) of + false -> do_health_check(Data2); + true -> emqx_resource_health_check:create_checker(InstId, + maps:get(health_check_interval, Opts, 15000)) + end; + {error, Reason} -> + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + {error, Reason} + end. + +do_stop(InstId) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_stop/1, []); +do_stop(#{state := undefined}) -> + ok; +do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + _ = emqx_resource_health_check:delete_checker(InstId), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ok. + do_health_check(InstId) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Data} -> do_health_check(Data); - Error -> Error - end; + do_with_instance_data(InstId, fun do_health_check/1, []); do_health_check(#{state := undefined}) -> {error, resource_not_initialized}; do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> @@ -284,6 +280,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> %% internal functions %%------------------------------------------------------------------------------ +do_with_instance_data(InstId, Do, Args) -> + case lookup(InstId) of + {ok, Data} -> erlang:apply(Do, [Data | Args]); + Error -> Error + end. + proc_name(Mod, Id) -> list_to_atom(lists:concat([Mod, "_", Id])). diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index b5655e301..99d601ec4 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -49,7 +49,7 @@ init([]) -> #{id => emqx_resource_health_check_sup, start => {emqx_resource_health_check_sup, start_link, []}, restart => transient, - shutdown => 5000, type => supervisor, modules => [emqx_resource_health_check_sup]}, + shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]}, {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. %% internal functions