Merge pull request #8994 from thalesmg/mongo-coverage-43

test: increase mongo integration test coverage (4.3)
This commit is contained in:
Thales Macedo Garitezi 2022-09-23 13:23:22 -03:00 committed by GitHub
commit dbf3a3cee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 847 additions and 55 deletions

View File

@ -1,5 +1,5 @@
EMQX_AUTH__LDAP__SERVERS=ldap_server EMQX_AUTH__LDAP__SERVERS=ldap_server
EMQX_AUTH__MONGO__SERVER=mongo_server:27017 EMQX_AUTH__MONGO__SERVER=toxiproxy:27017
EMQX_AUTH__MYSQL__SERVER=mysql_server:3306 EMQX_AUTH__MYSQL__SERVER=mysql_server:3306
EMQX_AUTH__MYSQL__USERNAME=root EMQX_AUTH__MYSQL__USERNAME=root
EMQX_AUTH__MYSQL__PASSWORD=public EMQX_AUTH__MYSQL__PASSWORD=public

View File

@ -0,0 +1,16 @@
version: '3.9'
services:
toxiproxy:
container_name: toxiproxy
image: ghcr.io/shopify/toxiproxy:2.5.0
restart: always
networks:
- emqx_bridge
volumes:
- "./toxiproxy.json:/config/toxiproxy.json"
ports:
- 8474:8474
command:
- "-host=0.0.0.0"
- "-config=/config/toxiproxy.json"

View File

@ -0,0 +1,8 @@
[
{
"name": "mongo",
"listen": "0.0.0.0:27017",
"upstream": "mongo:27017",
"enabled": true
}
]

View File

@ -82,6 +82,7 @@ jobs:
- name: docker-compose up - name: docker-compose up
run: | run: |
docker-compose \ docker-compose \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-${{ matrix.connect_type }}.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-${{ matrix.connect_type }}.yaml \
-f .ci/docker-compose-file/docker-compose.yaml \ -f .ci/docker-compose-file/docker-compose.yaml \
up -d --build up -d --build

View File

@ -54,6 +54,7 @@ jobs:
run: | run: |
docker-compose \ docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \ -f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
@ -81,6 +82,7 @@ jobs:
run: | run: |
docker-compose \ docker-compose \
-f .ci/docker-compose-file/docker-compose.yaml \ -f .ci/docker-compose-file/docker-compose.yaml \
-f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \
-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-tcp.yaml \
-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \

View File

@ -408,7 +408,7 @@ t_password_hash(_) ->
ok = application:start(emqx_auth_mnesia). ok = application:start(emqx_auth_mnesia).
t_will_message_connection_denied(Config) when is_list(Config) -> t_will_message_connection_denied(Config) when is_list(Config) ->
ClientId = Username = <<"subscriber">>, ClientId = <<"subscriber">>,
Password = <<"p">>, Password = <<"p">>,
application:stop(emqx_auth_mnesia), application:stop(emqx_auth_mnesia),
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]), ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia]),

View File

@ -79,4 +79,3 @@ feedvar(Str, Var, Val) ->
re:replace(Str, Var, Val, [global, {return, binary}]). re:replace(Str, Var, Val, [global, {return, binary}]).
description() -> "ACL with MongoDB". description() -> "ACL with MongoDB".

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mongo, {application, emqx_auth_mongo,
[{description, "EMQ X Authentication/ACL with MongoDB"}, [{description, "EMQ X Authentication/ACL with MongoDB"},
{vsn, "4.3.4"}, % strict semver, bump manually! {vsn, "4.3.5"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_mongo_sup]}, {registered, [emqx_auth_mongo_sup]},
{applications, [kernel,stdlib,mongodb,ecpool]}, {applications, [kernel,stdlib,mongodb,ecpool]},

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{<<"4\\.3\\.[1-3]">>, [{"4.3.4",[{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-3]">>,
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
@ -9,7 +10,8 @@
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]}, {load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[1-3]">>, [{"4.3.4",[{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-3]">>,
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]}, [{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]}, {load_module,emqx_auth_mongo,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",

View File

@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ check/3 -export([ check/3
, description/0 , description/0
@ -38,14 +39,22 @@
, available/3 , available/3
]). ]).
-ifdef(TEST).
-export([ is_superuser/3
, available/4
]).
-endif.
check(ClientInfo = #{password := Password}, AuthResult, check(ClientInfo = #{password := Password}, AuthResult,
Env = #{authquery := AuthQuery, superquery := SuperQuery}) -> Env = #{authquery := AuthQuery, superquery := SuperQuery}) ->
?tp(emqx_auth_mongo_superuser_check_authn_enter, #{}),
#authquery{collection = Collection, field = Fields, #authquery{collection = Collection, field = Fields,
hash = HashType, selector = Selector} = AuthQuery, hash = HashType, selector = Selector} = AuthQuery,
Pool = maps:get(pool, Env, ?APP), Pool = maps:get(pool, Env, ?APP),
case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of case query(Pool, Collection, maps:from_list(replvars(Selector, ClientInfo))) of
undefined -> ok; undefined -> ok;
{error, Reason} -> {error, Reason} ->
?tp(emqx_auth_mongo_check_authn_error, #{error => Reason}),
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
{stop, AuthResult#{auth_result => not_authorized, anonymous => false}}; {stop, AuthResult#{auth_result => not_authorized, anonymous => false}};
UserMap -> UserMap ->
@ -58,6 +67,7 @@ check(ClientInfo = #{password := Password}, AuthResult,
end, end,
case Result of case Result of
ok -> ok ->
?tp(emqx_auth_mongo_superuser_check_authn_ok, #{}),
{stop, AuthResult#{is_superuser => is_superuser(Pool, SuperQuery, ClientInfo), {stop, AuthResult#{is_superuser => is_superuser(Pool, SuperQuery, ClientInfo),
anonymous => false, anonymous => false,
auth_result => success}}; auth_result => success}};
@ -81,17 +91,24 @@ description() -> "Authentication with MongoDB".
is_superuser(_Pool, undefined, _ClientInfo) -> is_superuser(_Pool, undefined, _ClientInfo) ->
false; false;
is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) -> is_superuser(Pool, #superquery{collection = Coll, field = Field, selector = Selector}, ClientInfo) ->
case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of ?tp(emqx_auth_mongo_superuser_query_enter, #{}),
undefined -> false; Res =
{error, Reason} -> case query(Pool, Coll, maps:from_list(replvars(Selector, ClientInfo))) of
?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]), undefined ->
false; %% returned when there are no returned documents
Row -> false;
case maps:get(Field, Row, false) of {error, Reason} ->
true -> true; ?tp(emqx_auth_mongo_superuser_query_error, #{error => Reason}),
_False -> false ?LOG(error, "[MongoDB] Can't connect to MongoDB server: ~0p", [Reason]),
end false;
end. Row ->
case maps:get(Field, Row, false) of
true -> true;
_False -> false
end
end,
?tp(emqx_auth_mongo_superuser_query_result, #{is_superuser => Res}),
Res.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Availability Test %% Availability Test
@ -114,6 +131,7 @@ available(Pool, Collection, Query) ->
available(Pool, Collection, Query, Fun) -> available(Pool, Collection, Query, Fun) ->
try Fun(Pool, Collection, Query) of try Fun(Pool, Collection, Query) of
{error, Reason} -> {error, Reason} ->
?tp(emqx_auth_mongo_available_error, #{error => Reason}),
?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]), ?LOG(error, "[MongoDB] ~p availability test error: ~0p", [Collection, Reason]),
{error, Reason}; {error, Reason};
Error = #{<<"code">> := Code} -> Error = #{<<"code">> := Code} ->
@ -144,7 +162,16 @@ test_client_info() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
replvars(VarList, ClientInfo) -> replvars(VarList, ClientInfo) ->
lists:map(fun(Var) -> replvar(Var, ClientInfo) end, VarList). lists:foldl(
fun(Var, Selector) ->
case replvar(Var, ClientInfo) of
%% assumes that all fields are binaries...
{unmatchable, Field} -> [{Field, []} | Selector];
Interpolated -> [Interpolated | Selector]
end
end,
[],
VarList).
replvar({Field, <<"%u">>}, #{username := Username}) -> replvar({Field, <<"%u">>}, #{username := Username}) ->
{Field, Username}; {Field, Username};
@ -154,8 +181,8 @@ replvar({Field, <<"%C">>}, #{cn := CN}) ->
{Field, CN}; {Field, CN};
replvar({Field, <<"%d">>}, #{dn := DN}) -> replvar({Field, <<"%d">>}, #{dn := DN}) ->
{Field, DN}; {Field, DN};
replvar(Selector, _ClientInfo) -> replvar({Field, _PlaceHolder}, _ClientInfo) ->
Selector. {unmatchable, Field}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MongoDB Connect/Query %% MongoDB Connect/Query
@ -169,19 +196,57 @@ connect(Opts) ->
mongo_api:connect(Type, Hosts, Options, WorkerOptions). mongo_api:connect(Type, Hosts, Options, WorkerOptions).
query(Pool, Collection, Selector) -> query(Pool, Collection, Selector) ->
ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end). Timeout = timer:seconds(15),
with_timeout(Timeout, fun() ->
ecpool:with_client(Pool, fun(Conn) -> mongo_api:find_one(Conn, Collection, Selector, #{}) end)
end).
query_multi(Pool, Collection, SelectorList) -> query_multi(Pool, Collection, SelectorList) ->
?tp(emqx_auth_mongo_query_multi_enter, #{}),
Timeout = timer:seconds(45),
lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) -> lists:reverse(lists:flatten(lists:foldl(fun(Selector, Acc1) ->
Batch = ecpool:with_client(Pool, fun(Conn) -> Res =
case mongo_api:find(Conn, Collection, Selector, #{}) of with_timeout(Timeout, fun() ->
{error, Reason} -> ecpool:with_client(Pool, fun(Conn) ->
?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]), ?tp(emqx_auth_mongo_query_multi_find_selector, #{}),
[]; case find(Conn, Collection, Selector) of
[] -> []; {error, Reason} ->
{ok, Cursor} -> ?tp(emqx_auth_mongo_query_multi_error,
mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000) #{error => Reason}),
end ?LOG(error, "[MongoDB] query_multi failed, got error: ~p", [Reason]),
end), [];
[Batch|Acc1] [] ->
?tp(emqx_auth_mongo_query_multi_no_results, #{}),
[];
{ok, Cursor} ->
mc_cursor:foldl(fun(O, Acc2) -> [O | Acc2] end, [], Cursor, 1000)
end
end)
end),
case Res of
{error, timeout} ->
?tp(emqx_auth_mongo_query_multi_error, #{error => timeout}),
?LOG(error, "[MongoDB] query_multi timeout", []),
Acc1;
Batch ->
[Batch | Acc1]
end
end, [], SelectorList))). end, [], SelectorList))).
find(Conn, Collection, Selector) ->
try
mongo_api:find(Conn, Collection, Selector, #{})
catch
K:E:S ->
{error, {K, E, S}}
end.
with_timeout(Timeout, Fun) ->
try
emqx_misc:nolink_apply(Fun, Timeout)
catch
exit:timeout ->
{error, timeout};
K:E:S ->
erlang:raise(K, E, S)
end.

View File

@ -30,6 +30,10 @@
, stop/1 , stop/1
]). ]).
-ifdef(TEST).
-export([with_env/2]).
-endif.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Application callbacks %% Application callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -19,42 +19,98 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_auth_mongo.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(APP, emqx_auth_mongo).
-define(POOL(App), ecpool_worker:client(gproc_pool:pick_worker({ecpool, App}))). -define(POOL(App), ecpool_worker:client(gproc_pool:pick_worker({ecpool, App}))).
-define(MONGO_CL_ACL, <<"mqtt_acl">>). -define(MONGO_CL_ACL, <<"mqtt_acl">>).
-define(MONGO_CL_USER, <<"mqtt_user">>). -define(MONGO_CL_USER, <<"mqtt_user">>).
-define(INIT_ACL, [{<<"username">>, <<"testuser">>, <<"clientid">>, <<"null">>, <<"subscribe">>, [<<"#">>]}, -define(INIT_ACL, [ { <<"username">>, <<"testuser">>
{<<"username">>, <<"dashboard">>, <<"clientid">>, <<"null">>, <<"pubsub">>, [<<"$SYS/#">>]}, , <<"clientid">>, <<"null">>
{<<"username">>, <<"user3">>, <<"clientid">>, <<"null">>, <<"publish">>, [<<"a/b/c">>]}]). , <<"subscribe">>, [<<"#">>]
}
, { <<"username">>, <<"dashboard">>
, <<"clientid">>, <<"null">>
, <<"pubsub">>, [<<"$SYS/#">>]
}
, { <<"username">>, <<"user3">>
, <<"clientid">>, <<"null">>
, <<"publish">>, [<<"a/b/c">>]
}
]).
-define(INIT_AUTH, [{<<"username">>, <<"plain">>, <<"password">>, <<"plain">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, true}, -define(INIT_AUTH, [ { <<"username">>, <<"plain">>
{<<"username">>, <<"md5">>, <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, , <<"password">>, <<"plain">>
{<<"username">>, <<"sha">>, <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, , <<"salt">>, <<"salt">>
{<<"username">>, <<"sha256">>, <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">>, <<"salt">>, <<"salt">>, <<"is_superuser">>, false}, , <<"is_superuser">>, true
{<<"username">>, <<"pbkdf2_password">>, <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">>, <<"salt">>, <<"ATHENA.MIT.EDUraeburn">>, <<"is_superuser">>, false}, }
{<<"username">>, <<"bcrypt_foo">>, <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">>, <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">>, <<"is_superuser">>, false} , { <<"username">>, <<"md5">>
]). , <<"password">>, <<"1bc29b36f623ba82aaf6724fd3b16718">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"sha">>
, <<"password">>, <<"d8f4590320e1343a915b6394170650a8f35d6926">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"sha256">>
, <<"password">>, <<"5d5b09f6dcb2d53a5fffc60c4ac0d55fabdf556069d6631545f42aa6e3500f2e">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"pbkdf2_password">>
, <<"password">>, <<"cdedb5281bb2f801565a1122b2563515">>
, <<"salt">>, <<"ATHENA.MIT.EDUraeburn">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"bcrypt_foo">>
, <<"password">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.HbUIOdlQI0iS22Q5rd5z.JVVYH6sfm6">>
, <<"salt">>, <<"$2a$12$sSS8Eg.ovVzaHzi1nUHYK.">>
, <<"is_superuser">>, false
}
, { <<"username">>, <<"user_full">>
, <<"clientid">>, <<"client_full">>
, <<"common_name">>, <<"cn_full">>
, <<"distinguished_name">>, <<"dn_full">>
, <<"password">>, <<"plain">>
, <<"salt">>, <<"salt">>
, <<"is_superuser">>, false
}
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
all() -> all() ->
emqx_ct:all(?MODULE). OtherTCs = emqx_ct:all(?MODULE) -- resilience_tests(),
[ {group, resilience}
| OtherTCs].
init_per_suite(Cfg) -> resilience_tests() ->
[ t_acl_superuser_timeout
, t_available_acl_query_no_connection
, t_available_acl_query_timeout
, t_available_authn_query_timeout
, t_authn_timeout
, t_available
].
groups() ->
[ {resilience, resilience_tests()}
].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1), emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1),
init_mongo_data(),
%% avoid inter-suite flakiness %% avoid inter-suite flakiness
ok = emqx_mod_acl_internal:unload([]), ok = emqx_mod_acl_internal:unload([]),
Cfg. Config.
end_per_suite(_Cfg) -> end_per_suite(_Cfg) ->
deinit_mongo_data(), deinit_mongo_data(),
@ -69,6 +125,81 @@ set_special_confs(emqx) ->
set_special_confs(_App) -> set_special_confs(_App) ->
ok. ok.
init_per_group(resilience, Config) ->
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPortStr = os:getenv("PROXY_PORT", "8474"),
ProxyPort = list_to_integer(ProxyPortStr),
reset_proxy(ProxyHost, ProxyPort),
ProxyServer = ProxyHost ++ ":27017",
{ok, OriginalServer} = application:get_env(emqx_auth_mongo, server),
OriginalServerMap = maps:from_list(OriginalServer),
NewServerMap = OriginalServerMap#{hosts => [ProxyServer]},
NewServer = maps:to_list(NewServerMap),
emqx_ct_helpers:stop_apps([emqx_auth_mongo]),
Handler =
fun(App = emqx_auth_mongo) ->
application:set_env(emqx_auth_mongo, server, NewServer),
set_special_confs(App);
(App)->
set_special_confs(App)
end,
emqx_ct_helpers:start_apps([emqx_auth_mongo], Handler),
[ {original_server, OriginalServer}
, {proxy_host, ProxyHost}
, {proxy_port, ProxyPort}
| Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(resilience, Config) ->
OriginalServer = ?config(original_server, Config),
application:set_env(emqx_auth_mongo, server, OriginalServer),
emqx_ct_helpers:stop_apps([emqx_auth_mongo]),
emqx_ct_helpers:start_apps([emqx_auth_mongo], fun set_special_confs/1),
ok;
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_authn_full_selector_variables, Config) ->
{ok, AuthQuery} = application:get_env(emqx_auth_mongo, auth_query),
OriginalSelector = proplists:get_value(selector, AuthQuery),
Selector = [ {<<"username">>, <<"%u">>}
, {<<"clientid">>, <<"%c">>}
, {<<"common_name">>, <<"%C">>}
, {<<"distinguished_name">>, <<"%d">>}
],
reload({auth_query, [{selector, Selector}]}),
init_mongo_data(),
[ {original_selector, OriginalSelector}
, {selector, Selector}
| Config];
init_per_testcase(_TestCase, Config) ->
init_mongo_data(),
Config.
end_per_testcase(t_authn_full_selector_variables, Config) ->
OriginalSelector = ?config(original_selector, Config),
reload({auth_query, [{selector, OriginalSelector}]}),
deinit_mongo_data(),
ok;
end_per_testcase(TestCase, Config)
when TestCase =:= t_available_acl_query_timeout;
TestCase =:= t_acl_superuser_timeout;
TestCase =:= t_authn_no_connection;
TestCase =:= t_available_acl_query_no_connection ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
reset_proxy(ProxyHost, ProxyPort),
%% force restart of clients because CI tends to get stuck...
application:stop(emqx_auth_mongo),
application:start(emqx_auth_mongo),
wait_for_stabilization(#{attempts => 10, interval_ms => 500}),
deinit_mongo_data(),
ok;
end_per_testcase(_TestCase, _Config) ->
deinit_mongo_data(),
ok.
init_mongo_data() -> init_mongo_data() ->
%% Users %% Users
{ok, Connection} = ?POOL(?APP), {ok, Connection} = ?POOL(?APP),
@ -87,6 +218,14 @@ deinit_mongo_data() ->
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% for full coverage ;-)
t_authn_description(_Config) ->
?assert(is_list(emqx_auth_mongo:description())).
%% for full coverage ;-)
t_acl_description(_Config) ->
?assert(is_list(emqx_acl_mongo:description())).
t_check_auth(_) -> t_check_auth(_) ->
Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>}, Plain = #{zone => external, clientid => <<"client1">>, username => <<"plain">>},
Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>}, Plain1 = #{zone => external, clientid => <<"client1">>, username => <<"plain2">>},
@ -116,7 +255,124 @@ t_check_auth(_) ->
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}), {ok, #{is_superuser := false}} = emqx_access_control:authenticate(Pbkdf2#{password => <<"password">>}),
reload({auth_query, [{password_hash, {salt, bcrypt}}]}), reload({auth_query, [{password_hash, {salt, bcrypt}}]}),
{ok, #{is_superuser := false}} = emqx_access_control:authenticate(Bcrypt#{password => <<"foo">>}), {ok, #{is_superuser := false}} = emqx_access_control:authenticate(Bcrypt#{password => <<"foo">>}),
{error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}). {error, _} = emqx_access_control:authenticate(User1#{password => <<"foo">>}),
%% bad field config
reload({auth_query, [{password_field, [<<"bad_field">>]}]}),
?assertEqual({error, password_error},
emqx_access_control:authenticate(Plain#{password => <<"plain">>})),
%% unknown username
Unknown = #{zone => unknown, clientid => <<"?">>, username => <<"?">>, password => <<"">>},
?assertEqual({error, not_authorized}, emqx_access_control:authenticate(Unknown)),
ok.
t_authn_full_selector_variables(Config) ->
Selector = ?config(selector, Config),
ClientInfo = #{ zone => external
, clientid => <<"client_full">>
, username => <<"user_full">>
, cn => <<"cn_full">>
, dn => <<"dn_full">>
, password => <<"plain">>
},
?assertMatch({ok, _}, emqx_access_control:authenticate(ClientInfo)),
EnvFields = [ clientid
, username
, cn
, dn
],
lists:foreach(
fun(Field) ->
UnauthorizedClientInfo = ClientInfo#{Field => <<"wrong">>},
?assertEqual({error, not_authorized},
emqx_access_control:authenticate(UnauthorizedClientInfo),
#{ field => Field
, client_info => UnauthorizedClientInfo
, selector => Selector
})
end,
EnvFields),
ok.
t_authn_interpolation_no_info(_Config) ->
Valid = #{zone => external, clientid => <<"client1">>,
username => <<"plain">>, password => <<"plain">>},
?assertMatch({ok, _}, emqx_access_control:authenticate(Valid)),
try
%% has values that are equal to placeholders
InterpolationUser = #{ <<"username">> => <<"%u">>
, <<"password">> => <<"plain">>
, <<"salt">> => <<"salt">>
, <<"is_superuser">> => true
},
{ok, Conn} = ?POOL(?APP),
{{true, _}, _} = mongo_api:insert(Conn, ?MONGO_CL_USER, InterpolationUser),
Invalid = maps:without([username], Valid),
?assertMatch({error, not_authorized}, emqx_access_control:authenticate(Invalid))
after
deinit_mongo_data(),
init_mongo_data()
end.
%% authenticates, but superquery returns no documents
t_authn_empty_is_superuser_collection(_Config) ->
{ok, SuperQuery} = application:get_env(emqx_auth_mongo, super_query),
Collection = list_to_binary(proplists:get_value(collection, SuperQuery)),
reload({auth_query, [{password_hash, plain}]}),
Plain = #{zone => external, clientid => <<"client1">>,
username => <<"plain">>, password => <<"plain">>},
ok = snabbkaffe:start_trace(),
?force_ordering(
#{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok},
#{?snk_kind := truncate_coll_enter}),
?force_ordering(
#{?snk_kind := truncate_coll_done},
#{?snk_kind := emqx_auth_mongo_superuser_query_enter}),
try
spawn_link(fun() ->
?tp(truncate_coll_enter, #{}),
{ok, Conn} = ?POOL(?APP),
{true, _} = mongo_api:delete(Conn, Collection, _Selector = #{}),
?tp(truncate_coll_done, #{})
end),
?assertMatch({ok, #{is_superuser := false}}, emqx_access_control:authenticate(Plain)),
ok = snabbkaffe:stop(),
ok
after
init_mongo_data()
end.
t_available(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
Pool = ?APP,
SuperQuery = #superquery{collection = SuperCollection} = superquery(),
%% success;
?assertEqual(ok, emqx_auth_mongo:available(Pool, SuperQuery)),
%% error with code;
EmptySelector = #{},
?assertEqual(
{error, {mongo_error, 2}},
emqx_auth_mongo:available(Pool, SuperCollection, EmptySelector, fun error_code_query/3)),
%% exception (in query)
?assertMatch(
{error, _},
with_failure(down, ProxyHost, ProxyPort,
fun() ->
Collection = <<"mqtt_user">>,
Selector = #{},
emqx_auth_mongo:available(Pool, Collection, Selector)
end)),
%% exception (arbitrary function)
?assertMatch(
{error, _},
with_failure(down, ProxyHost, ProxyPort,
fun() ->
Collection = <<"mqtt_user">>,
Selector = #{},
RaisingFun = fun(_, _, _) -> error(some_error) end,
emqx_auth_mongo:available(Pool, Collection, Selector, RaisingFun)
end)),
ok.
t_check_acl(_) -> t_check_acl(_) ->
{ok, Connection} = ?POOL(?APP), {ok, Connection} = ?POOL(?APP),
@ -132,7 +388,30 @@ t_check_acl(_) ->
allow = emqx_access_control:check_acl(User2, subscribe, <<"$SYS/testuser/1">>), allow = emqx_access_control:check_acl(User2, subscribe, <<"$SYS/testuser/1">>),
allow = emqx_access_control:check_acl(User3, publish, <<"a/b/c">>), allow = emqx_access_control:check_acl(User3, publish, <<"a/b/c">>),
deny = emqx_access_control:check_acl(User3, publish, <<"c">>), deny = emqx_access_control:check_acl(User3, publish, <<"c">>),
deny = emqx_access_control:check_acl(User4, publish, <<"a/b/c">>). deny = emqx_access_control:check_acl(User4, publish, <<"a/b/c">>),
%% undefined value to interpolate
User1Undef = User1#{clientid => undefined},
allow = emqx_access_control:check_acl(User1Undef, subscribe, <<"users/testuser/1">>),
ok.
t_acl_empty_results(_Config) ->
#aclquery{selector = Selector} = aclquery(),
User1 = #{zone => external, clientid => <<"client1">>, username => <<"testuser">>},
try
reload({acl_query, [{selector, []}]}),
?assertEqual(deny, emqx_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
ok
after
reload({acl_query, [{selector, Selector}]})
end,
ok.
t_acl_exception(_Config) ->
%% FIXME: is there a more authentic way to produce an exception in
%% `match'???
User1 = #{zone => external, clientid => not_a_binary, username => <<"testuser">>},
?assertEqual(deny, emqx_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
ok.
t_acl_super(_) -> t_acl_super(_) ->
reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}), reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}),
@ -155,10 +434,175 @@ t_acl_super(_) ->
end, end,
emqtt:disconnect(C). emqtt:disconnect(C).
%% apparently, if the config is undefined in `emqx_auth_mongo_app:r',
%% this is allowed...
t_is_superuser_undefined(_Config) ->
Pool = ClientInfo = unused_in_this_case,
SuperQuery = undefined,
?assertNot(emqx_auth_mongo:is_superuser(Pool, SuperQuery, ClientInfo)),
ok.
t_authn_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
FailureType = timeout,
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
{username, <<"plain">>},
{password, <<"plain">>}]),
unlink(C),
?check_trace(
try
enable_failure(FailureType, ProxyHost, ProxyPort),
{error, {unauthorized_client, _}} = emqtt:connect(C),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
%% fails with `{exit,{{{badmatch,{{error,closed},...'
?assertMatch([_], ?of_kind(emqx_auth_mongo_check_authn_error, Trace)),
ok
end),
ok.
%% tests query timeout failure
t_available_authn_query_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
FailureType = timeout,
SuperQuery = superquery(),
?check_trace(
#{timetrap => timer:seconds(60)},
try
enable_failure(FailureType, ProxyHost, ProxyPort),
Pool = ?APP,
%% query_multi returns an empty list even on failures.
?assertEqual({error, timeout}, emqx_auth_mongo:available(Pool, SuperQuery)),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
?assertMatch(
[#{?snk_kind := emqx_auth_mongo_available_error , error := _}],
?of_kind(emqx_auth_mongo_available_error, Trace))
end),
ok.
%% tests query_multi failure
t_available_acl_query_no_connection(Config) ->
test_acl_query_failure(down, Config).
%% ensure query_multi has a timeout
t_available_acl_query_timeout(Config) ->
ct:timetrap(90000),
test_acl_query_failure(timeout, Config).
%% checks that `with_timeout' lets unknown errors pass through
t_query_multi_unknown_exception(_Config) ->
ok = meck:new(ecpool, [no_link, no_history, non_strict, passthrough]),
ok = meck:expect(ecpool, with_client, fun(_, _) -> throw(some_error) end),
Pool = ?APP,
Collection = ?MONGO_CL_ACL,
SelectorList = [#{<<"username">> => <<"user">>}],
try
?assertThrow(some_error, emqx_auth_mongo:query_multi(Pool, Collection, SelectorList))
after
meck:unload(ecpool)
end.
t_acl_superuser_timeout(Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
FailureType = timeout,
reload({auth_query, [{password_hash, plain}, {password_field, [<<"password">>]}]}),
{ok, C} = emqtt:start_link([{clientid, <<"simpleClient">>},
{username, <<"plain">>},
{password, <<"plain">>}]),
unlink(C),
?check_trace(
try
?force_ordering(
#{?snk_kind := emqx_auth_mongo_superuser_check_authn_ok},
#{?snk_kind := connection_will_cut}
),
?force_ordering(
#{?snk_kind := connection_cut},
#{?snk_kind := emqx_auth_mongo_superuser_query_enter}
),
spawn(fun() ->
?tp(connection_will_cut, #{}),
enable_failure(FailureType, ProxyHost, ProxyPort),
?tp(connection_cut, #{})
end),
{ok, _} = emqtt:connect(C),
ok = emqtt:disconnect(C),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
?assertMatch(
[ #{ ?snk_kind := emqx_auth_mongo_superuser_query_error
, error := _
}
, #{ ?snk_kind := emqx_auth_mongo_superuser_query_result
, is_superuser := false
}
],
?of_kind([ emqx_auth_mongo_superuser_query_error
, emqx_auth_mongo_superuser_query_result
], Trace))
end),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Utils %% Utils
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
test_acl_query_failure(FailureType, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
ACLQuery = aclquery(),
?check_trace(
#{timetrap => timer:seconds(60)},
try
?force_ordering(
#{?snk_kind := emqx_auth_mongo_query_multi_enter},
#{?snk_kind := connection_will_cut}
),
?force_ordering(
#{?snk_kind := connection_cut},
#{?snk_kind := emqx_auth_mongo_query_multi_find_selector}
),
spawn(fun() ->
?tp(connection_will_cut, #{}),
enable_failure(FailureType, ProxyHost, ProxyPort),
?tp(connection_cut, #{})
end),
Pool = ?APP,
%% query_multi returns an empty list even on failures.
?assertMatch(ok, emqx_auth_mongo:available(Pool, ACLQuery)),
ok
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end,
fun(Trace) ->
?assertMatch(
[#{?snk_kind := emqx_auth_mongo_query_multi_error , error := _}],
?of_kind(emqx_auth_mongo_query_multi_error, Trace))
end),
ok.
reload({Par, Vals}) when is_list(Vals) -> reload({Par, Vals}) when is_list(Vals) ->
application:stop(?APP), application:stop(?APP),
{ok, TupleVals} = application:get_env(?APP, Par), {ok, TupleVals} = application:get_env(?APP, Par),
@ -171,3 +615,105 @@ reload({Par, Vals}) when is_list(Vals) ->
end, TupleVals), end, TupleVals),
application:set_env(?APP, Par, lists:append(NewVals, Vals)), application:set_env(?APP, Par, lists:append(NewVals, Vals)),
application:start(?APP). application:start(?APP).
superquery() ->
emqx_auth_mongo_app:with_env(super_query, fun(SQ) -> SQ end).
aclquery() ->
emqx_auth_mongo_app:with_env(acl_query, fun(SQ) -> SQ end).
%% TODO: any easier way to make mongo return a map with an error code???
error_code_query(Pool, Collection, Selector) ->
%% should be a query; this is to provoke an error return from
%% mongo.
WrongLimit = {},
ecpool:with_client(
Pool,
fun(Conn) ->
mongoc:transaction_query(
Conn,
fun(Conf = #{pool := Worker}) ->
Query = mongoc:count_query(Conf, Collection, Selector, WrongLimit),
{_, Res} = mc_worker_api:command(Worker, Query),
Res
end)
end).
wait_for_stabilization(#{attempts := Attempts, interval_ms := IntervalMS})
when Attempts > 0 ->
try
{ok, Conn} = ?POOL(?APP),
#{} = mongo_api:find_one(Conn, ?MONGO_CL_USER, #{}, #{}),
ok
catch
_:_ ->
ct:pal("mongodb connection still stabilizing... sleeping for ~b ms", [IntervalMS]),
ct:sleep(IntervalMS),
wait_for_stabilization(#{attempts => Attempts - 1, interval_ms => IntervalMS})
end;
wait_for_stabilization(_) ->
error(mongo_connection_did_not_stabilize).
%% TODO: move to ct helpers???
reset_proxy(ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
with_failure(FailureType, ProxyHost, ProxyPort, Fun) ->
enable_failure(FailureType, ProxyHost, ProxyPort),
try
Fun()
after
heal_failure(FailureType, ProxyHost, ProxyPort)
end.
enable_failure(FailureType, ProxyHost, ProxyPort) ->
case FailureType of
down -> switch_proxy(off, ProxyHost, ProxyPort);
timeout -> timeout_proxy(on, ProxyHost, ProxyPort);
latency_up -> latency_up_proxy(on, ProxyHost, ProxyPort)
end.
heal_failure(FailureType, ProxyHost, ProxyPort) ->
case FailureType of
down -> switch_proxy(on, ProxyHost, ProxyPort);
timeout -> timeout_proxy(off, ProxyHost, ProxyPort);
latency_up -> latency_up_proxy(off, ProxyHost, ProxyPort)
end.
switch_proxy(Switch, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo",
Body = case Switch of
off -> <<"{\"enabled\":false}">>;
on -> <<"{\"enabled\":true}">>
end,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
timeout_proxy(on, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics",
Body = <<"{\"name\":\"timeout\",\"type\":\"timeout\","
"\"stream\":\"upstream\",\"toxicity\":1.0,"
"\"attributes\":{\"timeout\":0}}">>,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]);
timeout_proxy(off, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/timeout",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).
latency_up_proxy(on, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics",
Body = <<"{\"name\":\"latency_up\",\"type\":\"latency\","
"\"stream\":\"upstream\",\"toxicity\":1.0,"
"\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>,
{ok, {{_, 200, _}, _, _}} = httpc:request(post, {Url, [], "application/json", Body}, [],
[{body_format, binary}]);
latency_up_proxy(off, ProxyHost, ProxyPort) ->
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/mongo/toxics/latency_up",
Body = <<>>,
{ok, {{_, 204, _}, _, _}} = httpc:request(delete, {Url, [], "application/json", Body}, [],
[{body_format, binary}]).

View File

@ -59,7 +59,7 @@
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"} , {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.1"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.21", [{"4.3.21",
[{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -13,7 +14,8 @@
{load_module,emqx_tracer,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.20", {"4.3.20",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_tracer,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]},
@ -820,7 +822,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.21", [{"4.3.21",
[{load_module,emqx_alarm,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -831,7 +834,8 @@
{load_module,emqx_tracer,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.20", {"4.3.20",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_tracer,brutal_purge,soft_purge,[]}, {load_module,emqx_tracer,brutal_purge,soft_purge,[]},

View File

@ -45,6 +45,8 @@
, index_of/2 , index_of/2
, maybe_parse_ip/1 , maybe_parse_ip/1
, ipv6_probe/1 , ipv6_probe/1
, pmap/2
, pmap/3
]). ]).
-export([ bin2hexstr_A_F/1 -export([ bin2hexstr_A_F/1
@ -55,7 +57,13 @@
-export([ is_sane_id/1 -export([ is_sane_id/1
]). ]).
-export([
nolink_apply/1,
nolink_apply/2
]).
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
-define(DEFAULT_PMAP_TIMEOUT, 5000).
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. -spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
is_sane_id(Str) -> is_sane_id(Str) ->
@ -328,6 +336,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10; hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
%% @doc Like lists:map/2, only the callback function is evaluated
%% concurrently.
-spec pmap(fun((A) -> B), list(A)) -> list(B).
pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
pmap(Fun, List, Timeout) when
is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
->
nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
%% @doc Delegate a function to a worker process.
%% The function may spawn_link other processes but we do not
%% want the caller process to be linked.
%% This is done by isolating the possible link with a not-linked
%% middleman process.
nolink_apply(Fun) -> nolink_apply(Fun, infinity).
%% @doc Same as `nolink_apply/1', with a timeout.
-spec nolink_apply(function(), timer:timeout()) -> term().
nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
Caller = self(),
ResRef = make_ref(),
Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)),
receive
{ResRef, {normal, Result}} ->
Result;
{ResRef, {exception, {C, E, S}}} ->
erlang:raise(C, E, S);
{ResRef, {'EXIT', Reason}} ->
exit(Reason)
after Timeout ->
exit(Middleman, kill),
exit(timeout)
end.
-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_middleman_fn(Caller, Fun, ResRef) ->
fun() ->
process_flag(trap_exit, true),
CallerMRef = erlang:monitor(process, Caller),
Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)),
receive
{'DOWN', CallerMRef, process, _, _} ->
%% For whatever reason, if the caller is dead,
%% there is no reason to continue
exit(Worker, kill),
exit(normal);
{'EXIT', Worker, normal} ->
exit(normal);
{'EXIT', Worker, Reason} ->
%% worker exited with some reason other than 'normal'
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
exit(normal)
end
end.
-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
make_worker_fn(Caller, Fun, ResRef) ->
fun() ->
Res =
try
{normal, Fun()}
catch
C:E:S ->
{exception, {C, E, S}}
end,
_ = erlang:send(Caller, {ResRef, Res}),
exit(normal)
end.
do_parallel_map(Fun, List) ->
Parent = self(),
PidList = lists:map(
fun(Item) ->
erlang:spawn_link(
fun() ->
Res =
try
{normal, Fun(Item)}
catch
C:E:St ->
{exception, {C, E, St}}
end,
Parent ! {self(), Res}
end
)
end,
List
),
lists:foldr(
fun(Pid, Acc) ->
receive
{Pid, {normal, Result}} ->
[Result | Acc];
{Pid, {exception, {C, E, St}}} ->
erlang:raise(C, E, St)
end
end,
[],
PidList
).
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -146,3 +146,36 @@ t_now_to_secs(_) ->
t_now_to_ms(_) -> t_now_to_ms(_) ->
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))). ?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
t_pmap_normal(_) ->
?assertEqual(
[5, 7, 9],
emqx_misc:pmap(
fun({A, B}) -> A + B end,
[{2, 3}, {3, 4}, {4, 5}]
)
).
t_pmap_timeout(_) ->
?assertExit(
timeout,
emqx_misc:pmap(
fun
(timeout) -> ct:sleep(1000);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, timeout],
100
)
).
t_pmap_exception(_) ->
?assertError(
foobar,
emqx_misc:pmap(
fun
(error) -> error(foobar);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, error]
)
).