diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md index 04f4ecb2d..0258866dd 100644 --- a/.github/ISSUE_TEMPLATE/bug-report.md +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -2,26 +2,25 @@ name: Bug Report about: Create a report to help us improve title: '' -labels: BUG +labels: Support assignees: tigercl --- -**What happened**: - -**What you expected to happen**: - -**How to reproduce it (as minimally and precisely as possible)**: - -**Anything else we need to know?**: + **Environment**: - EMQ X version (e.g. `emqx_ctl status`): - - Hardware configuration (e.g. `lscpu`): - OS (e.g. `cat /etc/os-release`): - Kernel (e.g. `uname -a`): -- Erlang/OTP version : +- Erlang/OTP version (in case you build emqx from source code): - Others: + +**What happened and what you expected to happen**: + +**How to reproduce it (as minimally and precisely as possible)**: + +**Anything else we need to know?**: diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 9a3339c65..68b7d5422 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1138,11 +1138,10 @@ do_enhanced_auth(undefined, _AuthData, Channel) -> do_enhanced_auth(_AuthMethod, undefined, Channel) -> {error, emqx_reason_codes:connack_error(not_authorized), Channel}; do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) -> - case run_hooks('client.enhanced_authenticate',[AuthMethod, AuthData, Cache]) of - {ok, <<>>} -> {ok, #{}, Channel#channel{auth_cache = #{}}}; - {ok, NAuthData} -> + case run_hooks('client.enhanced_authenticate',[AuthMethod, AuthData], Cache) of + {ok, NAuthData, NCache} -> NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData}, - {ok, NProperties, Channel#channel{auth_cache = #{}}}; + {ok, NProperties, Channel#channel{auth_cache = NCache}}; {continue, NAuthData, NCache} -> NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData}, {continue, NProperties, Channel#channel{auth_cache = NCache}}; diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 8224d85c9..6a1b3cfc1 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -58,6 +58,8 @@ , lookup_channels/2 ]). +-export([all_channels/0]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -327,6 +329,11 @@ with_channel(ClientId, Fun) -> Pids -> Fun(lists:last(Pids)) end. +%% @doc Get all channels registed. +all_channels() -> + Pat = [{{'_', '$1'}, [], ['$1']}], + ets:select(?CHAN_TAB, Pat). + %% @doc Lookup channels. -spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())). lookup_channels(ClientId) -> diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index 2256ef015..36323d90f 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -109,7 +109,7 @@ init([]) -> {read_concurrency, true}, {write_concurrency, true} ]), - {ok, #{}, hibernate}. + {ok, ensure_timer(#{}), hibernate}. handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), @@ -142,6 +142,12 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. +handle_info({timeout, TRef, expired_detecting}, State = #{expired_timer := TRef}) -> + Timestamp = erlang:system_time(millisecond) - maps:get(duration, get_policy()), + MatchSpec = [{{'_', '_', '_', '$1', '_'},[{'<', '$1', Timestamp}], [true]}], + ets:select_delete(?FLAPPING_TAB, MatchSpec), + {noreply, ensure_timer(State), hibernate}; + handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. @@ -151,3 +157,8 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +ensure_timer(State) -> + Timeout = maps:get(duration, get_policy()), + TRef = emqx_misc:start_timer(Timeout, expired_detecting), + State#{expired_timer => TRef}. \ No newline at end of file diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 7b5b5c2a7..0a4037a31 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -51,7 +51,10 @@ unload(_Env) -> emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). reload(_Env) -> - emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(), + emqx_acl_cache:is_enabled() andalso ( + lists:foreach( + fun(Pid) -> erlang:send(Pid, clean_acl_cache) end, + emqx_cm:all_channels())), unload([]), load([]). description() -> diff --git a/test/emqx_acl_cache_SUITE.erl b/test/emqx_acl_cache_SUITE.erl index 53e1a4a15..bd04961c6 100644 --- a/test/emqx_acl_cache_SUITE.erl +++ b/test/emqx_acl_cache_SUITE.erl @@ -31,13 +31,11 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). -init_per_testcase(_TestCase, Config) -> - Config. +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- -end_per_testcase(_TestCase, Config) -> - Config. - -t_clean_acl_cache(_Config) -> +t_clean_acl_cache(_) -> {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), @@ -57,6 +55,58 @@ t_clean_acl_cache(_Config) -> ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))), emqtt:stop(Client). +% optimize?? +t_reload_aclfile_and_cleanall(Config) -> + + RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end, + disconnected => fun(_) -> ok end, + publish => fun(_) -> ok end } end, + + {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5}, {msg_handler, RasieMsg()}]), + {ok, _} = emqtt:connect(Client), + + {ok, PktId} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1), + + %% Success publish to broker + receive + {puback, #{packet_id := PktId, reason_code := Rc}} -> + ?assertEqual(16#10, Rc); + _ -> + ?assert(false) + end, + + %% Check acl cache list + [ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>), + ?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0), + + %% Update acl file and reload mod_acl_internal + Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]), + ok = file:write_file(Path, <<"{deny, all}.">>), + OldPath = emqx:get_env(acl_file), + application:set_env(emqx, acl_file, Path), + + emqx_mod_acl_internal:reload([]), + + ?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0), + {ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1), + + receive + {puback, #{packet_id := PktId2, reason_code := Rc2}} -> + %% Not authorized + ?assertEqual(16#87, Rc2); + _ -> + ?assert(false) + end, + application:set_env(emqx, acl_file, OldPath), + file:delete(Path), + emqx_mod_acl_internal:reload([]), + emqtt:stop(Client). + +%% @private +testdir(DataPath) -> + Ls = filename:split(DataPath), + filename:join(lists:sublist(Ls, 1, length(Ls) - 1)). + % t_cache_k(_) -> % error('TODO'). diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 183dd5f30..1c201d0aa 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -133,6 +133,9 @@ t_kick_session(_) -> ok = emqx_cm:unregister_channel(<<"clientid">>), ok = meck:unload(emqx_connection). +t_all_channels(_) -> + ?assertEqual(true, is_list(emqx_cm:all_channels())). + t_lock_clientid(_) -> {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), {true, _Nodes} = emqx_cm_locker:lock(<<"clientid">>), diff --git a/test/emqx_cm_registry_SUITE.erl b/test/emqx_cm_registry_SUITE.erl index edc5ff9c4..c1e78bf3a 100644 --- a/test/emqx_cm_registry_SUITE.erl +++ b/test/emqx_cm_registry_SUITE.erl @@ -75,3 +75,4 @@ t_cleanup_channels(_) -> ct:sleep(100), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId)), ?assertEqual([], emqx_cm_registry:lookup_channels(ClientId2)). + diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index 223fd5d7e..dcd552385 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -19,6 +19,8 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("eunit/include/eunit.hrl"). + all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> @@ -50,7 +52,7 @@ t_detect_check(_) -> false = emqx_flapping:detect(ClientInfo), false = emqx_banned:check(ClientInfo), true = emqx_flapping:detect(ClientInfo), - timer:sleep(100), + timer:sleep(50), true = emqx_banned:check(ClientInfo), timer:sleep(3000), false = emqx_banned:check(ClientInfo), @@ -61,3 +63,13 @@ t_detect_check(_) -> Pid ! test, ok = emqx_flapping:stop(). +t_expired_detecting(_) -> + ClientInfo = #{zone => external, + clientid => <<"clientid">>, + peerhost => {127,0,0,1}}, + false = emqx_flapping:detect(ClientInfo), + ?assertEqual(true, lists:any(fun({flapping, <<"clientid">>, _, _, _}) -> true; + (_) -> false end, ets:tab2list(emqx_flapping))), + timer:sleep(200), + ?assertEqual(true, lists:all(fun({flapping, <<"clientid">>, _, _, _}) -> false; + (_) -> true end, ets:tab2list(emqx_flapping))). \ No newline at end of file diff --git a/test/emqx_modules_SUITE.erl b/test/emqx_modules_SUITE.erl index eb161d02b..3fe7864c3 100644 --- a/test/emqx_modules_SUITE.erl +++ b/test/emqx_modules_SUITE.erl @@ -24,11 +24,9 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - - emqx_ct_helpers:boot_modules([]), emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1), Config. - + set_sepecial_cfg(_) -> application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")), ok.