diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index afe3cec5d..fcd952696 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -489,7 +489,7 @@ zone(Opts) -> maps:get(zone, Opts, undefined). limiter(Opts) -> - maps:get(limiter, Opts). + maps:get(limiter, Opts, #{}). ssl_opts(Opts) -> maps:to_list( diff --git a/apps/emqx/src/emqx_tls_lib.erl b/apps/emqx/src/emqx_tls_lib.erl index 27373eb7c..4250ff430 100644 --- a/apps/emqx/src/emqx_tls_lib.erl +++ b/apps/emqx/src/emqx_tls_lib.erl @@ -368,6 +368,7 @@ do_ensure_ssl_file(Dir, Key, SSL, MaybePem, DryRun) -> end end. +is_valid_string(Empty) when Empty == <<>>; Empty == "" -> false; is_valid_string(String) when is_list(String) -> io_lib:printable_unicode_list(String); is_valid_string(Binary) when is_binary(Binary) -> diff --git a/apps/emqx/test/emqx_tls_lib_tests.erl b/apps/emqx/test/emqx_tls_lib_tests.erl index 93a105ea3..22c480637 100644 --- a/apps/emqx/test/emqx_tls_lib_tests.erl +++ b/apps/emqx/test/emqx_tls_lib_tests.erl @@ -104,6 +104,17 @@ ssl_files_failure_test_() -> ) end}, {"bad_pem_string", fun() -> + %% empty string + ?assertMatch( + {error, #{ + reason := invalid_file_path_or_pem_string, which_options := [<<"keyfile">>] + }}, + emqx_tls_lib:ensure_ssl_files("/tmp", #{ + <<"keyfile">> => <<>>, + <<"certfile">> => bin(test_key()), + <<"cacertfile">> => bin(test_key()) + }) + ), %% not valid unicode ?assertMatch( {error, #{ diff --git a/apps/emqx_gateway/i18n/emqx_gateway_api_clients_i18n.conf b/apps/emqx_gateway/i18n/emqx_gateway_api_clients_i18n.conf index 8da672586..1e6f575c3 100644 --- a/apps/emqx_gateway/i18n/emqx_gateway_api_clients_i18n.conf +++ b/apps/emqx_gateway/i18n/emqx_gateway_api_clients_i18n.conf @@ -245,6 +245,13 @@ emqx_gateway_api_clients { } } + mountpoint { + desc { + en: """Topic mountpoint""" + zh: """主题固定前缀""" + } + } + proto_name { desc { en: """Client protocol name""" diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 1edc28494..207524448 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -579,7 +579,7 @@ ensure_connected( } ) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, - _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, []]), + _ = run_hooks(Ctx, 'client.connack', [NConnInfo, connection_accepted, #{}]), ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo, conn_state = connected}. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index af6393a27..ac0e72c83 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -174,9 +174,7 @@ clients_insta(delete, #{ {204} end). -%% FIXME: -%% List the subscription without mountpoint, but has SubOpts, -%% for example, share group ... +%% List the established subscriptions with mountpoint subscriptions(get, #{ bindings := #{ name := Name0, @@ -189,7 +187,7 @@ subscriptions(get, #{ {error, not_found} -> return_http_error(404, "client process not found"); {error, Reason} -> - return_http_error(500, Reason); + return_http_error(400, Reason); {ok, Subs} -> {200, Subs} end @@ -216,7 +214,7 @@ subscriptions(post, #{ {error, not_found} -> return_http_error(404, "client process not found"); {error, Reason} -> - return_http_error(500, Reason); + return_http_error(400, Reason); {ok, {NTopic, NSubOpts}} -> {201, maps:merge(NSubOpts, #{topic => NTopic})} end @@ -368,6 +366,7 @@ format_channel_info({_, Infos, Stats} = R) -> {node, ClientInfo, Node}, {clientid, ClientInfo}, {username, ClientInfo}, + {mountpoint, ClientInfo}, {proto_name, ConnInfo}, {proto_ver, ConnInfo}, {ip_address, {peername, ConnInfo, fun peer_to_binary_addr/1}}, @@ -813,6 +812,11 @@ common_client_props() -> binary(), #{desc => ?DESC(username)} )}, + {mountpoint, + mk( + binary(), + #{desc => ?DESC(mountpoint)} + )}, {proto_name, mk( binary(), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index f8cf4f96f..f104c6629 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -514,8 +514,9 @@ with_gateway(GwName0, Fun) -> ), return_http_error(404, "Resource not found. path: " ++ Path); %% Exceptions from emqx_gateway_conf:convert_certs/2,3 - error:{bad_ssl_config, #{which_option := Option}} -> - return_http_error(400, ["Bad SSL config, option: ", Option]); + error:{bad_ssl_config, Reason0} -> + Reason = emqx_gateway_utils:stringfy(Reason0), + return_http_error(400, ["Bad SSL config, reason: ", Reason]); Class:Reason:Stk -> ?SLOG(error, #{ msg => "uncatched_error", diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index cf0681c25..9955f75d9 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1782,25 +1782,11 @@ message_to_packet( | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> - %% XXX: Only support short_topic_name - SubProps = maps:get(sub_props, SubOpts, #{}), - case maps:get(subtype, SubProps, short_topic_name) of - short_topic_name -> - case byte_size(Topic) of - 2 -> - case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of - {ok, {_, NTopicName, NSubOpts}, NChannel} -> - reply({ok, {NTopicName, NSubOpts}}, NChannel); - {error, ?SN_RC2_EXCEED_LIMITATION} -> - reply({error, exceed_limitation}, Channel) - end; - _ -> - reply({error, bad_topic_name}, Channel) - end; - predefined_topic_id -> - reply({error, only_support_short_name_topic}, Channel); - _ -> - reply({error, only_support_short_name_topic}, Channel) + case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of + {ok, {_, NTopicName, NSubOpts}, NChannel} -> + reply({ok, {NTopicName, NSubOpts}}, NChannel); + {error, ?SN_RC2_EXCEED_LIMITATION} -> + reply({error, exceed_limitation}, Channel) end; handle_call({unsubscribe, Topic}, _From, Channel) -> TopicFilters = [emqx_topic:parse(Topic)], diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 4ef48d264..6e6a67d0b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -417,10 +417,10 @@ handle_in(Packet = ?PACKET(?CMD_CONNECT), Channel) -> emqx_misc:pipeline( [ fun enrich_conninfo/2, - fun run_conn_hooks/2, fun negotiate_version/2, fun enrich_clientinfo/2, fun assign_clientid_to_conninfo/2, + fun run_conn_hooks/2, fun set_log_meta/2, %% TODO: How to implement the banned in the gateway instance? %, fun check_banned/2 @@ -755,7 +755,7 @@ handle_out( } ) -> %% XXX: connection_accepted is not defined by stomp protocol - _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, []]), + _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), Replies = [ {outgoing, connected_frame(Headers)}, {event, connected} diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 033373655..c7ceb26d5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -492,6 +492,7 @@ fields(client) -> " Maximum number of subscriptions allowed by this client">> })}, {username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})}, + {mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})}, {will_msg, hoconsc:mk(binary(), #{desc => <<"Client will message">>})}, {zone, hoconsc:mk(binary(), #{ @@ -849,7 +850,6 @@ format_channel_info({_, ClientInfo0, ClientStats}) -> is_superuser, sockport, anonymous, - mountpoint, socktype, active_n, await_rel_timeout,