Add option to disconnect client in case acl deny (#2059)

* Add option to disconnect client in case acl deny
This commit is contained in:
Gilbert 2018-12-19 10:34:06 +08:00 committed by turtleDeng
parent 95ad67b47c
commit 7d9e350bbe
7 changed files with 194 additions and 58 deletions

View File

@ -423,6 +423,12 @@ acl_cache_max_size = 32
## Default: 1 minute
acl_cache_ttl = 1m
## The action when acl check reject current operation
##
## Value: ignore | disconnect
## Default: ignore
acl_deny_action = ignore
##--------------------------------------------------------------------
## MQTT Protocol
##--------------------------------------------------------------------
@ -512,6 +518,12 @@ zone.external.enable_ban = on
## Value: on | off
zone.external.enable_stats = on
## The action when acl check reject current operation
##
## Value: ignore | disconnect
## Default: ignore
zone.external.acl_deny_action = ignore
## Force MQTT connection/session process GC after this number of
## messages | bytes passed through.
##
@ -670,6 +682,12 @@ zone.internal.enable_stats = on
## Value: Flag
zone.internal.enable_acl = off
## The action when acl check reject current operation
##
## Value: ignore | disconnect
## Default: ignore
zone.internal.acl_deny_action = ignore
## See zone.$name.wildcard_subscription.
##
## Value: boolean

View File

@ -548,6 +548,12 @@ end}.
{validators, ["range:gt_0"]}
]}.
%% @doc Action when acl check reject current operation
{mapping, "acl_deny_action", "emqx.acl_deny_action", [
{default, ignore},
{datatype, {enum, [ignore, disconnect]}}
]}.
{validator, "range:gt_0", "must greater than 0",
fun(X) -> X > 0 end
}.
@ -640,6 +646,12 @@ end}.
{datatype, flag}
]}.
%% @doc Action when acl check reject current operation
{mapping, "zone.$name.acl_deny_action", "emqx.zones", [
{default, ignore},
{datatype, {enum, [ignore, disconnect]}}
]}.
%% @doc Enable Ban.
{mapping, "zone.$name.enable_ban", "emqx.zones", [
{default, off},

View File

@ -60,6 +60,7 @@
is_bridge,
enable_ban,
enable_acl,
acl_deny_action,
recv_stats,
send_stats,
connected,
@ -84,28 +85,29 @@
-spec(init(map(), list()) -> state()).
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
Zone = proplists:get_value(zone, Options),
#pstate{zone = Zone,
sendfun = SendFun,
peername = Peername,
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}.
#pstate{zone = Zone,
sendfun = SendFun,
peername = Peername,
peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
is_assigned = false,
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl),
acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore),
recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0},
connected = false,
ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}.
init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of
@ -341,13 +343,10 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
{error, ?RC_TOPIC_ALIAS_INVALID} ->
?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]),
{error, ?RC_TOPIC_ALIAS_INVALID, PState};
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos0 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
{error, ReasonCode, PState}
do_acl_deny_action(Packet, ReasonCode, PState)
end;
process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) ->
@ -357,7 +356,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PSta
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
deliver({puback, PacketId, ReasonCode}, PState)
case deliver({puback, PacketId, ReasonCode}, PState) of
{ok, _PState} ->
do_acl_deny_action(Packet, ReasonCode, PState);
Error ->
Error
end
end;
process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) ->
@ -367,7 +371,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PSta
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
deliver({pubrec, PacketId, ReasonCode}, PState)
case deliver({pubrec, PacketId, ?RC_NOT_AUTHORIZED}, PState) of
{ok, _PState} ->
do_acl_deny_action(Packet, ReasonCode, PState);
Error ->
Error
end
end;
process_packet(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
@ -392,7 +401,7 @@ process_packet(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session =
process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process_packet(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint,
proto_ver = ProtoVer, is_bridge = IsBridge,
ignore_loop = IgnoreLoop}) ->
@ -419,15 +428,17 @@ process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
deliver({suback, PacketId, ReasonCodes}, PState)
end;
{error, TopicFilters} ->
{SubTopics, ReasonCodes} =
lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
{ReverseSubTopics, ReverseReasonCodes} =
lists:foldl(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) ->
{[Topic|Topics], [?RC_IMPLEMENTATION_SPECIFIC_ERROR | Codes]};
({Topic, #{rc := Code}}, {Topics, Codes}) ->
{[Topic|Topics], [Code|Codes]}
end, {[], []}, TopicFilters),
{SubTopics, ReasonCodes} = {lists:reverse(ReverseSubTopics), lists:reverse(ReverseReasonCodes)},
?LOG(warning, "Cannot subscribe ~p for ~p",
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
deliver({suback, PacketId, ReasonCodes}, PState)
deliver({suback, PacketId, ReasonCodes}, PState),
do_acl_deny_action(Packet, ReasonCodes, PState)
end;
process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
@ -674,7 +685,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c
false ->
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end
end, SessAttrs);
end, SessAttrs);
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) ->
maps:put(topic_alias_maximum, if
ProtoVer =:= ?MQTT_PROTO_V5 ->
@ -781,7 +792,6 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
#pstate{zone = Zone}) ->
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl})
when IsSuper orelse (not EnableAcl) ->
ok;
@ -887,3 +897,32 @@ sp(false) -> 0.
flag(false) -> 0;
flag(true) -> 1.
%%------------------------------------------------------------------------------
%% Execute actions in case acl deny
do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) ->
{error, ?RC_NOT_AUTHORIZED, PState};
do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload),
?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) ->
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
{error, ?RC_NOT_AUTHORIZED, PState};
do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload),
?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) ->
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
{error, ?RC_NOT_AUTHORIZED, PState};
do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters),
ReasonCodes, PState = #pstate{acl_deny_action = disconnect}) ->
case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of
true ->
deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState),
{error, ?RC_NOT_AUTHORIZED, PState};
false ->
{ok, PState}
end;
do_acl_deny_action(_PubSupPacket, _ReasonCode, PState) ->
{ok, PState}.

View File

@ -179,15 +179,15 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
{error, Error} ->
?LOG(error, "Protocol error - ~p", [Error]),
stop(Error, State);
shutdown(Error, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1});
{stop, Error, ProtoState1} ->
stop(Error, State#state{proto_state = ProtoState1})
shutdown(Error, State#state{proto_state = ProtoState1})
end;
{error, Error} ->
?LOG(error, "Frame error: ~p", [Error]),
stop(Error, State);
shutdown(Error, State);
{'EXIT', Reason} ->
?LOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]),
shutdown(parse_error, State)
@ -299,8 +299,5 @@ ensure_stats_timer(State) ->
shutdown(Reason, State) ->
{stop, State#state{shutdown = Reason}}.
stop(Error, State) ->
{stop, State#state{shutdown = Error}}.
wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS].

View File

@ -13,4 +13,3 @@
{deny, all, subscribe, ["$SYS/#", "#"]}.
{deny, all}.

View File

@ -0,0 +1,4 @@
{deny, {user, "emqx"}, pubsub, ["acl_deny_action"]}.
{allow, all}.

View File

@ -36,29 +36,33 @@
all() ->
[
{group, mqttv4},
{group, mqttv5}].
{group, mqttv5},
{group, acl}
].
groups() ->
[{mqttv4,
[sequence],
[
connect_v4,
subscribe_v4
]},
[connect_v4,
subscribe_v4]},
{mqttv5,
[sequence],
[
connect_v5,
subscribe_v5
]
}].
[connect_v5,
subscribe_v5]},
{acl,
[sequence],
[acl_deny_action]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
[start_apps(App, SchemaFile, ConfigFile) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, deps_path(emqx, "priv/emqx.schema"),
deps_path(emqx, "etc/emqx.conf")}]],
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
application:stop(emqx).
batch_connect(NumberOfConnections) ->
batch_connect([], NumberOfConnections).
@ -67,7 +71,7 @@ batch_connect(Socks, 0) ->
Socks;
batch_connect(Socks, NumberOfConnections) ->
{ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw}, {active, false}],
[binary, {packet, raw}, {active, false}],
3000),
batch_connect([Sock | Socks], NumberOfConnections - 1).
@ -77,7 +81,7 @@ with_connection(DoFun, NumberOfConnections) ->
DoFun(Socks)
after
lists:foreach(fun(Sock) ->
emqx_client_sock:close(Sock)
emqx_client_sock:close(Sock)
end, Socks)
end.
@ -154,7 +158,7 @@ connect_v5(_) ->
#{'Response-Information' := _RespInfo}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
% test clean start
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
@ -267,7 +271,7 @@ connect_v5(_) ->
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
)
),
{ok, WillData} = gen_tcp:recv(Sock2, 0),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
@ -324,7 +328,7 @@ connect_v5(_) ->
{ok, SubData1} = gen_tcp:recv(Sock1, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5)
end, 2),
end, 2),
ok.
@ -422,3 +426,66 @@ raw_send_serialize(Packet, Opts) ->
raw_recv_parse(P, ProtoVersion) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ProtoVersion}}).
acl_deny_action(_) ->
emqx_zone:set_env(external, acl_deny_action, disconnect),
process_flag(trap_exit, true),
[acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
[acl_deny_do_disconnect(subscribe, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
emqx_zone:set_env(external, acl_deny_action, ignore),
ok.
acl_deny_do_disconnect(publish, QoS, Topic) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
{ok, _} = emqx_client:connect(Client),
emqx_client:publish(Client, Topic, <<"test">>, QoS),
receive
{'EXIT', Client, _Reason} ->
false = is_process_alive(Client)
end;
acl_deny_do_disconnect(subscribe, QoS, Topic) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
{ok, _} = emqx_client:connect(Client),
try emqx_client:subscribe(Client, Topic, QoS) of
_ ->
ok
catch
exit : _Reason ->
false = is_process_alive(Client)
end.
start_apps(App, SchemaFile, ConfigFile) ->
read_schema_configs(App, SchemaFile, ConfigFile),
set_special_configs(App),
application:ensure_all_started(App).
read_schema_configs(App, SchemaFile, ConfigFile) ->
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = conf_parse:file(ConfigFile),
NewConfig = cuttlefish_generator:map(Schema, Conf),
Vals = proplists:get_value(App, NewConfig, []),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals].
set_special_configs(emqx) ->
application:set_env(emqx, enable_acl_cache, false),
application:set_env(emqx, plugins_loaded_file,
deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")),
application:set_env(emqx, acl_deny_action, disconnect),
application:set_env(emqx, acl_file,
deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"));
set_special_configs(_App) ->
ok.
deps_path(App, RelativePath) ->
%% Note: not lib_dir because etc dir is not sym-link-ed to _build dir
%% but priv dir is
Path0 = code:priv_dir(App),
Path = case file:read_link(Path0) of
{ok, Resolved} -> Resolved;
{error, _} -> Path0
end,
filename:join([Path, "..", RelativePath]).
local_path(RelativePath) ->
deps_path(emqx_auth_username, RelativePath).