diff --git a/etc/emqx.conf b/etc/emqx.conf index 6370e7214..430c58ce4 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -423,6 +423,12 @@ acl_cache_max_size = 32 ## Default: 1 minute acl_cache_ttl = 1m +## The action when acl check reject current operation +## +## Value: ignore | disconnect +## Default: ignore +acl_deny_action = ignore + ##-------------------------------------------------------------------- ## MQTT Protocol ##-------------------------------------------------------------------- @@ -512,6 +518,12 @@ zone.external.enable_ban = on ## Value: on | off zone.external.enable_stats = on +## The action when acl check reject current operation +## +## Value: ignore | disconnect +## Default: ignore +zone.external.acl_deny_action = ignore + ## Force MQTT connection/session process GC after this number of ## messages | bytes passed through. ## @@ -670,6 +682,12 @@ zone.internal.enable_stats = on ## Value: Flag zone.internal.enable_acl = off +## The action when acl check reject current operation +## +## Value: ignore | disconnect +## Default: ignore +zone.internal.acl_deny_action = ignore + ## See zone.$name.wildcard_subscription. ## ## Value: boolean diff --git a/priv/emqx.schema b/priv/emqx.schema index 1001ab5a8..d59c8af24 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -548,6 +548,12 @@ end}. {validators, ["range:gt_0"]} ]}. +%% @doc Action when acl check reject current operation +{mapping, "acl_deny_action", "emqx.acl_deny_action", [ + {default, ignore}, + {datatype, {enum, [ignore, disconnect]}} +]}. + {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. @@ -640,6 +646,12 @@ end}. {datatype, flag} ]}. +%% @doc Action when acl check reject current operation +{mapping, "zone.$name.acl_deny_action", "emqx.zones", [ + {default, ignore}, + {datatype, {enum, [ignore, disconnect]}} +]}. + %% @doc Enable Ban. {mapping, "zone.$name.enable_ban", "emqx.zones", [ {default, off}, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 573b913f7..0bedb0927 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -60,6 +60,7 @@ is_bridge, enable_ban, enable_acl, + acl_deny_action, recv_stats, send_stats, connected, @@ -84,28 +85,29 @@ -spec(init(map(), list()) -> state()). init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - conn_pid = self(), - username = init_username(Peercert, Options), - is_super = false, - clean_start = false, - topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), - mountpoint = emqx_zone:get_env(Zone, mountpoint), - is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false, - ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}. + #pstate{zone = Zone, + sendfun = SendFun, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + conn_pid = self(), + username = init_username(Peercert, Options), + is_super = false, + clean_start = false, + topic_aliases = #{}, + packet_size = emqx_zone:get_env(Zone, max_packet_size), + mountpoint = emqx_zone:get_env(Zone, mountpoint), + is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), + enable_acl = emqx_zone:get_env(Zone, enable_acl), + acl_deny_action = emqx_zone:get_env(Zone, acl_deny_action, ignore), + recv_stats = #{msg => 0, pkt => 0}, + send_stats = #{msg => 0, pkt => 0}, + connected = false, + ignore_loop = emqx_config:get_env(mqtt_ignore_loop_deliver, false)}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -341,13 +343,10 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PSt case check_publish(Packet, PState) of {ok, PState1} -> do_publish(Packet, PState1); - {error, ?RC_TOPIC_ALIAS_INVALID} -> - ?LOG(error, "Protocol error - ~p", [?RC_TOPIC_ALIAS_INVALID]), - {error, ?RC_TOPIC_ALIAS_INVALID, PState}; {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos0 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - {error, ReasonCode, PState} + do_acl_deny_action(Packet, ReasonCode, PState) end; process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PState) -> @@ -357,7 +356,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PSta {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos1 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - deliver({puback, PacketId, ReasonCode}, PState) + case deliver({puback, PacketId, ReasonCode}, PState) of + {ok, _PState} -> + do_acl_deny_action(Packet, ReasonCode, PState); + Error -> + Error + end end; process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PState) -> @@ -367,7 +371,12 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PSta {error, ReasonCode} -> ?LOG(warning, "Cannot publish qos2 message to ~s for ~s", [Topic, emqx_reason_codes:text(ReasonCode)]), - deliver({pubrec, PacketId, ReasonCode}, PState) + case deliver({pubrec, PacketId, ?RC_NOT_AUTHORIZED}, PState) of + {ok, _PState} -> + do_acl_deny_action(Packet, ReasonCode, PState); + Error -> + Error + end end; process_packet(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> @@ -392,7 +401,7 @@ process_packet(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) -> {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; -process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), +process_packet(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge, ignore_loop = IgnoreLoop}) -> @@ -419,15 +428,17 @@ process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), deliver({suback, PacketId, ReasonCodes}, PState) end; {error, TopicFilters} -> - {SubTopics, ReasonCodes} = - lists:foldr(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) -> + {ReverseSubTopics, ReverseReasonCodes} = + lists:foldl(fun({Topic, #{rc := ?RC_SUCCESS}}, {Topics, Codes}) -> {[Topic|Topics], [?RC_IMPLEMENTATION_SPECIFIC_ERROR | Codes]}; ({Topic, #{rc := Code}}, {Topics, Codes}) -> {[Topic|Topics], [Code|Codes]} end, {[], []}, TopicFilters), + {SubTopics, ReasonCodes} = {lists:reverse(ReverseSubTopics), lists:reverse(ReverseReasonCodes)}, ?LOG(warning, "Cannot subscribe ~p for ~p", [SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]), - deliver({suback, PacketId, ReasonCodes}, PState) + deliver({suback, PacketId, ReasonCodes}, PState), + do_acl_deny_action(Packet, ReasonCodes, PState) end; process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), @@ -674,7 +685,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) end - end, SessAttrs); + end, SessAttrs); set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps}}, SessAttrs) -> maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> @@ -781,7 +792,6 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret #pstate{zone = Zone}) -> emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). - check_pub_acl(_Packet, #pstate{is_super = IsSuper, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> ok; @@ -887,3 +897,32 @@ sp(false) -> 0. flag(false) -> 0; flag(true) -> 1. + +%%------------------------------------------------------------------------------ +%% Execute actions in case acl deny + +do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), + ?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) -> + {error, ?RC_NOT_AUTHORIZED, PState}; + +do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload), + ?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) -> + deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), + {error, ?RC_NOT_AUTHORIZED, PState}; + +do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload), + ?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) -> + deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), + {error, ?RC_NOT_AUTHORIZED, PState}; + +do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters), + ReasonCodes, PState = #pstate{acl_deny_action = disconnect}) -> + case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of + true -> + deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), + {error, ?RC_NOT_AUTHORIZED, PState}; + false -> + {ok, PState} + end; +do_acl_deny_action(_PubSupPacket, _ReasonCode, PState) -> + {ok, PState}. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 180edc8f0..4751a5bed 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -179,15 +179,15 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); {error, Error} -> ?LOG(error, "Protocol error - ~p", [Error]), - stop(Error, State); + shutdown(Error, State); {error, Reason, ProtoState1} -> shutdown(Reason, State#state{proto_state = ProtoState1}); {stop, Error, ProtoState1} -> - stop(Error, State#state{proto_state = ProtoState1}) + shutdown(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?LOG(error, "Frame error: ~p", [Error]), - stop(Error, State); + shutdown(Error, State); {'EXIT', Reason} -> ?LOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data]), shutdown(parse_error, State) @@ -299,8 +299,5 @@ ensure_stats_timer(State) -> shutdown(Reason, State) -> {stop, State#state{shutdown = Reason}}. -stop(Error, State) -> - {stop, State#state{shutdown = Error}}. - wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. diff --git a/test/emqx_access_SUITE_data/acl.conf b/test/emqx_access_SUITE_data/acl.conf index 03416f002..e5730b4c5 100644 --- a/test/emqx_access_SUITE_data/acl.conf +++ b/test/emqx_access_SUITE_data/acl.conf @@ -13,4 +13,3 @@ {deny, all, subscribe, ["$SYS/#", "#"]}. {deny, all}. - diff --git a/test/emqx_access_SUITE_data/acl_deny_action.conf b/test/emqx_access_SUITE_data/acl_deny_action.conf new file mode 100644 index 000000000..753782605 --- /dev/null +++ b/test/emqx_access_SUITE_data/acl_deny_action.conf @@ -0,0 +1,4 @@ + +{deny, {user, "emqx"}, pubsub, ["acl_deny_action"]}. + +{allow, all}. diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index ae308ea42..387be3053 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -36,29 +36,33 @@ all() -> [ {group, mqttv4}, - {group, mqttv5}]. + {group, mqttv5}, + {group, acl} + ]. groups() -> [{mqttv4, [sequence], - [ - connect_v4, - subscribe_v4 - ]}, + [connect_v4, + subscribe_v4]}, {mqttv5, [sequence], - [ - connect_v5, - subscribe_v5 - ] - }]. + [connect_v5, + subscribe_v5]}, + {acl, + [sequence], + [acl_deny_action]}]. + init_per_suite(Config) -> - emqx_ct_broker_helpers:run_setup_steps(), + [start_apps(App, SchemaFile, ConfigFile) || + {App, SchemaFile, ConfigFile} + <- [{emqx, deps_path(emqx, "priv/emqx.schema"), + deps_path(emqx, "etc/emqx.conf")}]], Config. end_per_suite(_Config) -> - emqx_ct_broker_helpers:run_teardown_steps(). + application:stop(emqx). batch_connect(NumberOfConnections) -> batch_connect([], NumberOfConnections). @@ -67,7 +71,7 @@ batch_connect(Socks, 0) -> Socks; batch_connect(Socks, NumberOfConnections) -> {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, - [binary, {packet, raw}, {active, false}], + [binary, {packet, raw}, {active, false}], 3000), batch_connect([Sock | Socks], NumberOfConnections - 1). @@ -77,7 +81,7 @@ with_connection(DoFun, NumberOfConnections) -> DoFun(Socks) after lists:foreach(fun(Sock) -> - emqx_client_sock:close(Sock) + emqx_client_sock:close(Sock) end, Socks) end. @@ -154,7 +158,7 @@ connect_v5(_) -> #{'Response-Information' := _RespInfo}), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) end), - + % test clean start with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, @@ -267,7 +271,7 @@ connect_v5(_) -> ?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE) ) ), - + {ok, WillData} = gen_tcp:recv(Sock2, 0), {ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5), @@ -324,7 +328,7 @@ connect_v5(_) -> {ok, SubData1} = gen_tcp:recv(Sock1, 0), {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5) - end, 2), + end, 2), ok. @@ -422,3 +426,66 @@ raw_send_serialize(Packet, Opts) -> raw_recv_parse(P, ProtoVersion) -> emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, version => ProtoVersion}}). + + +acl_deny_action(_) -> + emqx_zone:set_env(external, acl_deny_action, disconnect), + process_flag(trap_exit, true), + [acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)], + [acl_deny_do_disconnect(subscribe, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)], + emqx_zone:set_env(external, acl_deny_action, ignore), + ok. + +acl_deny_do_disconnect(publish, QoS, Topic) -> + {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]), + {ok, _} = emqx_client:connect(Client), + emqx_client:publish(Client, Topic, <<"test">>, QoS), + receive + {'EXIT', Client, _Reason} -> + false = is_process_alive(Client) + end; +acl_deny_do_disconnect(subscribe, QoS, Topic) -> + {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]), + {ok, _} = emqx_client:connect(Client), + try emqx_client:subscribe(Client, Topic, QoS) of + _ -> + ok + catch + exit : _Reason -> + false = is_process_alive(Client) + end. + +start_apps(App, SchemaFile, ConfigFile) -> + read_schema_configs(App, SchemaFile, ConfigFile), + set_special_configs(App), + application:ensure_all_started(App). + +read_schema_configs(App, SchemaFile, ConfigFile) -> + Schema = cuttlefish_schema:files([SchemaFile]), + Conf = conf_parse:file(ConfigFile), + NewConfig = cuttlefish_generator:map(Schema, Conf), + Vals = proplists:get_value(App, NewConfig, []), + [application:set_env(App, Par, Value) || {Par, Value} <- Vals]. + +set_special_configs(emqx) -> + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, plugins_loaded_file, + deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")), + application:set_env(emqx, acl_deny_action, disconnect), + application:set_env(emqx, acl_file, + deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf")); +set_special_configs(_App) -> + ok. + +deps_path(App, RelativePath) -> + %% Note: not lib_dir because etc dir is not sym-link-ed to _build dir + %% but priv dir is + Path0 = code:priv_dir(App), + Path = case file:read_link(Path0) of + {ok, Resolved} -> Resolved; + {error, _} -> Path0 + end, + filename:join([Path, "..", RelativePath]). + +local_path(RelativePath) -> + deps_path(emqx_auth_username, RelativePath).