fix: dialyzer warning.

This commit is contained in:
Zhongwen Deng 2022-04-07 17:18:54 +08:00
parent 63d6682a7d
commit 1664438b4f
4 changed files with 161 additions and 103 deletions

View File

@ -61,7 +61,7 @@
id_example() -> 'tcp:default'. id_example() -> 'tcp:default'.
%% @doc List configured listeners. %% @doc List configured listeners.
-spec list_raw() -> [{ListenerId :: atom(), Type :: atom(), ListenerConf :: map()}]. -spec list_raw() -> [{ListenerId :: atom(), Type :: binary(), ListenerConf :: map()}].
list_raw() -> list_raw() ->
[{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()]. [{listener_id(Type, LName), Type, LConf} || {Type, LName, LConf} <- do_list_raw()].
@ -76,7 +76,7 @@ format_list(Listener) ->
Running = is_running(Type, listener_id(Type, LName), LConf), Running = is_running(Type, listener_id(Type, LName), LConf),
{Type, LName, maps:put(running, Running, LConf)} {Type, LName, maps:put(running, Running, LConf)}
end end
|| {LName, LConf} <- maps:to_list(Conf), is_map(LConf) || {LName, LConf} <- maps:to_list(Conf), is_map(LConf)
]. ].
do_list_raw() -> do_list_raw() ->
@ -94,13 +94,20 @@ format_raw_listeners({Type, Conf}) ->
LConf1 = maps:remove(<<"authentication">>, LConf0), LConf1 = maps:remove(<<"authentication">>, LConf0),
LConf2 = maps:put(<<"running">>, Running, LConf1), LConf2 = maps:put(<<"running">>, Running, LConf1),
{Type, LName, LConf2} {Type, LName, LConf2}
end, maps:to_list(Conf)). end,
maps:to_list(Conf)
).
-spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
is_running(ListenerId) -> is_running(ListenerId) ->
{Type, Name} = parse_listener_id(ListenerId), {Type, Name} = parse_listener_id(ListenerId),
case [ Running || {Type0, Name0, #{running := Running}} <- list(), case
Type0 =:= Type, Name0 =:= Name] [
Running
|| {Type0, Name0, #{running := Running}} <- list(),
Type0 =:= Type,
Name0 =:= Name
]
of of
[] -> {error, not_found}; [] -> {error, not_found};
[IsRunning] -> IsRunning [IsRunning] -> IsRunning
@ -109,7 +116,8 @@ is_running(ListenerId) ->
is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
ListenOn = ListenOn =
case Conf of case Conf of
#{bind := Bind} -> Bind; #{bind := Bind} ->
Bind;
#{<<"bind">> := Bind} -> #{<<"bind">> := Bind} ->
case emqx_schema:to_ip_port(binary_to_list(Bind)) of case emqx_schema:to_ip_port(binary_to_list(Bind)) of
{ok, L} -> L; {ok, L} -> L;
@ -340,10 +348,15 @@ post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
perform_listener_changes(fun delete_authentication/3, Removed), perform_listener_changes(fun delete_authentication/3, Removed),
perform_listener_changes(fun start_listener/3, Added), perform_listener_changes(fun start_listener/3, Added),
perform_listener_changes(fun restart_listener/3, Updated) perform_listener_changes(fun restart_listener/3, Updated)
catch error : {failed_to_start, ListenerId, Bind, Reason} -> catch
Error = lists:flatten(io_lib:format("~ts(~ts) failed with ~ts", error:{failed_to_start, ListenerId, Bind, Reason} ->
[ListenerId, Bind, element(1, Reason)])), Error = lists:flatten(
{error, Error} io_lib:format(
"~ts(~ts) failed with ~ts",
[ListenerId, Bind, element(1, Reason)]
)
),
{error, Error}
end. end.
perform_listener_changes(Action, MapConfs) -> perform_listener_changes(Action, MapConfs) ->

View File

@ -74,10 +74,11 @@ global_chain_config() ->
listener_chain_configs() -> listener_chain_configs() ->
lists:map( lists:map(
fun({ListenerID, _, _}) -> fun({ListenerID, _, _}) ->
{ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])} {ListenerID, emqx:get_raw_config(auth_config_path(ListenerID), [])}
end, end,
emqx_listeners:list()). emqx_listeners:list()
).
auth_config_path(ListenerID) -> auth_config_path(ListenerID) ->
[<<"listeners">>] ++ [<<"listeners">>] ++

View File

@ -21,18 +21,20 @@
-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). -import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
-export([ list_listeners/2 -export([
, crud_listeners_by_id/2 list_listeners/2,
, list_listeners_on_node/2 crud_listeners_by_id/2,
, crud_listener_by_id_on_node/2 list_listeners_on_node/2,
, action_listeners/2 crud_listener_by_id_on_node/2,
]). action_listeners/2
]).
%% for rpc call %% for rpc call
-export([ do_list_listeners/0 -export([
, do_update_listener/2 do_list_listeners/0,
, do_remove_listener/1 do_update_listener/2,
]). do_remove_listener/1
]).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
@ -108,7 +110,8 @@ schema("/listeners/:id/:action") ->
desc => <<"Start/stop/restart listeners on all nodes.">>, desc => <<"Start/stop/restart listeners on all nodes.">>,
parameters => [ parameters => [
?R_REF(listener_id), ?R_REF(listener_id),
?R_REF(action)], ?R_REF(action)
],
responses => #{ responses => #{
200 => <<"Updated">>, 200 => <<"Updated">>,
400 => error_codes(['BAD_REQUEST']) 400 => error_codes(['BAD_REQUEST'])
@ -136,7 +139,8 @@ schema("/nodes/:node/listeners/:id") ->
desc => <<"Get the specified listener on the specified node.">>, desc => <<"Get the specified listener on the specified node.">>,
parameters => [ parameters => [
?R_REF(listener_id), ?R_REF(listener_id),
?R_REF(node)], ?R_REF(node)
],
responses => #{ responses => #{
200 => ?HOCON(listener_schema()), 200 => ?HOCON(listener_schema()),
400 => error_codes(['BAD_REQUEST']), 400 => error_codes(['BAD_REQUEST']),
@ -148,21 +152,25 @@ schema("/nodes/:node/listeners/:id") ->
desc => <<"Create or update the specified listener on the specified node.">>, desc => <<"Create or update the specified listener on the specified node.">>,
parameters => [ parameters => [
?R_REF(listener_id), ?R_REF(listener_id),
?R_REF(node)], ?R_REF(node)
],
'requestBody' => ?HOCON(listener_schema()), 'requestBody' => ?HOCON(listener_schema()),
responses => #{ responses => #{
200 => ?HOCON(listener_schema()), 200 => ?HOCON(listener_schema()),
400 => error_codes(['BAD_REQUEST']) 400 => error_codes(['BAD_REQUEST'])
}}, }
},
delete => #{ delete => #{
tags => [<<"listeners">>], tags => [<<"listeners">>],
desc => <<"Delete the specified listener on the specified node.">>, desc => <<"Delete the specified listener on the specified node.">>,
parameters => [ parameters => [
?R_REF(listener_id), ?R_REF(listener_id),
?R_REF(node)], ?R_REF(node)
],
responses => #{ responses => #{
204 => <<"Listener deleted">>, 204 => <<"Listener deleted">>,
400 => error_codes(['BAD_REQUEST'])} 400 => error_codes(['BAD_REQUEST'])
}
} }
}; };
schema("/nodes/:node/listeners/:id/:action") -> schema("/nodes/:node/listeners/:id/:action") ->
@ -174,46 +182,52 @@ schema("/nodes/:node/listeners/:id/:action") ->
parameters => [ parameters => [
?R_REF(node), ?R_REF(node),
?R_REF(listener_id), ?R_REF(listener_id),
?R_REF(action)], ?R_REF(action)
],
responses => #{ responses => #{
200 => <<"Updated">>, 200 => <<"Updated">>,
400 => error_codes(['BAD_REQUEST'])} 400 => error_codes(['BAD_REQUEST'])
}
} }
}. }.
fields(listeners) -> fields(listeners) ->
[ [
{"node", ?HOCON(atom(), #{ {"node",
desc => "Node name", ?HOCON(atom(), #{
example => "emqx@127.0.0.1", desc => "Node name",
required => true}) example => "emqx@127.0.0.1",
}, required => true
})},
{"listeners", ?ARRAY(listener_schema())} {"listeners", ?ARRAY(listener_schema())}
]; ];
fields(listener_id) -> fields(listener_id) ->
[ [
{id, ?HOCON(atom(), #{ {id,
desc => "Listener id", ?HOCON(atom(), #{
example => 'tcp:default', desc => "Listener id",
validator => fun validate_id/1, example => 'tcp:default',
in => path}) validator => fun validate_id/1,
} in => path
})}
]; ];
fields(action) -> fields(action) ->
[ [
{action, ?HOCON(?ENUM([start, stop, restart]), #{ {action,
desc => "listener action", ?HOCON(?ENUM([start, stop, restart]), #{
example => start, desc => "listener action",
in => path}) example => start,
} in => path
})}
]; ];
fields(node) -> fields(node) ->
[ [
{"node", ?HOCON(atom(), #{ {"node",
desc => "Node name", ?HOCON(atom(), #{
example => "emqx@127.0.0.1", desc => "Node name",
in => path}) example => "emqx@127.0.0.1",
} in => path
})}
]; ];
fields(Type) -> fields(Type) ->
Listeners = listeners_info(), Listeners = listeners_info(),
@ -225,19 +239,28 @@ listener_schema() ->
listeners_info() -> listeners_info() ->
Listeners = hocon_schema:fields(emqx_schema, "listeners"), Listeners = hocon_schema:fields(emqx_schema, "listeners"),
lists:map(fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) -> lists:map(
Fields0 = hocon_schema:fields(Mod, Field), fun({Type, #{type := ?MAP(_Name, ?R_REF(Mod, Field))}}) ->
Fields1 = lists:keydelete("authentication", 1, Fields0), Fields0 = hocon_schema:fields(Mod, Field),
TypeAtom = list_to_existing_atom(Type), Fields1 = lists:keydelete("authentication", 1, Fields0),
#{ref => ?R_REF(TypeAtom), TypeAtom = list_to_existing_atom(Type),
schema => [ #{
{type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})}, ref => ?R_REF(TypeAtom),
{running, ?HOCON(boolean(), #{desc => "Listener status", required => false})}, schema => [
{id, ?HOCON(atom(), #{desc => "Listener id", required => true, {type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})},
validator => fun validate_id/1})} {running, ?HOCON(boolean(), #{desc => "Listener status", required => false})},
| Fields1] {id,
} ?HOCON(atom(), #{
end, Listeners). desc => "Listener id",
required => true,
validator => fun validate_id/1
})}
| Fields1
]
}
end,
Listeners
).
validate_id(Id) -> validate_id(Id) ->
case emqx_listeners:parse_listener_id(Id) of case emqx_listeners:parse_listener_id(Id) of
@ -304,19 +327,22 @@ crud_listener_by_id_on_node(get, #{bindings := #{id := Id, node := Node}}) ->
end; end;
crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) -> crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body := Body}) ->
case parse_listener_conf(Body) of case parse_listener_conf(Body) of
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
{Id, Type, _Name, Conf} -> {Id, Type, _Name, Conf} ->
case update_listener(Node, Id, Conf) of case update_listener(Node, Id, Conf) of
{error, nodedown} -> {error, nodedown} ->
{400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}}; {400, #{code => 'BAD_REQUEST', message => ?NODE_NOT_FOUND_OR_DOWN}};
{error, {eaddrinuse, _}} -> %% TODO %% TODO
{error, {eaddrinuse, _}} ->
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
{error, Reason} -> {error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}; {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
{ok, Listener} -> {ok, Listener} ->
{200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}} {200, Listener#{<<"id">> => Id, <<"type">> => Type, <<"running">> => true}}
end; end;
_ -> {400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}} _ ->
{400, #{code => 'BAD_REQUEST', message => ?LISTENER_ID_INCONSISTENT}}
end; end;
crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) -> crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
case remove_listener(Node, Id) of case remove_listener(Node, Id) of
@ -327,10 +353,17 @@ crud_listener_by_id_on_node(delete, #{bindings := #{id := Id, node := Node}}) ->
action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) -> action_listeners(post, #{bindings := #{id := Id, action := Action, node := Node}}) ->
{_, Result} = action_listeners(Node, Id, Action), {_, Result} = action_listeners(Node, Id, Action),
Result; Result;
action_listeners(post, #{bindings := #{id := Id, action := Action}}) -> action_listeners(post, #{bindings := #{id := Id, action := Action}}) ->
Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()], Results = [action_listeners(Node, Id, Action) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of case
lists:filter(
fun
({_, {200}}) -> false;
(_) -> true
end,
Results
)
of
[] -> {200}; [] -> {200};
Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}} Errors -> {400, #{code => 'BAD_REQUEST', message => action_listeners_err(Errors)}}
end. end.
@ -344,28 +377,31 @@ do_action_listeners(start, Node, Id) ->
case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of case wrap_rpc(emqx_broker_proto_v1:start_listener(Node, Id)) of
ok -> {200}; ok -> {200};
{error, {already_started, _}} -> {200}; {error, {already_started, _}} -> {200};
{error, Reason} -> {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end; end;
do_action_listeners(stop, Node, Id) -> do_action_listeners(stop, Node, Id) ->
case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of case wrap_rpc(emqx_broker_proto_v1:stop_listener(Node, Id)) of
ok -> {200}; ok -> {200};
{error, not_found} -> {200}; {error, not_found} -> {200};
{error, Reason} -> {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end; end;
do_action_listeners(restart, Node, Id) -> do_action_listeners(restart, Node, Id) ->
case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of case wrap_rpc(emqx_broker_proto_v1:restart_listener(Node, Id)) of
ok -> {200}; ok -> {200};
{error, not_found} -> do_action_listeners(start, Node, Id); {error, not_found} -> do_action_listeners(start, Node, Id);
{error, Reason} -> {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end. end.
action_listeners_err(Errors) -> action_listeners_err(Errors) ->
list_to_binary(lists:foldl(fun({Node, Err}, Str) -> list_to_binary(
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str lists:foldl(
end, "", Errors)). fun({Node, Err}, Str) ->
err_msg_str(#{node => Node, error => Err}) ++ "; " ++ Str
end,
"",
Errors
)
).
err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom); err_msg(Atom) when is_atom(Atom) -> atom_to_binary(Atom);
err_msg(Reason) -> list_to_binary(err_msg_str(Reason)). err_msg(Reason) -> list_to_binary(err_msg_str(Reason)).
@ -389,10 +425,15 @@ get_listener(Node, Id) ->
end. end.
listener_id_filter(Id, Listeners) -> listener_id_filter(Id, Listeners) ->
lists:map(fun(Conf = #{<<"listeners">> := Listeners0}) -> lists:map(
Conf#{<<"listeners">> => fun(Conf = #{<<"listeners">> := Listeners0}) ->
[C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]} Conf#{
end, Listeners). <<"listeners">> =>
[C || C = #{<<"id">> := Id0} <- Listeners0, Id =:= Id0]
}
end,
Listeners
).
update_listener(Node, Id, Config) -> update_listener(Node, Id, Config) ->
wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)). wrap_rpc(emqx_management_proto_v1:update_listener(Node, Id, Config)).
@ -402,8 +443,10 @@ remove_listener(Node, Id) ->
-spec do_list_listeners() -> map(). -spec do_list_listeners() -> map().
do_list_listeners() -> do_list_listeners() ->
Listeners = [Conf#{<<"id">> => Id, <<"type">> => Type} Listeners = [
|| {Id, Type, Conf} <- emqx_listeners:list_raw()], Conf#{<<"id">> => Id, <<"type">> => Type}
|| {Id, Type, Conf} <- emqx_listeners:list_raw()
],
#{ #{
<<"node">> => node(), <<"node">> => node(),
<<"listeners">> => Listeners <<"listeners">> => Listeners

View File

@ -18,23 +18,24 @@
-behaviour(emqx_bpapi). -behaviour(emqx_bpapi).
-export([ introduced_in/0 -export([
introduced_in/0,
, node_info/1 node_info/1,
, broker_info/1 broker_info/1,
, list_subscriptions/1 list_subscriptions/1,
, list_listeners/1 list_listeners/1,
, remove_listener/2 remove_listener/2,
, update_listener/3 update_listener/3,
, subscribe/3 subscribe/3,
, unsubscribe/3 unsubscribe/3,
, call_client/3 call_client/3,
, get_full_config/1 get_full_config/1
]). ]).
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
@ -53,7 +54,7 @@ broker_info(Node) ->
list_subscriptions(Node) -> list_subscriptions(Node) ->
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []). rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
-spec list_listeners(node()) -> [map()] | {badrpc, _}. -spec list_listeners(node()) -> map() | {badrpc, _}.
list_listeners(Node) -> list_listeners(Node) ->
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []). rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
@ -61,18 +62,18 @@ list_listeners(Node) ->
remove_listener(Node, Id) -> remove_listener(Node, Id) ->
rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]). rpc:call(Node, emqx_mgmt_api_listeners, do_remove_listener, [Id]).
-spec update_listener(node(), string(), emqx_config:update_request()) -> -spec update_listener(node(), atom(), emqx_config:update_request()) ->
map() | {error, _} | {badrpc, _}. {ok, map()} | {error, _} | {badrpc, _}.
update_listener(Node, Id, Config) -> update_listener(Node, Id, Config) ->
rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]). rpc:call(Node, emqx_mgmt_api_listeners, do_update_listener, [Id, Config]).
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) -> -spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
{subscribe, _} | {error, atom()} | {badrpc, _}. {subscribe, _} | {error, atom()} | {badrpc, _}.
subscribe(Node, ClientId, TopicTables) -> subscribe(Node, ClientId, TopicTables) ->
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]). rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) -> -spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, _} | {badrpc, _}. {unsubscribe, _} | {error, _} | {badrpc, _}.
unsubscribe(Node, ClientId, Topic) -> unsubscribe(Node, ClientId, Topic) ->
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]). rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).