emqx/apps/emqx_gateway/src/emqx_gateway_http.erl

585 lines
17 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Gateway Interface Module for HTTP-APIs
-module(emqx_gateway_http).
-include("include/emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_authentication.hrl").
-define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
-import(emqx_gateway_utils, [listener_id/3]).
%% Mgmt APIs - gateway
-export([gateways/1]).
%% Mgmt APIs
-export([
add_listener/2,
remove_listener/1,
update_listener/2
]).
-export([
authn/1,
authn/2,
add_authn/2,
add_authn/3,
update_authn/2,
update_authn/3,
remove_authn/1,
remove_authn/2
]).
%% Mgmt APIs - clients
-export([
lookup_client/3,
kickout_client/2,
list_client_subscriptions/2,
client_subscribe/4,
client_unsubscribe/3
]).
%% Utils for http, swagger, etc.
-export([
return_http_error/2,
with_gateway/2,
with_authn/2,
with_listener_authn/3,
checks/2,
reason2resp/1,
reason2msg/1,
sum_cluster_connections/1
]).
%% RPC
-export([gateway_status/1, cluster_gateway_status/1]).
-type gateway_summary() ::
#{
name := binary(),
status := running | stopped | unloaded,
created_at => binary(),
started_at => binary(),
stopped_at => binary(),
max_connections => integer(),
current_connections => integer(),
listeners => []
}.
-elvis([
{elvis_style, god_modules, disable},
{elvis_style, no_nested_try_catch, disable},
{elvis_style, invalid_dynamic_call, disable}
]).
-define(DEFAULT_CALL_TIMEOUT, 15000).
%%--------------------------------------------------------------------
%% Mgmt APIs - gateway
%%--------------------------------------------------------------------
-spec gateways(Status :: all | running | stopped | unloaded) ->
[gateway_summary()].
gateways(Status) ->
Gateways = lists:map(
fun({GwName, _}) ->
case emqx_gateway:lookup(GwName) of
undefined ->
#{name => GwName, status => unloaded};
GwInfo = #{config := Config} ->
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
[created_at, started_at, stopped_at],
GwInfo
),
GwInfo1 = maps:with(
[
name,
status,
created_at,
started_at,
stopped_at
],
GwInfo0
),
NodeStatus = cluster_gateway_status(GwName),
{MaxCons, CurrCons} = sum_cluster_connections(NodeStatus),
GwInfo1#{
max_connections => MaxCons,
current_connections => CurrCons,
listeners => get_listeners_status(GwName, Config),
node_status => NodeStatus
}
end
end,
emqx_gateway_registry:list()
),
case Status of
all -> Gateways;
_ -> [Gw || Gw = #{status := S} <- Gateways, S == Status]
end.
gateway_status(GwName) ->
case emqx_gateway:lookup(GwName) of
undefined ->
#{node => node(), status => unloaded};
#{status := Status, config := Config} ->
#{
node => node(),
status => Status,
max_connections => max_connections_count(Config),
current_connections => current_connections_count(GwName)
}
end.
cluster_gateway_status(GwName) ->
Nodes = mria_mnesia:running_nodes(),
case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of
{Results, []} ->
Results;
{_, _BadNodes} ->
error(badrpc)
end.
%% @private
max_connections_count(Config) ->
Listeners = emqx_gateway_utils:normalize_config(Config),
lists:foldl(
fun({_, _, _, SocketOpts, _}, Acc) ->
Acc + proplists:get_value(max_connections, SocketOpts, 0)
end,
0,
Listeners
).
%% @private
current_connections_count(GwName) ->
try
InfoTab = emqx_gateway_cm:tabname(info, GwName),
ets:info(InfoTab, size)
catch
_:_ ->
0
end.
%% @private
get_listeners_status(GwName, Config) ->
Listeners = emqx_gateway_utils:normalize_config(Config),
lists:map(
fun({Type, LisName, ListenOn, _, _}) ->
Name0 = listener_id(GwName, Type, LisName),
Name = {Name0, ListenOn},
LisO = #{id => Name0, type => Type, name => LisName},
case catch esockd:listener(Name) of
_Pid when is_pid(_Pid) ->
LisO#{running => true};
_ ->
LisO#{running => false}
end
end,
Listeners
).
%%--------------------------------------------------------------------
%% Mgmt APIs - listeners
%%--------------------------------------------------------------------
-spec add_listener(atom() | binary(), map()) -> {ok, map()}.
add_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
NewConf = maps:without(
[
<<"id">>,
<<"name">>,
<<"type">>,
<<"running">>
],
NewConf0
),
confexp(emqx_gateway_conf:add_listener(GwName, {Type, Name}, NewConf)).
-spec update_listener(atom() | binary(), map()) -> {ok, map()}.
update_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
NewConf = maps:without(
[
<<"id">>,
<<"name">>,
<<"type">>,
<<"running">>
],
NewConf0
),
confexp(emqx_gateway_conf:update_listener(GwName, {Type, Name}, NewConf)).
-spec remove_listener(binary()) -> ok.
remove_listener(ListenerId) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:remove_listener(GwName, {Type, Name})).
-spec authn(gateway_name()) -> map().
authn(GwName) ->
%% XXX: Need append chain-nanme, authenticator-id?
Path = [gateway, GwName, ?AUTHN],
ChainName = emqx_gateway_utils:global_chain(GwName),
wrap_chain_name(
ChainName,
emqx_map_lib:jsonable_map(emqx:get_raw_config(Path))
).
-spec authn(gateway_name(), binary()) -> map().
authn(GwName, ListenerId) ->
{_, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
Path = [gateway, GwName, listeners, Type, Name, ?AUTHN],
ChainName = emqx_gateway_utils:listener_chain(GwName, Type, Name),
wrap_chain_name(
ChainName,
emqx_map_lib:jsonable_map(emqx:get_raw_config(Path))
).
wrap_chain_name(ChainName, Conf) ->
case emqx_authentication:list_authenticators(ChainName) of
{ok, [#{id := Id} | _]} ->
Conf#{chain_name => ChainName, id => Id};
_ ->
Conf
end.
-spec add_authn(gateway_name(), map()) -> {ok, map()}.
add_authn(GwName, AuthConf) ->
confexp(emqx_gateway_conf:add_authn(GwName, AuthConf)).
-spec add_authn(gateway_name(), binary(), map()) -> {ok, map()}.
add_authn(GwName, ListenerId, AuthConf) ->
{_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:add_authn(GwName, {LType, LName}, AuthConf)).
-spec update_authn(gateway_name(), map()) -> {ok, map()}.
update_authn(GwName, AuthConf) ->
confexp(emqx_gateway_conf:update_authn(GwName, AuthConf)).
-spec update_authn(gateway_name(), binary(), map()) -> {ok, map()}.
update_authn(GwName, ListenerId, AuthConf) ->
{_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:update_authn(GwName, {LType, LName}, AuthConf)).
-spec remove_authn(gateway_name()) -> ok.
remove_authn(GwName) ->
confexp(emqx_gateway_conf:remove_authn(GwName)).
-spec remove_authn(gateway_name(), binary()) -> ok.
remove_authn(GwName, ListenerId) ->
{_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:remove_authn(GwName, {LType, LName})).
confexp(ok) -> ok;
confexp({ok, Res}) -> {ok, Res};
confexp({error, Reason}) -> error(Reason).
%%--------------------------------------------------------------------
%% Mgmt APIs - clients
%%--------------------------------------------------------------------
-spec lookup_client(
gateway_name(),
emqx_types:clientid(),
{module(), atom()}
) -> list().
lookup_client(GwName, ClientId, {M, F}) ->
[
begin
Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid),
Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid),
M:F({{ClientId, Pid}, Info, Stats})
end
|| Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)
].
-spec kickout_client(gateway_name(), emqx_types:clientid()) ->
{error, any()}
| ok.
kickout_client(GwName, ClientId) ->
Results = [
emqx_gateway_cm:kick_session(GwName, ClientId, Pid)
|| Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)
],
IsOk = lists:any(fun(Item) -> Item =:= ok end, Results),
case {IsOk, Results} of
{true, _} -> ok;
{_, []} -> {error, not_found};
{false, _} -> lists:last(Results)
end.
-spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) ->
{error, any()}
| {ok, list()}.
list_client_subscriptions(GwName, ClientId) ->
case client_call(GwName, ClientId, subscriptions) of
{error, Reason} ->
{error, Reason};
{ok, Subs} ->
{ok,
lists:map(
fun({Topic, SubOpts}) ->
SubOpts#{topic => Topic}
end,
Subs
)}
end.
-spec client_subscribe(
gateway_name(),
emqx_types:clientid(),
emqx_types:topic(),
emqx_types:subopts()
) ->
{error, any()}
| {ok, {emqx_types:topic(), emqx_types:subopts()}}.
client_subscribe(GwName, ClientId, Topic, SubOpts) ->
client_call(GwName, ClientId, {subscribe, Topic, SubOpts}).
-spec client_unsubscribe(
gateway_name(),
emqx_types:clientid(),
emqx_types:topic()
) ->
{error, any()}
| ok.
client_unsubscribe(GwName, ClientId, Topic) ->
client_call(GwName, ClientId, {unsubscribe, Topic}).
client_call(GwName, ClientId, Req) ->
try
emqx_gateway_cm:call(
GwName,
ClientId,
Req,
?DEFAULT_CALL_TIMEOUT
)
of
undefined ->
{error, not_found};
Res ->
Res
catch
throw:noproc ->
{error, not_found};
throw:{badrpc, Reason} ->
{error, {badrpc, Reason}}
end.
%%--------------------------------------------------------------------
%% Utils
%%--------------------------------------------------------------------
-spec reason2resp({atom(), map()} | any()) -> binary() | any().
reason2resp(R) ->
case reason2msg(R) of
error ->
return_http_error(500, R);
Msg ->
return_http_error(400, Msg)
end.
-spec return_http_error(integer(), any()) -> {integer(), atom(), binary()}.
return_http_error(Code, Msg) ->
{Code, codestr(Code), emqx_gateway_utils:stringfy(Msg)}.
-spec reason2msg({atom(), map()} | any()) -> error | string().
reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
NValue =
case emqx_json:safe_encode(Value) of
{ok, Str} -> Str;
{error, _} -> emqx_gateway_utils:stringfy(Value)
end,
fmtstr(
"Bad config value '~s' for '~s', reason: ~s",
[NValue, Key, emqx_gateway_utils:stringfy(Reason)]
);
reason2msg(
{badres, #{
resource := gateway,
gateway := GwName,
reason := not_found
}}
) ->
fmtstr("The ~s gateway is unloaded", [GwName]);
reason2msg(
{badres, #{
resource := gateway,
gateway := GwName,
reason := already_exist
}}
) ->
fmtstr("The ~s gateway already loaded", [GwName]);
reason2msg(
{badres, #{
resource := listener,
listener := {GwName, LType, LName},
reason := not_found
}}
) ->
fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]);
reason2msg(
{badres, #{
resource := listener,
listener := {GwName, LType, LName},
reason := already_exist
}}
) ->
fmtstr(
"The listener ~s of ~s already exist",
[listener_id(GwName, LType, LName), GwName]
);
reason2msg(
{badres, #{
resource := authn,
gateway := GwName,
reason := not_found
}}
) ->
fmtstr("The authentication not found on ~s", [GwName]);
reason2msg(
{badres, #{
resource := authn,
gateway := GwName,
reason := already_exist
}}
) ->
fmtstr("The authentication already exist on ~s", [GwName]);
reason2msg(
{badres, #{
resource := listener_authn,
listener := {GwName, LType, LName},
reason := not_found
}}
) ->
fmtstr(
"The authentication not found on ~s",
[listener_id(GwName, LType, LName)]
);
reason2msg(
{badres, #{
resource := listener_authn,
listener := {GwName, LType, LName},
reason := already_exist
}}
) ->
fmtstr(
"The authentication already exist on ~s",
[listener_id(GwName, LType, LName)]
);
reason2msg(_) ->
error.
codestr(400) -> 'BAD_REQUEST';
codestr(404) -> 'RESOURCE_NOT_FOUND';
codestr(405) -> 'METHOD_NOT_ALLOWED';
codestr(409) -> 'NOT_SUPPORT';
codestr(500) -> 'UNKNOW_ERROR';
codestr(501) -> 'NOT_IMPLEMENTED'.
fmtstr(Fmt, Args) ->
lists:flatten(io_lib:format(Fmt, Args)).
-spec with_authn(binary(), function()) -> any().
with_authn(GwName0, Fun) ->
with_gateway(GwName0, fun(GwName, _GwConf) ->
Authn = emqx_gateway_http:authn(GwName),
Fun(GwName, Authn)
end).
-spec with_listener_authn(binary(), binary(), function()) -> any().
with_listener_authn(GwName0, Id, Fun) ->
with_gateway(GwName0, fun(GwName, _GwConf) ->
Authn = emqx_gateway_http:authn(GwName, Id),
Fun(GwName, Authn)
end).
-spec with_gateway(binary(), function()) -> any().
with_gateway(GwName0, Fun) ->
try
GwName =
try
binary_to_existing_atom(GwName0)
catch
_:_ -> error(badname)
end,
case emqx_gateway:lookup(GwName) of
undefined ->
return_http_error(404, "Gateway not load");
Gateway ->
Fun(GwName, Gateway)
end
catch
error:badname ->
return_http_error(404, "Bad gateway name");
%% Exceptions from: checks/2
error:{miss_param, K} ->
return_http_error(400, [K, " is required"]);
%% Exceptions from emqx_gateway_utils:parse_listener_id/1
error:{invalid_listener_id, Id} ->
return_http_error(400, ["Invalid listener id: ", Id]);
%% Exceptions from emqx:get_config/1
error:{config_not_found, Path0} ->
Path = lists:concat(
lists:join(".", lists:map(fun to_list/1, Path0))
),
return_http_error(404, "Resource not found. path: " ++ Path);
%% Exceptions from emqx_gateway_conf:convert_certs/2,3
error:{bad_ssl_config, Reason0} ->
Reason = emqx_gateway_utils:stringfy(Reason0),
return_http_error(400, ["Bad SSL config, reason: ", Reason]);
Class:Reason:Stk ->
?SLOG(error, #{
msg => "uncatched_error",
reason => {Class, Reason},
stacktrace => Stk
}),
reason2resp(Reason)
end.
-spec checks(list(), map()) -> ok.
checks([], _) ->
ok;
checks([K | Ks], Map) ->
case maps:is_key(K, Map) of
true -> checks(Ks, Map);
false -> error({miss_param, K})
end.
to_list(A) when is_atom(A) ->
atom_to_list(A);
to_list(B) when is_binary(B) ->
binary_to_list(B).
sum_cluster_connections(List) ->
sum_cluster_connections(List, 0, 0).
%%--------------------------------------------------------------------
%% Internal funcs
sum_cluster_connections(
[#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc
) ->
sum_cluster_connections(T, MaxAcc + Max, Current + CurrAcc);
sum_cluster_connections([_ | T], MaxAcc, CurrAcc) ->
sum_cluster_connections(T, MaxAcc, CurrAcc);
sum_cluster_connections([], MaxAcc, CurrAcc) ->
{MaxAcc, CurrAcc}.