Merge pull request #3411 from emqx/master
Auto-pull-request-by-2020-04-26
This commit is contained in:
commit
e270b168bf
|
@ -2,26 +2,25 @@
|
||||||
name: Bug Report
|
name: Bug Report
|
||||||
about: Create a report to help us improve
|
about: Create a report to help us improve
|
||||||
title: ''
|
title: ''
|
||||||
labels: BUG
|
labels: Support
|
||||||
assignees: tigercl
|
assignees: tigercl
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
<!-- Please use this template while reporting a bug and provide as much info as possible. Thanks!-->
|
<!-- Please use this template while reporting a bug and provide as much info as possible. Thanks!-->
|
||||||
**What happened**:
|
<!-- 请使用英文描述问题 -->
|
||||||
|
|
||||||
**What you expected to happen**:
|
|
||||||
|
|
||||||
**How to reproduce it (as minimally and precisely as possible)**:
|
|
||||||
|
|
||||||
**Anything else we need to know?**:
|
|
||||||
|
|
||||||
**Environment**:
|
**Environment**:
|
||||||
|
|
||||||
- EMQ X version (e.g. `emqx_ctl status`):
|
- EMQ X version (e.g. `emqx_ctl status`):
|
||||||
|
|
||||||
- Hardware configuration (e.g. `lscpu`):
|
- Hardware configuration (e.g. `lscpu`):
|
||||||
- OS (e.g. `cat /etc/os-release`):
|
- OS (e.g. `cat /etc/os-release`):
|
||||||
- Kernel (e.g. `uname -a`):
|
- Kernel (e.g. `uname -a`):
|
||||||
- Erlang/OTP version :
|
- Erlang/OTP version (in case you build emqx from source code):
|
||||||
- Others:
|
- Others:
|
||||||
|
|
||||||
|
**What happened and what you expected to happen**:
|
||||||
|
|
||||||
|
**How to reproduce it (as minimally and precisely as possible)**:
|
||||||
|
|
||||||
|
**Anything else we need to know?**:
|
||||||
|
|
|
@ -1138,11 +1138,10 @@ do_enhanced_auth(undefined, _AuthData, Channel) ->
|
||||||
do_enhanced_auth(_AuthMethod, undefined, Channel) ->
|
do_enhanced_auth(_AuthMethod, undefined, Channel) ->
|
||||||
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
||||||
do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) ->
|
do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) ->
|
||||||
case run_hooks('client.enhanced_authenticate',[AuthMethod, AuthData, Cache]) of
|
case run_hooks('client.enhanced_authenticate',[AuthMethod, AuthData], Cache) of
|
||||||
{ok, <<>>} -> {ok, #{}, Channel#channel{auth_cache = #{}}};
|
{ok, NAuthData, NCache} ->
|
||||||
{ok, NAuthData} ->
|
|
||||||
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
||||||
{ok, NProperties, Channel#channel{auth_cache = #{}}};
|
{ok, NProperties, Channel#channel{auth_cache = NCache}};
|
||||||
{continue, NAuthData, NCache} ->
|
{continue, NAuthData, NCache} ->
|
||||||
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
||||||
{continue, NProperties, Channel#channel{auth_cache = NCache}};
|
{continue, NProperties, Channel#channel{auth_cache = NCache}};
|
||||||
|
|
|
@ -58,6 +58,8 @@
|
||||||
, lookup_channels/2
|
, lookup_channels/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([all_channels/0]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
, handle_call/3
|
, handle_call/3
|
||||||
|
@ -327,6 +329,11 @@ with_channel(ClientId, Fun) ->
|
||||||
Pids -> Fun(lists:last(Pids))
|
Pids -> Fun(lists:last(Pids))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Get all channels registed.
|
||||||
|
all_channels() ->
|
||||||
|
Pat = [{{'_', '$1'}, [], ['$1']}],
|
||||||
|
ets:select(?CHAN_TAB, Pat).
|
||||||
|
|
||||||
%% @doc Lookup channels.
|
%% @doc Lookup channels.
|
||||||
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
|
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
|
||||||
lookup_channels(ClientId) ->
|
lookup_channels(ClientId) ->
|
||||||
|
|
|
@ -109,7 +109,7 @@ init([]) ->
|
||||||
{read_concurrency, true},
|
{read_concurrency, true},
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]),
|
]),
|
||||||
{ok, #{}, hibernate}.
|
{ok, ensure_timer(#{}), hibernate}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
|
@ -142,6 +142,12 @@ handle_cast(Msg, State) ->
|
||||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info({timeout, TRef, expired_detecting}, State = #{expired_timer := TRef}) ->
|
||||||
|
Timestamp = erlang:system_time(millisecond) - maps:get(duration, get_policy()),
|
||||||
|
MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}],
|
||||||
|
ets:select_delete(?FLAPPING_TAB, MatchSpec),
|
||||||
|
{noreply, ensure_timer(State), hibernate};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "Unexpected info: ~p", [Info]),
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -151,3 +157,8 @@ terminate(_Reason, _State) ->
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
ensure_timer(State) ->
|
||||||
|
Timeout = maps:get(duration, get_policy()),
|
||||||
|
TRef = emqx_misc:start_timer(Timeout, expired_detecting),
|
||||||
|
State#{expired_timer => TRef}.
|
|
@ -51,7 +51,10 @@ unload(_Env) ->
|
||||||
emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
|
emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
|
||||||
|
|
||||||
reload(_Env) ->
|
reload(_Env) ->
|
||||||
emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
|
emqx_acl_cache:is_enabled() andalso (
|
||||||
|
lists:foreach(
|
||||||
|
fun(Pid) -> erlang:send(Pid, clean_acl_cache) end,
|
||||||
|
emqx_cm:all_channels())),
|
||||||
unload([]), load([]).
|
unload([]), load([]).
|
||||||
|
|
||||||
description() ->
|
description() ->
|
||||||
|
|
|
@ -31,13 +31,11 @@ init_per_suite(Config) ->
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
%%--------------------------------------------------------------------
|
||||||
Config.
|
%% Test cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
end_per_testcase(_TestCase, Config) ->
|
t_clean_acl_cache(_) ->
|
||||||
Config.
|
|
||||||
|
|
||||||
t_clean_acl_cache(_Config) ->
|
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
|
@ -57,6 +55,58 @@ t_clean_acl_cache(_Config) ->
|
||||||
?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
|
?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
% optimize??
|
||||||
|
t_reload_aclfile_and_cleanall(Config) ->
|
||||||
|
|
||||||
|
RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end,
|
||||||
|
disconnected => fun(_) -> ok end,
|
||||||
|
publish => fun(_) -> ok end } end,
|
||||||
|
|
||||||
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5}, {msg_handler, RasieMsg()}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
|
||||||
|
{ok, PktId} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
||||||
|
|
||||||
|
%% Success publish to broker
|
||||||
|
receive
|
||||||
|
{puback, #{packet_id := PktId, reason_code := Rc}} ->
|
||||||
|
?assertEqual(16#10, Rc);
|
||||||
|
_ ->
|
||||||
|
?assert(false)
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% Check acl cache list
|
||||||
|
[ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>),
|
||||||
|
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
||||||
|
|
||||||
|
%% Update acl file and reload mod_acl_internal
|
||||||
|
Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]),
|
||||||
|
ok = file:write_file(Path, <<"{deny, all}.">>),
|
||||||
|
OldPath = emqx:get_env(acl_file),
|
||||||
|
application:set_env(emqx, acl_file, Path),
|
||||||
|
|
||||||
|
emqx_mod_acl_internal:reload([]),
|
||||||
|
|
||||||
|
?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0),
|
||||||
|
{ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
||||||
|
|
||||||
|
receive
|
||||||
|
{puback, #{packet_id := PktId2, reason_code := Rc2}} ->
|
||||||
|
%% Not authorized
|
||||||
|
?assertEqual(16#87, Rc2);
|
||||||
|
_ ->
|
||||||
|
?assert(false)
|
||||||
|
end,
|
||||||
|
application:set_env(emqx, acl_file, OldPath),
|
||||||
|
file:delete(Path),
|
||||||
|
emqx_mod_acl_internal:reload([]),
|
||||||
|
emqtt:stop(Client).
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
testdir(DataPath) ->
|
||||||
|
Ls = filename:split(DataPath),
|
||||||
|
filename:join(lists:sublist(Ls, 1, length(Ls) - 1)).
|
||||||
|
|
||||||
% t_cache_k(_) ->
|
% t_cache_k(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,9 @@ t_kick_session(_) ->
|
||||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||||
ok = meck:unload(emqx_connection).
|
ok = meck:unload(emqx_connection).
|
||||||
|
|
||||||
|
t_all_channels(_) ->
|
||||||
|
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
||||||
|
|
||||||
t_lock_clientid(_) ->
|
t_lock_clientid(_) ->
|
||||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||||
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
{true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>),
|
||||||
|
|
|
@ -75,3 +75,4 @@ t_cleanup_channels(_) ->
|
||||||
ct:sleep(100),
|
ct:sleep(100),
|
||||||
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
|
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)),
|
||||||
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).
|
?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)).
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -50,7 +52,7 @@ t_detect_check(_) ->
|
||||||
false = emqx_flapping:detect(ClientInfo),
|
false = emqx_flapping:detect(ClientInfo),
|
||||||
false = emqx_banned:check(ClientInfo),
|
false = emqx_banned:check(ClientInfo),
|
||||||
true = emqx_flapping:detect(ClientInfo),
|
true = emqx_flapping:detect(ClientInfo),
|
||||||
timer:sleep(100),
|
timer:sleep(50),
|
||||||
true = emqx_banned:check(ClientInfo),
|
true = emqx_banned:check(ClientInfo),
|
||||||
timer:sleep(3000),
|
timer:sleep(3000),
|
||||||
false = emqx_banned:check(ClientInfo),
|
false = emqx_banned:check(ClientInfo),
|
||||||
|
@ -61,3 +63,13 @@ t_detect_check(_) ->
|
||||||
Pid ! test,
|
Pid ! test,
|
||||||
ok = emqx_flapping:stop().
|
ok = emqx_flapping:stop().
|
||||||
|
|
||||||
|
t_expired_detecting(_) ->
|
||||||
|
ClientInfo = #{zone => external,
|
||||||
|
clientid => <<"clientid">>,
|
||||||
|
peerhost => {127,0,0,1}},
|
||||||
|
false = emqx_flapping:detect(ClientInfo),
|
||||||
|
?assertEqual(true, lists:any(fun({flapping, <<"clientid">>, _, _, _}) -> true;
|
||||||
|
(_) -> false end, ets:tab2list(emqx_flapping))),
|
||||||
|
timer:sleep(200),
|
||||||
|
?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false;
|
||||||
|
(_) -> true end, ets:tab2list(emqx_flapping))).
|
|
@ -24,8 +24,6 @@
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
|
||||||
emqx_ct_helpers:boot_modules([]),
|
|
||||||
emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
|
emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue