Add banned feature

This commit is contained in:
Feng Lee 2018-08-31 14:04:26 +08:00
parent 237e65a4e0
commit 3045ec10ab
6 changed files with 61 additions and 38 deletions

View File

@ -529,6 +529,11 @@ zone.external.idle_timeout = 15s
## Default: 10 messages per second, and 100 messages burst. ## Default: 10 messages per second, and 100 messages burst.
## zone.external.publish_limit = 10,100 ## zone.external.publish_limit = 10,100
## Enable ban check.
##
## Value: Flag
zone.external.enable_ban = on
## Enable ACL check. ## Enable ACL check.
## ##
## Value: Flag ## Value: Flag

View File

@ -676,6 +676,12 @@ end}.
{datatype, {enum, [allow, deny]}} {datatype, {enum, [allow, deny]}}
]}. ]}.
%% @doc Enable Ban.
{mapping, "zone.$name.enable_ban", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
%% @doc Enable ACL check. %% @doc Enable ACL check.
{mapping, "zone.$name.enable_acl", "emqx.zones", [ {mapping, "zone.$name.enable_acl", "emqx.zones", [
{default, off}, {default, off},

View File

@ -153,9 +153,8 @@ init([]) ->
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
Mods = lookup_mods(Type), Mods = lookup_mods(Type),
reply(case lists:keyfind(Mod, 1, Mods) of reply(case lists:keymember(Mod, 1, Mods) of
{_, _, _} -> true -> {error, already_existed};
{error, already_existed};
false -> false ->
case catch Mod:init(Opts) of case catch Mod:init(Opts) of
{ok, ModState} -> {ok, ModState} ->

View File

@ -24,27 +24,23 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% API
-export([start_link/0]). -export([start_link/0]).
-export([check/1]). -export([check/1]).
-export([add/1, del/1]). -export([add/1, del/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-record(state, {expiry_timer}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = ekka_mnesia:create_table(?TAB, [
{type, ordered_set}, {type, set},
{disc_copies, [node()]}, {disc_copies, [node()]},
{record_name, banned}, {record_name, banned},
{attributes, record_info(fields, banned)}]); {attributes, record_info(fields, banned)}]);
@ -52,11 +48,7 @@ mnesia(boot) ->
mnesia(copy) -> mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB). ok = ekka_mnesia:copy_table(?TAB).
%%-------------------------------------------------------------------- %% @doc Start the banned server.
%% API
%%--------------------------------------------------------------------
%% @doc Start the banned server
-spec(start_link() -> emqx_types:startlink_ret()). -spec(start_link() -> emqx_types:startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@ -67,9 +59,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -
orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {username, Username})
orelse ets:member(?TAB, {ipaddr, IPAddr}). orelse ets:member(?TAB, {ipaddr, IPAddr}).
add(Record) when is_record(Record, banned) -> -spec(add(#banned{}) -> ok).
mnesia:dirty_write(?TAB, Record). add(Banned) when is_record(Banned, banned) ->
mnesia:dirty_write(?TAB, Banned).
-spec(del({client_id, emqx_types:client_id()} |
{username, emqx_types:username()} |
{peername, emqx_types:peername()}) -> ok).
del(Key) -> del(Key) ->
mnesia:dirty_delete(?TAB, Key). mnesia:dirty_delete(?TAB, Key).
@ -78,27 +74,26 @@ del(Key) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqx_time:seed(), {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
{ok, ensure_expiry_timer(#state{})}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[BANNED] Unexpected request: ~p", [Req]), emqx_logger:error("[BANNED] unexpected call: ~p", [Req]),
{reply, ignore, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
emqx_logger:error("[BANNED] Unexpected msg: ~p", [Msg]), emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({timeout, Ref, expire}, State = #state{expiry_timer = Ref}) -> handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]), mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]),
{noreply, ensure_expiry_timer(State), hibernate}; {noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
emqx_logger:error("[BANNED] Unexpected info: ~p", [Info]), emqx_logger:error("[BANNED] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{expiry_timer = Timer}) -> terminate(_Reason, #{expiry_timer := TRef}) ->
emqx_misc:cancel_timer(Timer). emqx_misc:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -108,9 +103,7 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
ensure_expiry_timer(State) -> ensure_expiry_timer(State) ->
Interval = emqx_config:get_env(banned_expiry_interval, timer:minutes(5)), State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
State#state{expiry_timer = emqx_misc:start_timer(
Interval + rand:uniform(Interval), expire)}.
expire_banned_items(Now) -> expire_banned_items(Now) ->
expire_banned_item(mnesia:first(?TAB), Now). expire_banned_item(mnesia:first(?TAB), Now).
@ -119,11 +112,11 @@ expire_banned_item('$end_of_table', _Now) ->
ok; ok;
expire_banned_item(Key, Now) -> expire_banned_item(Key, Now) ->
case mnesia:read(?TAB, Key) of case mnesia:read(?TAB, Key) of
[#banned{until = undefined}] -> ok; [#banned{until = undefined}] ->
ok;
[B = #banned{until = Until}] when Until < Now -> [B = #banned{until = Until}] when Until < Now ->
mnesia:delete_object(?TAB, B, sticky_write); mnesia:delete_object(?TAB, B, sticky_write);
[_] -> ok; _ -> ok
[] -> ok
end, end,
expire_banned_item(mnesia:next(?TAB, Key), Now). expire_banned_item(mnesia:next(?TAB, Key), Now).

View File

@ -25,11 +25,17 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
{ok, {{one_for_all, 10, 3600}, Banned = #{id => banned,
[#{id => manager, start => {emqx_banned, start_link, []},
start => {emqx_cm, start_link, []}, restart => permanent,
restart => permanent, shutdown => 5000,
shutdown => 5000, type => worker,
type => worker, modules => [emqx_banned]},
modules => [emqx_cm]}]}}. Manager = #{id => manager,
start => {emqx_cm, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_cm]},
{ok, {{one_for_one, 10, 100}, [Banned, Manager]}}.

View File

@ -56,6 +56,7 @@
mountpoint, mountpoint,
is_super, is_super,
is_bridge, is_bridge,
enable_ban,
enable_acl, enable_acl,
recv_stats, recv_stats,
send_stats, send_stats,
@ -97,6 +98,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
packet_size = emqx_zone:get_env(Zone, max_packet_size), packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint), mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false, is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl), enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0}, recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0},
@ -581,7 +583,8 @@ set_property(Name, Value, Props) ->
check_connect(Packet, PState) -> check_connect(Packet, PState) ->
run_check_steps([fun check_proto_ver/2, run_check_steps([fun check_proto_ver/2,
fun check_client_id/2], Packet, PState). fun check_client_id/2,
fun check_banned/2], Packet, PState).
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}, _PState) -> proto_name = Name}, _PState) ->
@ -612,6 +615,17 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
end. end.
check_banned(_Connect, #pstate{enable_ban = false}) ->
ok;
check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
#pstate{peername = Peername}) ->
case emqx_banned:check(#{client_id => ClientId,
username => Username,
peername => Peername}) of
true -> {error, ?RC_BANNED};
false -> ok
end.
check_publish(Packet, PState) -> check_publish(Packet, PState) ->
run_check_steps([fun check_pub_caps/2, run_check_steps([fun check_pub_caps/2,
fun check_pub_acl/2], Packet, PState). fun check_pub_acl/2], Packet, PState).