diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index b2a3d77ef..0962e4be3 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -43,7 +43,7 @@ listeners.tcp.default { ## ## @doc listeners.tcp..access_rules ## ValueType: Array - ## Default: [] + ## Default: ["allow all"] ## Examples: ## access_rules: [ ## "deny 192.168.0.0/24", diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index a745f76fb..f3562572f 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -50,11 +50,11 @@ parse_listener_id/1 ]). --export([post_config_update/5]). +-export([pre_config_update/3, post_config_update/5]). -export([format_addr/1]). --define(CONF_KEY_PATH, [listeners]). +-define(CONF_KEY_PATH, [listeners, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). -spec id_example() -> atom(). @@ -202,7 +202,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> "Failed to start listener ~ts on ~ts: ~0p~n", [ListenerId, BindStr, Reason] ), - error({failed_to_start, ListenerId, BindStr, Reason}) + Msg = lists:flatten( + io_lib:format( + "~ts(~ts) : ~p", + [ListenerId, BindStr, element(1, Reason)] + ) + ), + {error, {failed_to_start, Msg}} end. %% @doc Restart all listeners @@ -214,7 +220,7 @@ restart() -> restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). --spec restart_listener(atom(), atom(), map()) -> ok | {error, term()}. +-spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}. restart_listener(Type, ListenerName, {OldConf, NewConf}) -> restart_listener(Type, ListenerName, OldConf, NewConf); restart_listener(Type, ListenerName, Conf) -> @@ -334,54 +340,38 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> {ok, {skipped, quic_app_missing}} end. -delete_authentication(Type, ListenerName, _Conf) -> - emqx_authentication:delete_chain(listener_id(Type, ListenerName)). - %% Update the listeners at runtime -post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) -> - #{added := Added, removed := Removed, changed := Updated} = - diff_listeners(NewListeners, OldListeners), - try - perform_listener_changes(fun stop_listener/3, Removed), - perform_listener_changes(fun delete_authentication/3, Removed), - perform_listener_changes(fun start_listener/3, Added), - perform_listener_changes(fun restart_listener/3, Updated) - catch - error:{failed_to_start, ListenerId, Bind, Reason} -> - Error = lists:flatten( - io_lib:format( - "~ts(~ts) failed with ~ts", - [ListenerId, Bind, element(1, Reason)] - ) - ), - {error, Error} - end. +pre_config_update([listeners, Type, Name], {create, NewConf}, undefined) -> + CertsDir = certs_dir(Type, Name), + {ok, convert_certs(CertsDir, NewConf)}; +pre_config_update([listeners, _Type, _Name], {create, _NewConf}, _RawConf) -> + {error, already_exist}; +pre_config_update([listeners, _Type, _Name], {update, _Request}, undefined) -> + {error, not_found}; +pre_config_update([listeners, Type, Name], {update, Request}, RawConf) -> + NewConf = emqx_map_lib:deep_merge(RawConf, Request), + CertsDir = certs_dir(Type, Name), + {ok, convert_certs(CertsDir, NewConf)}; +pre_config_update(_Path, _Request, RawConf) -> + {ok, RawConf}. -perform_listener_changes(Action, MapConfs) -> - lists:foreach( - fun({Id, Conf}) -> - {ok, #{type := Type, name := Name}} = parse_listener_id(Id), - Action(Type, Name, Conf) - end, - maps:to_list(MapConfs) - ). - -diff_listeners(NewListeners, OldListeners) -> - emqx_map_lib:diff_maps(flatten_listeners(NewListeners), flatten_listeners(OldListeners)). - -flatten_listeners(Conf0) -> - maps:from_list( - lists:append([ - do_flatten_listeners(Type, Conf) - || {Type, Conf} <- maps:to_list(Conf0) - ]) - ). - -do_flatten_listeners(Type, Conf0) -> - [ - {listener_id(Type, Name), maps:remove(authentication, Conf)} - || {Name, Conf} <- maps:to_list(Conf0) - ]. +post_config_update([listeners, Type, Name], {create, _Request}, NewConf, undefined, _AppEnvs) -> + start_listener(Type, Name, NewConf); +post_config_update([listeners, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> + restart_listener(Type, Name, {OldConf, NewConf}); +post_config_update([listeners, _Type, _Name], '$remove', undefined, undefined, _AppEnvs) -> + {error, not_found}; +post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppEnvs) -> + case stop_listener(Type, Name, OldConf) of + ok -> + _ = emqx_authentication:delete_chain(listener_id(Type, Name)), + CertsDir = certs_dir(Type, Name), + clear_certs(CertsDir, OldConf); + Err -> + Err + end; +post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> + ok. esockd_opts(Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), @@ -518,7 +508,10 @@ foreach_listeners(Do) -> lists:foreach( fun({Id, LConf}) -> {ok, #{type := Type, name := Name}} = parse_listener_id(Id), - Do(Type, Name, LConf) + case Do(Type, Name, LConf) of + {error, {failed_to_start, _} = Reason} -> error(Reason); + _ -> ok + end end, list() ). @@ -552,3 +545,22 @@ parse_bind(#{<<"bind">> := Bind}) -> {ok, L} -> L; {error, _} -> binary_to_integer(Bind) end. + +%% The relative dir for ssl files. +certs_dir(Type, Name) -> + iolist_to_binary(filename:join(["listeners", Type, Name])). + +convert_certs(CertsDir, Conf) -> + case emqx_tls_lib:ensure_ssl_files(CertsDir, maps:get(<<"ssl">>, Conf, undefined)) of + {ok, undefined} -> + Conf; + {ok, SSL} -> + Conf#{<<"ssl">> => SSL}; + {error, Reason} -> + ?SLOG(error, Reason#{msg => "bad_ssl_config"}), + throw({bad_ssl_config, Reason}) + end. + +clear_certs(CertsDir, Conf) -> + OldSSL = maps:get(<<"ssl">>, Conf, undefined), + emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 04f20cf0d..f5e57014a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1680,7 +1680,8 @@ mqtt_listener() -> #{ desc => "The access control rules for this listener.
" - "See: https://github.com/emqtt/esockd#allowdeny" + "See: https://github.com/emqtt/esockd#allowdeny", + default => [<<"allow all">>] } )}, {"proxy_protocol", @@ -1700,7 +1701,8 @@ mqtt_listener() -> #{ desc => "Timeout for proxy protocol. EMQX will close the TCP connection " - "if proxy protocol packet is not received within the timeout." + "if proxy protocol packet is not received within the timeout.", + default => "3s" } )}, {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index cc4166193..fba0a8998 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -398,36 +398,36 @@ trans_desc(Init, Hocon, Func, Name) -> end. trans_description(Spec, Hocon) -> - case trans_desc(<<"desc">>, Hocon, undefined) of + Desc = + case desc_struct(Hocon) of + undefined -> undefined; + ?DESC(_, _) = Struct -> get_i18n(<<"desc">>, Struct, undefined); + Struct -> to_bin(Struct) + end, + case Desc of undefined -> Spec; - Value -> Spec#{description => Value} + Desc -> Spec#{description => Desc} end. +get_i18n(Key, Struct, Default) -> + {ok, #{cache := Cache, lang := Lang}} = emqx_dashboard:get_i18n(), + Desc = hocon_schema:resolve_schema(Struct, Cache), + emqx_map_lib:deep_get([Key, Lang], Desc, Default). + trans_label(Spec, Hocon, Default) -> - Label = trans_desc(<<"label">>, Hocon, Default), + Label = + case desc_struct(Hocon) of + ?DESC(_, _) = Struct -> get_i18n(<<"label">>, Struct, Default); + _ -> Default + end, Spec#{label => Label}. -trans_desc(Key, Hocon, Default) -> - case resolve_desc(Key, desc_struct(Hocon)) of - undefined -> Default; - Value -> to_bin(Value) - end. - desc_struct(Hocon) -> case hocon_schema:field_schema(Hocon, desc) of undefined -> hocon_schema:field_schema(Hocon, description); Struct -> Struct end. -resolve_desc(_Key, Bin) when is_binary(Bin) -> Bin; -resolve_desc(Key, Struct) -> - {ok, #{cache := Cache, lang := Lang}} = emqx_dashboard:get_i18n(), - Desc = hocon_schema:resolve_schema(Struct, Cache), - case is_map(Desc) of - true -> emqx_map_lib:deep_get([Key, Lang], Desc, undefined); - false -> Desc - end. - request_body(#{content := _} = Content, _Module, _Options) -> {Content, []}; request_body([], _Module, _Options) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 9cdb26ed6..f55f454ed 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -30,9 +30,7 @@ %% for rpc call -export([ - do_list_listeners/0, - do_update_listener/2, - do_remove_listener/1 + do_list_listeners/0 ]). -include_lib("emqx/include/emqx.hrl"). @@ -43,7 +41,6 @@ -define(LISTENER_NOT_FOUND, <<"Listener id not found">>). -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>). -define(ADDR_PORT_INUSE, <<"Addr port in use">>). - -define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}). namespace() -> "listeners". @@ -65,7 +62,13 @@ schema("/listeners_status") -> get => #{ tags => [<<"listeners">>], desc => <<"List all running node's listeners live status. group by listener type">>, - responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_type_status)))} + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example( + ?ARRAY(?R_REF(listener_type_status)), + listener_type_status_example() + ) + } } }; schema("/listeners") -> @@ -78,10 +81,16 @@ schema("/listeners") -> {type, ?HOCON( ?ENUM(listeners_type()), - #{desc => "Listener type", in => query, required => false} + #{desc => "Listener type", in => query, required => false, example => tcp} )} ], - responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_id_status)))} + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example( + ?ARRAY(?R_REF(listener_id_status)), + listener_id_status_example() + ) + } } }; schema("/listeners/:id") -> @@ -92,7 +101,7 @@ schema("/listeners/:id") -> desc => <<"List all running node's listeners for the specified id.">>, parameters => [?R_REF(listener_id)], responses => #{ - 200 => ?HOCON(listener_schema(#{bind => true})), + 200 => listener_schema(#{bind => true}), 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) } }, @@ -100,9 +109,9 @@ schema("/listeners/:id") -> tags => [<<"listeners">>], desc => <<"Update the specified listener on all nodes.">>, parameters => [?R_REF(listener_id)], - 'requestBody' => ?HOCON(listener_schema(#{bind => false}), #{}), + 'requestBody' => listener_schema(#{bind => false}), responses => #{ - 200 => ?HOCON(listener_schema(#{bind => true}), #{}), + 200 => listener_schema(#{bind => true}), 400 => error_codes(['BAD_REQUEST']), 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) } @@ -111,9 +120,9 @@ schema("/listeners/:id") -> tags => [<<"listeners">>], desc => <<"Create the specified listener on all nodes.">>, parameters => [?R_REF(listener_id)], - 'requestBody' => ?HOCON(listener_schema(#{bind => true}), #{}), + 'requestBody' => listener_schema(#{bind => true}), responses => #{ - 200 => ?HOCON(listener_schema(#{bind => true}), #{}), + 200 => listener_schema(#{bind => true}), 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) } }, @@ -149,8 +158,9 @@ fields(listener_id) -> {id, ?HOCON(atom(), #{ desc => "Listener id", - example => 'tcp:default', + example => 'tcp:demo', validator => fun validate_id/1, + required => true, in => path })} ]; @@ -184,7 +194,13 @@ fields(listener_id_status) -> fields(listener_id) ++ [ {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, - {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId number"})}, + {number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId counter"})}, + {bind, + ?HOCON( + hoconsc:union([emqx_schema:ip_port(), integer()]), + #{desc => "Listener bind addr", required => true} + )}, + {acceptors, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId acceptors"})}, {status, ?HOCON(?R_REF(status))}, {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))} ]; @@ -202,7 +218,10 @@ fields(Type) -> Schema. listener_schema(Opts) -> - ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))). + emqx_dashboard_swagger:schema_with_example( + ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))), + tcp_schema_example() + ). listeners_type() -> lists:map( @@ -266,7 +285,13 @@ validate_id(Id) -> %% api listener_type_status(get, _Request) -> Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})), - List = lists:map(fun({Type, L}) -> L#{type => Type} end, Listeners), + List = lists:map( + fun({Type, L}) -> + L1 = maps:without([bind, acceptors], L), + L1#{type => Type} + end, + Listeners + ), {200, List}. list_listeners(get, #{query_string := Query}) -> @@ -291,15 +316,17 @@ crud_listeners_by_id(get, #{bindings := #{id := Id0}}) -> crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> case parse_listener_conf(Body0) of {Id, Type, Name, Conf} -> - Key = [listeners, Type, Name], - case emqx_conf:get_raw(Key, undefined) of + Path = [listeners, Type, Name], + case emqx_conf:get_raw(Path, undefined) of undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; PrevConf -> MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf), - case emqx_conf:update(Key, MergeConf, ?OPTS(cluster)) of + case update(Path, MergeConf) of {ok, #{raw_config := _RawConf}} -> crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, not_found} -> + {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end @@ -312,17 +339,14 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> case parse_listener_conf(Body0) of {Id, Type, Name, Conf} -> - Key = [listeners, Type, Name], - case emqx_conf:get(Key, undefined) of - undefined -> - case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of - {ok, #{raw_config := _RawConf}} -> - crud_listeners_by_id(get, #{bindings => #{id => Id}}); - {error, Reason} -> - {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} - end; - _ -> - {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}} + Path = [listeners, Type, Name], + case create(Path, Conf) of + {ok, #{raw_config := _RawConf}} -> + crud_listeners_by_id(get, #{bindings => #{id => Id}}); + {error, already_exist} -> + {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; @@ -331,8 +355,9 @@ crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> end; crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), - case emqx_conf:remove([listeners, Type, Name], ?OPTS(cluster)) of + case remove([listeners, Type, Name]) of {ok, _} -> {204}; + {error, not_found} -> {204}; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. @@ -465,23 +490,6 @@ do_list_listeners() -> <<"listeners">> => Listeners }. --spec do_update_listener(string(), emqx_config:update_request()) -> - {ok, map()} | {error, _}. -do_update_listener(Id, Config) -> - {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), - case emqx:update_config([listeners, Type, Name], Config, ?OPTS(local)) of - {ok, #{raw_config := RawConf}} -> {ok, RawConf}; - {error, Reason} -> {error, Reason} - end. - --spec do_remove_listener(string()) -> ok. -do_remove_listener(Id) -> - {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), - case emqx:remove_config([listeners, Type, Name], ?OPTS(local)) of - {ok, _} -> ok; - {error, Reason} -> error(Reason) - end. - wrap_rpc({badrpc, Reason}) -> {error, Reason}; wrap_rpc(Res) -> @@ -492,7 +500,9 @@ format_status(Key, Node, Listener, Acc) -> <<"id">> := Id, <<"running">> := Running, <<"max_connections">> := MaxConnections, - <<"current_connections">> := CurrentConnections + <<"current_connections">> := CurrentConnections, + <<"acceptors">> := Acceptors, + <<"bind">> := Bind } = Listener, GroupKey = maps:get(Key, Listener), case maps:find(GroupKey, Acc) of @@ -501,6 +511,8 @@ format_status(Key, Node, Listener, Acc) -> GroupKey => #{ enable => Running, ids => [Id], + acceptors => Acceptors, + bind => Bind, status => #{ max_connections => MaxConnections, current_connections => CurrentConnections @@ -555,6 +567,134 @@ format_status(Key, Node, Listener, Acc) -> } end. -max_conn(_Int1, infinity) -> infinity; -max_conn(infinity, _Int) -> infinity; +max_conn(_Int1, <<"infinity">>) -> <<"infinity">>; +max_conn(<<"infinity">>, _Int) -> <<"infinity">>; max_conn(Int1, Int2) -> Int1 + Int2. + +update(Path, Conf) -> + wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))). + +create(Path, Conf) -> + wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))). + +remove(Path) -> + wrap(emqx_conf:remove(Path, ?OPTS(cluster))). + +wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason}; +wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason}; +wrap({error, Reason}) -> {error, Reason}; +wrap(Ok) -> Ok. + +listener_type_status_example() -> + [ + #{ + enable => false, + ids => ["tcp:demo"], + node_status => #{ + 'emqx@127.0.0.1' => #{ + current_connections => 11, + max_connections => 1024000 + }, + 'emqx@127.0.0.2' => #{ + current_connections => 10, + max_connections => 1024000 + } + }, + status => #{ + current_connections => 21, + max_connections => 2048000 + }, + type => tcp + }, + #{ + enable => false, + ids => ["ssl:default"], + node_status => #{ + 'emqx@127.0.0.1' => #{ + current_connections => 31, + max_connections => infinity + }, + 'emqx@127.0.0.2' => #{ + current_connections => 40, + max_connections => infinity + } + }, + status => #{ + current_connections => 71, + max_connections => infinity + }, + type => ssl + } + ]. + +listener_id_status_example() -> + [ + #{ + acceptors => 16, + bind => <<"0.0.0.0:1884">>, + enable => true, + id => <<"tcp:demo">>, + node_status => #{ + 'emqx@127.0.0.1' => #{ + current_connections => 100, + max_connections => 1024000 + }, + 'emqx@127.0.0.2' => #{ + current_connections => 101, + max_connections => 1024000 + } + }, + number => 2, + status => #{ + current_connections => 201, + max_connections => 2048000 + } + }, + #{ + acceptors => 32, + bind => <<"0.0.0.0:1883">>, + enable => true, + id => <<"tcp:default">>, + node_status => #{ + 'emqx@127.0.0.1' => #{ + current_connections => 300, + max_connections => infinity + }, + 'emqx@127.0.0.2' => #{ + current_connections => 201, + max_connections => infinity + } + }, + number => 2, + status => #{ + current_connections => 501, + max_connections => infinity + } + } + ]. + +tcp_schema_example() -> + #{ + acceptors => 16, + access_rules => ["allow all"], + bind => <<"0.0.0.0:1884">>, + current_connections => 10240, + id => <<"tcp:demo">>, + max_connections => 204800, + mountpoint => <<"/">>, + proxy_protocol => false, + proxy_protocol_timeout => <<"3s">>, + running => true, + tcp => #{ + active_n => 100, + backlog => 1024, + buffer => <<"4KB">>, + high_watermark => <<"1MB">>, + nodelay => false, + reuseaddr => true, + send_timeout => <<"15s">>, + send_timeout_close => true + }, + type => tcp, + zone => default + }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_sys.erl b/apps/emqx_management/src/emqx_mgmt_api_sys.erl index 220ce2563..0209e2d0e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_sys.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_sys.erl @@ -22,14 +22,14 @@ -include_lib("typerefl/include/types.hrl"). %% API --export([ api_spec/0 - , paths/0 - , schema/1 - , namespace/0 - ]). +-export([ + api_spec/0, + paths/0, + schema/1, + namespace/0 +]). --export([ sys/2 - ]). +-export([sys/2]). -define(TAGS, [<<"sys">>]). @@ -61,8 +61,8 @@ schema("/mqtt/sys_topics") -> responses => #{ 200 => schema_sys_topics() - } - }, + } + }, put => #{ tags => ?TAGS, @@ -71,20 +71,24 @@ schema("/mqtt/sys_topics") -> responses => #{ 200 => schema_sys_topics() - } - } - }. + } + } + }. schema_sys_topics() -> emqx_dashboard_swagger:schema_with_example( - hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()). + hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics() + ). example_sys_topics() -> - #{<<"sys_event_messages">> => - #{<<"client_connected">> => true, - <<"client_disconnected">> => true, - <<"client_subscribed">> => false, - <<"client_unsubscribed">> => false}, - <<"sys_heartbeat_interval">> => <<"30s">>, - <<"sys_msg_interval">> => <<"1m">> - }. + #{ + <<"sys_event_messages">> => + #{ + <<"client_connected">> => true, + <<"client_disconnected">> => true, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false + }, + <<"sys_heartbeat_interval">> => <<"30s">>, + <<"sys_msg_interval">> => <<"1m">> + }. diff --git a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl index d3099ca39..c9456058e 100644 --- a/apps/emqx_management/src/proto/emqx_management_proto_v1.erl +++ b/apps/emqx_management/src/proto/emqx_management_proto_v1.erl @@ -26,9 +26,6 @@ list_subscriptions/1, list_listeners/1, - remove_listener/2, - - update_listener/3, subscribe/3, unsubscribe/3, @@ -58,15 +55,6 @@ list_subscriptions(Node) -> list_listeners(Node) -> rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). --spec remove_listener(node(), string()) -> ok | {badrpc, _}. -remove_listener(Node, Id) -> - rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]). - --spec update_listener(node(), atom(), emqx_config:update_request()) -> - {ok, map()} | {error, _} | {badrpc, _}. -update_listener(Node, Id, Config) -> - rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]). - -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> {subscribe, _} | {error, atom()} | {badrpc, _}. subscribe(Node, ClientId, TopicTables) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl index e0c2586ec..169a272e3 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl @@ -39,14 +39,44 @@ t_list_listeners(_) -> ?assertEqual(length(Expect), length(Res)), ok. -t_crud_listeners_by_id(_) -> - TcpListenerId = <<"tcp:default">>, +t_tcp_crud_listeners_by_id(_) -> + ListenerId = <<"tcp:default">>, NewListenerId = <<"tcp:new">>, - TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]), + MinListenerId = <<"tcp:min">>, + BadId = <<"tcp:bad">>, + Type = <<"tcp">>, + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + +t_ssl_crud_listeners_by_id(_) -> + ListenerId = <<"ssl:default">>, + NewListenerId = <<"ssl:new">>, + MinListenerId = <<"ssl:min">>, + BadId = <<"ssl:bad">>, + Type = <<"ssl">>, + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + +t_ws_crud_listeners_by_id(_) -> + ListenerId = <<"ws:default">>, + NewListenerId = <<"ws:new">>, + MinListenerId = <<"ws:min">>, + BadId = <<"ws:bad">>, + Type = <<"ws">>, + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + +t_wss_crud_listeners_by_id(_) -> + ListenerId = <<"wss:default">>, + NewListenerId = <<"wss:new">>, + MinListenerId = <<"wss:min">>, + BadId = <<"wss:bad">>, + Type = <<"wss">>, + crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type). + +crud_listeners_by_id(ListenerId, NewListenerId, MinListenerId, BadId, Type) -> + TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), TcpListener = request(get, TcpPath, [], []), - %% create + %% create with full options ?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), NewConf = TcpListener#{ @@ -59,8 +89,22 @@ t_crud_listeners_by_id(_) -> ?assertMatch(Create, Get1), ?assert(is_running(NewListenerId)), + %% create with required options + MinPath = emqx_mgmt_api_test_util:api_path(["listeners", MinListenerId]), + ?assertEqual({error, not_found}, is_running(MinListenerId)), + ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, MinPath, [], [])), + MinConf = #{ + <<"id">> => MinListenerId, + <<"bind">> => <<"0.0.0.0:3883">>, + <<"type">> => Type + }, + MinCreate = request(post, MinPath, [], MinConf), + ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(MinCreate))), + MinGet = request(get, MinPath, [], []), + ?assertMatch(MinCreate, MinGet), + ?assert(is_running(MinListenerId)), + %% bad create(same port) - BadId = <<"tcp:bad">>, BadPath = emqx_mgmt_api_test_util:api_path(["listeners", BadId]), BadConf = TcpListener#{ <<"id">> => BadId, @@ -79,6 +123,7 @@ t_crud_listeners_by_id(_) -> %% delete ?assertEqual([], delete(NewPath)), + ?assertEqual([], delete(MinPath)), ?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])), ?assertEqual([], delete(NewPath)), diff --git a/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl index 725eacb34..d577798ce 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl @@ -33,22 +33,27 @@ end_per_suite(_) -> t_get_put(_) -> {ok, Default} = get_sys_topics_config(), ?assertEqual( - #{<<"sys_event_messages">> => - #{<<"client_connected">> => true, - <<"client_disconnected">> => true, - <<"client_subscribed">> => false, - <<"client_unsubscribed">> => false - }, - <<"sys_heartbeat_interval">> => <<"30s">>, - <<"sys_msg_interval">> => <<"1m">>}, Default), + #{ + <<"sys_event_messages">> => + #{ + <<"client_connected">> => true, + <<"client_disconnected">> => true, + <<"client_subscribed">> => false, + <<"client_unsubscribed">> => false + }, + <<"sys_heartbeat_interval">> => <<"30s">>, + <<"sys_msg_interval">> => <<"1m">> + }, + Default + ), - NConfig = Default#{ - <<"sys_msg_interval">> => <<"4m">>, - <<"sys_event_messages">> => #{<<"client_subscribed">> => false} - }, - {ok, ConfigResp} = put_sys_topics_config(NConfig), - ?assertEqual(NConfig, ConfigResp), - {ok, Default} = put_sys_topics_config(Default). + NConfig = Default#{ + <<"sys_msg_interval">> => <<"4m">>, + <<"sys_event_messages">> => #{<<"client_subscribed">> => false} + }, + {ok, ConfigResp} = put_sys_topics_config(NConfig), + ?assertEqual(NConfig, ConfigResp), + {ok, Default} = put_sys_topics_config(Default). get_sys_topics_config() -> Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]), diff --git a/mix.exs b/mix.exs index 95b74c11c..bd4430e74 100644 --- a/mix.exs +++ b/mix.exs @@ -69,7 +69,7 @@ defmodule EMQXUmbrella.MixProject do {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "0.18.0", override: true}, {:hocon, github: "emqx/hocon", tag: "0.26.7", override: true}, - {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.4.1", override: true}, + {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, # in conflict by ehttpc and emqtt diff --git a/rebar.config b/rebar.config index 75d156e01..1d5560405 100644 --- a/rebar.config +++ b/rebar.config @@ -67,7 +67,7 @@ , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.18.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.26.7"}}} - , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.1"}}} + , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} ]}. diff --git a/rebar.config.erl b/rebar.config.erl index 8e022b44f..834f7988e 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -95,7 +95,7 @@ project_app_dirs(Edition) -> plugins() -> [ {relup_helper,{git,"https://github.com/emqx/relup_helper", {tag, "2.0.0"}}} - , {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.4"}}} + , {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}} %% emqx main project does not require port-compiler %% pin at root level for deterministic , {pc, {git, "https://github.com/emqx/port_compiler.git", {tag, "v1.11.1"}}}