fix: perpare sql when connector doing init; add prepare statement field; bad log path
This commit is contained in:
parent
2aedd38a43
commit
7417e5070d
|
@ -56,7 +56,7 @@ fields(?CONF_NS) ->
|
|||
{query, fun query/1},
|
||||
{query_timeout, fun query_timeout/1}
|
||||
] ++ emqx_authn_schema:common_fields() ++
|
||||
emqx_connector_mysql:fields(config).
|
||||
proplists:delete(prepare_statement, emqx_connector_mysql:fields(config)).
|
||||
|
||||
desc(?CONF_NS) ->
|
||||
"Configuration for authentication using MySQL database.";
|
||||
|
@ -104,17 +104,12 @@ create(
|
|||
ResourceId,
|
||||
?RESOURCE_GROUP,
|
||||
emqx_connector_mysql,
|
||||
Config,
|
||||
Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}},
|
||||
#{}
|
||||
)
|
||||
of
|
||||
{ok, _} ->
|
||||
case emqx_resource:query(ResourceId, {prepare_sql, [{?PREPARE_KEY, PrepareSql}]}) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end;
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
|
|
@ -60,7 +60,7 @@ fields(?CONF_NS) ->
|
|||
{query, fun query/1}
|
||||
] ++
|
||||
emqx_authn_schema:common_fields() ++
|
||||
proplists:delete(named_queries, emqx_connector_pgsql:fields(config)).
|
||||
proplists:delete(prepare_statement, emqx_connector_pgsql:fields(config)).
|
||||
|
||||
desc(?CONF_NS) ->
|
||||
"Configuration for PostgreSQL authentication backend.";
|
||||
|
@ -101,7 +101,7 @@ create(
|
|||
ResourceId,
|
||||
?RESOURCE_GROUP,
|
||||
emqx_connector_pgsql,
|
||||
Config#{named_queries => #{ResourceId => Query}},
|
||||
Config#{prepare_statement => #{ResourceId => Query}},
|
||||
#{}
|
||||
)
|
||||
of
|
||||
|
|
|
@ -198,10 +198,11 @@ t_update(_Config) ->
|
|||
>>
|
||||
},
|
||||
|
||||
{ok, _} = emqx:update_config(
|
||||
%% Code 1146, table not exist
|
||||
{error, {post_config_update,emqx_authentication, {1146, _, _}}} =
|
||||
emqx:update_config(
|
||||
?PATH,
|
||||
{create_authenticator, ?GLOBAL, IncorrectConfig}
|
||||
),
|
||||
{create_authenticator, ?GLOBAL, IncorrectConfig}),
|
||||
|
||||
{error, not_authorized} = emqx_access_control:authenticate(
|
||||
#{
|
||||
|
|
|
@ -56,11 +56,11 @@ fields(mongo_sharded) ->
|
|||
fields(mysql) ->
|
||||
authz_common_fields(mysql) ++
|
||||
[{query, mk(binary(), #{required => true})}] ++
|
||||
emqx_connector_mysql:fields(config);
|
||||
proplists:delete(prepare_statement, emqx_connector_mysql:fields(config));
|
||||
fields(postgresql) ->
|
||||
authz_common_fields(postgresql) ++
|
||||
[{query, mk(binary(), #{required => true})}] ++
|
||||
proplists:delete(named_queries, emqx_connector_pgsql:fields(config));
|
||||
proplists:delete(prepare_statement, emqx_connector_pgsql:fields(config));
|
||||
fields(redis_single) ->
|
||||
authz_redis_common_fields() ++
|
||||
emqx_connector_redis:fields(single);
|
||||
|
|
|
@ -49,19 +49,14 @@
|
|||
description() ->
|
||||
"AuthZ with Mysql".
|
||||
|
||||
init(#{query := SQL} = Source) ->
|
||||
init(#{query := SQL} = Source0) ->
|
||||
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
|
||||
Source = Source0#{prepare_statement := #{?MODULE => PrepareSQL}},
|
||||
case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of
|
||||
{error, Reason} ->
|
||||
error({load_config_error, Reason});
|
||||
{ok, Id} ->
|
||||
{PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
|
||||
case emqx_resource:query(Id, {prepare_sql, [{?MODULE, PrepareSQL}]}) of
|
||||
ok ->
|
||||
Source#{annotations => #{
|
||||
id => Id, tmpl_oken => TmplToken}};
|
||||
{error, Reason} ->
|
||||
error({load_config_error, Reason})
|
||||
end
|
||||
Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}
|
||||
end.
|
||||
|
||||
destroy(#{annotations := #{id := Id}}) ->
|
||||
|
|
|
@ -59,7 +59,7 @@ init(#{query := SQL0} = Source) ->
|
|||
ResourceID,
|
||||
?RESOURCE_GROUP,
|
||||
emqx_connector_pgsql,
|
||||
Source#{named_queries => #{ResourceID => SQL}},
|
||||
Source#{prepare_statement => #{ResourceID => SQL}},
|
||||
#{}
|
||||
)
|
||||
of
|
||||
|
|
|
@ -19,15 +19,4 @@ The PostgreSQL default port 5432 is used if `[:Port]` is not specified.
|
|||
}
|
||||
}
|
||||
|
||||
name_queries_desc {
|
||||
desc {
|
||||
en: "Key-value list of SQL prepared statements."
|
||||
zh: "SQL 预处理语句列表。"
|
||||
}
|
||||
label: {
|
||||
en: "SQL Prepared Statements List"
|
||||
zh: "SQL 预处理语句列表"
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,6 +11,17 @@ emqx_connector_schema_lib {
|
|||
}
|
||||
}
|
||||
|
||||
prepare_statement {
|
||||
desc {
|
||||
en: "Key-value list of SQL prepared statements."
|
||||
zh: "SQL 预处理语句列表。"
|
||||
}
|
||||
label: {
|
||||
en: "SQL Prepared Statements List"
|
||||
zh: "SQL 预处理语句列表"
|
||||
}
|
||||
}
|
||||
|
||||
database_desc {
|
||||
desc {
|
||||
en: "Database name."
|
||||
|
|
|
@ -51,7 +51,8 @@ fields(config) ->
|
|||
[ {server, fun server/1}
|
||||
] ++
|
||||
emqx_connector_schema_lib:relational_db_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields().
|
||||
emqx_connector_schema_lib:ssl_fields() ++
|
||||
emqx_connector_schema_lib:prepare_statement_fields().
|
||||
|
||||
server(type) -> emqx_schema:ip_port();
|
||||
server(required) -> true;
|
||||
|
@ -84,8 +85,10 @@ on_start(InstId, #{server := {Host, Port},
|
|||
{auto_reconnect, reconn_interval(AutoReconn)},
|
||||
{pool_size, PoolSize}],
|
||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||
Prepares = maps:get(prepare_statement, Config, #{}),
|
||||
State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}),
|
||||
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
||||
ok -> {ok, #{poolname => PoolName}};
|
||||
ok -> {ok, State};
|
||||
{error, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
|
@ -94,14 +97,12 @@ on_stop(InstId, #{poolname := PoolName}) ->
|
|||
connector => InstId}),
|
||||
emqx_plugin_libs_pool:stop_pool(PoolName).
|
||||
|
||||
on_query(_InstId, {prepare_sql, Prepares}, _AfterQuery, #{poolname := PoolName}) ->
|
||||
prepare_sql(Prepares, PoolName);
|
||||
|
||||
on_query(InstId, {Type, SQLOrKey}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||
on_query(InstId, {Type, SQLOrKey}, AfterQuery, State) ->
|
||||
on_query(InstId, {Type, SQLOrKey, [], default_timeout}, AfterQuery, State);
|
||||
on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
|
||||
on_query(InstId, {Type, SQLOrKey, Params}, AfterQuery, State) ->
|
||||
on_query(InstId, {Type, SQLOrKey, Params, default_timeout}, AfterQuery, State);
|
||||
on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
|
||||
on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery,
|
||||
#{poolname := PoolName, prepare_statement := Prepares} = State) ->
|
||||
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
|
||||
?TRACE("QUERY", "mysql_connector_received", LogMeta),
|
||||
Worker = ecpool:get_client(PoolName),
|
||||
|
@ -115,6 +116,17 @@ on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, #{poolname := Po
|
|||
%% kill the poll worker to trigger reconnection
|
||||
_ = exit(Conn, restart),
|
||||
emqx_resource:query_failed(AfterQuery);
|
||||
{error, not_prepared} ->
|
||||
?SLOG(warning,
|
||||
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => not_prepared}),
|
||||
case prepare_sql(Prepares, PoolName) of
|
||||
ok ->
|
||||
on_query(InstId, {Type, SQLOrKey, Params, Timeout}, AfterQuery, State);
|
||||
{error, Reason} ->
|
||||
?SLOG(error,
|
||||
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}),
|
||||
emqx_resource:query_failed(AfterQuery)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?SLOG(error,
|
||||
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}),
|
||||
|
@ -128,11 +140,35 @@ mysql_function(sql) -> query;
|
|||
mysql_function(prepared_query) -> execute.
|
||||
|
||||
on_health_check(_InstId, #{poolname := PoolName} = State) ->
|
||||
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
|
||||
case emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State) of
|
||||
{ok, State} ->
|
||||
case do_health_check_prepares(State) of
|
||||
ok->
|
||||
{ok, State};
|
||||
{ok, NState} ->
|
||||
{ok, NState};
|
||||
{error, _Reason} ->
|
||||
{error, health_check_failed, State}
|
||||
end;
|
||||
{error, health_check_failed, State} ->
|
||||
{error, health_check_failed, State}
|
||||
end.
|
||||
|
||||
do_health_check(Conn) ->
|
||||
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
|
||||
|
||||
do_health_check_prepares(#{prepare_statement := Prepares})when is_map(Prepares) ->
|
||||
ok;
|
||||
do_health_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) ->
|
||||
%% retry to prepare
|
||||
case prepare_sql(Prepares, PoolName) of
|
||||
ok ->
|
||||
%% remove the error
|
||||
{ok, State#{prepare_statement => Prepares}};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% ===================================================================
|
||||
reconn_interval(true) -> 15;
|
||||
reconn_interval(false) -> false.
|
||||
|
@ -145,6 +181,21 @@ connect(Options) ->
|
|||
to_server(Str) ->
|
||||
emqx_connector_schema_lib:parse_server(Str, ?MYSQL_HOST_OPTIONS).
|
||||
|
||||
init_prepare(State = #{prepare_statement := #{}}) ->
|
||||
State;
|
||||
init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
|
||||
case prepare_sql(Prepares, PoolName) of
|
||||
ok ->
|
||||
State;
|
||||
{error, Reason} ->
|
||||
LogMeta = #{msg => <<"MySQL init prepare statement failed">>, reason => Reason},
|
||||
?SLOG(error, LogMeta),
|
||||
%% mark the prepare_statement as failed
|
||||
State#{prepare_statement => {error, Prepares}}
|
||||
end.
|
||||
|
||||
prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
|
||||
prepare_sql(maps:to_list(Prepares), PoolName);
|
||||
prepare_sql(Prepares, PoolName) ->
|
||||
case do_prepare_sql(Prepares, PoolName) of
|
||||
ok ->
|
||||
|
|
|
@ -51,15 +51,10 @@ roots() ->
|
|||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
||||
|
||||
fields(config) ->
|
||||
[ {named_queries, fun named_queries/1}
|
||||
, {server, fun server/1}] ++
|
||||
[{server, fun server/1}] ++
|
||||
emqx_connector_schema_lib:relational_db_fields() ++
|
||||
emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
named_queries(type) -> map();
|
||||
named_queries(desc) -> ?DESC("name_queries_desc");
|
||||
named_queries(required) -> false;
|
||||
named_queries(_) -> undefined.
|
||||
emqx_connector_schema_lib:ssl_fields() ++
|
||||
emqx_connector_schema_lib:prepare_statement_fields().
|
||||
|
||||
server(type) -> emqx_schema:ip_port();
|
||||
server(required) -> true;
|
||||
|
@ -92,7 +87,7 @@ on_start(InstId, #{server := {Host, Port},
|
|||
{database, DB},
|
||||
{auto_reconnect, reconn_interval(AutoReconn)},
|
||||
{pool_size, PoolSize},
|
||||
{named_queries, maps:to_list(maps:get(named_queries, Config, #{}))}],
|
||||
{prepare_statement, maps:to_list(maps:get(prepare_statement, Config, #{}))}],
|
||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
|
||||
ok -> {ok, #{poolname => PoolName}};
|
||||
|
@ -135,10 +130,10 @@ connect(Opts) ->
|
|||
Host = proplists:get_value(host, Opts),
|
||||
Username = proplists:get_value(username, Opts),
|
||||
Password = proplists:get_value(password, Opts),
|
||||
NamedQueries = proplists:get_value(named_queries, Opts),
|
||||
PrepareStatement = proplists:get_value(prepare_statement, Opts),
|
||||
case epgsql:connect(Host, Username, Password, conn_opts(Opts)) of
|
||||
{ok, Conn} ->
|
||||
case parse(Conn, NamedQueries) of
|
||||
case parse(Conn, PrepareStatement) of
|
||||
ok -> {ok, Conn};
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
-export([ relational_db_fields/0
|
||||
, ssl_fields/0
|
||||
, prepare_statement_fields/0
|
||||
]).
|
||||
|
||||
-export([ ip_port_to_string/1
|
||||
|
@ -67,6 +68,15 @@ relational_db_fields() ->
|
|||
, {auto_reconnect, fun auto_reconnect/1}
|
||||
].
|
||||
|
||||
prepare_statement_fields() ->
|
||||
[ {prepare_statement, fun prepare_statement/1}
|
||||
].
|
||||
|
||||
prepare_statement(type) -> map();
|
||||
prepare_statement(desc) -> ?DESC("prepare_statement");
|
||||
prepare_statement(required) -> false;
|
||||
prepare_statement(_) -> undefined.
|
||||
|
||||
database(type) -> binary();
|
||||
database(desc) -> ?DESC("database_desc");
|
||||
database(required) -> true;
|
||||
|
|
|
@ -287,9 +287,8 @@ format(_Node, Info = #{memory_total := Total, memory_used := Used}) ->
|
|||
case log_path() of
|
||||
undefined ->
|
||||
<<"not found">>;
|
||||
Path0 ->
|
||||
Path = list_to_binary(Path0),
|
||||
<<SysPath/binary, Path/binary>>
|
||||
Path ->
|
||||
filename:join(SysPath, Path)
|
||||
end,
|
||||
Info#{
|
||||
memory_total := emqx_mgmt_util:kmg(Total),
|
||||
|
|
Loading…
Reference in New Issue