feat: add enabled for listeners
This commit is contained in:
parent
f1d4bab97d
commit
36af3d066f
|
@ -1895,16 +1895,16 @@ QUIC listeners
|
|||
}
|
||||
}
|
||||
|
||||
fields_mqtt_quic_listener_enabled {
|
||||
fields_listener_enabled {
|
||||
desc {
|
||||
en: """
|
||||
Enable QUIC listener.
|
||||
Enable listener.
|
||||
"""
|
||||
zh: """启用 QUIC 监听器"""
|
||||
zh: """启停监听器"""
|
||||
}
|
||||
label: {
|
||||
en: "Enable QUIC listener"
|
||||
zh: "启用 QUIC 监听器"
|
||||
en: "Enable listener"
|
||||
zh: "启停监听器"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -260,6 +260,12 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
[listener_id(Type, ListenerName), format_addr(Bind)]
|
||||
),
|
||||
ok;
|
||||
{error, not_found} ->
|
||||
?ELOG(
|
||||
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), already_stopped]
|
||||
),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?ELOG(
|
||||
"Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||
|
@ -360,6 +366,9 @@ pre_config_update([listeners, Type, Name], {update, Request}, RawConf) ->
|
|||
NewConf = ensure_override_limiter_conf(NewConfT, Request),
|
||||
CertsDir = certs_dir(Type, Name),
|
||||
{ok, convert_certs(CertsDir, NewConf)};
|
||||
pre_config_update([listeners, _Type, _Name], {action, _Action, Updated}, RawConf) ->
|
||||
NewConf = emqx_map_lib:deep_merge(RawConf, Updated),
|
||||
{ok, NewConf};
|
||||
pre_config_update(_Path, _Request, RawConf) ->
|
||||
{ok, RawConf}.
|
||||
|
||||
|
@ -378,6 +387,15 @@ post_config_update([listeners, Type, Name], '$remove', undefined, OldConf, _AppE
|
|||
Err ->
|
||||
Err
|
||||
end;
|
||||
post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
|
||||
#{enabled := NewEnabled} = NewConf,
|
||||
#{enabled := OldEnabled} = OldConf,
|
||||
case {NewEnabled, OldEnabled} of
|
||||
{true, true} -> restart_listener(Type, Name, {OldConf, NewConf});
|
||||
{true, false} -> start_listener(Type, Name, NewConf);
|
||||
{false, true} -> stop_listener(Type, Name, OldConf);
|
||||
{false, false} -> stop_listener(Type, Name, OldConf)
|
||||
end;
|
||||
post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
|
||||
ok.
|
||||
|
||||
|
|
|
@ -844,14 +844,6 @@ fields("mqtt_wss_listener") ->
|
|||
];
|
||||
fields("mqtt_quic_listener") ->
|
||||
[
|
||||
{"enabled",
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC(fields_mqtt_quic_listener_enabled)
|
||||
}
|
||||
)},
|
||||
%% TODO: ensure cacertfile is configurable
|
||||
{"certfile",
|
||||
sc(
|
||||
|
@ -1567,6 +1559,14 @@ mqtt_listener(Bind) ->
|
|||
|
||||
base_listener(Bind) ->
|
||||
[
|
||||
{"enabled",
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC(fields_listener_enabled)
|
||||
}
|
||||
)},
|
||||
{"bind",
|
||||
sc(
|
||||
hoconsc:union([ip_port(), integer()]),
|
||||
|
|
|
@ -24,11 +24,7 @@
|
|||
forward/3,
|
||||
forward_async/3,
|
||||
list_client_subscriptions/2,
|
||||
list_subscriptions_via_topic/2,
|
||||
|
||||
start_listener/2,
|
||||
stop_listener/2,
|
||||
restart_listener/2
|
||||
list_subscriptions_via_topic/2
|
||||
]).
|
||||
|
||||
-include("bpapi.hrl").
|
||||
|
@ -56,15 +52,3 @@ list_client_subscriptions(Node, ClientId) ->
|
|||
-spec list_subscriptions_via_topic(node(), emqx_types:topic()) -> [emqx_types:subopts()].
|
||||
list_subscriptions_via_topic(Node, Topic) ->
|
||||
rpc:call(Node, emqx_broker, subscriptions_via_topic, [Topic]).
|
||||
|
||||
-spec start_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
|
||||
start_listener(Node, Id) ->
|
||||
rpc:call(Node, emqx_listeners, start_listener, [Id]).
|
||||
|
||||
-spec stop_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
|
||||
stop_listener(Node, Id) ->
|
||||
rpc:call(Node, emqx_listeners, stop_listener, [Id]).
|
||||
|
||||
-spec restart_listener(node(), atom()) -> ok | {error, _} | {badrpc, _}.
|
||||
restart_listener(Node, Id) ->
|
||||
rpc:call(Node, emqx_listeners, restart_listener, [Id]).
|
||||
|
|
|
@ -148,7 +148,7 @@ schema("/listeners/:id/:action") ->
|
|||
],
|
||||
responses => #{
|
||||
200 => <<"Updated">>,
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
400 => error_codes(['BAD_REQUEST', 'BAD_LISTENER_ID'])
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
@ -362,66 +362,38 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
|
|||
end.
|
||||
|
||||
parse_listener_conf(Conf0) ->
|
||||
Conf1 = maps:remove(<<"running">>, Conf0),
|
||||
Conf2 = maps:remove(<<"current_connections">>, Conf1),
|
||||
{IdBin, Conf3} = maps:take(<<"id">>, Conf2),
|
||||
{TypeBin, Conf4} = maps:take(<<"type">>, Conf3),
|
||||
Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0),
|
||||
{IdBin, Conf2} = maps:take(<<"id">>, Conf1),
|
||||
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
|
||||
TypeAtom = binary_to_existing_atom(TypeBin),
|
||||
case Type =:= TypeAtom of
|
||||
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf4};
|
||||
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
|
||||
false -> {error, listener_type_inconsistent}
|
||||
end.
|
||||
|
||||
action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
|
||||
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
|
||||
case
|
||||
lists:filter(
|
||||
fun
|
||||
({_, {200}}) -> false;
|
||||
(_) -> true
|
||||
end,
|
||||
Results
|
||||
)
|
||||
of
|
||||
[] -> {200};
|
||||
Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}}
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
|
||||
Path = [listeners, Type, Name],
|
||||
case emqx_conf:get_raw(Path, undefined) of
|
||||
undefined ->
|
||||
{404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
|
||||
_PrevConf ->
|
||||
case action(Path, Action, enabled(Action)) of
|
||||
{ok, #{raw_config := _RawConf}} ->
|
||||
{200};
|
||||
{error, not_found} ->
|
||||
{404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end
|
||||
end.
|
||||
|
||||
%%%==============================================================================================
|
||||
|
||||
action_listeners(Node, Id, Action) ->
|
||||
{Node, do_action_listeners(Action, Node, Id)}.
|
||||
|
||||
do_action_listeners(start, Node, Id) ->
|
||||
case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of
|
||||
ok -> {200};
|
||||
{error, {already_started, _}} -> {200};
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end;
|
||||
do_action_listeners(stop, Node, Id) ->
|
||||
case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of
|
||||
ok -> {200};
|
||||
{error, not_found} -> {200};
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end;
|
||||
do_action_listeners(restart, Node, Id) ->
|
||||
case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of
|
||||
ok -> {200};
|
||||
{error, not_found} -> do_action_listeners(start, Node, Id);
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end.
|
||||
|
||||
action_listeners_err(Errors) ->
|
||||
list_to_binary(
|
||||
lists:foldl(
|
||||
fun({Node, Err}, Str) ->
|
||||
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
|
||||
end,
|
||||
"",
|
||||
Errors
|
||||
)
|
||||
).
|
||||
enabled(start) -> #{<<"enabled">> => true};
|
||||
enabled(stop) -> #{<<"enabled">> => false};
|
||||
enabled(restart) -> #{<<"enabled">> => true}.
|
||||
|
||||
err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom);
|
||||
err_msg(Reason) -> list_to_binary(err_msg_str(Reason)).
|
||||
|
@ -574,6 +546,9 @@ max_conn(Int1, Int2) -> Int1 + Int2.
|
|||
update(Path, Conf) ->
|
||||
wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
|
||||
|
||||
action(Path, Action, Conf) ->
|
||||
wrap(emqx_conf:update(Path, {action, Action, Conf}, ?OPTS(cluster))).
|
||||
|
||||
create(Path, Conf) ->
|
||||
wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
|
||||
|
||||
|
|
Loading…
Reference in New Issue