diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 64ce3e6a4..a7e073581 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -979,7 +979,7 @@ authenticator_examples() -> mechanism => <<"password-based">>, backend => <<"http">>, method => <<"post">>, - url => <<"http://127.0.0.2:8080">>, + url => <<"http://127.0.0.1:18083">>, headers => #{ <<"content-type">> => <<"application/json">> }, diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 6e014f2ec..b7a74c9a4 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -218,7 +218,7 @@ create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), #{force_create => true}) of + parse_confs(Type, Name, Conf), #{async_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} @@ -263,7 +263,8 @@ recreate(Type, Name) -> recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), - emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []). + emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), + #{async_create => true}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index ed1de18cf..363e40a5f 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,7 @@ %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, %% the 'status' of the resource will be 'stopped' in this case. %% Defaults to 'false' - force_create => boolean() + async_create => boolean() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 2701b70d6..4ac72ca4d 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -148,14 +148,19 @@ code_change(_OldVsn, State, _Extra) -> do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType} = Data} -> + {ok, #{mod := ResourceType, status := started} = Data} -> + %% If this resource is in use (status='started'), we should make sure + %% the new config is OK before removing the old one. case do_create_dry_run(ResourceType, NewConfig) of ok -> do_remove(Data, false), - do_create(InstId, ResourceType, NewConfig, Opts#{force_create => true}); + do_create(InstId, ResourceType, NewConfig, Opts); Error -> Error end; + {ok, #{mod := ResourceType, status := _} = Data} -> + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts); {ok, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> @@ -164,21 +169,21 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) -> do_create(InstId, ResourceType, Config, Opts) -> case lookup(InstId) of - {ok, _} -> {ok, already_created}; + {ok, _} -> + {ok, already_created}; {error, not_found} -> case do_start(InstId, ResourceType, Config, Opts) of ok -> - ok = emqx_resource_health_check_sup:create_checker(InstId, - maps:get(health_check_interval, Opts, 15000)), ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; - Error -> Error + Error -> + Error end end. do_create_dry_run(ResourceType, Config) -> InstId = make_test_id(), - Opts = #{force_create => false}, + Opts = #{async_create => false}, case do_create(InstId, ResourceType, Config, Opts) of {ok, Data} -> Return = do_health_check(Data), @@ -200,7 +205,6 @@ do_remove(#{id := InstId} = Data, ClearMetrics) -> true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok end, - _ = emqx_resource_health_check_sup:delete_checker(InstId), ok. do_restart(InstId, Opts) -> @@ -213,24 +217,32 @@ do_restart(InstId, Opts) -> end. do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> - ForceCreate = maps:get(force_create, Opts, false), - Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => starting, state => undefined}, + InitData = #{id => InstId, mod => ResourceType, config => Config, + status => starting, state => undefined}, %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Res0}), + ets:insert(emqx_resource_instance, {InstId, InitData}), + case maps:get(async_create, Opts, false) of + false -> + start_and_check(InstId, ResourceType, Config, Opts, InitData); + true -> + spawn(fun() -> + start_and_check(InstId, ResourceType, Config, Opts, InitData) + end), + ok + end. + +start_and_check(InstId, ResourceType, Config, Opts, Data) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> - %% this is the first time we do health check, this will update the - %% status and then do ets:insert/2 - _ = do_health_check(Res0#{state => ResourceState}), - ok; - {error, Reason} when ForceCreate == true -> - logger:warning("start ~ts resource ~ts failed: ~p, force_create it", - [ResourceType, InstId, Reason]), - ets:insert(emqx_resource_instance, {InstId, Res0}), - ok; - {error, Reason} when ForceCreate == false -> - ets:delete(emqx_resource_instance, InstId), + Data2 = Data#{state => ResourceState}, + ets:insert(emqx_resource_instance, {InstId, Data2}), + case maps:get(async_create, Opts, false) of + false -> do_health_check(Data2); + true -> emqx_resource_health_check_sup:create_checker(InstId, + maps:get(health_check_interval, Opts, 15000)) + end; + {error, Reason} -> + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), {error, Reason} end. @@ -240,14 +252,12 @@ do_stop(#{state := undefined}) -> ok; do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + ok = emqx_resource_health_check_sup:delete_checker(InstId), ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), ok. do_health_check(InstId) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Data} -> do_health_check(Data); - Error -> Error - end; + do_with_instance_data(InstId, fun do_health_check/1, []); do_health_check(#{state := undefined}) -> {error, resource_not_initialized}; do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->