diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 0bedb0927..1f9d19094 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -290,7 +290,7 @@ process_packet(?CONNECT_PACKET( properties = ConnProps, client_id = ClientId, username = Username, - password = Password} = Connect), PState) -> + password = Password} = ConnPkt), PState) -> NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState), @@ -298,7 +298,6 @@ process_packet(?CONNECT_PACKET( %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - WillMsg = make_will_msg(Connect), PState1 = set_username(Username, PState#pstate{client_id = NewClientId, @@ -307,16 +306,16 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, conn_props = ConnProps, - will_msg = WillMsg, is_bridge = IsBridge, connected_at = os:timestamp()}), connack( - case check_connect(Connect, PState1) of + case check_connect(ConnPkt, PState1) of {ok, PState2} -> case authenticate(credentials(PState2), Password) of {ok, IsSuper} -> %% Maybe assign a clientId - PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}), + PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper, + will_msg = make_will_msg(ConnPkt)}), emqx_logger:set_metadata_client_id(PState3#pstate.client_id), %% Open session case try_open_session(PState3) of @@ -719,14 +718,16 @@ get_property(Name, Props, Default) -> maps:get(Name, Props, Default). make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, - will_props = WillProps} = Connect) -> - emqx_packet:will_msg(if - ProtoVer =:= ?MQTT_PROTO_V5 -> - WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), - Connect#mqtt_packet_connect{will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; - true -> - Connect - end). + will_props = WillProps} = ConnPkt) -> + emqx_packet:will_msg( + case ProtoVer of + ?MQTT_PROTO_V5 -> + WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), + ConnPkt#mqtt_packet_connect{ + will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; + _ -> + ConnPkt + end). %%------------------------------------------------------------------------------ %% Check Packet @@ -735,7 +736,8 @@ make_will_msg(#mqtt_packet_connect{proto_ver = ProtoVer, check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, fun check_client_id/2, - fun check_banned/2], Packet, PState). + fun check_banned/2, + fun check_will_topic/2], Packet, PState). check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> @@ -766,7 +768,7 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. -check_banned(_Connect, #pstate{enable_ban = false}) -> +check_banned(_ConnPkt, #pstate{enable_ban = false}) -> ok; check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, #pstate{peername = Peername}) -> @@ -777,6 +779,26 @@ check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, false -> ok end. +check_will_topic(#mqtt_packet_connect{will_flag = false}, _PState) -> + ok; +check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) -> + try emqx_topic:validate(WillTopic) of + true -> check_will_acl(ConnPkt, PState) + catch error : _Error -> + {error, ?RC_TOPIC_NAME_INVALID} + end. + +check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) + when not EnableAcl -> + ok; +check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) -> + case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of + allow -> ok; + deny -> + ?LOG(warning, "Cannot publish will message to ~p for acl checking failed", [WillTopic]), + {error, ?RC_UNSPECIFIED_ERROR} + end. + check_publish(Packet, PState) -> run_check_steps([fun check_pub_caps/2, fun check_pub_acl/2], Packet, PState). @@ -902,25 +924,29 @@ flag(true) -> 1. %% Execute actions in case acl deny do_acl_deny_action(?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) -> - {error, ?RC_NOT_AUTHORIZED, PState}; + ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, + acl_deny_action = disconnect}) -> + {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; do_acl_deny_action(?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) -> + ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, + acl_deny_action = disconnect}) -> deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), - {error, ?RC_NOT_AUTHORIZED, PState}; + {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; do_acl_deny_action(?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, _Payload), - ?RC_NOT_AUTHORIZED, PState = #pstate{acl_deny_action = disconnect}) -> + ?RC_NOT_AUTHORIZED, PState = #pstate{proto_ver = ProtoVer, + acl_deny_action = disconnect}) -> deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), - {error, ?RC_NOT_AUTHORIZED, PState}; + {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; do_acl_deny_action(?SUBSCRIBE_PACKET(_PacketId, _Properties, _RawTopicFilters), - ReasonCodes, PState = #pstate{acl_deny_action = disconnect}) -> + ReasonCodes, PState = #pstate{proto_ver = ProtoVer, + acl_deny_action = disconnect}) -> case lists:member(?RC_NOT_AUTHORIZED, ReasonCodes) of true -> deliver({disconnect, ?RC_NOT_AUTHORIZED}, PState), - {error, ?RC_NOT_AUTHORIZED, PState}; + {error, emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ProtoVer), PState}; false -> {ok, PState} end; diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 3dc73a5d5..c6bdd849d 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -133,6 +133,7 @@ compat(connack, 16#89) -> ?CONNACK_SERVER; compat(connack, 16#8A) -> ?CONNACK_AUTH; compat(connack, 16#8B) -> ?CONNACK_SERVER; compat(connack, 16#8C) -> ?CONNACK_AUTH; +compat(connack, 16#90) -> ?CONNACK_SERVER; compat(connack, 16#97) -> ?CONNACK_SERVER; compat(connack, 16#9C) -> ?CONNACK_SERVER; compat(connack, 16#9D) -> ?CONNACK_SERVER; diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 387be3053..38d1001d3 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -35,13 +35,17 @@ all() -> [ + {group, mqtt_common}, {group, mqttv4}, {group, mqttv5}, {group, acl} ]. groups() -> - [{mqttv4, + [{mqtt_common, + [sequence], + [will_check]}, + {mqttv4, [sequence], [connect_v4, subscribe_v4]}, @@ -53,7 +57,6 @@ groups() -> [sequence], [acl_deny_action]}]. - init_per_suite(Config) -> [start_apps(App, SchemaFile, ConfigFile) || {App, SchemaFile, ConfigFile} @@ -436,6 +439,39 @@ acl_deny_action(_) -> emqx_zone:set_env(external, acl_deny_action, ignore), ok. +will_check(_) -> + process_flag(trap_exit, true), + will_topic_check(0), + will_acl_check(0). + +will_topic_check(QoS) -> + {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}, + {will_flag, true}, + {will_topic, <<"">>}, + {will_payload, <<"I have died">>}, + {will_qos, QoS}]), + try emqx_client:connect(Client) of + _ -> + ok + catch + exit : _Reason -> + false = is_process_alive(Client) + end. + +will_acl_check(QoS) -> + {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}, + {will_flag, true}, + {will_topic, <<"acl_deny_action">>}, + {will_payload, <<"I have died">>}, + {will_qos, QoS}]), + try emqx_client:connect(Client) of + _ -> + ok + catch + exit : _Reason -> + false = is_process_alive(Client) + end. + acl_deny_do_disconnect(publish, QoS, Topic) -> {ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]), {ok, _} = emqx_client:connect(Client),