refactor: code format emqx_connector emqx_resource

This commit is contained in:
DDDHuang 2022-04-28 10:11:01 +08:00
parent 667da90e52
commit 132b37813c
12 changed files with 132 additions and 93 deletions

View File

@ -25,11 +25,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
-type url() :: emqx_http_lib:uri_map(). -type url() :: emqx_http_lib:uri_map().
-reflect_type([url/0]). -reflect_type([url/0]).
@ -307,11 +308,15 @@ on_query(
on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) -> on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) ->
case do_get_status(Host, Port, Timeout) of case do_get_status(Host, Port, Timeout) of
ok -> connected; ok ->
connected;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "http_connector_get_status_failed", ?SLOG(error, #{
reason => Reason, msg => "http_connector_get_status_failed",
host => Host, port => Port}), reason => Reason,
host => Host,
port => Port
}),
disconnected disconnected
end. end.

View File

@ -24,11 +24,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
-export([connect/1]). -export([connect/1]).
@ -87,7 +88,7 @@ on_start(
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.

View File

@ -24,11 +24,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
%% ecpool callback %% ecpool callback
-export([connect/1]). -export([connect/1]).
@ -225,12 +226,16 @@ on_query(
on_get_status(InstId, #{poolname := PoolName} = _State) -> on_get_status(InstId, #{poolname := PoolName} = _State) ->
case health_check(PoolName) of case health_check(PoolName) of
true -> true ->
?tp(debug, emqx_connector_mongo_health_check, #{instance_id => InstId, ?tp(debug, emqx_connector_mongo_health_check, #{
status => ok}), instance_id => InstId,
status => ok
}),
connected; connected;
false -> false ->
?tp(warning, emqx_connector_mongo_health_check, #{instance_id => InstId, ?tp(warning, emqx_connector_mongo_health_check, #{
status => failed}), instance_id => InstId,
status => failed
}),
disconnected disconnected
end. end.
@ -247,24 +252,30 @@ check_worker_health(Worker) ->
%% we don't care if this returns something or not, we just to test the connection %% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of try do_test_query(Conn) of
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{msg => "mongo_connection_get_status_error", ?SLOG(warning, #{
worker => Worker, msg => "mongo_connection_get_status_error",
reason => Reason}), worker => Worker,
reason => Reason
}),
false; false;
_ -> _ ->
true true
catch catch
Class:Error -> Class:Error ->
?SLOG(warning, #{msg => "mongo_connection_get_status_exception", ?SLOG(warning, #{
worker => Worker, msg => "mongo_connection_get_status_exception",
class => Class, worker => Worker,
error => Error}), class => Class,
error => Error
}),
false false
end; end;
_ -> _ ->
?SLOG(warning, #{msg => "mongo_connection_get_status_error", ?SLOG(warning, #{
worker => Worker, msg => "mongo_connection_get_status_error",
reason => worker_not_found}), worker => Worker,
reason => worker_not_found
}),
false false
end. end.

View File

@ -34,11 +34,12 @@
-export([on_message_received/3]). -export([on_message_received/3]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
-behaviour(hocon_schema). -behaviour(hocon_schema).

View File

@ -23,11 +23,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
%% ecpool connect & reconnect %% ecpool connect & reconnect
-export([connect/1, prepare_sql_to_conn/2]). -export([connect/1, prepare_sql_to_conn/2]).
@ -99,7 +100,7 @@ on_start(
Prepares = maps:get(prepare_statement, Config, #{}), Prepares = maps:get(prepare_statement, Config, #{}),
State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn}, State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn},
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> {ok, init_prepare(State)}; ok -> {ok, init_prepare(State)};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.

View File

@ -26,11 +26,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
-export([connect/1]). -export([connect/1]).
@ -104,7 +105,7 @@ on_start(
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.

View File

@ -26,11 +26,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
-export([do_get_status/1]). -export([do_get_status/1]).
@ -150,13 +151,15 @@ on_start(
case Type of case Type of
cluster -> cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
{ok, _} -> {ok, State}; {ok, _} -> {ok, State};
{ok, _, _} -> {ok, State}; {ok, _, _} -> {ok, State};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end; end;
_ -> _ ->
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of case
ok -> {ok, State}; emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}])
of
ok -> {ok, State};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end end
end. end.
@ -220,10 +223,8 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect :
false -> false ->
disconnect disconnect
end; end;
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
do_get_status(Conn) -> do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of case eredis:q(Conn, ["PING"]) of

View File

@ -77,9 +77,11 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
), ),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, {ok, ?CONNECTOR_RESOURCE_GROUP, #{
status := InitialStatus}} state := State,
= emqx_resource:get_instance(PoolName), status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected % % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),

View File

@ -75,7 +75,8 @@ get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) ->
|| {_WorkerName, Worker} <- ecpool:workers(PoolName) || {_WorkerName, Worker} <- ecpool:workers(PoolName)
], ],
case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
true -> connected; true ->
connected;
false -> false ->
case AutoReconn of case AutoReconn of
true -> true ->

View File

@ -85,22 +85,33 @@
]). ]).
%% Direct calls to the callback module %% Direct calls to the callback module
-export([ call_start/3 %% start the instance
, call_health_check/3 %% verify if the resource is working normally
, call_stop/3 %% stop the instance
]).
-export([ list_instances/0 %% list all the instances, id only. %% start the instance
, list_instances_verbose/0 %% list all the instances -export([
, get_instance/1 %% return the data of the instance call_start/3,
, list_instances_by_type/1 %% return all the instances of the same resource type %% verify if the resource is working normally
, generate_id/1 call_health_check/3,
, list_group_instances/1 %% stop the instance
]). call_stop/3
]).
-optional_callbacks([ on_query/4 %% list all the instances, id only.
, on_get_status/2 -export([
]). list_instances/0,
%% list all the instances
list_instances_verbose/0,
%% return the data of the instance
get_instance/1,
%% return all the instances of the same resource type
list_instances_by_type/1,
generate_id/1,
list_group_instances/1
]).
-optional_callbacks([
on_query/4,
on_get_status/2
]).
%% when calling emqx_resource:start/1 %% when calling emqx_resource:start/1
-callback on_start(instance_id(), resource_config()) -> -callback on_start(instance_id(), resource_config()) ->
@ -114,8 +125,8 @@
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_get_status(instance_id(), resource_state()) -> -callback on_get_status(instance_id(), resource_state()) ->
resource_connection_status() | resource_connection_status()
{resource_connection_status(), resource_state()}. | {resource_connection_status(), resource_state()}.
-spec list_types() -> [module()]. -spec list_types() -> [module()].
list_types() -> list_types() ->
@ -304,8 +315,8 @@ call_start(InstId, Mod, Config) ->
?SAFE_CALL(Mod:on_start(InstId, Config)). ?SAFE_CALL(Mod:on_start(InstId, Config)).
-spec call_health_check(instance_id(), module(), resource_state()) -> -spec call_health_check(instance_id(), module(), resource_state()) ->
resource_connection_status() | resource_connection_status()
{resource_connection_status(), resource_state()}. | {resource_connection_status(), resource_state()}.
call_health_check(InstId, Mod, ResourceState) -> call_health_check(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)).

View File

@ -307,8 +307,10 @@ do_health_check(InstId) when is_binary(InstId) ->
do_health_check(_Group, #{state := undefined}) -> do_health_check(_Group, #{state := undefined}) ->
{error, resource_not_initialized}; {error, resource_not_initialized};
do_health_check(Group, do_health_check(
#{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data) -> Group,
#{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data
) ->
case emqx_resource:call_health_check(InstId, Mod, ResourceState) of case emqx_resource:call_health_check(InstId, Mod, ResourceState) of
{NewConnStatus, NewResourceState} -> {NewConnStatus, NewResourceState} ->
NData = Data#{status => NewConnStatus, state => NewResourceState}, NData = Data#{status => NewConnStatus, state => NewResourceState},
@ -333,7 +335,8 @@ do_set_resource_status_connecting(InstId) ->
{ok, Group, #{id := InstId} = Data} -> {ok, Group, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]), logger:error("health check for ~p failed: timeout", [InstId]),
update_resource(InstId, Group, Data#{status => connecting}); update_resource(InstId, Group, Data#{status => connecting});
Error -> {error, Error} Error ->
{error, Error}
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -21,11 +21,12 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([
, on_stop/2 on_start/2,
, on_query/4 on_stop/2,
, on_get_status/2 on_query/4,
]). on_get_status/2
]).
%% callbacks for emqx_resource config schema %% callbacks for emqx_resource config schema
-export([roots/0]). -export([roots/0]).