fix(emqx_resource): remove create_opts async_create

This commit is contained in:
EMQ-YangM 2022-02-25 11:18:21 +08:00
parent bf57bf717c
commit f29877bb6a
7 changed files with 26 additions and 54 deletions

View File

@ -224,9 +224,8 @@ create(BridgeId, Conf) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf), #{async_create => true}) of
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf), #{wait_connected => 1000}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason}
@ -271,8 +270,7 @@ recreate(Type, Name) ->
recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf),
#{async_create => true}).
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{wait_connected => 1000}).
create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},

View File

@ -164,7 +164,6 @@ t_http_crud_apis(_) ->
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
%% send an message to emqx and the message should be forwarded to the HTTP server
wait_for_resource_ready(BridgeID, 5),
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
@ -214,7 +213,6 @@ t_http_crud_apis(_) ->
}, jsx:decode(Bridge3Str)),
%% send an message to emqx again, check the path has been changed
wait_for_resource_ready(BridgeID, 5),
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
receive
@ -319,14 +317,3 @@ auth_header_() ->
operation_path(Oper, BridgeID) ->
uri(["bridges", BridgeID, "operation", Oper]).
wait_for_resource_ready(InstId, 0) ->
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
ct:fail(wait_resource_timeout);
wait_for_resource_ready(InstId, Retry) ->
case emqx_bridge:lookup(InstId) of
{ok, #{resource_data := #{status := connected}}} -> ok;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry-1)
end.

View File

@ -30,10 +30,9 @@
}.
-type resource_group() :: binary().
-type create_opts() :: #{
%% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
%% the 'status' of the resource will be 'stopped' in this case.
%% Defaults to 'false'
async_create => boolean()
health_check_interval => integer(),
health_check_timeout => integer(),
wait_connected => integer()
}.
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
undefined.

View File

@ -60,7 +60,7 @@
-export([ restart/1 %% restart the instance.
, restart/2
, health_check/1 %% verify if the resource is working normally
, set_resource_status_disconnected/1 %% set resource status to disconnected
, set_resource_status_connecting/1 %% set resource status to disconnected
, stop/1 %% stop the instance
, query/2 %% query the instance
, query/3 %% query the instance with after_query()
@ -225,8 +225,8 @@ stop(InstId) ->
health_check(InstId) ->
call_instance(InstId, {health_check, InstId}).
set_resource_status_disconnected(InstId) ->
call_instance(InstId, {set_resource_status_disconnected, InstId}).
set_resource_status_connecting(InstId) ->
call_instance(InstId, {set_resource_status_connecting, InstId}).
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(InstId) ->

View File

@ -83,7 +83,7 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
after Timeout ->
emqx_alarm:activate(Name, #{name => Name},
<<Name/binary, " health check timeout">>),
emqx_resource:set_resource_status_disconnected(Name),
emqx_resource:set_resource_status_connecting(Name),
receive
health_check_finish -> timer:sleep(SleepTime)
end

View File

@ -126,8 +126,8 @@ handle_call({stop, InstId}, _From, State) ->
handle_call({health_check, InstId}, _From, State) ->
{reply, do_health_check(InstId), State};
handle_call({set_resource_status_disconnected, InstId}, _From, State) ->
{reply, do_set_resource_status_disconnected(InstId), State};
handle_call({set_resource_status_connecting, InstId}, _From, State) ->
{reply, do_set_resource_status_connecting(InstId), State};
handle_call(Req, _From, State) ->
logger:error("Received unexpected call: ~p", [Req]),
@ -249,28 +249,17 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
status => connecting, state => undefined},
%% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
case maps:get(async_create, Opts, false) of
false ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData);
true ->
spawn(fun() ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end),
ok
end.
spawn(fun() ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end),
ok.
start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} ->
Data2 = Data#{state => ResourceState},
Data2 = Data#{state => ResourceState, status => connected},
ets:insert(emqx_resource_instance, {InstId, Group, Data2}),
case maps:get(async_create, Opts, false) of
false -> case do_health_check(Group, Data2) of
ok -> create_default_checker(InstId, Opts);
{error, Reason} -> {error, Reason}
end;
true -> create_default_checker(InstId, Opts)
end;
create_default_checker(InstId, Opts);
{error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
{error, Reason}
@ -306,15 +295,15 @@ do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Da
{error, Reason, ResourceState1} ->
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
ets:insert(emqx_resource_instance,
{InstId, Group, Data#{status => disconnected, state => ResourceState1}}),
{InstId, Group, Data#{status => connecting, state => ResourceState1}}),
{error, Reason}
end.
do_set_resource_status_disconnected(InstId) ->
do_set_resource_status_connecting(InstId) ->
case emqx_resource_instance:lookup(InstId) of
{ok, Group, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}});
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}});
Error -> {error, Error}
end.

View File

@ -107,7 +107,7 @@ t_create_remove_local(_) ->
?assert(is_process_alive(Pid)),
emqx_resource:set_resource_status_disconnected(?ID),
emqx_resource:set_resource_status_connecting(?ID),
emqx_resource:recreate_local(
?ID,
@ -153,7 +153,7 @@ t_healthy_timeout(_) ->
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"test_resource">>},
#{async_create => true, health_check_timeout => 200}),
#{health_check_timeout => 200}),
timer:sleep(500),
ok = emqx_resource:remove_local(?ID).
@ -163,14 +163,13 @@ t_healthy(_) ->
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"test_resource">>},
#{async_create => true}),
#{name => <<"test_resource">>}),
timer:sleep(400),
emqx_resource_health_check:create_checker(?ID, 15000, 10000),
#{pid := Pid} = emqx_resource:query(?ID, get_state),
timer:sleep(300),
emqx_resource:set_resource_status_disconnected(?ID),
emqx_resource:set_resource_status_connecting(?ID),
ok = emqx_resource:health_check(?ID),
@ -185,7 +184,7 @@ t_healthy(_) ->
emqx_resource:health_check(?ID)),
?assertMatch(
[#{status := disconnected}],
[#{status := connecting}],
emqx_resource:list_instances_verbose()),
ok = emqx_resource:remove_local(?ID).