feat: Make api_listener align with gateway and remove unused APIs.

This commit is contained in:
Zhongwen Deng 2022-04-19 10:42:08 +08:00
parent 9998b613c8
commit 0de367dc63
4 changed files with 229 additions and 267 deletions

View File

@ -94,14 +94,15 @@ format_raw_listeners({Type0, Conf}) ->
Bind = parse_bind(LConf0), Bind = parse_bind(LConf0),
Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}), Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
LConf1 = maps:remove(<<"authentication">>, LConf0), LConf1 = maps:remove(<<"authentication">>, LConf0),
LConf2 = maps:put(<<"running">>, Running, LConf1), LConf2 = maps:remove(<<"limiter">>, LConf1),
LConf3 = maps:put(<<"running">>, Running, LConf2),
CurrConn = CurrConn =
case Running of case Running of
true -> current_conns(Type, LName, Bind); true -> current_conns(Type, LName, Bind);
false -> 0 false -> 0
end, end,
LConf3 = maps:put(<<"current_connections">>, CurrConn, LConf2), LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3),
{Type0, LName, LConf3} {Type0, LName, LConf4}
end, end,
maps:to_list(Conf) maps:to_list(Conf)
). ).
@ -545,6 +546,7 @@ str(B) when is_binary(B) ->
str(S) when is_list(S) -> str(S) when is_list(S) ->
S. S.
parse_bind(#{<<"bind">> := Bind}) when is_integer(Bind) -> Bind;
parse_bind(#{<<"bind">> := Bind}) -> parse_bind(#{<<"bind">> := Bind}) ->
case emqx_schema:to_ip_port(binary_to_list(Bind)) of case emqx_schema:to_ip_port(binary_to_list(Bind)) of
{ok, L} -> L; {ok, L} -> L;

View File

@ -195,7 +195,7 @@ cors(_) ->
undefined. undefined.
i18n_lang(type) -> ?ENUM([en, zh]); i18n_lang(type) -> ?ENUM([en, zh]);
i18n_lang(default) -> zh; i18n_lang(default) -> en;
i18n_lang('readOnly') -> true; i18n_lang('readOnly') -> true;
i18n_lang(desc) -> "Internationalization language support."; i18n_lang(desc) -> "Internationalization language support.";
i18n_lang(_) -> undefined. i18n_lang(_) -> undefined.

View File

@ -20,17 +20,12 @@
-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
-define(LISTENER_TYPE, [quic, wss, ws, ssl, tcp]).
-define(LISTENER_STATUS, [enable, disable]).
-export([ -export([
listener_status/2, listener_type_status/2,
list_listeners/2, list_listeners/2,
crud_listeners_by_id/2, crud_listeners_by_id/2,
list_listeners_on_node/2, action_listeners_by_id/2
crud_listener_by_id_on_node/2,
action_listeners_by_id/2,
action_listeners_by_id_on_node/2
]). ]).
%% for rpc call %% for rpc call
@ -58,26 +53,19 @@ api_spec() ->
paths() -> paths() ->
[ [
"/listener/status", "/listeners_status",
"/listeners", "/listeners",
"/listeners/:id", "/listeners/:id",
"/listeners/:id/:action", "/listeners/:id/:action"
"/nodes/:node/listeners",
"/nodes/:node/listeners/:id",
"/nodes/:node/listeners/:id/:action"
]. ].
schema("/listeners_status") ->
schema("/listener/status") ->
#{ #{
'operationId' => listener_status, 'operationId' => listener_type_status,
get => #{ get => #{
tags => [<<"listeners">>], tags => [<<"listeners">>],
desc => <<"List all running node's listeners live status.">>, desc => <<"List all running node's listeners live status. group by listener type">>,
%% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_type_status)))}
%% Current we only support all node's listeners is the same,
%% so we don't return the node information right now.
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_status)))}
} }
}; };
schema("/listeners") -> schema("/listeners") ->
@ -85,11 +73,15 @@ schema("/listeners") ->
'operationId' => list_listeners, 'operationId' => list_listeners,
get => #{ get => #{
tags => [<<"listeners">>], tags => [<<"listeners">>],
desc => <<"List all running node's listeners.">>, desc => <<"List all running node's listeners for the specified type.">>,
%% responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))} parameters => [
%% Current we only support all node's listeners is the same, {type,
%% so we don't return the node information right now. ?HOCON(
responses => #{200 => ?HOCON(?ARRAY(listener_schema()))} ?ENUM(listeners_type()),
#{desc => "Listener type", in => query, required => false}
)}
],
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_id_status)))}
} }
}; };
schema("/listeners/:id") -> schema("/listeners/:id") ->
@ -100,17 +92,17 @@ schema("/listeners/:id") ->
desc => <<"List all running node's listeners for the specified id.">>, desc => <<"List all running node's listeners for the specified id.">>,
parameters => [?R_REF(listener_id)], parameters => [?R_REF(listener_id)],
responses => #{ responses => #{
%% 200 => ?HOCON(?ARRAY(?R_REF(listeners))) 200 => ?HOCON(listener_schema(#{bind => true})),
200 => ?HOCON(listener_schema()) 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
} }
}, },
put => #{ put => #{
tags => [<<"listeners">>], tags => [<<"listeners">>],
desc => <<"Update the specified listener on all nodes.">>, desc => <<"Update the specified listener on all nodes.">>,
parameters => [?R_REF(listener_id)], parameters => [?R_REF(listener_id)],
'requestBody' => ?HOCON(listener_schema(), #{}), 'requestBody' => ?HOCON(listener_schema(#{bind => false}), #{}),
responses => #{ responses => #{
200 => ?HOCON(listener_schema(), #{}), 200 => ?HOCON(listener_schema(#{bind => true}), #{}),
400 => error_codes(['BAD_REQUEST']), 400 => error_codes(['BAD_REQUEST']),
404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND) 404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
} }
@ -119,9 +111,9 @@ schema("/listeners/:id") ->
tags => [<<"listeners">>], tags => [<<"listeners">>],
desc => <<"Create the specified listener on all nodes.">>, desc => <<"Create the specified listener on all nodes.">>,
parameters => [?R_REF(listener_id)], parameters => [?R_REF(listener_id)],
'requestBody' => ?HOCON(listener_schema(), #{}), 'requestBody' => ?HOCON(listener_schema(#{bind => true}), #{}),
responses => #{ responses => #{
200 => ?HOCON(listener_schema(), #{}), 200 => ?HOCON(listener_schema(#{bind => true}), #{}),
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
} }
}, },
@ -150,90 +142,8 @@ schema("/listeners/:id/:action") ->
400 => error_codes(['BAD_REQUEST']) 400 => error_codes(['BAD_REQUEST'])
} }
} }
};
schema("/nodes/:node/listeners") ->
#{
'operationId' => list_listeners_on_node,
get => #{
tags => [<<"listeners">>],
desc => <<"List all listeners on the specified node.">>,
parameters => [?R_REF(node)],
responses => #{
200 => ?HOCON(?ARRAY(listener_schema())),
400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN)
}
}
};
schema("/nodes/:node/listeners/:id") ->
#{
'operationId' => crud_listener_by_id_on_node,
get => #{
tags => [<<"listeners">>],
desc => <<"Get the specified listener on the specified node.">>,
parameters => [
?R_REF(listener_id),
?R_REF(node)
],
responses => #{
200 => ?HOCON(listener_schema()),
400 => error_codes(['BAD_REQUEST']),
404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND)
}
},
put => #{
tags => [<<"listeners">>],
desc => <<"Create or update the specified listener on the specified node.">>,
parameters => [
?R_REF(listener_id),
?R_REF(node)
],
'requestBody' => ?HOCON(listener_schema()),
responses => #{
200 => ?HOCON(listener_schema()),
400 => error_codes(['BAD_REQUEST'])
}
},
delete => #{
tags => [<<"listeners">>],
desc => <<"Delete the specified listener on the specified node.">>,
parameters => [
?R_REF(listener_id),
?R_REF(node)
],
responses => #{
204 => <<"Listener deleted">>,
400 => error_codes(['BAD_REQUEST'])
}
}
};
schema("/nodes/:node/listeners/:id/:action") ->
#{
'operationId' => action_listeners_by_id_on_node,
post => #{
tags => [<<"listeners">>],
desc => <<"Start/stop/restart listeners on a specified node.">>,
parameters => [
?R_REF(node),
?R_REF(listener_id),
?R_REF(action)
],
responses => #{
200 => <<"Updated">>,
400 => error_codes(['BAD_REQUEST'])
}
}
}. }.
fields(listeners) ->
[
{"node",
?HOCON(atom(), #{
desc => "Node name",
example => "emqx@127.0.0.1",
required => true
})},
{"listeners", ?ARRAY(listener_schema())}
];
fields(listener_id) -> fields(listener_id) ->
[ [
{id, {id,
@ -262,11 +172,19 @@ fields(node) ->
in => path in => path
})} })}
]; ];
fields(listener_status) -> fields(listener_type_status) ->
[ [
{type, ?HOCON(?ENUM(?LISTENER_TYPE), #{desc => "Listener type", required => true})}, {type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})},
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})}, {enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
{number, ?HOCON(non_neg_integer(), #{desc => "Listener number", required => true})}, {ids, ?HOCON(?ARRAY(string()), #{desc => "Listener Ids", required => true})},
{status, ?HOCON(?R_REF(status))},
{node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
];
fields(listener_id_status) ->
fields(listener_id) ++
[
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId number"})},
{status, ?HOCON(?R_REF(status))}, {status, ?HOCON(?R_REF(status))},
{node_status, ?HOCON(?ARRAY(?R_REF(node_status)))} {node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
]; ];
@ -274,29 +192,36 @@ fields(status) ->
[ [
{max_connections, {max_connections,
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})}, ?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
{current_connections, {current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
?HOCON(non_neg_integer(), #{desc => "Current connections"})}
]; ];
fields(node_status) -> fields(node_status) ->
fields(node) ++ fields(status); fields(node) ++ fields(status);
fields(Type) -> fields(Type) ->
Listeners = listeners_info(), Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type], [Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
Schema. Schema.
listener_schema() -> listener_schema(Opts) ->
?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())). ?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))).
listeners_info() -> listeners_type() ->
lists:map(
fun({Type, _}) -> list_to_existing_atom(Type) end,
hocon_schema:fields(emqx_schema, "listeners")
).
listeners_info(Opts) ->
Listeners = hocon_schema:fields(emqx_schema, "listeners"), Listeners = hocon_schema:fields(emqx_schema, "listeners"),
lists:map( lists:map(
fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
Fields0 = hocon_schema:fields(Mod, Field), Fields0 = hocon_schema:fields(Mod, Field),
Fields1 = lists:keydelete("authentication", 1, Fields0), Fields1 = lists:keydelete("authentication", 1, Fields0),
Fields2 = lists:keydelete("limiter", 1, Fields1), Fields2 = lists:keydelete("limiter", 1, Fields1),
Fields3 = required_bind(Fields2, Opts),
Ref = listeners_ref(Type, Opts),
TypeAtom = list_to_existing_atom(Type), TypeAtom = list_to_existing_atom(Type),
#{ #{
ref => ?R_REF(TypeAtom), ref => ?R_REF(Ref),
schema => [ schema => [
{type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})},
{running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})},
@ -305,14 +230,33 @@ listeners_info() ->
desc => "Listener id", desc => "Listener id",
required => true, required => true,
validator => fun validate_id/1 validator => fun validate_id/1
})} })},
| Fields2 {current_connections,
?HOCON(
non_neg_integer(),
#{desc => "Current connections", required => false}
)}
| Fields3
] ]
} }
end, end,
Listeners Listeners
). ).
required_bind(Fields, #{bind := true}) ->
Fields;
required_bind(Fields, #{bind := false}) ->
{value, {_, Hocon}, Fields1} = lists:keytake("bind", 1, Fields),
[{"bind", Hocon#{required => false}} | Fields1].
listeners_ref(Type, #{bind := Bind}) ->
Suffix =
case Bind of
true -> "_required_bind";
false -> "_not_required_bind"
end,
Type ++ Suffix.
validate_id(Id) -> validate_id(Id) ->
case emqx_listeners:parse_listener_id(Id) of case emqx_listeners:parse_listener_id(Id) of
{error, Reason} -> {error, Reason}; {error, Reason} -> {error, Reason};
@ -320,23 +264,40 @@ validate_id(Id) ->
end. end.
%% api %% api
listener_status(get, _Request) -> listener_type_status(get, _Request) ->
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
List = lists:map(fun({Type, L}) -> L#{type => Type} end, Listeners),
{200, List}.
{200, []}. list_listeners(get, #{query_string := Query}) ->
Listeners = list_listeners(),
NodeL =
case maps:find(<<"type">>, Query) of
{ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners);
error -> Listeners
end,
{200, listener_status_by_id(NodeL)}.
list_listeners(get, _Request) -> crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
{200, list_listeners()}. Listeners = [
Conf#{<<"id">> => Id, <<"type">> => Type}
crud_listeners_by_id(get, #{bindings := #{id := Id}}) -> || {Id, Type, Conf} <- emqx_listeners:list_raw(),
{200, list_listeners_by_id(Id)}; Id =:= Id0
],
case Listeners of
[] -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
[L] -> {200, L}
end;
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
case parse_listener_conf(Body0) of case parse_listener_conf(Body0) of
{Id, Type, Name, Conf} -> {Id, Type, Name, Conf} ->
Key = [listeners, Type, Name], Key = [listeners, Type, Name],
case emqx_conf:get(Key, undefined) of case emqx_conf:get_raw(Key, undefined) of
undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; undefined ->
_PrevConf -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
case emqx_conf:update(Key, Conf, ?OPTS(cluster)) of PrevConf ->
MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf),
case emqx_conf:update(Key, MergeConf, ?OPTS(cluster)) of
{ok, #{raw_config := _RawConf}} -> {ok, #{raw_config := _RawConf}} ->
crud_listeners_by_id(get, #{bindings => #{id => Id}}); crud_listeners_by_id(get, #{bindings => #{id => Id}});
{error, Reason} -> {error, Reason} ->
@ -360,7 +321,8 @@ crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
{error, Reason} -> {error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end; end;
_ -> {400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}} _ ->
{400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}
end; end;
{error, Reason} -> {error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
@ -376,64 +338,16 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
parse_listener_conf(Conf0) -> parse_listener_conf(Conf0) ->
Conf1 = maps:remove(<<"running">>, Conf0), Conf1 = maps:remove(<<"running">>, Conf0),
{IdBin, Conf2} = maps:take(<<"id">>, Conf1), Conf2 = maps:remove(<<"current_connections">>, Conf1),
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2), {IdBin, Conf3} = maps:take(<<"id">>, Conf2),
{TypeBin, Conf4} = maps:take(<<"type">>, Conf3),
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin), {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
TypeAtom = binary_to_existing_atom(TypeBin), TypeAtom = binary_to_existing_atom(TypeBin),
case Type =:= TypeAtom of case Type =:= TypeAtom of
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf4};
false -> {error, listener_type_inconsistent} false -> {error, listener_type_inconsistent}
end. end.
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
case list_listeners(Node) of
{error, nodedown} ->
{400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
#{<<"listeners">> := Listener} ->
{200, Listener}
end.
crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
case get_listener(Node, Id) of
{error, not_found} ->
{404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
Listener ->
{200, Listener}
end;
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) ->
case parse_listener_conf(Body) of
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
{Id, Type, _Name, Conf} ->
case update_listener(Node, Id, Conf) of
{error, nodedown} ->
{400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}};
%% TODO
{error, {eaddrinuse, _}} ->
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
{ok, Listener} ->
{200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}}
end;
_ ->
{400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}}
end;
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
case remove_listener(Node, Id) of
ok -> {204};
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end.
action_listeners_by_id_on_node(post,
#{bindings := #{id := Id, action := Action, node := Node}}) ->
{_, Result} = action_listeners(Node, Id, Action),
Result.
action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) -> action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
case case
@ -496,31 +410,49 @@ list_listeners() ->
list_listeners(Node) -> list_listeners(Node) ->
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
list_listeners_by_id(Id) -> listener_status_by_id(NodeL) ->
listener_id_filter(Id, list_listeners()). Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
get_listener(Node, Id) ->
case listener_id_filter(Id, [list_listeners(Node)]) of
[#{<<"listeners">> := []}] -> {error, not_found};
[#{<<"listeners">> := [Listener]}] -> Listener
end.
listener_id_filter(Id, Listeners) ->
lists:map( lists:map(
fun(Conf = #{<<"listeners">> := Listeners0}) -> fun({Id, L}) ->
Conf#{ L1 = maps:remove(ids, L),
<<"listeners">> => #{node_status := Nodes} = L1,
[C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0] L1#{number => maps:size(Nodes), id => Id}
}
end, end,
Listeners Listeners
). ).
update_listener(Node, Id, Config) -> listener_status_by_type([], Acc) ->
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). Acc;
listener_status_by_type([NodeL | Rest], Acc) ->
#{<<"node">> := Node, <<"listeners">> := Listeners} = NodeL,
Acc1 = lists:foldl(
fun(L, Acc0) -> format_status(<<"type">>, Node, L, Acc0) end,
Acc,
Listeners
),
listener_status_by_type(Rest, Acc1).
remove_listener(Node, Id) -> listener_status_by_id([], Acc) ->
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)). Acc;
listener_status_by_id([NodeL | Rest], Acc) ->
#{<<"node">> := Node, <<"listeners">> := Listeners} = NodeL,
Acc1 = lists:foldl(
fun(L, Acc0) -> format_status(<<"id">>, Node, L, Acc0) end,
Acc,
Listeners
),
listener_status_by_id(Rest, Acc1).
listener_type_filter(Type0, Listeners) ->
lists:map(
fun(Conf = #{<<"listeners">> := Listeners0}) ->
Conf#{
<<"listeners">> =>
[C || C = #{<<"type">> := Type} <- Listeners0, Type =:= Type0]
}
end,
Listeners
).
-spec do_list_listeners() -> map(). -spec do_list_listeners() -> map().
do_list_listeners() -> do_list_listeners() ->
@ -554,3 +486,75 @@ wrap_rpc({badrpc, Reason}) ->
{error, Reason}; {error, Reason};
wrap_rpc(Res) -> wrap_rpc(Res) ->
Res. Res.
format_status(Key, Node, Listener, Acc) ->
#{
<<"id">> := Id,
<<"running">> := Running,
<<"max_connections">> := MaxConnections,
<<"current_connections">> := CurrentConnections
} = Listener,
GroupKey = maps:get(Key, Listener),
case maps:find(GroupKey, Acc) of
error ->
Acc#{
GroupKey => #{
enable => Running,
ids => [Id],
status => #{
max_connections => MaxConnections,
current_connections => CurrentConnections
},
node_status => #{
Node => #{
max_connections => MaxConnections,
current_connections => CurrentConnections
}
}
}
};
{ok, GroupValue} ->
#{
ids := Ids,
status := #{
max_connections := MaxConnections0,
current_connections := CurrentConnections0
},
node_status := NodeStatus0
} = GroupValue,
NodeStatus =
case maps:find(Node, NodeStatus0) of
error ->
#{
Node => #{
max_connections => MaxConnections,
current_connections => CurrentConnections
}
};
{ok, #{
max_connections := PrevMax,
current_connections := PrevCurr
}} ->
NodeStatus0#{
Node => #{
max_connections => max_conn(MaxConnections, PrevMax),
current_connections => CurrentConnections + PrevCurr
}
}
end,
Acc#{
GroupKey =>
GroupValue#{
ids => lists:usort([Id | Ids]),
status => #{
max_connections => max_conn(MaxConnections0, MaxConnections),
current_connections => CurrentConnections0 + CurrentConnections
},
node_status => NodeStatus
}
}
end.
max_conn(_Int1, infinity) -> infinity;
max_conn(infinity, _Int) -> infinity;
max_conn(Int1, Int2) -> Int1 + Int2.

View File

@ -35,8 +35,8 @@ end_per_suite(_) ->
t_list_listeners(_) -> t_list_listeners(_) ->
Path = emqx_mgmt_api_test_util:api_path(["listeners"]), Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
Res = request(get, Path, [], []), Res = request(get, Path, [], []),
Expect = emqx_mgmt_api_listeners:do_list_listeners(), #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)), ?assertEqual(length(Expect), length(Res)),
ok. ok.
t_crud_listeners_by_id(_) -> t_crud_listeners_by_id(_) ->
@ -44,19 +44,18 @@ t_crud_listeners_by_id(_) ->
NewListenerId = <<"tcp:new">>, NewListenerId = <<"tcp:new">>,
TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]), TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]),
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]), NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
[#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []), TcpListener = request(get, TcpPath, [], []),
?assertEqual(atom_to_binary(node()), Node),
%% create %% create
?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
NewConf = TcpListener#{ NewConf = TcpListener#{
<<"id">> => NewListenerId, <<"id">> => NewListenerId,
<<"bind">> => <<"0.0.0.0:2883">> <<"bind">> => <<"0.0.0.0:2883">>
}, },
[#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf), Create = request(post, NewPath, [], NewConf),
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))), ?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
[#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []), Get1 = request(get, NewPath, [], []),
?assertMatch(Create, Get1), ?assertMatch(Create, Get1),
?assert(is_running(NewListenerId)), ?assert(is_running(NewListenerId)),
@ -67,64 +66,21 @@ t_crud_listeners_by_id(_) ->
<<"id">> => BadId, <<"id">> => BadId,
<<"bind">> => <<"0.0.0.0:2883">> <<"bind">> => <<"0.0.0.0:2883">>
}, },
?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)), ?assertMatch({error, {"HTTP/1.1", 400, _}}, request(post, BadPath, [], BadConf)),
%% update %% update
#{<<"acceptors">> := Acceptors} = Create, #{<<"acceptors">> := Acceptors} = Create,
Acceptors1 = Acceptors + 10, Acceptors1 = Acceptors + 10,
[#{<<"listeners">> := [Update]}] = Update =
request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}), request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update), ?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
[#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []),
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
%% delete
?assertEqual([], delete(NewPath)),
?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
?assertEqual([], delete(NewPath)),
ok.
t_list_listeners_on_node(_) ->
Node = atom_to_list(node()),
Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]),
Listeners = request(get, Path, [], []),
#{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)),
ok.
t_crud_listener_by_id_on_node(_) ->
TcpListenerId = <<"tcp:default">>,
NewListenerId = <<"tcp:new1">>,
Node = atom_to_list(node()),
TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]),
NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]),
TcpListener = request(get, TcpPath, [], []),
%% create
?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
Create = request(put, NewPath, [], TcpListener#{
<<"id">> => NewListenerId,
<<"bind">> => <<"0.0.0.0:3883">>
}),
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
Get1 = request(get, NewPath, [], []),
?assertMatch(Create, Get1),
?assert(is_running(NewListenerId)),
%% update
#{<<"acceptors">> := Acceptors} = Create,
Acceptors1 = Acceptors + 10,
Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
Get2 = request(get, NewPath, [], []), Get2 = request(get, NewPath, [], []),
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2), ?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
%% delete %% delete
?assertEqual([], delete(NewPath)), ?assertEqual([], delete(NewPath)),
?assertEqual({error, not_found}, is_running(NewListenerId)), ?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])), ?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
?assertEqual([], delete(NewPath)), ?assertEqual([], delete(NewPath)),
ok. ok.
@ -139,8 +95,8 @@ action_listener(ID, Action, Running) ->
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path), {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path),
timer:sleep(500), timer:sleep(500),
GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]), GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]),
[#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []), Listener = request(get, GetPath, [], []),
[listener_stats(Listener, Running) || Listener <- Listeners]. listener_stats(Listener, Running).
request(Method, Url, QueryParams, Body) -> request(Method, Url, QueryParams, Body) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),