From 0faf1240f30b8e7899f6688f20f0bbeefa73a429 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 19 Apr 2022 16:24:46 +0800 Subject: [PATCH 01/10] fix: mysql support prepare sql --- .../src/emqx_connector_mysql.erl | 79 ++++++++++++++++--- 1 file changed, 69 insertions(+), 10 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index e82999ba7..6b1a37974 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -29,7 +29,10 @@ , on_health_check/2 ]). --export([connect/1]). +%% ecpool connect & reconnect +-export([connect/1, prepare_sql_to_conn/1]). + +-export([prepare_sql/2]). -export([roots/0, fields/1]). @@ -91,25 +94,35 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State); -on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) -> - on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State); -on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> - ?TRACE("QUERY", "mysql_connector_received", #{connector => InstId, sql => SQL, state => State}), +on_query(InstId, {Type, SQLOrKey}, AfterQuery, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State); +on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} = State) -> + on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State); +on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> + LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, + ?TRACE("QUERY", "mysql_connector_received", LogMeta), case Result = ecpool:pick_and_do( PoolName, - {mysql, query, [SQL, Params, Timeout]}, + {mysql, mysql_function(Type), [SQLOrKey, Params, Timeout]}, no_handover) of + {error, disconnected} -> + ?SLOG(error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}), + %% kill the poll worker to trigger reconnection + _ = exit(Conn, restart), + emqx_resource:query_failed(AfterQuery); {error, Reason} -> - ?SLOG(error, #{msg => "mysql_connector_do_sql_query_failed", - connector => InstId, sql => SQL, reason => Reason}), + ?SLOG(error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}), emqx_resource:query_failed(AfterQuery); _ -> emqx_resource:query_success(AfterQuery) end, Result. +mysql_function(sql) -> query; +mysql_function(prepared_query) -> execute. + on_health_check(_InstId, #{poolname := PoolName} = State) -> emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). @@ -127,3 +140,49 @@ connect(Options) -> -> {inet:ip_address() | inet:hostname(), pos_integer()}. to_server(Str) -> emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS). + +prepare_sql(Prepares, PoolName) -> + case do_prepare_sql(Prepares, PoolName) of + ok -> + %% prepare for reconnect + ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}), + ok; + {error, R} -> + {error, R} + end. + +do_prepare_sql([{PrepareSqlKey, PrepareStatement} | Prepares], PoolName) -> + Workers = + [begin + {ok, Conn} = ecpool_worker:client(Worker), + Conn + end || Worker <- ecpool:workers(PoolName)], + prepare_sql_to_conn_list(Workers, [{PrepareSqlKey, PrepareStatement}]). + +prepare_sql_to_conn_list([], PrepareList) -> ok; +prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> + case prepare_sql_to_conn(Conn, PrepareList) of + ok -> + prepare_sql_to_conn(ConnList, PrepareList); + {error, R} -> + %% rollback + [unprepare_sql_to_conn(Conn, Key) || {Key, _} <- PrepareList], + {error, R} + end. + +prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok; +prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) when is_pid(Conn) -> + LogMeta = #{msg => "MySQL Prepare Statement", name => PrepareSqlKey}, + ?SLOG(info, LogMeta#{prepare => PrepareStatement}), + _ = mysql:unprepare(Conn, PrepareSqlKey), + case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of + {ok, Name} -> + ?SLOG(info, LogMeta#{result => success}), + prepare_sql_to_conn(Conn, PrepareList); + {error, Reason} -> + ?SLOG(error, LogMeta#{result => failed, reason => Reason}), + {error, Reason}; + end. + +unprepare_sql_to_conn(Conn, PrepareSqlKey) -> + mysql:unprepare(Conn, PrepareSqlKey). From d2c4b862b6729da726fe33f810a490654fcf8467 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 19 Apr 2022 16:55:45 +0800 Subject: [PATCH 02/10] fix: bad code & xref --- .../src/emqx_connector_mysql.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 6b1a37974..f662bbe5b 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -30,7 +30,7 @@ ]). %% ecpool connect & reconnect --export([connect/1, prepare_sql_to_conn/1]). +-export([connect/1, prepare_sql_to_conn/2]). -export([prepare_sql/2]). @@ -101,10 +101,9 @@ on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), - case Result = ecpool:pick_and_do( - PoolName, - {mysql, mysql_function(Type), [SQLOrKey, Params, Timeout]}, - no_handover) of + Conn = ecpool:get_client(PoolName), + Result = erlang:apply(mysql, mysql_function(Type), [Conn, SQLOrKey, Params, Timeout]), + case Result of {error, disconnected} -> ?SLOG(error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}), @@ -151,15 +150,15 @@ prepare_sql(Prepares, PoolName) -> {error, R} end. -do_prepare_sql([{PrepareSqlKey, PrepareStatement} | Prepares], PoolName) -> +do_prepare_sql(Prepares, PoolName) -> Workers = [begin {ok, Conn} = ecpool_worker:client(Worker), Conn end || Worker <- ecpool:workers(PoolName)], - prepare_sql_to_conn_list(Workers, [{PrepareSqlKey, PrepareStatement}]). + prepare_sql_to_conn_list(Workers, Prepares). -prepare_sql_to_conn_list([], PrepareList) -> ok; +prepare_sql_to_conn_list([], _PrepareList) -> ok; prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> case prepare_sql_to_conn(Conn, PrepareList) of ok -> @@ -176,12 +175,12 @@ prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) whe ?SLOG(info, LogMeta#{prepare => PrepareStatement}), _ = mysql:unprepare(Conn, PrepareSqlKey), case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of - {ok, Name} -> + {ok, _Name} -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); {error, Reason} -> ?SLOG(error, LogMeta#{result => failed, reason => Reason}), - {error, Reason}; + {error, Reason} end. unprepare_sql_to_conn(Conn, PrepareSqlKey) -> From 039619ee23d34863f67f8f534d45cc7d3e6ebfcc Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 20 Apr 2022 10:50:38 +0800 Subject: [PATCH 03/10] feat: authz & authn mysql resource support prepare sql query --- .../src/simple_authn/emqx_authn_mysql.erl | 16 +++++++----- apps/emqx_authz/src/emqx_authz_mysql.erl | 26 ++++++++----------- .../src/emqx_connector_mysql.erl | 3 +++ 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 4152bd888..11e8ede76 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -89,12 +89,12 @@ create( } = Config ) -> ok = emqx_authn_password_hashing:init(Algorithm), - {Query, PlaceHolders} = emqx_authn_utils:parse_sql(Query0, '?'), + {PrepareSqlKey, PrepareStatement} = emqx_authn_utils:parse_sql(Query0, '?'), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), State = #{ password_hash_algorithm => Algorithm, - query => Query, - placeholders => PlaceHolders, + prepare_sql_key => PrepareSqlKey, + prepare_sql_statement => PrepareStatement, query_timeout => QueryTimeout, resource_id => ResourceId }, @@ -107,10 +107,14 @@ create( #{} ) of - {ok, already_created} -> - {ok, State}; {ok, _} -> - {ok, State}; + case emqx_resource:query(ResourceId, + {prepare_sql, [{PrepareSqlKey, PrepareStatement}]}) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason} + end; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index d88273e14..4da81c90b 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -52,17 +52,13 @@ init(#{query := SQL} = Source) -> {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> - Source#{ - annotations => - #{ - id => Id, - query => emqx_authz_utils:parse_sql( - SQL, - '?', - ?PLACEHOLDERS - ) - } - } + {PrepareKey, PrepareStatement} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), + case emqx_resource:query(Id, {prepare_sql, [{PrepareKey, PrepareStatement}]}) of + ok -> + Source#{annotations => #{id => Id, prepare => {PrepareKey, PrepareStatement}}}; + {error, Reason} -> + error({load_config_error, Reason}) + end end. destroy(#{annotations := #{id := Id}}) -> @@ -75,12 +71,12 @@ authorize( #{ annotations := #{ id := ResourceID, - query := {Query, Params} + prepare := {PrepareKey, PrepareStatement} } } ) -> - RenderParams = emqx_authz_utils:render_sql_params(Params, Client), - case emqx_resource:query(ResourceID, {sql, Query, RenderParams}) of + RenderParams = emqx_authz_utils:render_sql_params(PrepareStatement, Client), + case emqx_resource:query(ResourceID, {sql, PrepareKey, RenderParams}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> @@ -89,7 +85,7 @@ authorize( ?SLOG(error, #{ msg => "query_mysql_error", reason => Reason, - query => Query, + prepare => {PrepareKey, PrepareStatement}, params => RenderParams, resource_id => ResourceID }), diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index f662bbe5b..ad8c882e7 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -94,6 +94,9 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). +on_query(_InstId, {prepare_sql, Prepares}, _AfterQuery, #{poolname := PoolName}) -> + prepare_sql(Prepares, PoolName); + on_query(InstId, {Type, SQLOrKey}, AfterQuery, #{poolname := _PoolName} = State) -> on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State); on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} = State) -> From 94795098c994a7aafd35b6c0911034fafc42ed52 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 20 Apr 2022 11:00:01 +0800 Subject: [PATCH 04/10] fix: bad function name & dialyzer --- apps/emqx_connector/src/emqx_connector_mysql.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index ad8c882e7..21e90c929 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -165,10 +165,11 @@ prepare_sql_to_conn_list([], _PrepareList) -> ok; prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> case prepare_sql_to_conn(Conn, PrepareList) of ok -> - prepare_sql_to_conn(ConnList, PrepareList); + prepare_sql_to_conn_list(ConnList, PrepareList); {error, R} -> %% rollback - [unprepare_sql_to_conn(Conn, Key) || {Key, _} <- PrepareList], + Fun = fun({Key, _}) -> _ = unprepare_sql_to_conn(Conn, Key), ok end, + lists:foreach(Fun, PrepareList), {error, R} end. From 365ca670789dc4f656b58db6e0ce1c564272b5cc Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 20 Apr 2022 21:47:36 +0800 Subject: [PATCH 05/10] fix: auth mysql prepare query --- .../src/simple_authn/emqx_authn_mysql.erl | 19 +++++++-------- .../src/emqx_connector_mysql.erl | 24 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 11e8ede76..37043961f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -23,6 +23,8 @@ -behaviour(hocon_schema). -behaviour(emqx_authentication). +-define(PREPARE_KEY, ?MODULE). + -export([ namespace/0, roots/0, @@ -89,12 +91,11 @@ create( } = Config ) -> ok = emqx_authn_password_hashing:init(Algorithm), - {PrepareSqlKey, PrepareStatement} = emqx_authn_utils:parse_sql(Query0, '?'), + {PrepareSql, TmplToken} = emqx_authn_utils:parse_sql(Query0, '?'), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), State = #{ password_hash_algorithm => Algorithm, - prepare_sql_key => PrepareSqlKey, - prepare_sql_statement => PrepareStatement, + tmpl_token => TmplToken, query_timeout => QueryTimeout, resource_id => ResourceId }, @@ -108,8 +109,7 @@ create( ) of {ok, _} -> - case emqx_resource:query(ResourceId, - {prepare_sql, [{PrepareSqlKey, PrepareStatement}]}) of + case emqx_resource:query(ResourceId, {prepare_sql, [{?PREPARE_KEY, PrepareSql}]}) of ok -> {ok, State}; {error, Reason} -> @@ -133,15 +133,14 @@ authenticate(#{auth_method := _}, _) -> authenticate( #{password := Password} = Credential, #{ - placeholders := PlaceHolders, - query := Query, + tmpl_token := TmplToken, query_timeout := Timeout, resource_id := ResourceId, password_hash_algorithm := Algorithm } ) -> - Params = emqx_authn_utils:render_sql_params(PlaceHolders, Credential), - case emqx_resource:query(ResourceId, {sql, Query, Params, Timeout}) of + Params = emqx_authn_utils:render_sql_params(TmplToken, Credential), + case emqx_resource:query(ResourceId, {prepared_query, ?PREPARE_KEY, Params, Timeout}) of {ok, _Columns, []} -> ignore; {ok, Columns, [Row | _]} -> @@ -160,7 +159,7 @@ authenticate( ?SLOG(error, #{ msg => "mysql_query_failed", resource => ResourceId, - query => Query, + tmpl_token => TmplToken, params => Params, timeout => Timeout, reason => Reason diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 21e90c929..37534ccff 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -104,8 +104,10 @@ on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), - Conn = ecpool:get_client(PoolName), - Result = erlang:apply(mysql, mysql_function(Type), [Conn, SQLOrKey, Params, Timeout]), + Worker = ecpool:get_client(PoolName), + {ok, Conn} = ecpool_worker:client(Worker), + MySqlFunction = mysql_function(Type), + Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey, Params, Timeout]), case Result of {error, disconnected} -> ?SLOG(error, @@ -154,12 +156,12 @@ prepare_sql(Prepares, PoolName) -> end. do_prepare_sql(Prepares, PoolName) -> - Workers = + Conns = [begin {ok, Conn} = ecpool_worker:client(Worker), Conn - end || Worker <- ecpool:workers(PoolName)], - prepare_sql_to_conn_list(Workers, Prepares). + end || {_Name, Worker} <- ecpool:workers(PoolName)], + prepare_sql_to_conn_list(Conns, Prepares). prepare_sql_to_conn_list([], _PrepareList) -> ok; prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> @@ -174,12 +176,12 @@ prepare_sql_to_conn_list([Conn | ConnList], PrepareList) -> end. prepare_sql_to_conn(Conn, []) when is_pid(Conn) -> ok; -prepare_sql_to_conn(Conn, [{PrepareSqlKey, PrepareStatement} | PrepareList]) when is_pid(Conn) -> - LogMeta = #{msg => "MySQL Prepare Statement", name => PrepareSqlKey}, - ?SLOG(info, LogMeta#{prepare => PrepareStatement}), - _ = mysql:unprepare(Conn, PrepareSqlKey), - case mysql:prepare(Conn, PrepareSqlKey, PrepareStatement) of - {ok, _Name} -> +prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList]) when is_pid(Conn) -> + LogMeta = #{msg => "MySQL Prepare Statement", name => Key, prepare_sql => SQL}, + ?SLOG(info, LogMeta), + _ = unprepare_sql_to_conn(Conn, Key), + case mysql:prepare(Conn, Key, SQL) of + {ok, _Key} -> ?SLOG(info, LogMeta#{result => success}), prepare_sql_to_conn(Conn, PrepareList); {error, Reason} -> From 2aedd38a4309d2950990a0824a067f1692b8a60d Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Wed, 20 Apr 2022 22:15:05 +0800 Subject: [PATCH 06/10] fix: authz mysql prepare query --- apps/emqx_authz/src/emqx_authz_mysql.erl | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index 4da81c90b..32cec3a71 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -23,6 +23,8 @@ -behaviour(emqx_authz). +-define(PREPARE_KEY, ?MODULE). + %% AuthZ Callbacks -export([ description/0, @@ -52,10 +54,11 @@ init(#{query := SQL} = Source) -> {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> - {PrepareKey, PrepareStatement} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), - case emqx_resource:query(Id, {prepare_sql, [{PrepareKey, PrepareStatement}]}) of + {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), + case emqx_resource:query(Id, {prepare_sql, [{?MODULE, PrepareSQL}]}) of ok -> - Source#{annotations => #{id => Id, prepare => {PrepareKey, PrepareStatement}}}; + Source#{annotations => #{ + id => Id, tmpl_oken => TmplToken}}; {error, Reason} -> error({load_config_error, Reason}) end @@ -71,12 +74,12 @@ authorize( #{ annotations := #{ id := ResourceID, - prepare := {PrepareKey, PrepareStatement} + tmpl_oken := TmplToken } } ) -> - RenderParams = emqx_authz_utils:render_sql_params(PrepareStatement, Client), - case emqx_resource:query(ResourceID, {sql, PrepareKey, RenderParams}) of + RenderParams = emqx_authz_utils:render_sql_params(TmplToken, Client), + case emqx_resource:query(ResourceID, {prepared_query, ?MODULE, RenderParams}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> @@ -85,7 +88,7 @@ authorize( ?SLOG(error, #{ msg => "query_mysql_error", reason => Reason, - prepare => {PrepareKey, PrepareStatement}, + tmpl_oken => TmplToken, params => RenderParams, resource_id => ResourceID }), From 7417e5070d50af9c931b73054e77336803a62b67 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 21 Apr 2022 15:59:42 +0800 Subject: [PATCH 07/10] fix: perpare sql when connector doing init; add prepare statement field; bad log path --- .../src/simple_authn/emqx_authn_mysql.erl | 11 +-- .../src/simple_authn/emqx_authn_pgsql.erl | 4 +- .../test/emqx_authn_mysql_SUITE.erl | 7 +- apps/emqx_authz/src/emqx_authz_api_schema.erl | 4 +- apps/emqx_authz/src/emqx_authz_mysql.erl | 13 ++-- apps/emqx_authz/src/emqx_authz_postgresql.erl | 2 +- .../i18n/emqx_connector_pgsql.conf | 11 --- .../i18n/emqx_connector_schema_lib.conf | 11 +++ .../src/emqx_connector_mysql.erl | 69 ++++++++++++++++--- .../src/emqx_connector_pgsql.erl | 17 ++--- .../src/emqx_connector_schema_lib.erl | 10 +++ .../src/emqx_mgmt_api_nodes.erl | 5 +- 12 files changed, 105 insertions(+), 59 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 37043961f..9cbe0c174 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -56,7 +56,7 @@ fields(?CONF_NS) -> {query, fun query/1}, {query_timeout, fun query_timeout/1} ] ++ emqx_authn_schema:common_fields() ++ - emqx_connector_mysql:fields(config). + proplists:delete(prepare_statement, emqx_connector_mysql:fields(config)). desc(?CONF_NS) -> "Configuration for authentication using MySQL database."; @@ -104,17 +104,12 @@ create( ResourceId, ?RESOURCE_GROUP, emqx_connector_mysql, - Config, + Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}}, #{} ) of {ok, _} -> - case emqx_resource:query(ResourceId, {prepare_sql, [{?PREPARE_KEY, PrepareSql}]}) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end; + {ok, State}; {error, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 3a9f390dd..cb8340a4a 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -60,7 +60,7 @@ fields(?CONF_NS) -> {query, fun query/1} ] ++ emqx_authn_schema:common_fields() ++ - proplists:delete(named_queries, emqx_connector_pgsql:fields(config)). + proplists:delete(prepare_statement, emqx_connector_pgsql:fields(config)). desc(?CONF_NS) -> "Configuration for PostgreSQL authentication backend."; @@ -101,7 +101,7 @@ create( ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, - Config#{named_queries => #{ResourceId => Query}}, + Config#{prepare_statement => #{ResourceId => Query}}, #{} ) of diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 3d2dba895..93da8ba49 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -198,10 +198,11 @@ t_update(_Config) -> >> }, - {ok, _} = emqx:update_config( + %% Code 1146, table not exist + {error, {post_config_update,emqx_authentication, {1146, _, _}}} = + emqx:update_config( ?PATH, - {create_authenticator, ?GLOBAL, IncorrectConfig} - ), + {create_authenticator, ?GLOBAL, IncorrectConfig}), {error, not_authorized} = emqx_access_control:authenticate( #{ diff --git a/apps/emqx_authz/src/emqx_authz_api_schema.erl b/apps/emqx_authz/src/emqx_authz_api_schema.erl index b4ea40ccc..19c7e5cfd 100644 --- a/apps/emqx_authz/src/emqx_authz_api_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_api_schema.erl @@ -56,11 +56,11 @@ fields(mongo_sharded) -> fields(mysql) -> authz_common_fields(mysql) ++ [{query, mk(binary(), #{required => true})}] ++ - emqx_connector_mysql:fields(config); + proplists:delete(prepare_statement, emqx_connector_mysql:fields(config)); fields(postgresql) -> authz_common_fields(postgresql) ++ [{query, mk(binary(), #{required => true})}] ++ - proplists:delete(named_queries, emqx_connector_pgsql:fields(config)); + proplists:delete(prepare_statement, emqx_connector_pgsql:fields(config)); fields(redis_single) -> authz_redis_common_fields() ++ emqx_connector_redis:fields(single); diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index 32cec3a71..1aac2e533 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -49,19 +49,14 @@ description() -> "AuthZ with Mysql". -init(#{query := SQL} = Source) -> +init(#{query := SQL} = Source0) -> + {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), + Source = Source0#{prepare_statement := #{?MODULE => PrepareSQL}}, case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> - {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), - case emqx_resource:query(Id, {prepare_sql, [{?MODULE, PrepareSQL}]}) of - ok -> - Source#{annotations => #{ - id => Id, tmpl_oken => TmplToken}}; - {error, Reason} -> - error({load_config_error, Reason}) - end + Source#{annotations => #{id => Id, tmpl_oken => TmplToken}} end. destroy(#{annotations := #{id := Id}}) -> diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index 4305addb7..1d7542655 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -59,7 +59,7 @@ init(#{query := SQL0} = Source) -> ResourceID, ?RESOURCE_GROUP, emqx_connector_pgsql, - Source#{named_queries => #{ResourceID => SQL}}, + Source#{prepare_statement => #{ResourceID => SQL}}, #{} ) of diff --git a/apps/emqx_connector/i18n/emqx_connector_pgsql.conf b/apps/emqx_connector/i18n/emqx_connector_pgsql.conf index adac024ef..6aa792070 100644 --- a/apps/emqx_connector/i18n/emqx_connector_pgsql.conf +++ b/apps/emqx_connector/i18n/emqx_connector_pgsql.conf @@ -19,15 +19,4 @@ The PostgreSQL default port 5432 is used if `[:Port]` is not specified. } } - name_queries_desc { - desc { - en: "Key-value list of SQL prepared statements." - zh: "SQL 预处理语句列表。" - } - label: { - en: "SQL Prepared Statements List" - zh: "SQL 预处理语句列表" - } - } - } diff --git a/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf b/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf index 6a2bbc6c2..f5caf29c4 100644 --- a/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf +++ b/apps/emqx_connector/i18n/emqx_connector_schema_lib.conf @@ -11,6 +11,17 @@ emqx_connector_schema_lib { } } + prepare_statement { + desc { + en: "Key-value list of SQL prepared statements." + zh: "SQL 预处理语句列表。" + } + label: { + en: "SQL Prepared Statements List" + zh: "SQL 预处理语句列表" + } + } + database_desc { desc { en: "Database name." diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 37534ccff..b8ca93104 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -51,7 +51,8 @@ fields(config) -> [ {server, fun server/1} ] ++ emqx_connector_schema_lib:relational_db_fields() ++ - emqx_connector_schema_lib:ssl_fields(). + emqx_connector_schema_lib:ssl_fields() ++ + emqx_connector_schema_lib:prepare_statement_fields(). server(type) -> emqx_schema:ip_port(); server(required) -> true; @@ -84,8 +85,10 @@ on_start(InstId, #{server := {Host, Port}, {auto_reconnect, reconn_interval(AutoReconn)}, {pool_size, PoolSize}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + Prepares = maps:get(prepare_statement, Config, #{}), + State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, #{poolname => PoolName}}; + ok -> {ok, State}; {error, Reason} -> {error, Reason} end. @@ -94,14 +97,12 @@ on_stop(InstId, #{poolname := PoolName}) -> connector => InstId}), emqx_plugin_libs_pool:stop_pool(PoolName). -on_query(_InstId, {prepare_sql, Prepares}, _AfterQuery, #{poolname := PoolName}) -> - prepare_sql(Prepares, PoolName); - -on_query(InstId, {Type, SQLOrKey}, AfterQuery, #{poolname := _PoolName} = State) -> +on_query(InstId, {Type, SQLOrKey}, AfterQuery, State) -> on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State); -on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} = State) -> +on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, State) -> on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State); -on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) -> +on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, + #{poolname := PoolName, prepare_statement := Prepares} = State) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), Worker = ecpool:get_client(PoolName), @@ -115,6 +116,17 @@ on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := Po %% kill the poll worker to trigger reconnection _ = exit(Conn, restart), emqx_resource:query_failed(AfterQuery); + {error, not_prepared} -> + ?SLOG(warning, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => not_prepared}), + case prepare_sql(Prepares, PoolName) of + ok -> + on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, State); + {error, Reason} -> + ?SLOG(error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}), + emqx_resource:query_failed(AfterQuery) + end; {error, Reason} -> ?SLOG(error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}), @@ -128,11 +140,35 @@ mysql_function(sql) -> query; mysql_function(prepared_query) -> execute. on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). + case emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State) of + {ok, State} -> + case do_health_check_prepares(State) of + ok-> + {ok, State}; + {ok, NState} -> + {ok, NState}; + {error, _Reason} -> + {error, health_check_failed, State} + end; + {error, health_check_failed, State} -> + {error, health_check_failed, State} + end. do_health_check(Conn) -> ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). +do_health_check_prepares(#{prepare_statement := Prepares})when is_map(Prepares) -> + ok; +do_health_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> + %% retry to prepare + case prepare_sql(Prepares, PoolName) of + ok -> + %% remove the error + {ok, State#{prepare_statement => Prepares}}; + {error, Reason} -> + {error, Reason} + end. + %% =================================================================== reconn_interval(true) -> 15; reconn_interval(false) -> false. @@ -145,6 +181,21 @@ connect(Options) -> to_server(Str) -> emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS). +init_prepare(State = #{prepare_statement := #{}}) -> + State; +init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> + case prepare_sql(Prepares, PoolName) of + ok -> + State; + {error, Reason} -> + LogMeta = #{msg => <<"MySQL init prepare statement failed">>, reason => Reason}, + ?SLOG(error, LogMeta), + %% mark the prepare_statement as failed + State#{prepare_statement => {error, Prepares}} + end. + +prepare_sql(Prepares, PoolName) when is_map(Prepares) -> + prepare_sql(maps:to_list(Prepares), PoolName); prepare_sql(Prepares, PoolName) -> case do_prepare_sql(Prepares, PoolName) of ok -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 513640e86..d7b6c1b14 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -51,15 +51,10 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. fields(config) -> - [ {named_queries, fun named_queries/1} - , {server, fun server/1}] ++ + [{server, fun server/1}] ++ emqx_connector_schema_lib:relational_db_fields() ++ - emqx_connector_schema_lib:ssl_fields(). - -named_queries(type) -> map(); -named_queries(desc) -> ?DESC("name_queries_desc"); -named_queries(required) -> false; -named_queries(_) -> undefined. + emqx_connector_schema_lib:ssl_fields() ++ + emqx_connector_schema_lib:prepare_statement_fields(). server(type) -> emqx_schema:ip_port(); server(required) -> true; @@ -92,7 +87,7 @@ on_start(InstId, #{server := {Host, Port}, {database, DB}, {auto_reconnect, reconn_interval(AutoReconn)}, {pool_size, PoolSize}, - {named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}], + {prepare_statement, maps:to_list(maps:get(prepare_statement, Config, #{}))}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> {ok, #{poolname => PoolName}}; @@ -135,10 +130,10 @@ connect(Opts) -> Host = proplists:get_value(host, Opts), Username = proplists:get_value(username, Opts), Password = proplists:get_value(password, Opts), - NamedQueries = proplists:get_value(named_queries, Opts), + PrepareStatement = proplists:get_value(prepare_statement, Opts), case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of {ok, Conn} -> - case parse(Conn, NamedQueries) of + case parse(Conn, PrepareStatement) of ok -> {ok, Conn}; {error, Reason} -> {error, Reason} end; diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index c20320a36..5398ac9fe 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -21,6 +21,7 @@ -export([ relational_db_fields/0 , ssl_fields/0 + , prepare_statement_fields/0 ]). -export([ ip_port_to_string/1 @@ -67,6 +68,15 @@ relational_db_fields() -> , {auto_reconnect, fun auto_reconnect/1} ]. +prepare_statement_fields() -> + [ {prepare_statement, fun prepare_statement/1} + ]. + +prepare_statement(type) -> map(); +prepare_statement(desc) -> ?DESC("prepare_statement"); +prepare_statement(required) -> false; +prepare_statement(_) -> undefined. + database(type) -> binary(); database(desc) -> ?DESC("database_desc"); database(required) -> true; diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index 5813e6522..bd5b0d85e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -287,9 +287,8 @@ format(_Node, Info = #{memory_total := Total, memory_used := Used}) -> case log_path() of undefined -> <<"not found">>; - Path0 -> - Path = list_to_binary(Path0), - <> + Path -> + filename:join(SysPath, Path) end, Info#{ memory_total := emqx_mgmt_util:kmg(Total), From 75f612a44925a0d5b0704eb8720054a383788588 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 21 Apr 2022 18:30:51 +0800 Subject: [PATCH 08/10] fix: bad SUITE & bad code --- apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl | 12 ++++++------ apps/emqx_authz/src/emqx_authz_mysql.erl | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 93da8ba49..57c5dee16 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -24,7 +24,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(MYSQL_HOST, "mysql"). +-define(MYSQL_HOST, "192.168.1.234"). +% -define(MYSQL_HOST, "mysql"). -define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>). -define(PATH, [authentication]). @@ -198,11 +199,10 @@ t_update(_Config) -> >> }, - %% Code 1146, table not exist - {error, {post_config_update,emqx_authentication, {1146, _, _}}} = - emqx:update_config( - ?PATH, - {create_authenticator, ?GLOBAL, IncorrectConfig}), + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, IncorrectConfig} + ), {error, not_authorized} = emqx_access_control:authenticate( #{ diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index 1aac2e533..db6acb347 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -51,7 +51,7 @@ description() -> init(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), - Source = Source0#{prepare_statement := #{?MODULE => PrepareSQL}}, + Source = Source0#{prepare_statement => #{?MODULE => PrepareSQL}}, case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of {error, Reason} -> error({load_config_error, Reason}); From 4b7a5bbf536396c39add1415f3f3f34fbdf702dc Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 21 Apr 2022 21:26:35 +0800 Subject: [PATCH 09/10] fix: bad empty map --- apps/emqx_authz/src/emqx_authz_mysql.erl | 4 ++-- .../src/emqx_connector_mysql.erl | 22 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index db6acb347..1bdf511a5 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -51,7 +51,7 @@ description() -> init(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), - Source = Source0#{prepare_statement => #{?MODULE => PrepareSQL}}, + Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of {error, Reason} -> error({load_config_error, Reason}); @@ -74,7 +74,7 @@ authorize( } ) -> RenderParams = emqx_authz_utils:render_sql_params(TmplToken, Client), - case emqx_resource:query(ResourceID, {prepared_query, ?MODULE, RenderParams}) of + case emqx_resource:query(ResourceID, {prepared_query, ?PREPARE_KEY, RenderParams}) of {ok, _Columns, []} -> nomatch; {ok, Columns, Rows} -> diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index b8ca93104..f44e92413 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -86,6 +86,7 @@ on_start(InstId, #{server := {Host, Port}, {pool_size, PoolSize}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = maps:get(prepare_statement, Config, #{}), + io:format("Prepares ~p~n", [Prepares]), State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> {ok, State}; @@ -181,17 +182,20 @@ connect(Options) -> to_server(Str) -> emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS). -init_prepare(State = #{prepare_statement := #{}}) -> - State; init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> - case prepare_sql(Prepares, PoolName) of - ok -> + case maps:size(Prepares) of + 0 -> State; - {error, Reason} -> - LogMeta = #{msg => <<"MySQL init prepare statement failed">>, reason => Reason}, - ?SLOG(error, LogMeta), - %% mark the prepare_statement as failed - State#{prepare_statement => {error, Prepares}} + _ -> + case prepare_sql(Prepares, PoolName) of + ok -> + State; + {error, Reason} -> + LogMeta = #{msg => <<"MySQL init prepare statement failed">>, reason => Reason}, + ?SLOG(error, LogMeta), + %% mark the prepare_statement as failed + State#{prepare_statement => {error, Prepares}} + end end. prepare_sql(Prepares, PoolName) when is_map(Prepares) -> From 9088752afab1fdddea47cb6e70e5448bed91a8f9 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 21 Apr 2022 23:43:27 +0800 Subject: [PATCH 10/10] fix: bad return in retry prepare --- .../test/emqx_authn_mysql_SUITE.erl | 3 +-- .../src/emqx_connector_mysql.erl | 21 +++++++++++-------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 57c5dee16..b6ae6d514 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -24,8 +24,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(MYSQL_HOST, "192.168.1.234"). -% -define(MYSQL_HOST, "mysql"). +-define(MYSQL_HOST, "mysql"). -define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>). -define(PATH, [authentication]). diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index f44e92413..7a5ff9130 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -86,7 +86,6 @@ on_start(InstId, #{server := {Host, Port}, {pool_size, PoolSize}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = maps:get(prepare_statement, Config, #{}), - io:format("Prepares ~p~n", [Prepares]), State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of ok -> {ok, State}; @@ -116,26 +115,30 @@ on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}), %% kill the poll worker to trigger reconnection _ = exit(Conn, restart), - emqx_resource:query_failed(AfterQuery); + emqx_resource:query_failed(AfterQuery), + Result; {error, not_prepared} -> ?SLOG(warning, - LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => not_prepared}), + LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}), case prepare_sql(Prepares, PoolName) of ok -> + %% not return result, next loop will try again on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, State); {error, Reason} -> ?SLOG(error, - LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}), - emqx_resource:query_failed(AfterQuery) + LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}), + emqx_resource:query_failed(AfterQuery), + {error, Reason} end; {error, Reason} -> ?SLOG(error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}), - emqx_resource:query_failed(AfterQuery); + emqx_resource:query_failed(AfterQuery), + Result; _ -> - emqx_resource:query_success(AfterQuery) - end, - Result. + emqx_resource:query_success(AfterQuery), + Result + end. mysql_function(sql) -> query; mysql_function(prepared_query) -> execute.