From 132b37813cf5553ee6806277a52e19e15204a264 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 28 Apr 2022 10:11:01 +0800 Subject: [PATCH] refactor: code format emqx_connector emqx_resource --- .../src/emqx_connector_http.erl | 23 +++++---- .../src/emqx_connector_ldap.erl | 13 ++--- .../src/emqx_connector_mongo.erl | 49 ++++++++++++------- .../src/emqx_connector_mqtt.erl | 11 +++-- .../src/emqx_connector_mysql.erl | 13 ++--- .../src/emqx_connector_pgsql.erl | 13 ++--- .../src/emqx_connector_redis.erl | 25 +++++----- .../test/emqx_connector_mysql_SUITE.erl | 8 +-- .../src/emqx_plugin_libs_pool.erl | 3 +- apps/emqx_resource/src/emqx_resource.erl | 47 +++++++++++------- .../src/emqx_resource_instance.erl | 9 ++-- .../emqx_resource/test/emqx_test_resource.erl | 11 +++-- 12 files changed, 132 insertions(+), 93 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index d2edc5fca..956cdb840 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -25,11 +25,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). -type url() :: emqx_http_lib:uri_map(). -reflect_type([url/0]). @@ -307,11 +308,15 @@ on_query( on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) -> case do_get_status(Host, Port, Timeout) of - ok -> connected; + ok -> + connected; {error, Reason} -> - ?SLOG(error, #{msg => "http_connector_get_status_failed", - reason => Reason, - host => Host, port => Port}), + ?SLOG(error, #{ + msg => "http_connector_get_status_failed", + reason => Reason, + host => Host, + port => Port + }), disconnected end. diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 4214380b3..195aa89a9 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -24,11 +24,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). -export([connect/1]). @@ -87,7 +88,7 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of - ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; + ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index a9d660545..f2ecab8b1 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -24,11 +24,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). %% ecpool callback -export([connect/1]). @@ -225,12 +226,16 @@ on_query( on_get_status(InstId, #{poolname := PoolName} = _State) -> case health_check(PoolName) of true -> - ?tp(debug, emqx_connector_mongo_health_check, #{instance_id => InstId, - status => ok}), + ?tp(debug, emqx_connector_mongo_health_check, #{ + instance_id => InstId, + status => ok + }), connected; false -> - ?tp(warning, emqx_connector_mongo_health_check, #{instance_id => InstId, - status => failed}), + ?tp(warning, emqx_connector_mongo_health_check, #{ + instance_id => InstId, + status => failed + }), disconnected end. @@ -247,24 +252,30 @@ check_worker_health(Worker) -> %% we don't care if this returns something or not, we just to test the connection try do_test_query(Conn) of {error, Reason} -> - ?SLOG(warning, #{msg => "mongo_connection_get_status_error", - worker => Worker, - reason => Reason}), + ?SLOG(warning, #{ + msg => "mongo_connection_get_status_error", + worker => Worker, + reason => Reason + }), false; _ -> true catch Class:Error -> - ?SLOG(warning, #{msg => "mongo_connection_get_status_exception", - worker => Worker, - class => Class, - error => Error}), + ?SLOG(warning, #{ + msg => "mongo_connection_get_status_exception", + worker => Worker, + class => Class, + error => Error + }), false end; _ -> - ?SLOG(warning, #{msg => "mongo_connection_get_status_error", - worker => Worker, - reason => worker_not_found}), + ?SLOG(warning, #{ + msg => "mongo_connection_get_status_error", + worker => Worker, + reason => worker_not_found + }), false end. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index ab95285f4..6182881e2 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -34,11 +34,12 @@ -export([on_message_received/3]). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). -behaviour(hocon_schema). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 8aba5ec00..802fdec5f 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -23,11 +23,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). %% ecpool connect & reconnect -export([connect/1, prepare_sql_to_conn/2]). @@ -99,7 +100,7 @@ on_start( Prepares = maps:get(prepare_statement, Config, #{}), State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn}, case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, init_prepare(State)}; + ok -> {ok, init_prepare(State)}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 72f85955f..85d3d327e 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -26,11 +26,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). -export([connect/1]). @@ -104,7 +105,7 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; + ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index ca7bb12ea..147cdc601 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -26,11 +26,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). -export([do_get_status/1]). @@ -150,13 +151,15 @@ on_start( case Type of cluster -> case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of - {ok, _} -> {ok, State}; - {ok, _, _} -> {ok, State}; + {ok, _} -> {ok, State}; + {ok, _, _} -> {ok, State}; {error, Reason} -> {error, Reason} end; _ -> - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of - ok -> {ok, State}; + case + emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) + of + ok -> {ok, State}; {error, Reason} -> {error, Reason} end end. @@ -220,10 +223,8 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect : false -> disconnect end; - - on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> - emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). + emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). do_get_status(Conn) -> case eredis:q(Conn, ["PING"]) of diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index c576e3de7..f419a08c4 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -77,9 +77,11 @@ perform_lifecycle_check(PoolName, InitialConfig) -> ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, - status := InitialStatus}} - = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{ + state := State, + status := InitialStatus + }} = + emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), % % Perform query as further check that the resource is working as expected ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index f331e028f..e6a7ad366 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -75,7 +75,8 @@ get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) -> || {_WorkerName, Worker} <- ecpool:workers(PoolName) ], case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of - true -> connected; + true -> + connected; false -> case AutoReconn of true -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b24213eae..12e02ce0a 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,22 +85,33 @@ ]). %% Direct calls to the callback module --export([ call_start/3 %% start the instance - , call_health_check/3 %% verify if the resource is working normally - , call_stop/3 %% stop the instance - ]). --export([ list_instances/0 %% list all the instances, id only. - , list_instances_verbose/0 %% list all the instances - , get_instance/1 %% return the data of the instance - , list_instances_by_type/1 %% return all the instances of the same resource type - , generate_id/1 - , list_group_instances/1 - ]). +%% start the instance +-export([ + call_start/3, + %% verify if the resource is working normally + call_health_check/3, + %% stop the instance + call_stop/3 +]). --optional_callbacks([ on_query/4 - , on_get_status/2 - ]). +%% list all the instances, id only. +-export([ + list_instances/0, + %% list all the instances + list_instances_verbose/0, + %% return the data of the instance + get_instance/1, + %% return all the instances of the same resource type + list_instances_by_type/1, + generate_id/1, + list_group_instances/1 +]). + +-optional_callbacks([ + on_query/4, + on_get_status/2 +]). %% when calling emqx_resource:start/1 -callback on_start(instance_id(), resource_config()) -> @@ -114,8 +125,8 @@ %% when calling emqx_resource:health_check/2 -callback on_get_status(instance_id(), resource_state()) -> - resource_connection_status() | - {resource_connection_status(), resource_state()}. + resource_connection_status() + | {resource_connection_status(), resource_state()}. -spec list_types() -> [module()]. list_types() -> @@ -304,8 +315,8 @@ call_start(InstId, Mod, Config) -> ?SAFE_CALL(Mod:on_start(InstId, Config)). -spec call_health_check(instance_id(), module(), resource_state()) -> - resource_connection_status() | - {resource_connection_status(), resource_state()}. + resource_connection_status() + | {resource_connection_status(), resource_state()}. call_health_check(InstId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index bf197b88c..04592159c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -307,8 +307,10 @@ do_health_check(InstId) when is_binary(InstId) -> do_health_check(_Group, #{state := undefined}) -> {error, resource_not_initialized}; -do_health_check(Group, - #{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data) -> +do_health_check( + Group, + #{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data +) -> case emqx_resource:call_health_check(InstId, Mod, ResourceState) of {NewConnStatus, NewResourceState} -> NData = Data#{status => NewConnStatus, state => NewResourceState}, @@ -333,7 +335,8 @@ do_set_resource_status_connecting(InstId) -> {ok, Group, #{id := InstId} = Data} -> logger:error("health check for ~p failed: timeout", [InstId]), update_resource(InstId, Group, Data#{status => connecting}); - Error -> {error, Error} + Error -> + {error, Error} end. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_test_resource.erl index a1148f94e..c23f87d50 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_test_resource.erl @@ -21,11 +21,12 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ on_start/2 - , on_stop/2 - , on_query/4 - , on_get_status/2 - ]). +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). %% callbacks for emqx_resource config schema -export([roots/0]).