refactor(connnector): rename waiting_connect_complete -> wait_for_resource_ready

Rename the option to wait_for_resource_ready and defaults to 5s.
This commit is contained in:
Shawn 2022-03-10 10:46:47 +08:00
parent a38cac0233
commit 1d023b541f
21 changed files with 37 additions and 26 deletions

View File

@ -134,7 +134,7 @@ create(#{method := Method,
emqx_connector_http,
Config#{base_url => maps:remove(query, URIMap),
pool_type => random},
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -116,7 +116,7 @@ create(#{selector := Selector} = Config) ->
?RESOURCE_GROUP,
emqx_connector_mongo,
Config,
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->

View File

@ -85,7 +85,7 @@ create(#{password_hash_algorithm := Algorithm,
?RESOURCE_GROUP,
emqx_connector_mysql,
Config,
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -81,7 +81,7 @@ create(#{query := Query0,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql,
Config#{named_queries => #{ResourceId => Query}},
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -93,7 +93,7 @@ create(#{cmd := Cmd,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP,
emqx_connector_redis, Config,
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->

View File

@ -63,7 +63,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP,
emqx_connector_mysql,
mysql_config(),
#{waiting_connect_complete => 5000}),
#{}),
Config;
false ->
{skip, no_mysql}

View File

@ -64,7 +64,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP,
emqx_connector_pgsql,
pgsql_config(),
#{waiting_connect_complete => 5000}),
#{}),
Config;
false ->
{skip, no_pgsql}

View File

@ -63,7 +63,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP,
emqx_connector_redis,
redis_config(),
#{waiting_connect_complete => 5000}),
#{}),
Config;
false ->
{skip, no_redis}

View File

@ -56,7 +56,7 @@ init(#{query := SQL0} = Source) ->
?RESOURCE_GROUP,
emqx_connector_pgsql,
Source#{named_queries => #{ResourceID => SQL}},
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, _} ->
Source#{annotations =>
#{id => ResourceID,

View File

@ -38,7 +38,7 @@ create_resource(Module, Config) ->
case emqx_resource:create_local(ResourceID,
?RESOURCE_GROUP,
Module, Config,
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} -> {ok, ResourceID};
{ok, _} -> {ok, ResourceID};
{error, Reason} -> {error, Reason}

View File

@ -45,7 +45,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP,
emqx_connector_mysql,
mysql_config(),
#{waiting_connect_complete => 5000}),
#{}),
Config;
false ->
{skip, no_mysql}

View File

@ -45,7 +45,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP,
emqx_connector_pgsql,
pgsql_config(),
#{waiting_connect_complete => 5000}),
#{}),
Config;
false ->
{skip, no_pgsql}

View File

@ -46,7 +46,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP,
emqx_connector_redis,
redis_config(),
#{waiting_connect_complete => 5000}),
#{}),
Config;
false ->
{skip, no_redis}

View File

@ -225,7 +225,7 @@ create(Type, Name, Conf) ->
<<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason}
@ -272,7 +272,7 @@ recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{waiting_connect_complete => 5000}).
#{}).
create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},

View File

@ -79,8 +79,14 @@ init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
ok.
clear_resources() ->
lists:foreach(fun(#{type := Type, name := Name}) ->
ok = emqx_bridge:remove(Type, Name)
end, emqx_bridge:list()).
%%------------------------------------------------------------------------------
%% HTTP server for testing
%%------------------------------------------------------------------------------

View File

@ -71,7 +71,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?CONNECTOR_RESOURCE_GROUP,
?MYSQL_RESOURCE_MOD,
CheckedConfig,
#{waiting_connect_complete => 5000}
#{}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource

View File

@ -72,7 +72,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?CONNECTOR_RESOURCE_GROUP,
?PGSQL_RESOURCE_MOD,
CheckedConfig,
#{waiting_connect_complete => 5000}
#{}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource

View File

@ -86,7 +86,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
?CONNECTOR_RESOURCE_GROUP,
?REDIS_RESOURCE_MOD,
CheckedConfig,
#{waiting_connect_complete => 5000}
#{}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource

View File

@ -157,6 +157,8 @@ init(Name) ->
persistent_term:put(?CntrRef(Name), #{}),
{ok, #state{}}.
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
{reply, #{}, State};
handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
{reply, case maps:get(Id, Rates, undefined) of
undefined -> #{};

View File

@ -178,14 +178,17 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) ->
{error, not_found}
end.
wait_for_resource_ready(InstId, 0) ->
force_lookup(InstId);
wait_for_resource_ready(InstId, Retry) ->
wait_for_resource_ready(InstId, WaitTime) ->
do_wait_for_resource_ready(InstId, WaitTime div 100).
do_wait_for_resource_ready(_InstId, 0) ->
timeout;
do_wait_for_resource_ready(InstId, Retry) ->
case force_lookup(InstId) of
#{status := connected} = Data -> Data;
#{status := connected} -> ok;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry-1)
do_wait_for_resource_ready(InstId, Retry-1)
end.
do_create(InstId, Group, ResourceType, Config, Opts) ->
@ -197,8 +200,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) ->
ok ->
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
[matched, success, failed, exception], [matched]),
WaitTime = maps:get(waiting_connect_complete , Opts, 0),
{ok, wait_for_resource_ready(InstId, WaitTime div 100)};
{ok, force_lookup(InstId)};
Error ->
Error
end
@ -252,6 +254,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
spawn(fun() ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end),
_ = wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
ok.
start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->

View File

@ -361,7 +361,7 @@ create_resource(Context, #{type := DB} = Config) ->
<<"emqx_retainer">>,
list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
Config,
#{waiting_connect_complete => 5000}) of
#{}) of
{ok, already_created} ->
Context#{resource_id => ResourceID};
{ok, _} ->