chore(gw): add listeners apis

This commit is contained in:
JianBo He 2021-09-10 17:19:32 +08:00
parent 9fd9e09721
commit 18ed1c9794
5 changed files with 565 additions and 130 deletions

View File

@ -20,6 +20,11 @@
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%% minirest behaviour callbacks
@ -55,44 +60,34 @@ gateway(get, Request) ->
{200, emqx_gateway_http:gateways(Status)}.
gateway_insta(delete, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0),
case emqx_gateway:unload(Name) of
ok ->
{204};
{error, not_found} ->
return_http_error(404, <<"Gateway not found">>)
end;
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway:unload(GwName),
{204}
end);
gateway_insta(get, #{bindings := #{name := Name0}}) ->
Name = binary_to_existing_atom(Name0),
case emqx_gateway:lookup(Name) of
#{config := _Config} ->
GwCfs = filled_raw_confs([<<"gateway">>, Name0]),
NGwCfs = GwCfs#{<<"listeners">> =>
emqx_gateway_http:mapping_listener_m2l(
Name0, maps:get(<<"listeners">>, GwCfs, #{})
)
},
{200, NGwCfs};
undefined ->
return_http_error(404, <<"Gateway not found">>)
end;
gateway_insta(put, #{body := RawConfsIn0,
bindings := #{name := Name}
with_gateway(Name0, fun(_, _) ->
GwConf = filled_raw_confs([<<"gateway">>, Name0]),
LisConf = maps:get(<<"listeners">>, GwConf, #{}),
NLisConf = emqx_gateway_http:mapping_listener_m2l(Name0, LisConf),
{200, GwConf#{<<"listeners">> => NLisConf}}
end);
gateway_insta(put, #{body := GwConf0,
bindings := #{name := Name0}
}) ->
RawConfsIn = maps:without([<<"authentication">>,
<<"listeners">>], RawConfsIn0),
%% FIXME: Cluster Consistence ??
case emqx_gateway:update_rawconf(Name, RawConfsIn) of
ok ->
{200};
{error, not_found} ->
return_http_error(404, <<"Gateway not found">>);
{error, Reason} ->
return_http_error(500, Reason)
end.
with_gateway(Name0, fun(_, _) ->
GwConf = maps:without([<<"authentication">>, <<"listeners">>], GwConf0),
case emqx_gateway:update_rawconf(Name0, GwConf) of
ok ->
{200};
{error, not_found} ->
return_http_error(404, "Gateway not found");
{error, Reason} ->
return_http_error(500, Reason)
end
end).
gateway_insta_stats(get, _Req) ->
return_http_error(401, <<"Implement it later (maybe 5.1)">>).
return_http_error(401, "Implement it later (maybe 5.1)").
filled_raw_confs(Path) ->
RawConf = emqx_config:fill_defaults(
@ -131,7 +126,9 @@ swagger("/gateway/:name", get) ->
#{ description => <<"Get the gateway configurations">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_gateway_conf()
}
};
@ -139,7 +136,9 @@ swagger("/gateway/:name", delete) ->
#{ description => <<"Delete/Unload the gateway">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
@ -148,7 +147,9 @@ swagger("/gateway/:name", put) ->
, parameters => params_gateway_name_in_path()
, requestBody => schema_gateway_conf()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
};
@ -156,7 +157,9 @@ swagger("/gateway/:name/stats", get) ->
#{ description => <<"Get gateway Statistic">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_gateway_stats()
}
}.
@ -181,12 +184,6 @@ params_gateway_status_in_qs() ->
%%--------------------------------------------------------------------
%% schemas
schema_not_found() ->
emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
schema_no_content() ->
#{description => <<"No Content">>}.
schema_gateway_overview_list() ->
emqx_mgmt_util:array_schema(
#{ type => object

View File

@ -36,6 +36,11 @@
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%%--------------------------------------------------------------------
@ -71,102 +76,103 @@ apis() ->
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
clients(get, #{ bindings := #{name := GwName0}
clients(get, #{ bindings := #{name := Name0}
, query_string := Qs
}) ->
GwName = binary_to_existing_atom(GwName0),
TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Qs, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(
Qs, TabName,
?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response};
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Qs),
Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response}
end.
with_gateway(Name0, fun(GwName, _) ->
TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Qs, undefined) of
undefined ->
Response = emqx_mgmt_api:cluster_query(
Qs, TabName,
?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response};
Node1 ->
Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Qs),
Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun
),
{200, Response}
end
end).
clients_insta(get, #{ bindings := #{name := GwName0,
clients_insta(get, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
case emqx_gateway_http:lookup_client(GwName, ClientId,
{?MODULE, format_channel_info}) of
[ClientInfo] ->
{200, ClientInfo};
[ClientInfo|_More] ->
?LOG(warning, "More than one client info was returned on ~s",
[ClientId]),
{200, ClientInfo};
[] ->
return_http_error(404, <<"Gateway or ClientId not found">>)
end;
clients_insta(delete, #{ bindings := #{name := GwName0,
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:lookup_client(GwName, ClientId,
{?MODULE, format_channel_info}) of
[ClientInfo] ->
{200, ClientInfo};
[ClientInfo|_More] ->
?LOG(warning, "More than one client info was returned on ~s",
[ClientId]),
{200, ClientInfo};
[] ->
return_http_error(404, "Client not found")
end
end);
clients_insta(delete, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
{200}.
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
{200}
end).
%% FIXME:
%% List the subscription without mountpoint, but has SubOpts,
%% for example, share group ...
subscriptions(get, #{ bindings := #{name := GwName0,
subscriptions(get, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
{error, Reason} ->
return_http_error(404, Reason);
{ok, Subs} ->
{200, Subs}
end;
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
{error, Reason} ->
return_http_error(500, Reason);
{ok, Subs} ->
{200, Subs}
end
end);
%% Create the subscription without mountpoint
subscriptions(post, #{ bindings := #{name := GwName0,
subscriptions(post, #{ bindings := #{name := Name0,
clientid := ClientId0},
body := Body
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
{undefined, _} ->
%% FIXME: more reasonable error code??
return_http_error(404, <<"Request paramter missed: topic">>);
{Topic, QoS} ->
case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
{error, Reason} ->
return_http_error(404, Reason);
ok ->
{200}
end
end;
with_gateway(Name0, fun(GwName, _) ->
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
{undefined, _} ->
return_http_error(400, "Miss topic property");
{Topic, QoS} ->
case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
{error, Reason} ->
return_http_error(404, Reason);
ok ->
{200}
end
end
end);
%% Remove the subscription without mountpoint
subscriptions(delete, #{ bindings := #{name := GwName0,
subscriptions(delete, #{ bindings := #{name := Name0,
clientid := ClientId0,
topic := Topic0
}
}) ->
GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
Topic = emqx_mgmt_util:urldecode(Topic0),
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
{200}.
with_gateway(Name0, fun(GwName, _) ->
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
{200}
end).
%%--------------------------------------------------------------------
%% Utils
@ -379,7 +385,9 @@ swagger("/gateway/:name/clients", get) ->
#{ description => <<"Get the gateway clients">>
, parameters => params_client_query()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_clients_list()
}
};
@ -387,7 +395,9 @@ swagger("/gateway/:name/clients/:clientid", get) ->
#{ description => <<"Get the gateway client infomation">>
, parameters => params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_client()
}
};
@ -395,7 +405,9 @@ swagger("/gateway/:name/clients/:clientid", delete) ->
#{ description => <<"Kick out the gateway client">>
, parameters => params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
@ -403,7 +415,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", get) ->
#{ description => <<"Get the gateway client subscriptions">>
, parameters => params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_subscription_list()
}
};
@ -412,7 +426,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
, parameters => params_client_insta()
, requestBody => schema_subscription()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
};
@ -420,7 +436,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
#{ description => <<"Unsubscribe the topic for client">>
, parameters => params_topic_name_in_path() ++ params_client_insta()
, responses =>
#{ <<"404">> => schema_not_found()
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
}.
@ -483,12 +501,6 @@ queries(Ls) ->
%%--------------------------------------------------------------------
%% schemas
schema_not_found() ->
emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
schema_no_content() ->
#{description => <<"No Content">>}.
schema_clients_list() ->
emqx_mgmt_util:page_schema(
#{ type => object

View File

@ -0,0 +1,316 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_api_listeners).
-behaviour(minirest_api).
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, checks/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%% minirest behaviour callbacks
-export([api_spec/0]).
%% http handlers
-export([ listeners/2
, listeners_insta/2
]).
%%--------------------------------------------------------------------
%% minirest behaviour callbacks
%%--------------------------------------------------------------------
api_spec() ->
{metadata(apis()), []}.
apis() ->
[ {"/gateway/:name/listeners", listeners}
, {"/gateway/:name/listeners/:id", listeners_insta}
].
%%--------------------------------------------------------------------
%% http handlers
listeners(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
{200, emqx_gateway_http:listeners(GwName)}
end);
listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
with_gateway(Name0, fun(GwName, Gateway) ->
RunningConf = maps:get(config, Gateway),
%% XXX: check params miss? check badly data tpye??
_ = checks([<<"type">>, <<"name">>, <<"bind">>], LConf),
Type = binary_to_existing_atom(maps:get(<<"type">>, LConf)),
LName = binary_to_atom(maps:get(<<"name">>, LConf)),
Path = [listeners, Type, LName],
case emqx_map_lib:deep_get(Path, RunningConf, undefined) of
undefined ->
ListenerId = emqx_gateway_utils:listener_id(
GwName, Type, LName),
case emqx_gateway_http:update_listener(
ListenerId, LConf) of
ok ->
{204};
{error, Reason} ->
return_http_error(500, Reason)
end;
_ ->
return_http_error(400, "Listener name has occupied")
end
end).
listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:remove_listener(ListenerId) of
ok -> {204};
{error, not_found} -> {204};
{error, Reason} ->
return_http_error(500, Reason)
end
end);
listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:listener(ListenerId) of
{ok, Listener} ->
{200, Listener};
{error, not_found} ->
return_http_error(404, "Listener not found");
{error, Reason} ->
return_http_error(500, Reason)
end
end);
listeners_insta(put, #{body := LConf,
bindings := #{name := Name0, id := ListenerId0}
}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) ->
case emqx_gateway_http:update_listener(ListenerId, LConf) of
ok ->
{204};
{error, Reason} ->
return_http_error(500, Reason)
end
end).
%%--------------------------------------------------------------------
%% Swagger defines
%%--------------------------------------------------------------------
metadata(APIs) ->
metadata(APIs, []).
metadata([], APIAcc) ->
lists:reverse(APIAcc);
metadata([{Path, Fun}|More], APIAcc) ->
Methods = [get, post, put, delete, patch],
Mds = lists:foldl(fun(M, Acc) ->
try
Acc#{M => swagger(Path, M)}
catch
error : function_clause ->
Acc
end
end, #{}, Methods),
metadata(More, [{Path, Mds, Fun} | APIAcc]).
swagger("/gateway/:name/listeners", get) ->
#{ description => <<"Get the gateway listeners">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_listener_list()
}
};
swagger("/gateway/:name/listeners", post) ->
#{ description => <<"Create the gateway listener">>
, parameters => params_gateway_name_in_path()
, requestBody => schema_listener()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_listener_list()
}
};
swagger("/gateway/:name/listeners/:id", get) ->
#{ description => <<"Get the gateway listener configurations">>
, parameters => params_gateway_name_in_path()
++ params_listener_id_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_listener()
}
};
swagger("/gateway/:name/listeners/:id", delete) ->
#{ description => <<"Delete the gateway listener">>
, parameters => params_gateway_name_in_path()
++ params_listener_id_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/listeners/:id", put) ->
#{ description => <<"Update the gateway listener">>
, parameters => params_gateway_name_in_path()
++ params_listener_id_in_path()
, requestBody => schema_listener()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
}.
%%--------------------------------------------------------------------
%% params defines
params_gateway_name_in_path() ->
[#{ name => name
, in => path
, schema => #{type => string}
, required => true
}].
params_listener_id_in_path() ->
[#{ name => id
, in => path
, schema => #{type => string}
, required => true
}].
%%--------------------------------------------------------------------
%% schemas
schema_listener_list() ->
emqx_mgmt_util:array_schema(
#{ type => object
, properties => properties_listener()
},
<<"Listener list">>
).
schema_listener() ->
emqx_mgmt_util:schema(
#{ type => object
, properties => properties_listener()
}
).
%%--------------------------------------------------------------------
%% properties
properties_listener() ->
emqx_mgmt_util:properties(
raw_properties_common_listener() ++
[ {tcp, object, raw_properties_tcp_opts()}
, {ssl, object, raw_properties_ssl_opts()}
, {udp, object, raw_properties_udp_opts()}
, {dtls, object, raw_properties_dtls_opts()}
]).
raw_properties_tcp_opts() ->
[ {active_n, integer, <<>>}
, {backlog, integer, <<>>}
, {buffer, string, <<>>}
, {recbuf, string, <<>>}
, {sndbuf, string, <<>>}
, {high_watermark, string, <<>>}
, {nodelay, boolean, <<>>}
, {reuseaddr, boolean, <<>>}
, {send_timeout, string, <<>>}
, {send_timeout_close, boolean, <<>>}
].
raw_properties_ssl_opts() ->
[ {cacertfile, string, <<>>}
, {certfile, string, <<>>}
, {keyfile, string, <<>>}
, {verify, string, <<>>}
, {fail_if_no_peer_cert, boolean, <<>>}
, {server_name_indication, boolean, <<>>}
, {depth, integer, <<>>}
, {password, string, <<>>}
, {handshake_timeout, string, <<>>}
, {versions, {array, string}, <<>>}
, {ciphers, {array, string}, <<>>}
, {user_lookup_fun, string, <<>>}
, {reuse_sessions, boolean, <<>>}
, {secure_renegotiate, boolean, <<>>}
, {honor_cipher_order, boolean, <<>>}
, {dhfile, string, <<>>}
].
raw_properties_udp_opts() ->
[ {active_n, integer, <<>>}
, {buffer, string, <<>>}
, {recbuf, string, <<>>}
, {sndbuf, string, <<>>}
, {reuseaddr, boolean, <<>>}
].
raw_properties_dtls_opts() ->
Ls = lists_key_without(
[versions,ciphers,handshake_timeout], 1,
raw_properties_ssl_opts()
),
[ {versions, {array, string}, <<>>}
, {ciphers, {array, string}, <<>>}
| Ls].
lists_key_without([], _N, L) ->
L;
lists_key_without([K|Ks], N, L) ->
lists_key_without(Ks, N, lists:keydelete(K, N, L)).
raw_properties_common_listener() ->
[ {enable, boolean, <<"Whether to enable this listener">>}
, {id, string, <<"Listener Id">>}
, {name, string, <<"Listener name">>}
, {type, string,
<<"Listener type. Enum: tcp, udp, ssl, dtls">>,
[<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]}
, {running, boolean, <<"Listener running status">>}
%% FIXME:
, {bind, string, <<"Listener bind address or port">>}
, {acceptors, integer, <<"Listener acceptors number">>}
, {access_rules, {array, string}, <<"Listener Access rules for client">>}
, {max_conn_rate, integer, <<"Max connection rate for the listener">>}
, {max_connections, integer, <<"Max connections for the listener">>}
, {mountpoint, string,
<<"The Mounpoint for clients of the listener. "
"The gateway-level mountpoint configuration can be overloaded "
"when it is not null or empty string">>}
%% FIXME:
, {authentication, string, <<"NOT-SUPPORTED-NOW">>}
].

View File

@ -26,7 +26,9 @@
%% Mgmt APIs - listeners
-export([ listeners/1
, listener/2
, listener/1
, remove_listener/1
, update_listener/2
, mapping_listener_m2l/2
]).
@ -42,6 +44,12 @@
%% Utils for http, swagger, etc.
-export([ return_http_error/2
, with_gateway/2
, checks/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
-type gateway_summary() ::
@ -108,7 +116,7 @@ get_listeners_status(GwName, Config) ->
lists:map(fun({Type, LisName, ListenOn, _, _}) ->
Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName),
Name = {Name0, ListenOn},
LisO = #{id => Name0, type => Type},
LisO = #{id => Name0, type => Type, name => LisName},
case catch esockd:listener(Name) of
_Pid when is_pid(_Pid) ->
LisO#{running => true};
@ -121,7 +129,8 @@ get_listeners_status(GwName, Config) ->
%% Mgmt APIs - listeners
%%--------------------------------------------------------------------
listeners(GwName) when is_atom (GwName) ->
-spec listeners(atom() | binary()) -> list().
listeners(GwName) when is_atom(GwName) ->
listeners(atom_to_binary(GwName));
listeners(GwName) ->
RawConf = emqx_config:fill_defaults(
@ -131,8 +140,27 @@ listeners(GwName) ->
[<<"gateway">>, GwName, <<"listeners">>], RawConf)),
mapping_listener_m2l(GwName, Listeners).
listener(_GwName, _ListenerId) ->
ok.
-spec listener(binary()) -> {ok, map()} | {error, not_found} | {error, any()}.
listener(ListenerId) ->
{GwName, Type, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
RootConf = emqx_config:fill_defaults(
emqx_config:get_root_raw([<<"gateway">>])),
try
Path = [<<"gateway">>, GwName, <<"listeners">>, Type, LName],
LConf = emqx_map_lib:deep_get(Path, RootConf),
Running = is_running(binary_to_existing_atom(ListenerId), LConf),
{ok, emqx_map_lib:jsonable_map(
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running})}
catch
error : {config_not_found, _} ->
{error, not_found};
_Class : Reason ->
{error, Reason}
end.
mapping_listener_m2l(GwName, Listeners0) ->
Listeners = maps:to_list(Listeners0),
@ -146,6 +174,7 @@ listener(GwName, Type, Conf) ->
LConf#{
id => ListenerId,
type => Type,
name => LName,
running => Running
}
end || {LName, LConf} <- Conf, is_map(LConf)].
@ -159,6 +188,28 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
false
end.
-spec remove_listener(binary()) -> ok | {error, not_found} | {error, any()}.
remove_listener(ListenerId) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
LConf = emqx:get_raw_config(
[<<"gateway">>, GwName, <<"listeners">>, Type]
),
NLConf = maps:remove(Name, LConf),
emqx_gateway:update_rawconf(
GwName,
#{<<"listeners">> => #{Type => NLConf}}
).
-spec update_listener(binary(), map()) -> ok | {error, any()}.
update_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
NewConf = maps:without([<<"id">>, <<"name">>,
<<"type">>, <<"running">>], NewConf0),
emqx_gateway:update_rawconf(
GwName,
#{<<"listeners">> => #{Type => #{Name => NewConf}}
}).
%%--------------------------------------------------------------------
%% Mgmt APIs - clients
%%--------------------------------------------------------------------
@ -256,10 +307,61 @@ return_http_error(Code, Msg) ->
})
}.
codestr(404) -> 'RESOURCE_NOT_FOUND';
codestr(400) -> 'BAD_REQUEST';
codestr(401) -> 'NOT_SUPPORTED_NOW';
codestr(404) -> 'RESOURCE_NOT_FOUND';
codestr(500) -> 'UNKNOW_ERROR'.
-spec with_gateway(binary(), function()) -> any().
with_gateway(GwName0, Fun) ->
try
GwName = try
binary_to_existing_atom(GwName0)
catch _ : _ -> error(badname)
end,
case emqx_gateway:lookup(GwName) of
undefined ->
return_http_error(404, "Gateway not load");
Gateway ->
Fun(GwName, Gateway)
end
catch
error : badname ->
return_http_error(404, "Bad gateway name");
error : {miss_param, K} ->
return_http_error(400, [K, " is required"]);
error : {invalid_listener_id, Id} ->
return_http_error(400, ["invalid listener id: ", Id]);
Class : Reason : Stk ->
?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p",
[Class, Reason, Stk]),
return_http_error(500, {Class, Reason, Stk})
end.
-spec checks(list(), map()) -> ok.
checks([], _) ->
ok;
checks([K|Ks], Map) ->
case maps:is_key(K, Map) of
true -> checks(Ks, Map);
false ->
error({miss_param, K})
end.
%%--------------------------------------------------------------------
%% common schemas
schema_bad_request() ->
emqx_mgmt_util:error_schema(
<<"Some Params missed">>, ['PARAMETER_MISSED']).
schema_internal_error() ->
emqx_mgmt_util:error_schema(
<<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']).
schema_not_found() ->
emqx_mgmt_util:error_schema(<<"Resource Not Found">>).
schema_no_content() ->
#{description => <<"No Content">>}.
%%--------------------------------------------------------------------
%% Internal funcs

View File

@ -33,6 +33,7 @@
, unix_ts_to_rfc3339/2
, listener_id/3
, parse_listener_id/1
, parse_listener_id2/1
]).
-export([ stringfy/1
@ -136,12 +137,17 @@ listener_id(GwName, Type, LisName) ->
parse_listener_id(Id) ->
try
[GwName, Type, Name] = binary:split(bin(Id), <<":">>, [global]),
{binary_to_existing_atom(GwName), binary_to_existing_atom(Type),
binary_to_atom(Name)}
{GwName, Type, Name}
catch
_ : _ -> error({invalid_listener_id, Id})
end.
parse_listener_id2(Id) ->
{GwName, Type, Name} = parse_listener_id(Id),
{binary_to_existing_atom(GwName),
binary_to_existing_atom(Type),
binary_to_atom(Name)}.
bin(A) when is_atom(A) ->
atom_to_binary(A);
bin(L) when is_list(L); is_binary(L) ->
@ -161,6 +167,8 @@ unix_ts_to_rfc3339(Ts) ->
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>).
-spec stringfy(term()) -> binary().
stringfy(T) when is_list(T); is_binary(T) ->
iolist_to_binary(T);
stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])).