diff --git a/etc/emqx.conf b/etc/emqx.conf index 82ec0dc8c..341056344 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1886,7 +1886,9 @@ plugins.etc_dir = {{ platform_etc_dir }}/plugins/ ## Value: File plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins -## File to store loaded plugin names. +## The directory of extension plugins. +## +## Value: File plugins.expand_plugins_dir = {{ platform_plugins_dir }}/ ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index db4421c91..024e14815 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -997,6 +997,8 @@ end}. end, #{}, string:tokens(Val, ",")), {mqueue_priorities, MqueuePriorities} end; + ("mountpoint", Val) -> + {mountpoint, iolist_to_binary(Val)}; (Opt, Val) -> {list_to_atom(Opt), Val} end, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index cce2e1f97..a60f148f1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -287,7 +287,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{clientinfo = ClientInfo}) -> + Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = parse_topic_filters(TopicFilters), TopicFilters2 = enrich_subid(Properties, TopicFilters1), @@ -296,7 +296,15 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), TopicFilters2 ), {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel), - handle_out(suback, {PacketId, ReasonCodes}, NChannel); + case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso + lists:any(fun(ReasonCode) -> + ReasonCode =:= ?RC_NOT_AUTHORIZED + end, ReasonCodes) of + true -> + handle_out(disconnect, ?RC_NOT_AUTHORIZED, NChannel); + false -> + handle_out(suback, {PacketId, ReasonCodes}, NChannel) + end; {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) end; @@ -373,7 +381,8 @@ process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, %% Process Publish %%-------------------------------------------------------------------- -process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) -> +process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), + Channel = #channel{clientinfo = #{zone := Zone}}) -> case pipeline([fun process_alias/2, fun check_pub_alias/2, fun check_pub_acl/2, @@ -382,6 +391,19 @@ process_publish(Packet = ?PUBLISH_PACKET(_QoS, Topic, PacketId), Channel) -> {ok, NPacket, NChannel} -> Msg = packet_to_message(NPacket, NChannel), do_publish(PacketId, Msg, NChannel); + {error, ReasonCode, NChannel} when ReasonCode =:= ?RC_NOT_AUTHORIZED -> + ?LOG(warning, "Cannot publish message to ~s due to ~s.", + [Topic, emqx_reason_codes:text(ReasonCode)]), + case emqx_zone:get_env(Zone, acl_deny_action, ignore) of + ignore -> + case QoS of + ?QOS_0 -> {ok, NChannel}; + _ -> + handle_out(puback, {PacketId, ReasonCode}, NChannel) + end; + disconnect -> + handle_out(disconnect, ReasonCode, NChannel) + end; {error, ReasonCode, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s.", [Topic, emqx_reason_codes:text(ReasonCode)]), @@ -786,6 +808,9 @@ handle_info(Info, Channel) -> handle_timeout(_TRef, {keepalive, _StatVal}, Channel = #channel{keepalive = undefined}) -> {ok, Channel}; +handle_timeout(_TRef, {keepalive, _StatVal}, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; handle_timeout(_TRef, {keepalive, StatVal}, Channel = #channel{keepalive = Keepalive}) -> case emqx_keepalive:check(StatVal, Keepalive) of @@ -796,6 +821,9 @@ handle_timeout(_TRef, {keepalive, StatVal}, handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel) end; +handle_timeout(_TRef, retry_delivery, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; handle_timeout(_TRef, retry_delivery, Channel = #channel{session = Session}) -> case emqx_session:retry(Session) of @@ -809,6 +837,9 @@ handle_timeout(_TRef, retry_delivery, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; +handle_timeout(_TRef, expire_awaiting_rel, + Channel = #channel{conn_state = disconnected}) -> + {ok, Channel}; handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{session = Session}) -> case emqx_session:expire(awaiting_rel, Session) of @@ -981,6 +1012,9 @@ maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Us false -> ok end. +maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId}) + when ClientId /= undefined -> + {ok, ClientInfo}; maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) -> %% Generate a rand clientId {ok, ClientInfo#{clientid => emqx_guid:to_base62(emqx_guid:gen())}}; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index a3d3e41a6..1f595d3fa 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -476,13 +476,18 @@ handle_timeout(_TRef, emit_stats, State = emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; -handle_timeout(TRef, keepalive, State = - #state{transport = Transport, socket = Socket}) -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> - handle_timeout(TRef, {keepalive, RecvOct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) +handle_timeout(TRef, keepalive, State = #state{transport = Transport, + socket = Socket, + channel = Channel})-> + case emqx_channel:info(conn_state, Channel) of + disconnected -> {ok, State}; + _ -> + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> + handle_timeout(TRef, {keepalive, RecvOct}, State); + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end end; handle_timeout(TRef, Msg, State) -> @@ -617,7 +622,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> {ok, Limiter1} -> State#state{limiter = Limiter1}; {pause, Time, Limiter1} -> - ?LOG(debug, "Pause ~pms due to rate limit", [Time]), + ?LOG(warning, "Pause ~pms due to rate limit", [Time]), TRef = start_timer(Time, limit_timeout), State#state{sockstate = blocked, limiter = Limiter1, diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 354229f99..b229a3e07 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -478,8 +478,9 @@ format_variable(#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}) -> io_lib:format("Topic=~s, PacketId=~p", [TopicName, PacketId]); -format_variable(#mqtt_packet_puback{packet_id = PacketId}) -> - io_lib:format("PacketId=~p", [PacketId]); +format_variable(#mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode}) -> + io_lib:format("PacketId=~p, ReasonCode=~p", [PacketId, ReasonCode]); format_variable(#mqtt_packet_subscribe{packet_id = PacketId, topic_filters = TopicFilters}) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 204dc82ee..97fb5e623 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -414,7 +414,7 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> {ok, Limiter1} -> State#state{limiter = Limiter1}; {pause, Time, Limiter1} -> - ?LOG(debug, "Pause ~pms due to rate limit", [Time]), + ?LOG(warning, "Pause ~pms due to rate limit", [Time]), TRef = start_timer(Time, limit_timeout), NState = State#state{sockstate = blocked, limiter = Limiter1, diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index fc440aeb1..c7b44cbe9 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -45,7 +45,8 @@ all() -> [{group, mqttv3}, {group, mqttv4}, - {group, mqttv5} + {group, mqttv5}, + {group, others} ]. groups() -> @@ -66,17 +67,26 @@ groups() -> ]}, {mqttv5, [non_parallel_tests], [t_basic_with_props_v5 + ]}, + {others, [non_parallel_tests], + [t_username_as_clientid, + t_certcn_as_clientid ]} ]. init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([]), + emqx_ct_helpers:start_apps([], fun set_special_confs/1), Config. end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +set_special_confs(emqx) -> + emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]); +set_special_confs(_) -> + ok. + %%-------------------------------------------------------------------- %% Test cases for MQTT v3 %%-------------------------------------------------------------------- @@ -110,10 +120,7 @@ t_cm_registry(_) -> {_, Pid, _, _} = lists:keyfind(registry, 1, Info), ignored = gen_server:call(Pid, <<"Unexpected call">>), gen_server:cast(Pid, <<"Unexpected cast">>), - Pid ! <<"Unexpected info">>, - ok = application:stop(mnesia), - emqx_ct_helpers:stop_apps([]), - emqx_ct_helpers:start_apps([]). + Pid ! <<"Unexpected info">>. t_will_message(_Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, @@ -263,6 +270,23 @@ t_basic(_Opts) -> ?assertEqual(3, length(recv_msgs(3))), ok = emqtt:disconnect(C). +t_username_as_clientid(_) -> + emqx_zone:set_env(external, use_username_as_clientid, true), + Username = <<"usera">>, + {ok, C} = emqtt:start_link([{username, Username}]), + {ok, _} = emqtt:connect(C), + #{clientinfo := #{clientid := Username}} = emqx_cm:get_chan_info(Username), + emqtt:disconnect(C). + +t_certcn_as_clientid(_) -> + CN = <<"0004.novalocal">>, + emqx_zone:set_env(external, use_username_as_clientid, true), + SslConf = emqx_ct_helpers:client_ssl_twoway(), + {ok, C} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]), + {ok, _} = emqtt:connect(C), + #{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN), + emqtt:disconnect(C). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 49ba497e5..9166701e5 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -74,6 +74,14 @@ receive_disconnect_reasoncode() -> error("no disconnect packet") end. +waiting_client_process_exit(C) -> + receive + {'EXIT', C, Reason} -> Reason; + _Oth -> error({got_another_message, _Oth}) + after + 1000 -> error({waiting_timeout, C}) + end. + clean_retained(Topic) -> {ok, Clean} = emqtt:start_link([{clean_start, true}]), {ok, _} = emqtt:connect(Clean), @@ -102,6 +110,7 @@ t_basic_test(_) -> %%-------------------------------------------------------------------- t_connect_clean_start(_) -> + process_flag(trap_exit, true), {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]), {ok, _} = emqtt:connect(Client1), ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4] @@ -109,11 +118,19 @@ t_connect_clean_start(_) -> {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]), {ok, _} = emqtt:connect(Client2), ?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5] + ?assertEqual(142, receive_disconnect_reasoncode()), + waiting_client_process_exit(Client1), + ok = emqtt:disconnect(Client2), + waiting_client_process_exit(Client2), + {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]), {ok, _} = emqtt:connect(Client3), ?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6] - ok = emqtt:disconnect(Client3). + ok = emqtt:disconnect(Client3), + waiting_client_process_exit(Client3), + + process_flag(trap_exit, false). t_connect_will_message(_) -> Topic = nth(1, ?TOPICS), @@ -396,7 +413,7 @@ t_connack_max_qos_allowed(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Connack1} = emqtt:connect(Client1), - ?assertEqual(0, maps:get('Maximum-QoS',Connack1)), %% [MQTT-3.2.2-9] + ?assertEqual(0, maps:get('Maximum-QoS', Connack1)), %% [MQTT-3.2.2-9] {ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10] {ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1), %% [MQTT-3.2.2-10] @@ -404,6 +421,7 @@ t_connack_max_qos_allowed(_) -> {ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1), ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] + waiting_client_process_exit(Client1), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, @@ -413,7 +431,8 @@ t_connack_max_qos_allowed(_) -> {will_qos, 2} ]), {error, Connack2} = emqtt:connect(Client2), - ?assertMatch({qos_not_supported,_ }, Connack2), %% [MQTT-3.2.2-12] + ?assertMatch({qos_not_supported, _}, Connack2), %% [MQTT-3.2.2-12] + waiting_client_process_exit(Client2), %% max_qos_allowed = 1 emqx_zone:set_env(external, max_qos_allowed, 1), @@ -422,7 +441,7 @@ t_connack_max_qos_allowed(_) -> {ok, Client3} = emqtt:start_link([{proto_ver, v5}]), {ok, Connack3} = emqtt:connect(Client3), - ?assertEqual(1, maps:get('Maximum-QoS',Connack3)), %% [MQTT-3.2.2-9] + ?assertEqual(1, maps:get('Maximum-QoS', Connack3)), %% [MQTT-3.2.2-9] {ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10] {ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1), %% [MQTT-3.2.2-10] @@ -430,6 +449,7 @@ t_connack_max_qos_allowed(_) -> {ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2), ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] + waiting_client_process_exit(Client3), {ok, Client4} = emqtt:start_link([ {proto_ver, v5}, @@ -439,11 +459,8 @@ t_connack_max_qos_allowed(_) -> {will_qos, 2} ]), {error, Connack4} = emqtt:connect(Client4), - ?assertMatch({qos_not_supported,_ }, Connack4), %% [MQTT-3.2.2-12] - receive - {'EXIT', _, {shutdown,qos_not_supported}} -> ok - after 100 -> error("t_connack_max_qos_allowed") - end, + ?assertMatch({qos_not_supported, _}, Connack4), %% [MQTT-3.2.2-12] + waiting_client_process_exit(Client4), %% max_qos_allowed = 2 emqx_zone:set_env(external, max_qos_allowed, 2), @@ -452,12 +469,10 @@ t_connack_max_qos_allowed(_) -> {ok, Client5} = emqtt:start_link([{proto_ver, v5}]), {ok, Connack5} = emqtt:connect(Client5), - ?assertEqual(2, maps:get('Maximum-QoS',Connack5)), %% [MQTT-3.2.2-9] + ?assertEqual(2, maps:get('Maximum-QoS', Connack5)), %% [MQTT-3.2.2-9] ok = emqtt:disconnect(Client5), + waiting_client_process_exit(Client5), - receive {'EXIT', _, _} -> ok - after 100 -> ok - end, process_flag(trap_exit, false). t_connack_assigned_clienid(_) -> @@ -499,10 +514,8 @@ t_publish_wildtopic(_) -> {ok, _} = emqtt:connect(Client1), ok = emqtt:publish(Client1, Topic, <<"error topic">>), ?assertEqual(144, receive_disconnect_reasoncode()), + waiting_client_process_exit(Client1), - receive {'EXIT', _, _} -> ok - after 100 -> ok - end, process_flag(trap_exit, false). t_publish_payload_format_indicator(_) -> @@ -525,6 +538,7 @@ t_publish_topic_alias(_) -> {ok, _} = emqtt:connect(Client1), ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), ?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8] + waiting_client_process_exit(Client1), {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client2), @@ -533,10 +547,8 @@ t_publish_topic_alias(_) -> ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), ?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12] ok = emqtt:disconnect(Client2), + waiting_client_process_exit(Client2), - receive {'EXIT', _, _} -> ok - after 100 -> ok - end, process_flag(trap_exit, false). t_publish_response_topic(_) -> @@ -547,10 +559,8 @@ t_publish_response_topic(_) -> {ok, _} = emqtt:connect(Client1), ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]), ?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14] + waiting_client_process_exit(Client1), - receive {'EXIT', _, _} -> ok - after 100 -> ok - end, process_flag(trap_exit, false). t_publish_properties(_) ->