diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index f264339a4..9037518c5 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -20,6 +20,11 @@ -import(emqx_gateway_http, [ return_http_error/2 + , with_gateway/2 + , schema_bad_request/0 + , schema_not_found/0 + , schema_internal_error/0 + , schema_no_content/0 ]). %% minirest behaviour callbacks @@ -55,44 +60,34 @@ gateway(get, Request) -> {200, emqx_gateway_http:gateways(Status)}. gateway_insta(delete, #{bindings := #{name := Name0}}) -> - Name = binary_to_existing_atom(Name0), - case emqx_gateway:unload(Name) of - ok -> - {204}; - {error, not_found} -> - return_http_error(404, <<"Gateway not found">>) - end; + with_gateway(Name0, fun(GwName, _) -> + _ = emqx_gateway:unload(GwName), + {204} + end); gateway_insta(get, #{bindings := #{name := Name0}}) -> - Name = binary_to_existing_atom(Name0), - case emqx_gateway:lookup(Name) of - #{config := _Config} -> - GwCfs = filled_raw_confs([<<"gateway">>, Name0]), - NGwCfs = GwCfs#{<<"listeners">> => - emqx_gateway_http:mapping_listener_m2l( - Name0, maps:get(<<"listeners">>, GwCfs, #{}) - ) - }, - {200, NGwCfs}; - undefined -> - return_http_error(404, <<"Gateway not found">>) - end; -gateway_insta(put, #{body := RawConfsIn0, - bindings := #{name := Name} + with_gateway(Name0, fun(_, _) -> + GwConf = filled_raw_confs([<<"gateway">>, Name0]), + LisConf = maps:get(<<"listeners">>, GwConf, #{}), + NLisConf = emqx_gateway_http:mapping_listener_m2l(Name0, LisConf), + {200, GwConf#{<<"listeners">> => NLisConf}} + end); +gateway_insta(put, #{body := GwConf0, + bindings := #{name := Name0} }) -> - RawConfsIn = maps:without([<<"authentication">>, - <<"listeners">>], RawConfsIn0), - %% FIXME: Cluster Consistence ?? - case emqx_gateway:update_rawconf(Name, RawConfsIn) of - ok -> - {200}; - {error, not_found} -> - return_http_error(404, <<"Gateway not found">>); - {error, Reason} -> - return_http_error(500, Reason) - end. + with_gateway(Name0, fun(_, _) -> + GwConf = maps:without([<<"authentication">>, <<"listeners">>], GwConf0), + case emqx_gateway:update_rawconf(Name0, GwConf) of + ok -> + {200}; + {error, not_found} -> + return_http_error(404, "Gateway not found"); + {error, Reason} -> + return_http_error(500, Reason) + end + end). gateway_insta_stats(get, _Req) -> - return_http_error(401, <<"Implement it later (maybe 5.1)">>). + return_http_error(401, "Implement it later (maybe 5.1)"). filled_raw_confs(Path) -> RawConf = emqx_config:fill_defaults( @@ -131,7 +126,9 @@ swagger("/gateway/:name", get) -> #{ description => <<"Get the gateway configurations">> , parameters => params_gateway_name_in_path() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_gateway_conf() } }; @@ -139,7 +136,9 @@ swagger("/gateway/:name", delete) -> #{ description => <<"Delete/Unload the gateway">> , parameters => params_gateway_name_in_path() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"204">> => schema_no_content() } }; @@ -148,7 +147,9 @@ swagger("/gateway/:name", put) -> , parameters => params_gateway_name_in_path() , requestBody => schema_gateway_conf() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_no_content() } }; @@ -156,7 +157,9 @@ swagger("/gateway/:name/stats", get) -> #{ description => <<"Get gateway Statistic">> , parameters => params_gateway_name_in_path() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_gateway_stats() } }. @@ -181,12 +184,6 @@ params_gateway_status_in_qs() -> %%-------------------------------------------------------------------- %% schemas -schema_not_found() -> - emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>). - -schema_no_content() -> - #{description => <<"No Content">>}. - schema_gateway_overview_list() -> emqx_mgmt_util:array_schema( #{ type => object diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index fcfea7343..386d6e1ea 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -36,6 +36,11 @@ -import(emqx_gateway_http, [ return_http_error/2 + , with_gateway/2 + , schema_bad_request/0 + , schema_not_found/0 + , schema_internal_error/0 + , schema_no_content/0 ]). %%-------------------------------------------------------------------- @@ -71,102 +76,103 @@ apis() -> -define(query_fun, {?MODULE, query}). -define(format_fun, {?MODULE, format_channel_info}). -clients(get, #{ bindings := #{name := GwName0} +clients(get, #{ bindings := #{name := Name0} , query_string := Qs }) -> - GwName = binary_to_existing_atom(GwName0), - TabName = emqx_gateway_cm:tabname(info, GwName), - case maps:get(<<"node">>, Qs, undefined) of - undefined -> - Response = emqx_mgmt_api:cluster_query( - Qs, TabName, - ?CLIENT_QS_SCHEMA, ?query_fun - ), - {200, Response}; - Node1 -> - Node = binary_to_atom(Node1, utf8), - ParamsWithoutNode = maps:without([<<"node">>], Qs), - Response = emqx_mgmt_api:node_query( - Node, ParamsWithoutNode, - TabName, ?CLIENT_QS_SCHEMA, ?query_fun - ), - {200, Response} - end. + with_gateway(Name0, fun(GwName, _) -> + TabName = emqx_gateway_cm:tabname(info, GwName), + case maps:get(<<"node">>, Qs, undefined) of + undefined -> + Response = emqx_mgmt_api:cluster_query( + Qs, TabName, + ?CLIENT_QS_SCHEMA, ?query_fun + ), + {200, Response}; + Node1 -> + Node = binary_to_atom(Node1, utf8), + ParamsWithoutNode = maps:without([<<"node">>], Qs), + Response = emqx_mgmt_api:node_query( + Node, ParamsWithoutNode, + TabName, ?CLIENT_QS_SCHEMA, ?query_fun + ), + {200, Response} + end + end). -clients_insta(get, #{ bindings := #{name := GwName0, +clients_insta(get, #{ bindings := #{name := Name0, clientid := ClientId0} }) -> - GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - - case emqx_gateway_http:lookup_client(GwName, ClientId, - {?MODULE, format_channel_info}) of - [ClientInfo] -> - {200, ClientInfo}; - [ClientInfo|_More] -> - ?LOG(warning, "More than one client info was returned on ~s", - [ClientId]), - {200, ClientInfo}; - [] -> - return_http_error(404, <<"Gateway or ClientId not found">>) - - end; - -clients_insta(delete, #{ bindings := #{name := GwName0, + with_gateway(Name0, fun(GwName, _) -> + case emqx_gateway_http:lookup_client(GwName, ClientId, + {?MODULE, format_channel_info}) of + [ClientInfo] -> + {200, ClientInfo}; + [ClientInfo|_More] -> + ?LOG(warning, "More than one client info was returned on ~s", + [ClientId]), + {200, ClientInfo}; + [] -> + return_http_error(404, "Client not found") + end + end); +clients_insta(delete, #{ bindings := #{name := Name0, clientid := ClientId0} }) -> - GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - _ = emqx_gateway_http:kickout_client(GwName, ClientId), - {200}. + with_gateway(Name0, fun(GwName, _) -> + _ = emqx_gateway_http:kickout_client(GwName, ClientId), + {200} + end). %% FIXME: %% List the subscription without mountpoint, but has SubOpts, %% for example, share group ... -subscriptions(get, #{ bindings := #{name := GwName0, +subscriptions(get, #{ bindings := #{name := Name0, clientid := ClientId0} }) -> - GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of - {error, Reason} -> - return_http_error(404, Reason); - {ok, Subs} -> - {200, Subs} - end; + with_gateway(Name0, fun(GwName, _) -> + case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of + {error, Reason} -> + return_http_error(500, Reason); + {ok, Subs} -> + {200, Subs} + end + end); %% Create the subscription without mountpoint -subscriptions(post, #{ bindings := #{name := GwName0, +subscriptions(post, #{ bindings := #{name := Name0, clientid := ClientId0}, body := Body }) -> - GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - - case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of - {undefined, _} -> - %% FIXME: more reasonable error code?? - return_http_error(404, <<"Request paramter missed: topic">>); - {Topic, QoS} -> - case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of - {error, Reason} -> - return_http_error(404, Reason); - ok -> - {200} - end - end; + with_gateway(Name0, fun(GwName, _) -> + case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of + {undefined, _} -> + return_http_error(400, "Miss topic property"); + {Topic, QoS} -> + case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of + {error, Reason} -> + return_http_error(404, Reason); + ok -> + {200} + end + end + end); %% Remove the subscription without mountpoint -subscriptions(delete, #{ bindings := #{name := GwName0, +subscriptions(delete, #{ bindings := #{name := Name0, clientid := ClientId0, topic := Topic0 } }) -> - GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), Topic = emqx_mgmt_util:urldecode(Topic0), - _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), - {200}. + with_gateway(Name0, fun(GwName, _) -> + _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), + {200} + end). %%-------------------------------------------------------------------- %% Utils @@ -379,7 +385,9 @@ swagger("/gateway/:name/clients", get) -> #{ description => <<"Get the gateway clients">> , parameters => params_client_query() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_clients_list() } }; @@ -387,7 +395,9 @@ swagger("/gateway/:name/clients/:clientid", get) -> #{ description => <<"Get the gateway client infomation">> , parameters => params_client_insta() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_client() } }; @@ -395,7 +405,9 @@ swagger("/gateway/:name/clients/:clientid", delete) -> #{ description => <<"Kick out the gateway client">> , parameters => params_client_insta() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"204">> => schema_no_content() } }; @@ -403,7 +415,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", get) -> #{ description => <<"Get the gateway client subscriptions">> , parameters => params_client_insta() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_subscription_list() } }; @@ -412,7 +426,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) -> , parameters => params_client_insta() , requestBody => schema_subscription() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"200">> => schema_no_content() } }; @@ -420,7 +436,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) -> #{ description => <<"Unsubscribe the topic for client">> , parameters => params_topic_name_in_path() ++ params_client_insta() , responses => - #{ <<"404">> => schema_not_found() + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() , <<"204">> => schema_no_content() } }. @@ -483,12 +501,6 @@ queries(Ls) -> %%-------------------------------------------------------------------- %% schemas -schema_not_found() -> - emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>). - -schema_no_content() -> - #{description => <<"No Content">>}. - schema_clients_list() -> emqx_mgmt_util:page_schema( #{ type => object diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl new file mode 100644 index 000000000..374f2841d --- /dev/null +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -0,0 +1,316 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_api_listeners). + +-behaviour(minirest_api). + +-import(emqx_gateway_http, + [ return_http_error/2 + , with_gateway/2 + , checks/2 + , schema_bad_request/0 + , schema_not_found/0 + , schema_internal_error/0 + , schema_no_content/0 + ]). + +%% minirest behaviour callbacks +-export([api_spec/0]). + +%% http handlers +-export([ listeners/2 + , listeners_insta/2 + ]). + +%%-------------------------------------------------------------------- +%% minirest behaviour callbacks +%%-------------------------------------------------------------------- + +api_spec() -> + {metadata(apis()), []}. + +apis() -> + [ {"/gateway/:name/listeners", listeners} + , {"/gateway/:name/listeners/:id", listeners_insta} + ]. +%%-------------------------------------------------------------------- +%% http handlers + +listeners(get, #{bindings := #{name := Name0}}) -> + with_gateway(Name0, fun(GwName, _) -> + {200, emqx_gateway_http:listeners(GwName)} + end); + +listeners(post, #{bindings := #{name := Name0}, body := LConf}) -> + with_gateway(Name0, fun(GwName, Gateway) -> + RunningConf = maps:get(config, Gateway), + %% XXX: check params miss? check badly data tpye?? + _ = checks([<<"type">>, <<"name">>, <<"bind">>], LConf), + + Type = binary_to_existing_atom(maps:get(<<"type">>, LConf)), + LName = binary_to_atom(maps:get(<<"name">>, LConf)), + + Path = [listeners, Type, LName], + case emqx_map_lib:deep_get(Path, RunningConf, undefined) of + undefined -> + ListenerId = emqx_gateway_utils:listener_id( + GwName, Type, LName), + case emqx_gateway_http:update_listener( + ListenerId, LConf) of + ok -> + {204}; + {error, Reason} -> + return_http_error(500, Reason) + end; + _ -> + return_http_error(400, "Listener name has occupied") + end + end). + +listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId0}}) -> + ListenerId = emqx_mgmt_util:urldecode(ListenerId0), + with_gateway(Name0, fun(_GwName, _) -> + case emqx_gateway_http:remove_listener(ListenerId) of + ok -> {204}; + {error, not_found} -> {204}; + {error, Reason} -> + return_http_error(500, Reason) + end + end); +listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) -> + ListenerId = emqx_mgmt_util:urldecode(ListenerId0), + with_gateway(Name0, fun(_GwName, _) -> + case emqx_gateway_http:listener(ListenerId) of + {ok, Listener} -> + {200, Listener}; + {error, not_found} -> + return_http_error(404, "Listener not found"); + {error, Reason} -> + return_http_error(500, Reason) + end + end); +listeners_insta(put, #{body := LConf, + bindings := #{name := Name0, id := ListenerId0} + }) -> + ListenerId = emqx_mgmt_util:urldecode(ListenerId0), + with_gateway(Name0, fun(_GwName, _) -> + case emqx_gateway_http:update_listener(ListenerId, LConf) of + ok -> + {204}; + {error, Reason} -> + return_http_error(500, Reason) + end + end). + +%%-------------------------------------------------------------------- +%% Swagger defines +%%-------------------------------------------------------------------- + +metadata(APIs) -> + metadata(APIs, []). +metadata([], APIAcc) -> + lists:reverse(APIAcc); +metadata([{Path, Fun}|More], APIAcc) -> + Methods = [get, post, put, delete, patch], + Mds = lists:foldl(fun(M, Acc) -> + try + Acc#{M => swagger(Path, M)} + catch + error : function_clause -> + Acc + end + end, #{}, Methods), + metadata(More, [{Path, Mds, Fun} | APIAcc]). + +swagger("/gateway/:name/listeners", get) -> + #{ description => <<"Get the gateway listeners">> + , parameters => params_gateway_name_in_path() + , responses => + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() + , <<"200">> => schema_listener_list() + } + }; +swagger("/gateway/:name/listeners", post) -> + #{ description => <<"Create the gateway listener">> + , parameters => params_gateway_name_in_path() + , requestBody => schema_listener() + , responses => + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() + , <<"200">> => schema_listener_list() + } + }; +swagger("/gateway/:name/listeners/:id", get) -> + #{ description => <<"Get the gateway listener configurations">> + , parameters => params_gateway_name_in_path() + ++ params_listener_id_in_path() + , responses => + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() + , <<"200">> => schema_listener() + } + }; +swagger("/gateway/:name/listeners/:id", delete) -> + #{ description => <<"Delete the gateway listener">> + , parameters => params_gateway_name_in_path() + ++ params_listener_id_in_path() + , responses => + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() + , <<"204">> => schema_no_content() + } + }; +swagger("/gateway/:name/listeners/:id", put) -> + #{ description => <<"Update the gateway listener">> + , parameters => params_gateway_name_in_path() + ++ params_listener_id_in_path() + , requestBody => schema_listener() + , responses => + #{ <<"400">> => schema_bad_request() + , <<"404">> => schema_not_found() + , <<"500">> => schema_internal_error() + , <<"200">> => schema_no_content() + } + }. + +%%-------------------------------------------------------------------- +%% params defines + +params_gateway_name_in_path() -> + [#{ name => name + , in => path + , schema => #{type => string} + , required => true + }]. + +params_listener_id_in_path() -> + [#{ name => id + , in => path + , schema => #{type => string} + , required => true + }]. + +%%-------------------------------------------------------------------- +%% schemas + +schema_listener_list() -> + emqx_mgmt_util:array_schema( + #{ type => object + , properties => properties_listener() + }, + <<"Listener list">> + ). + +schema_listener() -> + emqx_mgmt_util:schema( + #{ type => object + , properties => properties_listener() + } + ). + +%%-------------------------------------------------------------------- +%% properties + +properties_listener() -> + emqx_mgmt_util:properties( + raw_properties_common_listener() ++ + [ {tcp, object, raw_properties_tcp_opts()} + , {ssl, object, raw_properties_ssl_opts()} + , {udp, object, raw_properties_udp_opts()} + , {dtls, object, raw_properties_dtls_opts()} + ]). + +raw_properties_tcp_opts() -> + [ {active_n, integer, <<>>} + , {backlog, integer, <<>>} + , {buffer, string, <<>>} + , {recbuf, string, <<>>} + , {sndbuf, string, <<>>} + , {high_watermark, string, <<>>} + , {nodelay, boolean, <<>>} + , {reuseaddr, boolean, <<>>} + , {send_timeout, string, <<>>} + , {send_timeout_close, boolean, <<>>} + ]. + +raw_properties_ssl_opts() -> + [ {cacertfile, string, <<>>} + , {certfile, string, <<>>} + , {keyfile, string, <<>>} + , {verify, string, <<>>} + , {fail_if_no_peer_cert, boolean, <<>>} + , {server_name_indication, boolean, <<>>} + , {depth, integer, <<>>} + , {password, string, <<>>} + , {handshake_timeout, string, <<>>} + , {versions, {array, string}, <<>>} + , {ciphers, {array, string}, <<>>} + , {user_lookup_fun, string, <<>>} + , {reuse_sessions, boolean, <<>>} + , {secure_renegotiate, boolean, <<>>} + , {honor_cipher_order, boolean, <<>>} + , {dhfile, string, <<>>} + ]. + +raw_properties_udp_opts() -> + [ {active_n, integer, <<>>} + , {buffer, string, <<>>} + , {recbuf, string, <<>>} + , {sndbuf, string, <<>>} + , {reuseaddr, boolean, <<>>} + ]. + +raw_properties_dtls_opts() -> + Ls = lists_key_without( + [versions,ciphers,handshake_timeout], 1, + raw_properties_ssl_opts() + ), + [ {versions, {array, string}, <<>>} + , {ciphers, {array, string}, <<>>} + | Ls]. + +lists_key_without([], _N, L) -> + L; +lists_key_without([K|Ks], N, L) -> + lists_key_without(Ks, N, lists:keydelete(K, N, L)). + +raw_properties_common_listener() -> + [ {enable, boolean, <<"Whether to enable this listener">>} + , {id, string, <<"Listener Id">>} + , {name, string, <<"Listener name">>} + , {type, string, + <<"Listener type. Enum: tcp, udp, ssl, dtls">>, + [<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]} + , {running, boolean, <<"Listener running status">>} + %% FIXME: + , {bind, string, <<"Listener bind address or port">>} + , {acceptors, integer, <<"Listener acceptors number">>} + , {access_rules, {array, string}, <<"Listener Access rules for client">>} + , {max_conn_rate, integer, <<"Max connection rate for the listener">>} + , {max_connections, integer, <<"Max connections for the listener">>} + , {mountpoint, string, + <<"The Mounpoint for clients of the listener. " + "The gateway-level mountpoint configuration can be overloaded " + "when it is not null or empty string">>} + %% FIXME: + , {authentication, string, <<"NOT-SUPPORTED-NOW">>} + ]. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index f233a6151..51137423e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -26,7 +26,9 @@ %% Mgmt APIs - listeners -export([ listeners/1 - , listener/2 + , listener/1 + , remove_listener/1 + , update_listener/2 , mapping_listener_m2l/2 ]). @@ -42,6 +44,12 @@ %% Utils for http, swagger, etc. -export([ return_http_error/2 + , with_gateway/2 + , checks/2 + , schema_bad_request/0 + , schema_not_found/0 + , schema_internal_error/0 + , schema_no_content/0 ]). -type gateway_summary() :: @@ -108,7 +116,7 @@ get_listeners_status(GwName, Config) -> lists:map(fun({Type, LisName, ListenOn, _, _}) -> Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName), Name = {Name0, ListenOn}, - LisO = #{id => Name0, type => Type}, + LisO = #{id => Name0, type => Type, name => LisName}, case catch esockd:listener(Name) of _Pid when is_pid(_Pid) -> LisO#{running => true}; @@ -121,7 +129,8 @@ get_listeners_status(GwName, Config) -> %% Mgmt APIs - listeners %%-------------------------------------------------------------------- -listeners(GwName) when is_atom (GwName) -> +-spec listeners(atom() | binary()) -> list(). +listeners(GwName) when is_atom(GwName) -> listeners(atom_to_binary(GwName)); listeners(GwName) -> RawConf = emqx_config:fill_defaults( @@ -131,8 +140,27 @@ listeners(GwName) -> [<<"gateway">>, GwName, <<"listeners">>], RawConf)), mapping_listener_m2l(GwName, Listeners). -listener(_GwName, _ListenerId) -> - ok. +-spec listener(binary()) -> {ok, map()} | {error, not_found} | {error, any()}. +listener(ListenerId) -> + {GwName, Type, LName} = emqx_gateway_utils:parse_listener_id(ListenerId), + RootConf = emqx_config:fill_defaults( + emqx_config:get_root_raw([<<"gateway">>])), + try + Path = [<<"gateway">>, GwName, <<"listeners">>, Type, LName], + LConf = emqx_map_lib:deep_get(Path, RootConf), + Running = is_running(binary_to_existing_atom(ListenerId), LConf), + {ok, emqx_map_lib:jsonable_map( + LConf#{ + id => ListenerId, + type => Type, + name => LName, + running => Running})} + catch + error : {config_not_found, _} -> + {error, not_found}; + _Class : Reason -> + {error, Reason} + end. mapping_listener_m2l(GwName, Listeners0) -> Listeners = maps:to_list(Listeners0), @@ -146,6 +174,7 @@ listener(GwName, Type, Conf) -> LConf#{ id => ListenerId, type => Type, + name => LName, running => Running } end || {LName, LConf} <- Conf, is_map(LConf)]. @@ -159,6 +188,28 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> false end. +-spec remove_listener(binary()) -> ok | {error, not_found} | {error, any()}. +remove_listener(ListenerId) -> + {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), + LConf = emqx:get_raw_config( + [<<"gateway">>, GwName, <<"listeners">>, Type] + ), + NLConf = maps:remove(Name, LConf), + emqx_gateway:update_rawconf( + GwName, + #{<<"listeners">> => #{Type => NLConf}} + ). + +-spec update_listener(binary(), map()) -> ok | {error, any()}. +update_listener(ListenerId, NewConf0) -> + {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), + NewConf = maps:without([<<"id">>, <<"name">>, + <<"type">>, <<"running">>], NewConf0), + emqx_gateway:update_rawconf( + GwName, + #{<<"listeners">> => #{Type => #{Name => NewConf}} + }). + %%-------------------------------------------------------------------- %% Mgmt APIs - clients %%-------------------------------------------------------------------- @@ -256,10 +307,61 @@ return_http_error(Code, Msg) -> }) }. -codestr(404) -> 'RESOURCE_NOT_FOUND'; +codestr(400) -> 'BAD_REQUEST'; codestr(401) -> 'NOT_SUPPORTED_NOW'; +codestr(404) -> 'RESOURCE_NOT_FOUND'; codestr(500) -> 'UNKNOW_ERROR'. +-spec with_gateway(binary(), function()) -> any(). +with_gateway(GwName0, Fun) -> + try + GwName = try + binary_to_existing_atom(GwName0) + catch _ : _ -> error(badname) + end, + case emqx_gateway:lookup(GwName) of + undefined -> + return_http_error(404, "Gateway not load"); + Gateway -> + Fun(GwName, Gateway) + end + catch + error : badname -> + return_http_error(404, "Bad gateway name"); + error : {miss_param, K} -> + return_http_error(400, [K, " is required"]); + error : {invalid_listener_id, Id} -> + return_http_error(400, ["invalid listener id: ", Id]); + Class : Reason : Stk -> + ?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p", + [Class, Reason, Stk]), + return_http_error(500, {Class, Reason, Stk}) + end. + +-spec checks(list(), map()) -> ok. +checks([], _) -> + ok; +checks([K|Ks], Map) -> + case maps:is_key(K, Map) of + true -> checks(Ks, Map); + false -> + error({miss_param, K}) + end. + +%%-------------------------------------------------------------------- +%% common schemas + +schema_bad_request() -> + emqx_mgmt_util:error_schema( + <<"Some Params missed">>, ['PARAMETER_MISSED']). +schema_internal_error() -> + emqx_mgmt_util:error_schema( + <<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']). +schema_not_found() -> + emqx_mgmt_util:error_schema(<<"Resource Not Found">>). +schema_no_content() -> + #{description => <<"No Content">>}. + %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 4f19db23b..4773ad604 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -33,6 +33,7 @@ , unix_ts_to_rfc3339/2 , listener_id/3 , parse_listener_id/1 + , parse_listener_id2/1 ]). -export([ stringfy/1 @@ -136,12 +137,17 @@ listener_id(GwName, Type, LisName) -> parse_listener_id(Id) -> try [GwName, Type, Name] = binary:split(bin(Id), <<":">>, [global]), - {binary_to_existing_atom(GwName), binary_to_existing_atom(Type), - binary_to_atom(Name)} + {GwName, Type, Name} catch _ : _ -> error({invalid_listener_id, Id}) end. +parse_listener_id2(Id) -> + {GwName, Type, Name} = parse_listener_id(Id), + {binary_to_existing_atom(GwName), + binary_to_existing_atom(Type), + binary_to_atom(Name)}. + bin(A) when is_atom(A) -> atom_to_binary(A); bin(L) when is_list(L); is_binary(L) -> @@ -161,6 +167,8 @@ unix_ts_to_rfc3339(Ts) -> emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>). -spec stringfy(term()) -> binary(). +stringfy(T) when is_list(T); is_binary(T) -> + iolist_to_binary(T); stringfy(T) -> iolist_to_binary(io_lib:format("~0p", [T])).