diff --git a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl index 5c2342559..44dab934c 100644 --- a/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl +++ b/apps/emqx_auth_mongo/src/emqx_auth_mongo.erl @@ -203,16 +203,51 @@ query(Pool, Collection, Selector) -> end. query_multi(Pool, Collection, SelectorList) -> + ?tp(emqx_auth_mongo_query_multi_enter, #{}), + Timeout = timer:seconds(45), lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) -> - Batch = ecpool:with_client(Pool, fun(Conn) -> - case mongo_api:find(Conn, Collection, Selector, #{}) of - {error, Reason} -> - ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), - []; - [] -> []; - {ok, Cursor} -> - mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000) - end - end), - [Batch|Acc1] + Res = + with_timeout(Timeout, fun() -> + ecpool:with_client(Pool, fun(Conn) -> + ?tp(emqx_auth_mongo_query_multi_find_selector, #{}), + case find(Conn, Collection, Selector) of + {error, Reason} -> + ?tp(emqx_auth_mongo_query_multi_error, + #{error => Reason}), + ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), + []; + [] -> + ?tp(emqx_auth_mongo_query_multi_no_results, #{}), + []; + {ok, Cursor} -> + mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000) + end + end) + end), + case Res of + {error, timeout} -> + ?tp(emqx_auth_mongo_query_multi_error, #{error => timeout}), + ?LOG(error, "[MongoDB] query_multi timeout", []), + Acc1; + Batch -> + [Batch | Acc1] + end end, [], SelectorList))). + +find(Conn, Collection, Selector) -> + try + mongo_api:find(Conn, Collection, Selector, #{}) + catch + K:E:S -> + {error, {K, E, S}} + end. + +with_timeout(Timeout, Fun) -> + try + emqx_misc:nolink_apply(Fun, Timeout) + catch + exit:timeout -> + {error, timeout}; + K:E:S -> + erlang:raise(K, E, S) + end. diff --git a/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl b/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl index d27d46971..7a7f98a3d 100644 --- a/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl +++ b/apps/emqx_auth_mongo/test/emqx_auth_mongo_SUITE.erl @@ -95,6 +95,8 @@ all() -> resilience_tests() -> [ t_acl_superuser_no_connection + , t_available_acl_query_no_connection + , t_available_acl_query_timeout , t_authn_no_connection , t_available ]. @@ -105,7 +107,6 @@ groups() -> init_per_suite(Config) -> emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1), - init_mongo_data(), %% avoid inter-suite flakiness ok = emqx_mod_acl_internal:unload([]), Config. @@ -167,17 +168,27 @@ init_per_testcase(t_authn_full_selector_variables, Config) -> , {<<"distinguished_name">>, <<"%d">>} ], reload({auth_query, [{selector, Selector}]}), + init_mongo_data(), [ {original_selector, OriginalSelector} , {selector, Selector} | Config]; init_per_testcase(_TestCase, Config) -> + init_mongo_data(), Config. end_per_testcase(t_authn_full_selector_variables, Config) -> OriginalSelector = ?config(original_selector, Config), reload({auth_query, [{selector, OriginalSelector}]}), + deinit_mongo_data(), + ok; +end_per_testcase(t_available_acl_query_timeout, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + reset_proxy(ProxyHost, ProxyPort), + deinit_mongo_data(), ok; end_per_testcase(_TestCase, _Config) -> + deinit_mongo_data(), ok. init_mongo_data() -> @@ -198,6 +209,10 @@ deinit_mongo_data() -> %% Test cases %%-------------------------------------------------------------------- +%% for full coverage ;-) +t_description(_Config) -> + ?assert(is_list(emqx_auth_mongo:description())). + t_check_auth(_) -> Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>}, Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>}, @@ -325,8 +340,6 @@ t_available(Config) -> ?assertEqual( {error, {mongo_error, 2}}, emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)), - %% some error; - todo, %% exception. ?assertMatch( {error, _}, @@ -408,6 +421,15 @@ t_authn_no_connection(Config) -> ok. +%% tests query_multi failure +t_available_acl_query_no_connection(Config) -> + test_acl_query_failure(down, Config). + +%% ensure query_multi has a timeout +t_available_acl_query_timeout(Config) -> + ct:timetrap(90000), + test_acl_query_failure(timeout, Config). + t_acl_superuser_no_connection(Config) -> ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), @@ -460,6 +482,41 @@ t_acl_superuser_no_connection(Config) -> %% Utils %%-------------------------------------------------------------------- +test_acl_query_failure(FailureType, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ACLQuery = aclquery(), + + ?check_trace( + try + ?force_ordering( + #{?snk_kind := emqx_auth_mongo_query_multi_enter}, + #{?snk_kind := connection_will_cut} + ), + ?force_ordering( + #{?snk_kind := connection_cut}, + #{?snk_kind := emqx_auth_mongo_query_multi_find_selector} + ), + spawn(fun() -> + ?tp(connection_will_cut, #{}), + enable_failure(FailureType, ProxyHost, ProxyPort), + ?tp(connection_cut, #{}) + end), + Pool = ?APP, + %% query_multi returns an empty list even on failures. + ?assertMatch(ok, emqx_auth_mongo:available(Pool, ACLQuery)), + ok + after + heal_failure(FailureType, ProxyHost, ProxyPort) + end, + fun(Trace) -> + ?assertMatch( + [#{?snk_kind := emqx_auth_mongo_query_multi_error , error := _}], + ?of_kind(emqx_auth_mongo_query_multi_error, Trace)) + end), + + ok. + reload({Par, Vals}) when is_list(Vals) -> application:stop(?APP), {ok, TupleVals} = application:get_env(?APP, Par), @@ -476,6 +533,9 @@ reload({Par, Vals}) when is_list(Vals) -> superquery() -> emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end). +aclquery() -> + emqx_auth_mongo_app:with_env(acl_query, fun(SQ) -> SQ end). + %% TODO: any easier way to make mongo return a map with an error code??? error_code_query(Pool, Collection, Selector) -> %% should be a query; this is to provoke an error return from