refactor(resource): support async create mode

This commit is contained in:
Shawn 2022-01-02 20:11:25 +08:00
parent 2277b75b2f
commit e1ab331a30
4 changed files with 42 additions and 31 deletions

View File

@ -979,7 +979,7 @@ authenticator_examples() ->
mechanism => <<"password-based">>,
backend => <<"http">>,
method => <<"post">>,
url => <<"http://127.0.0.2:8080">>,
url => <<"http://127.0.0.1:18083">>,
headers => #{
<<"content-type">> => <<"application/json">>
},

View File

@ -218,7 +218,7 @@ create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf), #{force_create => true}) of
parse_confs(Type, Name, Conf), #{async_create => true}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason}
@ -263,7 +263,8 @@ recreate(Type, Name) ->
recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf),
#{async_create => true}).
create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},

View File

@ -33,7 +33,7 @@
%% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
%% the 'status' of the resource will be 'stopped' in this case.
%% Defaults to 'false'
force_create => boolean()
async_create => boolean()
}.
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
undefined.

View File

@ -148,14 +148,19 @@ code_change(_OldVsn, State, _Extra) ->
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
case lookup(InstId) of
{ok, #{mod := ResourceType} = Data} ->
{ok, #{mod := ResourceType, status := started} = Data} ->
%% If this resource is in use (status='started'), we should make sure
%% the new config is OK before removing the old one.
case do_create_dry_run(ResourceType, NewConfig) of
ok ->
do_remove(Data, false),
do_create(InstId, ResourceType, NewConfig, Opts#{force_create => true});
do_create(InstId, ResourceType, NewConfig, Opts);
Error ->
Error
end;
{ok, #{mod := ResourceType, status := _} = Data} ->
do_remove(Data, false),
do_create(InstId, ResourceType, NewConfig, Opts);
{ok, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type};
{error, not_found} ->
@ -164,21 +169,21 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) ->
do_create(InstId, ResourceType, Config, Opts) ->
case lookup(InstId) of
{ok, _} -> {ok, already_created};
{ok, _} ->
{ok, already_created};
{error, not_found} ->
case do_start(InstId, ResourceType, Config, Opts) of
ok ->
ok = emqx_resource_health_check_sup:create_checker(InstId,
maps:get(health_check_interval, Opts, 15000)),
ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
{ok, force_lookup(InstId)};
Error -> Error
Error ->
Error
end
end.
do_create_dry_run(ResourceType, Config) ->
InstId = make_test_id(),
Opts = #{force_create => false},
Opts = #{async_create => false},
case do_create(InstId, ResourceType, Config, Opts) of
{ok, Data} ->
Return = do_health_check(Data),
@ -200,7 +205,6 @@ do_remove(#{id := InstId} = Data, ClearMetrics) ->
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
false -> ok
end,
_ = emqx_resource_health_check_sup:delete_checker(InstId),
ok.
do_restart(InstId, Opts) ->
@ -213,24 +217,32 @@ do_restart(InstId, Opts) ->
end.
do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) ->
ForceCreate = maps:get(force_create, Opts, false),
Res0 = #{id => InstId, mod => ResourceType, config => Config,
status => starting, state => undefined},
InitData = #{id => InstId, mod => ResourceType, config => Config,
status => starting, state => undefined},
%% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, Res0}),
ets:insert(emqx_resource_instance, {InstId, InitData}),
case maps:get(async_create, Opts, false) of
false ->
start_and_check(InstId, ResourceType, Config, Opts, InitData);
true ->
spawn(fun() ->
start_and_check(InstId, ResourceType, Config, Opts, InitData)
end),
ok
end.
start_and_check(InstId, ResourceType, Config, Opts, Data) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} ->
%% this is the first time we do health check, this will update the
%% status and then do ets:insert/2
_ = do_health_check(Res0#{state => ResourceState}),
ok;
{error, Reason} when ForceCreate == true ->
logger:warning("start ~ts resource ~ts failed: ~p, force_create it",
[ResourceType, InstId, Reason]),
ets:insert(emqx_resource_instance, {InstId, Res0}),
ok;
{error, Reason} when ForceCreate == false ->
ets:delete(emqx_resource_instance, InstId),
Data2 = Data#{state => ResourceState},
ets:insert(emqx_resource_instance, {InstId, Data2}),
case maps:get(async_create, Opts, false) of
false -> do_health_check(Data2);
true -> emqx_resource_health_check_sup:create_checker(InstId,
maps:get(health_check_interval, Opts, 15000))
end;
{error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
{error, Reason}
end.
@ -240,14 +252,12 @@ do_stop(#{state := undefined}) ->
ok;
do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
ok = emqx_resource_health_check_sup:delete_checker(InstId),
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
ok.
do_health_check(InstId) when is_binary(InstId) ->
case lookup(InstId) of
{ok, Data} -> do_health_check(Data);
Error -> Error
end;
do_with_instance_data(InstId, fun do_health_check/1, []);
do_health_check(#{state := undefined}) ->
{error, resource_not_initialized};
do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->