From 3045ec10ab7af0e37f361b9df890284e4bae9636 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 14:04:26 +0800 Subject: [PATCH] Add banned feature --- etc/emqx.conf | 5 ++++ priv/emqx.schema | 6 +++++ src/emqx_access_control.erl | 5 ++-- src/emqx_banned.erl | 47 ++++++++++++++++--------------------- src/emqx_cm_sup.erl | 20 ++++++++++------ src/emqx_protocol.erl | 16 ++++++++++++- 6 files changed, 61 insertions(+), 38 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index e8435a3b3..deb702211 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -529,6 +529,11 @@ zone.external.idle_timeout = 15s ## Default: 10 messages per second, and 100 messages burst. ## zone.external.publish_limit = 10,100 +## Enable ban check. +## +## Value: Flag +zone.external.enable_ban = on + ## Enable ACL check. ## ## Value: Flag diff --git a/priv/emqx.schema b/priv/emqx.schema index 1e1892b33..e9f4932c4 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -676,6 +676,12 @@ end}. {datatype, {enum, [allow, deny]}} ]}. +%% @doc Enable Ban. +{mapping, "zone.$name.enable_ban", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + %% @doc Enable ACL check. {mapping, "zone.$name.enable_acl", "emqx.zones", [ {default, off}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index bc4969e54..8301bd8d8 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -153,9 +153,8 @@ init([]) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), - reply(case lists:keyfind(Mod, 1, Mods) of - {_, _, _} -> - {error, already_existed}; + reply(case lists:keymember(Mod, 1, Mods) of + true -> {error, already_existed}; false -> case catch Mod:init(Opts) of {ok, ModState} -> diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 4f8d44f44..444f07dad 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -24,27 +24,23 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% API -export([start_link/0]). -export([check/1]). -export([add/1, del/1]). -%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). --record(state, {expiry_timer}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ - {type, ordered_set}, + {type, set}, {disc_copies, [node()]}, {record_name, banned}, {attributes, record_info(fields, banned)}]); @@ -52,11 +48,7 @@ mnesia(boot) -> mnesia(copy) -> ok = ekka_mnesia:copy_table(?TAB). -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Start the banned server +%% @doc Start the banned server. -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -67,9 +59,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) - orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {ipaddr, IPAddr}). -add(Record) when is_record(Record, banned) -> - mnesia:dirty_write(?TAB, Record). +-spec(add(#banned{}) -> ok). +add(Banned) when is_record(Banned, banned) -> + mnesia:dirty_write(?TAB, Banned). +-spec(del({client_id, emqx_types:client_id()} | + {username, emqx_types:username()} | + {peername, emqx_types:peername()}) -> ok). del(Key) -> mnesia:dirty_delete(?TAB, Key). @@ -78,27 +74,26 @@ del(Key) -> %%-------------------------------------------------------------------- init([]) -> - emqx_time:seed(), - {ok, ensure_expiry_timer(#state{})}. + {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - emqx_logger:error("[BANNED] Unexpected request: ~p", [Req]), - {reply, ignore, State}. + emqx_logger:error("[BANNED] unexpected call: ~p", [Req]), + {reply, ignored, State}. handle_cast(Msg, State) -> - emqx_logger:error("[BANNED] Unexpected msg: ~p", [Msg]), + emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({timeout, Ref, expire}, State = #state{expiry_timer = Ref}) -> +handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - emqx_logger:error("[BANNED] Unexpected info: ~p", [Info]), + emqx_logger:error("[BANNED] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{expiry_timer = Timer}) -> - emqx_misc:cancel_timer(Timer). +terminate(_Reason, #{expiry_timer := TRef}) -> + emqx_misc:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -108,9 +103,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- ensure_expiry_timer(State) -> - Interval = emqx_config:get_env(banned_expiry_interval, timer:minutes(5)), - State#state{expiry_timer = emqx_misc:start_timer( - Interval + rand:uniform(Interval), expire)}. + State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. expire_banned_items(Now) -> expire_banned_item(mnesia:first(?TAB), Now). @@ -119,11 +112,11 @@ expire_banned_item('$end_of_table', _Now) -> ok; expire_banned_item(Key, Now) -> case mnesia:read(?TAB, Key) of - [#banned{until = undefined}] -> ok; + [#banned{until = undefined}] -> + ok; [B = #banned{until = Until}] when Until < Now -> mnesia:delete_object(?TAB, B, sticky_write); - [_] -> ok; - [] -> ok + _ -> ok end, expire_banned_item(mnesia:next(?TAB, Key), Now). diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 231822ba5..000e79336 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -25,11 +25,17 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_all, 10, 3600}, - [#{id => manager, - start => {emqx_cm, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_cm]}]}}. + Banned = #{id => banned, + start => {emqx_banned, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_banned]}, + Manager = #{id => manager, + start => {emqx_cm, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_cm]}, + {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c36346673..01fbce313 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -56,6 +56,7 @@ mountpoint, is_super, is_bridge, + enable_ban, enable_acl, recv_stats, send_stats, @@ -97,6 +98,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) packet_size = emqx_zone:get_env(Zone, max_packet_size), mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), enable_acl = emqx_zone:get_env(Zone, enable_acl), recv_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0}, @@ -581,7 +583,8 @@ set_property(Name, Value, Props) -> check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, - fun check_client_id/2], Packet, PState). + fun check_client_id/2, + fun check_banned/2], Packet, PState). check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> @@ -612,6 +615,17 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. +check_banned(_Connect, #pstate{enable_ban = false}) -> + ok; +check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, + #pstate{peername = Peername}) -> + case emqx_banned:check(#{client_id => ClientId, + username => Username, + peername => Peername}) of + true -> {error, ?RC_BANNED}; + false -> ok + end. + check_publish(Packet, PState) -> run_check_steps([fun check_pub_caps/2, fun check_pub_acl/2], Packet, PState).