Merge pull request #7661 from zhongwencool/api-listener
feat: Make api_listener align with gateway and remove unused APIs.
This commit is contained in:
commit
6a38be2b30
|
@ -87,13 +87,22 @@ do_list_raw() ->
|
|||
Listeners = maps:to_list(RawWithDefault),
|
||||
lists:flatmap(fun format_raw_listeners/1, Listeners).
|
||||
|
||||
format_raw_listeners({Type, Conf}) ->
|
||||
format_raw_listeners({Type0, Conf}) ->
|
||||
Type = binary_to_atom(Type0),
|
||||
lists:map(
|
||||
fun({LName, LConf0}) when is_map(LConf0) ->
|
||||
Running = is_running(binary_to_atom(Type), listener_id(Type, LName), LConf0),
|
||||
Bind = parse_bind(LConf0),
|
||||
Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
|
||||
LConf1 = maps:remove(<<"authentication">>, LConf0),
|
||||
LConf2 = maps:put(<<"running">>, Running, LConf1),
|
||||
{Type, LName, LConf2}
|
||||
LConf2 = maps:remove(<<"limiter">>, LConf1),
|
||||
LConf3 = maps:put(<<"running">>, Running, LConf2),
|
||||
CurrConn =
|
||||
case Running of
|
||||
true -> current_conns(Type, LName, Bind);
|
||||
false -> 0
|
||||
end,
|
||||
LConf4 = maps:put(<<"current_connections">>, CurrConn, LConf3),
|
||||
{Type0, LName, LConf4}
|
||||
end,
|
||||
maps:to_list(Conf)
|
||||
).
|
||||
|
@ -112,16 +121,7 @@ is_running(ListenerId) ->
|
|||
end.
|
||||
|
||||
is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
|
||||
ListenOn =
|
||||
case Conf of
|
||||
#{bind := Bind} ->
|
||||
Bind;
|
||||
#{<<"bind">> := Bind} ->
|
||||
case emqx_schema:to_ip_port(binary_to_list(Bind)) of
|
||||
{ok, L} -> L;
|
||||
{error, _} -> binary_to_integer(Bind)
|
||||
end
|
||||
end,
|
||||
#{bind := ListenOn} = Conf,
|
||||
try esockd:listener({ListenerId, ListenOn}) of
|
||||
Pid when is_pid(Pid) ->
|
||||
true
|
||||
|
@ -545,3 +545,10 @@ str(B) when is_binary(B) ->
|
|||
binary_to_list(B);
|
||||
str(S) when is_list(S) ->
|
||||
S.
|
||||
|
||||
parse_bind(#{<<"bind">> := Bind}) when is_integer(Bind) -> Bind;
|
||||
parse_bind(#{<<"bind">> := Bind}) ->
|
||||
case emqx_schema:to_ip_port(binary_to_list(Bind)) of
|
||||
{ok, L} -> L;
|
||||
{error, _} -> binary_to_integer(Bind)
|
||||
end.
|
||||
|
|
|
@ -195,7 +195,7 @@ cors(_) ->
|
|||
undefined.
|
||||
|
||||
i18n_lang(type) -> ?ENUM([en, zh]);
|
||||
i18n_lang(default) -> zh;
|
||||
i18n_lang(default) -> en;
|
||||
i18n_lang('readOnly') -> true;
|
||||
i18n_lang(desc) -> "Internationalization language support.";
|
||||
i18n_lang(_) -> undefined.
|
||||
|
|
|
@ -22,12 +22,10 @@
|
|||
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
|
||||
|
||||
-export([
|
||||
listener_type_status/2,
|
||||
list_listeners/2,
|
||||
crud_listeners_by_id/2,
|
||||
list_listeners_on_node/2,
|
||||
crud_listener_by_id_on_node/2,
|
||||
action_listeners_by_id/2,
|
||||
action_listeners_by_id_on_node/2
|
||||
action_listeners_by_id/2
|
||||
]).
|
||||
|
||||
%% for rpc call
|
||||
|
@ -55,21 +53,35 @@ api_spec() ->
|
|||
|
||||
paths() ->
|
||||
[
|
||||
"/listeners_status",
|
||||
"/listeners",
|
||||
"/listeners/:id",
|
||||
"/listeners/:id/:action",
|
||||
"/nodes/:node/listeners",
|
||||
"/nodes/:node/listeners/:id",
|
||||
"/nodes/:node/listeners/:id/:action"
|
||||
"/listeners/:id/:action"
|
||||
].
|
||||
|
||||
schema("/listeners_status") ->
|
||||
#{
|
||||
'operationId' => listener_type_status,
|
||||
get => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"List all running node's listeners live status. group by listener type">>,
|
||||
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_type_status)))}
|
||||
}
|
||||
};
|
||||
schema("/listeners") ->
|
||||
#{
|
||||
'operationId' => list_listeners,
|
||||
get => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"List all running node's listeners.">>,
|
||||
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listeners)))}
|
||||
desc => <<"List all running node's listeners for the specified type.">>,
|
||||
parameters => [
|
||||
{type,
|
||||
?HOCON(
|
||||
?ENUM(listeners_type()),
|
||||
#{desc => "Listener type", in => query, required => false}
|
||||
)}
|
||||
],
|
||||
responses => #{200 => ?HOCON(?ARRAY(?R_REF(listener_id_status)))}
|
||||
}
|
||||
};
|
||||
schema("/listeners/:id") ->
|
||||
|
@ -80,17 +92,29 @@ schema("/listeners/:id") ->
|
|||
desc => <<"List all running node's listeners for the specified id.">>,
|
||||
parameters => [?R_REF(listener_id)],
|
||||
responses => #{
|
||||
200 => ?HOCON(?ARRAY(?R_REF(listeners)))
|
||||
200 => ?HOCON(listener_schema(#{bind => true})),
|
||||
404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Create or update the specified listener on all nodes.">>,
|
||||
desc => <<"Update the specified listener on all nodes.">>,
|
||||
parameters => [?R_REF(listener_id)],
|
||||
'requestBody' => ?HOCON(listener_schema(), #{}),
|
||||
'requestBody' => ?HOCON(listener_schema(#{bind => false}), #{}),
|
||||
responses => #{
|
||||
200 => ?HOCON(listener_schema(), #{}),
|
||||
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
|
||||
200 => ?HOCON(listener_schema(#{bind => true}), #{}),
|
||||
400 => error_codes(['BAD_REQUEST']),
|
||||
404 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'], ?LISTENER_NOT_FOUND)
|
||||
}
|
||||
},
|
||||
post => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Create the specified listener on all nodes.">>,
|
||||
parameters => [?R_REF(listener_id)],
|
||||
'requestBody' => ?HOCON(listener_schema(#{bind => true}), #{}),
|
||||
responses => #{
|
||||
200 => ?HOCON(listener_schema(#{bind => true}), #{}),
|
||||
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
|
||||
}
|
||||
},
|
||||
delete => #{
|
||||
|
@ -118,90 +142,8 @@ schema("/listeners/:id/:action") ->
|
|||
400 => error_codes(['BAD_REQUEST'])
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/nodes/:node/listeners") ->
|
||||
#{
|
||||
'operationId' => list_listeners_on_node,
|
||||
get => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"List all listeners on the specified node.">>,
|
||||
parameters => [?R_REF(node)],
|
||||
responses => #{
|
||||
200 => ?HOCON(?ARRAY(listener_schema())),
|
||||
400 => error_codes(['BAD_NODE', 'BAD_REQUEST'], ?NODE_NOT_FOUND_OR_DOWN)
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/nodes/:node/listeners/:id") ->
|
||||
#{
|
||||
'operationId' => crud_listener_by_id_on_node,
|
||||
get => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Get the specified listener on the specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(listener_id),
|
||||
?R_REF(node)
|
||||
],
|
||||
responses => #{
|
||||
200 => ?HOCON(listener_schema()),
|
||||
400 => error_codes(['BAD_REQUEST']),
|
||||
404 => error_codes(['BAD_LISTEN_ID'], ?NODE_LISTENER_NOT_FOUND)
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Create or update the specified listener on the specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(listener_id),
|
||||
?R_REF(node)
|
||||
],
|
||||
'requestBody' => ?HOCON(listener_schema()),
|
||||
responses => #{
|
||||
200 => ?HOCON(listener_schema()),
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
}
|
||||
},
|
||||
delete => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Delete the specified listener on the specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(listener_id),
|
||||
?R_REF(node)
|
||||
],
|
||||
responses => #{
|
||||
204 => <<"Listener deleted">>,
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/nodes/:node/listeners/:id/:action") ->
|
||||
#{
|
||||
'operationId' => action_listeners_by_id_on_node,
|
||||
post => #{
|
||||
tags => [<<"listeners">>],
|
||||
desc => <<"Start/stop/restart listeners on a specified node.">>,
|
||||
parameters => [
|
||||
?R_REF(node),
|
||||
?R_REF(listener_id),
|
||||
?R_REF(action)
|
||||
],
|
||||
responses => #{
|
||||
200 => <<"Updated">>,
|
||||
400 => error_codes(['BAD_REQUEST'])
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
fields(listeners) ->
|
||||
[
|
||||
{"node",
|
||||
?HOCON(atom(), #{
|
||||
desc => "Node name",
|
||||
example => "emqx@127.0.0.1",
|
||||
required => true
|
||||
})},
|
||||
{"listeners", ?ARRAY(listener_schema())}
|
||||
];
|
||||
fields(listener_id) ->
|
||||
[
|
||||
{id,
|
||||
|
@ -230,23 +172,56 @@ fields(node) ->
|
|||
in => path
|
||||
})}
|
||||
];
|
||||
fields(listener_type_status) ->
|
||||
[
|
||||
{type, ?HOCON(?ENUM(listeners_type()), #{desc => "Listener type", required => true})},
|
||||
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
||||
{ids, ?HOCON(?ARRAY(string()), #{desc => "Listener Ids", required => true})},
|
||||
{status, ?HOCON(?R_REF(status))},
|
||||
{node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
|
||||
];
|
||||
fields(listener_id_status) ->
|
||||
fields(listener_id) ++
|
||||
[
|
||||
{enable, ?HOCON(boolean(), #{desc => "Listener enable", required => true})},
|
||||
{number, ?HOCON(typerefl:pos_integer(), #{desc => "ListenerId number"})},
|
||||
{status, ?HOCON(?R_REF(status))},
|
||||
{node_status, ?HOCON(?ARRAY(?R_REF(node_status)))}
|
||||
];
|
||||
fields(status) ->
|
||||
[
|
||||
{max_connections,
|
||||
?HOCON(hoconsc:union([infinity, integer()]), #{desc => "Max connections"})},
|
||||
{current_connections, ?HOCON(non_neg_integer(), #{desc => "Current connections"})}
|
||||
];
|
||||
fields(node_status) ->
|
||||
fields(node) ++ fields(status);
|
||||
fields(Type) ->
|
||||
Listeners = listeners_info(),
|
||||
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
|
||||
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
|
||||
Schema.
|
||||
|
||||
listener_schema() ->
|
||||
?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info())).
|
||||
listener_schema(Opts) ->
|
||||
?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))).
|
||||
|
||||
listeners_info() ->
|
||||
listeners_type() ->
|
||||
lists:map(
|
||||
fun({Type, _}) -> list_to_existing_atom(Type) end,
|
||||
hocon_schema:fields(emqx_schema, "listeners")
|
||||
).
|
||||
|
||||
listeners_info(Opts) ->
|
||||
Listeners = hocon_schema:fields(emqx_schema, "listeners"),
|
||||
lists:map(
|
||||
fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
|
||||
Fields0 = hocon_schema:fields(Mod, Field),
|
||||
Fields1 = lists:keydelete("authentication", 1, Fields0),
|
||||
Fields2 = lists:keydelete("limiter", 1, Fields1),
|
||||
Fields3 = required_bind(Fields2, Opts),
|
||||
Ref = listeners_ref(Type, Opts),
|
||||
TypeAtom = list_to_existing_atom(Type),
|
||||
#{
|
||||
ref => ?R_REF(TypeAtom),
|
||||
ref => ?R_REF(Ref),
|
||||
schema => [
|
||||
{type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})},
|
||||
{running, ?HOCON(boolean(), #{desc => "Listener status", required => false})},
|
||||
|
@ -255,14 +230,33 @@ listeners_info() ->
|
|||
desc => "Listener id",
|
||||
required => true,
|
||||
validator => fun validate_id/1
|
||||
})}
|
||||
| Fields1
|
||||
})},
|
||||
{current_connections,
|
||||
?HOCON(
|
||||
non_neg_integer(),
|
||||
#{desc => "Current connections", required => false}
|
||||
)}
|
||||
| Fields3
|
||||
]
|
||||
}
|
||||
end,
|
||||
Listeners
|
||||
).
|
||||
|
||||
required_bind(Fields, #{bind := true}) ->
|
||||
Fields;
|
||||
required_bind(Fields, #{bind := false}) ->
|
||||
{value, {_, Hocon}, Fields1} = lists:keytake("bind", 1, Fields),
|
||||
[{"bind", Hocon#{required => false}} | Fields1].
|
||||
|
||||
listeners_ref(Type, #{bind := Bind}) ->
|
||||
Suffix =
|
||||
case Bind of
|
||||
true -> "_required_bind";
|
||||
false -> "_not_required_bind"
|
||||
end,
|
||||
Type ++ Suffix.
|
||||
|
||||
validate_id(Id) ->
|
||||
case emqx_listeners:parse_listener_id(Id) of
|
||||
{error, Reason} -> {error, Reason};
|
||||
|
@ -270,19 +264,65 @@ validate_id(Id) ->
|
|||
end.
|
||||
|
||||
%% api
|
||||
list_listeners(get, _Request) ->
|
||||
{200, list_listeners()}.
|
||||
listener_type_status(get, _Request) ->
|
||||
Listeners = maps:to_list(listener_status_by_type(list_listeners(), #{})),
|
||||
List = lists:map(fun({Type, L}) -> L#{type => Type} end, Listeners),
|
||||
{200, List}.
|
||||
|
||||
crud_listeners_by_id(get, #{bindings := #{id := Id}}) ->
|
||||
{200, list_listeners_by_id(Id)};
|
||||
list_listeners(get, #{query_string := Query}) ->
|
||||
Listeners = list_listeners(),
|
||||
NodeL =
|
||||
case maps:find(<<"type">>, Query) of
|
||||
{ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners);
|
||||
error -> Listeners
|
||||
end,
|
||||
{200, listener_status_by_id(NodeL)}.
|
||||
|
||||
crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
|
||||
Listeners = [
|
||||
Conf#{<<"id">> => Id, <<"type">> => Type}
|
||||
|| {Id, Type, Conf} <- emqx_listeners:list_raw(),
|
||||
Id =:= Id0
|
||||
],
|
||||
case Listeners of
|
||||
[] -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
|
||||
[L] -> {200, L}
|
||||
end;
|
||||
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
|
||||
case parse_listener_conf(Body0) of
|
||||
{Id, Type, Name, Conf} ->
|
||||
case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
|
||||
{ok, #{raw_config := _RawConf}} ->
|
||||
crud_listeners_by_id(get, #{bindings => #{id => Id}});
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
Key = [listeners, Type, Name],
|
||||
case emqx_conf:get_raw(Key, undefined) of
|
||||
undefined ->
|
||||
{404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
|
||||
PrevConf ->
|
||||
MergeConf = emqx_map_lib:deep_merge(PrevConf, Conf),
|
||||
case emqx_conf:update(Key, MergeConf, ?OPTS(cluster)) of
|
||||
{ok, #{raw_config := _RawConf}} ->
|
||||
crud_listeners_by_id(get, #{bindings => #{id => Id}});
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
_ ->
|
||||
{400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
|
||||
end;
|
||||
crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) ->
|
||||
case parse_listener_conf(Body0) of
|
||||
{Id, Type, Name, Conf} ->
|
||||
Key = [listeners, Type, Name],
|
||||
case emqx_conf:get(Key, undefined) of
|
||||
undefined ->
|
||||
case emqx_conf:update([listeners, Type, Name], Conf, ?OPTS(cluster)) of
|
||||
{ok, #{raw_config := _RawConf}} ->
|
||||
crud_listeners_by_id(get, #{bindings => #{id => Id}});
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end;
|
||||
_ ->
|
||||
{400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
|
@ -298,64 +338,16 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
|
|||
|
||||
parse_listener_conf(Conf0) ->
|
||||
Conf1 = maps:remove(<<"running">>, Conf0),
|
||||
{IdBin, Conf2} = maps:take(<<"id">>, Conf1),
|
||||
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
|
||||
Conf2 = maps:remove(<<"current_connections">>, Conf1),
|
||||
{IdBin, Conf3} = maps:take(<<"id">>, Conf2),
|
||||
{TypeBin, Conf4} = maps:take(<<"type">>, Conf3),
|
||||
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
|
||||
TypeAtom = binary_to_existing_atom(TypeBin),
|
||||
case Type =:= TypeAtom of
|
||||
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
|
||||
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf4};
|
||||
false -> {error, listener_type_inconsistent}
|
||||
end.
|
||||
|
||||
list_listeners_on_node(get, #{bindings := #{node := Node}}) ->
|
||||
case list_listeners(Node) of
|
||||
{error, nodedown} ->
|
||||
{400, #{code => 'BAD_NODE', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
#{<<"listeners">> := Listener} ->
|
||||
{200, Listener}
|
||||
end.
|
||||
|
||||
crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
|
||||
case get_listener(Node, Id) of
|
||||
{error, not_found} ->
|
||||
{404, #{code => 'BAD_LISTEN_ID', message => ?NODE_LISTENER_NOT_FOUND}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
Listener ->
|
||||
{200, Listener}
|
||||
end;
|
||||
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) ->
|
||||
case parse_listener_conf(Body) of
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
{Id, Type, _Name, Conf} ->
|
||||
case update_listener(Node, Id, Conf) of
|
||||
{error, nodedown} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}};
|
||||
%% TODO
|
||||
{error, {eaddrinuse, _}} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
{ok, Listener} ->
|
||||
{200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}}
|
||||
end;
|
||||
_ ->
|
||||
{400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}}
|
||||
end;
|
||||
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
|
||||
case remove_listener(Node, Id) of
|
||||
ok -> {204};
|
||||
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end.
|
||||
|
||||
action_listeners_by_id_on_node(post,
|
||||
#{bindings := #{id := Id, action := Action, node := Node}}) ->
|
||||
{_, Result} = action_listeners(Node, Id, Action),
|
||||
Result.
|
||||
|
||||
action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
|
||||
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
|
||||
case
|
||||
|
@ -418,31 +410,49 @@ list_listeners() ->
|
|||
list_listeners(Node) ->
|
||||
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
|
||||
|
||||
list_listeners_by_id(Id) ->
|
||||
listener_id_filter(Id, list_listeners()).
|
||||
|
||||
get_listener(Node, Id) ->
|
||||
case listener_id_filter(Id, [list_listeners(Node)]) of
|
||||
[#{<<"listeners">> := []}] -> {error, not_found};
|
||||
[#{<<"listeners">> := [Listener]}] -> Listener
|
||||
end.
|
||||
|
||||
listener_id_filter(Id, Listeners) ->
|
||||
listener_status_by_id(NodeL) ->
|
||||
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
|
||||
lists:map(
|
||||
fun(Conf = #{<<"listeners">> := Listeners0}) ->
|
||||
Conf#{
|
||||
<<"listeners">> =>
|
||||
[C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]
|
||||
}
|
||||
fun({Id, L}) ->
|
||||
L1 = maps:remove(ids, L),
|
||||
#{node_status := Nodes} = L1,
|
||||
L1#{number => maps:size(Nodes), id => Id}
|
||||
end,
|
||||
Listeners
|
||||
).
|
||||
|
||||
update_listener(Node, Id, Config) ->
|
||||
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
|
||||
listener_status_by_type([], Acc) ->
|
||||
Acc;
|
||||
listener_status_by_type([NodeL | Rest], Acc) ->
|
||||
#{<<"node">> := Node, <<"listeners">> := Listeners} = NodeL,
|
||||
Acc1 = lists:foldl(
|
||||
fun(L, Acc0) -> format_status(<<"type">>, Node, L, Acc0) end,
|
||||
Acc,
|
||||
Listeners
|
||||
),
|
||||
listener_status_by_type(Rest, Acc1).
|
||||
|
||||
remove_listener(Node, Id) ->
|
||||
wrap_rpc(emqx_management_proto_v1:remove_listener(Node, Id)).
|
||||
listener_status_by_id([], Acc) ->
|
||||
Acc;
|
||||
listener_status_by_id([NodeL | Rest], Acc) ->
|
||||
#{<<"node">> := Node, <<"listeners">> := Listeners} = NodeL,
|
||||
Acc1 = lists:foldl(
|
||||
fun(L, Acc0) -> format_status(<<"id">>, Node, L, Acc0) end,
|
||||
Acc,
|
||||
Listeners
|
||||
),
|
||||
listener_status_by_id(Rest, Acc1).
|
||||
|
||||
listener_type_filter(Type0, Listeners) ->
|
||||
lists:map(
|
||||
fun(Conf = #{<<"listeners">> := Listeners0}) ->
|
||||
Conf#{
|
||||
<<"listeners">> =>
|
||||
[C || C = #{<<"type">> := Type} <- Listeners0, Type =:= Type0]
|
||||
}
|
||||
end,
|
||||
Listeners
|
||||
).
|
||||
|
||||
-spec do_list_listeners() -> map().
|
||||
do_list_listeners() ->
|
||||
|
@ -476,3 +486,75 @@ wrap_rpc({badrpc, Reason}) ->
|
|||
{error, Reason};
|
||||
wrap_rpc(Res) ->
|
||||
Res.
|
||||
|
||||
format_status(Key, Node, Listener, Acc) ->
|
||||
#{
|
||||
<<"id">> := Id,
|
||||
<<"running">> := Running,
|
||||
<<"max_connections">> := MaxConnections,
|
||||
<<"current_connections">> := CurrentConnections
|
||||
} = Listener,
|
||||
GroupKey = maps:get(Key, Listener),
|
||||
case maps:find(GroupKey, Acc) of
|
||||
error ->
|
||||
Acc#{
|
||||
GroupKey => #{
|
||||
enable => Running,
|
||||
ids => [Id],
|
||||
status => #{
|
||||
max_connections => MaxConnections,
|
||||
current_connections => CurrentConnections
|
||||
},
|
||||
node_status => #{
|
||||
Node => #{
|
||||
max_connections => MaxConnections,
|
||||
current_connections => CurrentConnections
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
{ok, GroupValue} ->
|
||||
#{
|
||||
ids := Ids,
|
||||
status := #{
|
||||
max_connections := MaxConnections0,
|
||||
current_connections := CurrentConnections0
|
||||
},
|
||||
node_status := NodeStatus0
|
||||
} = GroupValue,
|
||||
NodeStatus =
|
||||
case maps:find(Node, NodeStatus0) of
|
||||
error ->
|
||||
#{
|
||||
Node => #{
|
||||
max_connections => MaxConnections,
|
||||
current_connections => CurrentConnections
|
||||
}
|
||||
};
|
||||
{ok, #{
|
||||
max_connections := PrevMax,
|
||||
current_connections := PrevCurr
|
||||
}} ->
|
||||
NodeStatus0#{
|
||||
Node => #{
|
||||
max_connections => max_conn(MaxConnections, PrevMax),
|
||||
current_connections => CurrentConnections + PrevCurr
|
||||
}
|
||||
}
|
||||
end,
|
||||
Acc#{
|
||||
GroupKey =>
|
||||
GroupValue#{
|
||||
ids => lists:usort([Id | Ids]),
|
||||
status => #{
|
||||
max_connections => max_conn(MaxConnections0, MaxConnections),
|
||||
current_connections => CurrentConnections0 + CurrentConnections
|
||||
},
|
||||
node_status => NodeStatus
|
||||
}
|
||||
}
|
||||
end.
|
||||
|
||||
max_conn(_Int1, infinity) -> infinity;
|
||||
max_conn(infinity, _Int) -> infinity;
|
||||
max_conn(Int1, Int2) -> Int1 + Int2.
|
||||
|
|
|
@ -35,8 +35,8 @@ end_per_suite(_) ->
|
|||
t_list_listeners(_) ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
|
||||
Res = request(get, Path, [], []),
|
||||
Expect = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||
?assertEqual(emqx_json:encode([Expect]), emqx_json:encode(Res)),
|
||||
#{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||
?assertEqual(length(Expect), length(Res)),
|
||||
ok.
|
||||
|
||||
t_crud_listeners_by_id(_) ->
|
||||
|
@ -44,19 +44,18 @@ t_crud_listeners_by_id(_) ->
|
|||
NewListenerId = <<"tcp:new">>,
|
||||
TcpPath = emqx_mgmt_api_test_util:api_path(["listeners", TcpListenerId]),
|
||||
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
|
||||
[#{<<"listeners">> := [TcpListener], <<"node">> := Node}] = request(get, TcpPath, [], []),
|
||||
?assertEqual(atom_to_binary(node()), Node),
|
||||
TcpListener = request(get, TcpPath, [], []),
|
||||
|
||||
%% create
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||
?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
|
||||
NewConf = TcpListener#{
|
||||
<<"id">> => NewListenerId,
|
||||
<<"bind">> => <<"0.0.0.0:2883">>
|
||||
},
|
||||
[#{<<"listeners">> := [Create]}] = request(put, NewPath, [], NewConf),
|
||||
Create = request(post, NewPath, [], NewConf),
|
||||
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||
[#{<<"listeners">> := [Get1]}] = request(get, NewPath, [], []),
|
||||
Get1 = request(get, NewPath, [], []),
|
||||
?assertMatch(Create, Get1),
|
||||
?assert(is_running(NewListenerId)),
|
||||
|
||||
|
@ -67,64 +66,21 @@ t_crud_listeners_by_id(_) ->
|
|||
<<"id">> => BadId,
|
||||
<<"bind">> => <<"0.0.0.0:2883">>
|
||||
},
|
||||
?assertEqual({error, {"HTTP/1.1", 400, "Bad Request"}}, request(put, BadPath, [], BadConf)),
|
||||
?assertMatch({error, {"HTTP/1.1", 400, _}}, request(post, BadPath, [], BadConf)),
|
||||
|
||||
%% update
|
||||
#{<<"acceptors">> := Acceptors} = Create,
|
||||
Acceptors1 = Acceptors + 10,
|
||||
[#{<<"listeners">> := [Update]}] =
|
||||
Update =
|
||||
request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
|
||||
[#{<<"listeners">> := [Get2]}] = request(get, NewPath, [], []),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
|
||||
|
||||
%% delete
|
||||
?assertEqual([], delete(NewPath)),
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch([#{<<"listeners">> := []}], request(get, NewPath, [], [])),
|
||||
?assertEqual([], delete(NewPath)),
|
||||
ok.
|
||||
|
||||
t_list_listeners_on_node(_) ->
|
||||
Node = atom_to_list(node()),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners"]),
|
||||
Listeners = request(get, Path, [], []),
|
||||
#{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
|
||||
?assertEqual(emqx_json:encode(Expect), emqx_json:encode(Listeners)),
|
||||
ok.
|
||||
|
||||
t_crud_listener_by_id_on_node(_) ->
|
||||
TcpListenerId = <<"tcp:default">>,
|
||||
NewListenerId = <<"tcp:new1">>,
|
||||
Node = atom_to_list(node()),
|
||||
TcpPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", TcpListenerId]),
|
||||
NewPath = emqx_mgmt_api_test_util:api_path(["nodes", Node, "listeners", NewListenerId]),
|
||||
TcpListener = request(get, TcpPath, [], []),
|
||||
|
||||
%% create
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch({error,{"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
|
||||
Create = request(put, NewPath, [], TcpListener#{
|
||||
<<"id">> => NewListenerId,
|
||||
<<"bind">> => <<"0.0.0.0:3883">>
|
||||
}),
|
||||
?assertEqual(lists:sort(maps:keys(TcpListener)), lists:sort(maps:keys(Create))),
|
||||
Get1 = request(get, NewPath, [], []),
|
||||
?assertMatch(Create, Get1),
|
||||
?assert(is_running(NewListenerId)),
|
||||
|
||||
%% update
|
||||
#{<<"acceptors">> := Acceptors} = Create,
|
||||
Acceptors1 = Acceptors + 10,
|
||||
Update = request(put, NewPath, [], Create#{<<"acceptors">> => Acceptors1}),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Update),
|
||||
Get2 = request(get, NewPath, [], []),
|
||||
?assertMatch(#{<<"acceptors">> := Acceptors1}, Get2),
|
||||
|
||||
%% delete
|
||||
?assertEqual([], delete(NewPath)),
|
||||
?assertEqual({error, not_found}, is_running(NewListenerId)),
|
||||
?assertMatch({error, {"HTTP/1.1", 404, "Not Found"}}, request(get, NewPath, [], [])),
|
||||
?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
|
||||
?assertEqual([], delete(NewPath)),
|
||||
ok.
|
||||
|
||||
|
@ -139,8 +95,8 @@ action_listener(ID, Action, Running) ->
|
|||
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path),
|
||||
timer:sleep(500),
|
||||
GetPath = emqx_mgmt_api_test_util:api_path(["listeners", ID]),
|
||||
[#{<<"listeners">> := Listeners}] = request(get, GetPath, [], []),
|
||||
[listener_stats(Listener, Running) || Listener <- Listeners].
|
||||
Listener = request(get, GetPath, [], []),
|
||||
listener_stats(Listener, Running).
|
||||
|
||||
request(Method, Url, QueryParams, Body) ->
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
|
|
Loading…
Reference in New Issue