diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 6cf13eae6..fb308a772 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -1895,16 +1895,16 @@ QUIC listeners } } -fields_mqtt_quic_listener_enabled { +fields_listener_enabled { desc { en: """ -Enable QUIC listener. +Enable listener. """ - zh: """启用 QUIC 监听器""" + zh: """启停监听器""" } label: { - en: "Enable QUIC listener" - zh: "启用 QUIC 监听器" + en: "Enable listener" + zh: "启停监听器" } } diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 2a73efd70..c14447336 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -260,6 +260,12 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> [listener_id(Type, ListenerName), format_addr(Bind)] ), ok; + {error, not_found} -> + ?ELOG( + "Failed to stop listener ~ts on ~ts: ~0p~n", + [listener_id(Type, ListenerName), format_addr(Bind), already_stopped] + ), + ok; {error, Reason} -> ?ELOG( "Failed to stop listener ~ts on ~ts: ~0p~n", @@ -360,6 +366,9 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) -> NewConf = ensure_override_limiter_conf(NewConfT, Request), CertsDir = certs_dir(Type, Name), {ok, convert_certs(CertsDir, NewConf)}; +pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) -> + NewConf = emqx_map_lib:deep_merge(RawConf, Updated), + {ok, NewConf}; pre_config_update(_Path, _Request, RawConf) -> {ok, RawConf}. @@ -378,6 +387,15 @@ post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppE Err -> Err end; +post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> + #{enabled := NewEnabled} = NewConf, + #{enabled := OldEnabled} = OldConf, + case {NewEnabled, OldEnabled} of + {true, true} -> restart_listener(Type, Name, {OldConf, NewConf}); + {true, false} -> start_listener(Type, Name, NewConf); + {false, true} -> stop_listener(Type, Name, OldConf); + {false, false} -> stop_listener(Type, Name, OldConf) + end; post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8bd1da1f9..576ef3b02 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -844,14 +844,6 @@ fields("mqtt_wss_listener") -> ]; fields("mqtt_quic_listener") -> [ - {"enabled", - sc( - boolean(), - #{ - default => true, - desc => ?DESC(fields_mqtt_quic_listener_enabled) - } - )}, %% TODO: ensure cacertfile is configurable {"certfile", sc( @@ -1567,6 +1559,14 @@ mqtt_listener(Bind) -> base_listener(Bind) -> [ + {"enabled", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(fields_listener_enabled) + } + )}, {"bind", sc( hoconsc:union([ip_port(), integer()]), diff --git a/apps/emqx/src/proto/emqx_broker_proto_v1.erl b/apps/emqx/src/proto/emqx_broker_proto_v1.erl index 54ab4f2b8..3e34ba346 100644 --- a/apps/emqx/src/proto/emqx_broker_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_broker_proto_v1.erl @@ -24,11 +24,7 @@ forward/3, forward_async/3, list_client_subscriptions/2, - list_subscriptions_via_topic/2, - - start_listener/2, - stop_listener/2, - restart_listener/2 + list_subscriptions_via_topic/2 ]). -include("bpapi.hrl"). @@ -56,15 +52,3 @@ list_client_subscriptions(Node, ClientId) -> -spec list_subscriptions_via_topic(node(), emqx_types:topic()) -> [emqx_types:subopts()]. list_subscriptions_via_topic(Node, Topic) -> rpc:call(Node, emqx_broker, subscriptions_via_topic, [Topic]). - --spec start_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}. -start_listener(Node, Id) -> - rpc:call(Node, emqx_listeners, start_listener, [Id]). - --spec stop_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}. -stop_listener(Node, Id) -> - rpc:call(Node, emqx_listeners, stop_listener, [Id]). - --spec restart_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}. -restart_listener(Node, Id) -> - rpc:call(Node, emqx_listeners, restart_listener, [Id]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index c5d7f9f55..d158834ae 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -148,7 +148,7 @@ schema("/listeners/:id/:action") -> ], responses => #{ 200 => <<"Updated">>, - 400 => error_codes(['BAD_REQUEST']) + 400 => error_codes(['BAD_REQUEST', 'BAD_LISTENER_ID']) } } }. @@ -362,66 +362,38 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> end. parse_listener_conf(Conf0) -> - Conf1 = maps:remove(<<"running">>, Conf0), - Conf2 = maps:remove(<<"current_connections">>, Conf1), - {IdBin, Conf3} = maps:take(<<"id">>, Conf2), - {TypeBin, Conf4} = maps:take(<<"type">>, Conf3), + Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0), + {IdBin, Conf2} = maps:take(<<"id">>, Conf1), + {TypeBin, Conf3} = maps:take(<<"type">>, Conf2), {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), TypeAtom = binary_to_existing_atom(TypeBin), case Type =:= TypeAtom of - true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf4}; + true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; false -> {error, listener_type_inconsistent} end. action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) -> - Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], - case - lists:filter( - fun - ({_, {200}}) -> false; - (_) -> true - end, - Results - ) - of - [] -> {200}; - Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}} + {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), + Path = [listeners, Type, Name], + case emqx_conf:get_raw(Path, undefined) of + undefined -> + {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; + _PrevConf -> + case action(Path, Action, enabled(Action)) of + {ok, #{raw_config := _RawConf}} -> + {200}; + {error, not_found} -> + {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; + {error, Reason} -> + {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} + end end. %%%============================================================================================== -action_listeners(Node, Id, Action) -> - {Node, do_action_listeners(Action, Node, Id)}. - -do_action_listeners(start, Node, Id) -> - case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of - ok -> {200}; - {error, {already_started, _}} -> {200}; - {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} - end; -do_action_listeners(stop, Node, Id) -> - case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of - ok -> {200}; - {error, not_found} -> {200}; - {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} - end; -do_action_listeners(restart, Node, Id) -> - case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of - ok -> {200}; - {error, not_found} -> do_action_listeners(start, Node, Id); - {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} - end. - -action_listeners_err(Errors) -> - list_to_binary( - lists:foldl( - fun({Node, Err}, Str) -> - err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str - end, - "", - Errors - ) - ). +enabled(start) -> #{<<"enabled">> => true}; +enabled(stop) -> #{<<"enabled">> => false}; +enabled(restart) -> #{<<"enabled">> => true}. err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom); err_msg(Reason) -> list_to_binary(err_msg_str(Reason)). @@ -574,6 +546,9 @@ max_conn(Int1, Int2) -> Int1 + Int2. update(Path, Conf) -> wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))). +action(Path, Action, Conf) -> + wrap(emqx_conf:update(Path, {action, Action, Conf}, ?OPTS(cluster))). + create(Path, Conf) -> wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).