fix(authn): add timeout option for mysql connector

This commit is contained in:
zhouzb 2021-09-23 16:40:33 +08:00
parent e737f18548
commit e31840d943
8 changed files with 82 additions and 106 deletions

View File

@ -303,8 +303,8 @@ authenticate(#{listener := Listener, protocol := Protocol} = Credential, _AuthRe
do_authenticate([], _) -> do_authenticate([], _) ->
{stop, {error, not_authorized}}; {stop, {error, not_authorized}};
do_authenticate([#authenticator{provider = Provider, state = State} | More], Credential) -> do_authenticate([#authenticator{provider = Provider, state = #{'_unique' := Unique} = State} | More], Credential) ->
case Provider:authenticate(Credential, State) of try Provider:authenticate(Credential, State) of
ignore -> ignore ->
do_authenticate(More, Credential); do_authenticate(More, Credential);
Result -> Result ->
@ -314,6 +314,10 @@ do_authenticate([#authenticator{provider = Provider, state = State} | More], Cre
%% {continue, AuthData, AuthCache} %% {continue, AuthData, AuthCache}
%% {error, Reason} %% {error, Reason}
{stop, Result} {stop, Result}
catch
error:Reason:Stacktrace ->
?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, {Reason, Stacktrace}]),
do_authenticate(More, Credential)
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -156,7 +156,6 @@ authenticate(#{auth_method := _}, _) ->
authenticate(Credential, #{'_unique' := Unique, authenticate(Credential, #{'_unique' := Unique,
method := Method, method := Method,
request_timeout := RequestTimeout} = State) -> request_timeout := RequestTimeout} = State) ->
try
Request = generate_request(Credential, State), Request = generate_request(Credential, State),
case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of case emqx_resource:query(Unique, {Method, Request, RequestTimeout}) of
{ok, 204, _Headers} -> {ok, #{is_superuser => false}}; {ok, 204, _Headers} -> {ok, #{is_superuser => false}};
@ -172,11 +171,6 @@ authenticate(Credential, #{'_unique' := Unique,
end; end;
{error, _Reason} -> {error, _Reason} ->
ignore ignore
end
catch
error:Reason ->
?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Reason]),
ignore
end. end.
destroy(#{'_unique' := Unique}) -> destroy(#{'_unique' := Unique}) ->

View File

@ -141,7 +141,6 @@ authenticate(#{password := Password} = Credential,
, selector := Selector0 , selector := Selector0
, '_unique' := Unique , '_unique' := Unique
} = State) -> } = State) ->
try
Selector1 = replace_placeholders(Selector0, Credential), Selector1 = replace_placeholders(Selector0, Credential),
Selector2 = normalize_selector(Selector1), Selector2 = normalize_selector(Selector1),
case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of case emqx_resource:query(Unique, {find_one, Collection, Selector2, #{}}) of
@ -159,11 +158,6 @@ authenticate(#{password := Password} = Credential,
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end end
end
catch
error:Error ->
?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
ignore
end. end.
destroy(#{'_unique' := Unique}) -> destroy(#{'_unique' := Unique}) ->

View File

@ -114,7 +114,6 @@ authenticate(#{password := Password} = Credential,
query := Query, query := Query,
query_timeout := Timeout, query_timeout := Timeout,
'_unique' := Unique} = State) -> '_unique' := Unique} = State) ->
try
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of case emqx_resource:query(Unique, {sql, Query, Params, Timeout}) of
{ok, _Columns, []} -> ignore; {ok, _Columns, []} -> ignore;
@ -128,11 +127,6 @@ authenticate(#{password := Password} = Credential,
end; end;
{error, _Reason} -> {error, _Reason} ->
ignore ignore
end
catch
error:Error ->
?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
ignore
end. end.
destroy(#{'_unique' := Unique}) -> destroy(#{'_unique' := Unique}) ->

View File

@ -103,7 +103,6 @@ authenticate(#{password := Password} = Credential,
#{query := Query, #{query := Query,
placeholders := PlaceHolders, placeholders := PlaceHolders,
'_unique' := Unique} = State) -> '_unique' := Unique} = State) ->
try
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential), Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
case emqx_resource:query(Unique, {sql, Query, Params}) of case emqx_resource:query(Unique, {sql, Query, Params}) of
{ok, _Columns, []} -> ignore; {ok, _Columns, []} -> ignore;
@ -118,11 +117,6 @@ authenticate(#{password := Password} = Credential,
end; end;
{error, _Reason} -> {error, _Reason} ->
ignore ignore
end
catch
error:Error ->
?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, Error]),
ignore
end. end.
destroy(#{'_unique' := Unique}) -> destroy(#{'_unique' := Unique}) ->

View File

@ -127,7 +127,6 @@ authenticate(#{password := Password} = Credential,
#{ query := {Command, Key, Fields} #{ query := {Command, Key, Fields}
, '_unique' := Unique , '_unique' := Unique
} = State) -> } = State) ->
try
NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))), NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))),
case emqx_resource:query(Unique, {cmd, [Command, NKey | Fields]}) of case emqx_resource:query(Unique, {cmd, [Command, NKey | Fields]}) of
{ok, Values} -> {ok, Values} ->
@ -141,11 +140,6 @@ authenticate(#{password := Password} = Credential,
{error, Reason} -> {error, Reason} ->
?LOG(error, "['~s'] Query failed: ~p", [Unique, Reason]), ?LOG(error, "['~s'] Query failed: ~p", [Unique, Reason]),
ignore ignore
end
catch
error:{cannot_get_variable, Placeholder} ->
?LOG(warning, "The following error occurred in '~s' during authentication: ~p", [Unique, {cannot_get_variable, Placeholder}]),
ignore
end. end.
destroy(#{'_unique' := Unique}) -> destroy(#{'_unique' := Unique}) ->

View File

@ -76,11 +76,13 @@ on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping mysql connector: ~p", [InstId]), logger:info("stopping mysql connector: ~p", [InstId]),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); on_query(InstId, {sql, SQL, [], default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, Params, default_timeout}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params, Timeout}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), logger:debug("mysql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]),
case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {mysql, query, [SQL, Params, Timeout]}, no_handover) of
{error, Reason} -> {error, Reason} ->
logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]), logger:debug("mysql connector ~p do sql query failed, sql: ~p, reason: ~p", [InstId, SQL, Reason]),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);

View File

@ -76,8 +76,8 @@ on_stop(InstId, #{poolname := PoolName}) ->
logger:info("stopping postgresql connector: ~p", [InstId]), logger:info("stopping postgresql connector: ~p", [InstId]),
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL}, AfterQuery, #{poolname := _PoolName} = State) ->
on_query(InstId, {sql, SQL, []}, AfterQuery, #{poolname := PoolName} = State); on_query(InstId, {sql, SQL, []}, AfterQuery, State);
on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) -> on_query(InstId, {sql, SQL, Params}, AfterQuery, #{poolname := PoolName} = State) ->
logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]), logger:debug("postgresql connector ~p received sql query: ~p, at state: ~p", [InstId, SQL, State]),
case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of case Result = ecpool:pick_and_do(PoolName, {?MODULE, query, [SQL, Params]}, no_handover) of