emqx/apps/emqx_gateway/src/emqx_gateway_api.erl

946 lines
31 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%%
-module(emqx_gateway_api).
-include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-behaviour(minirest_api).
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(
emqx_gateway_http,
[
return_http_error/2,
with_gateway/2
]
).
%% minirest/dashboard_swagger behaviour callbacks
-export([
api_spec/0,
paths/0,
schema/1
]).
-export([
roots/0,
fields/1,
listener_schema/0
]).
%% http handlers
-export([
gateways/2,
gateway/2,
gateway_enable/2
]).
-define(KNOWN_GATEWAY_STATUSES, [<<"running">>, <<"stopped">>, <<"unloaded">>]).
-define(TAGS, [<<"Gateways">>]).
%%--------------------------------------------------------------------
%% minirest behaviour callbacks
%%--------------------------------------------------------------------
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/gateways",
"/gateways/:name",
"/gateways/:name/enable/:enable"
].
%%--------------------------------------------------------------------
%% http handlers
gateways(get, Request) ->
Params = maps:get(query_string, Request, #{}),
Status = maps:get(<<"status">>, Params, <<"all">>),
case lists:member(Status, [<<"all">> | ?KNOWN_GATEWAY_STATUSES]) of
true ->
{200, emqx_gateway_http:gateways(binary_to_existing_atom(Status, utf8))};
false ->
return_http_error(
400,
[
"Unknown gateway status in query: ",
Status,
"\n",
"Values allowed: ",
lists:join(", ", ?KNOWN_GATEWAY_STATUSES)
]
)
end.
gateway(get, #{bindings := #{name := Name}}) ->
try
case emqx_gateway:lookup(Name) of
undefined ->
{200, #{name => Name, status => unloaded}};
Gateway ->
GwConf = emqx_gateway_conf:gateway(Name),
GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
[created_at, started_at, stopped_at],
Gateway
),
GwInfo1 = maps:with(
[
name,
status,
created_at,
started_at,
stopped_at
],
GwInfo0
),
{200, maps:merge(GwConf, GwInfo1)}
end
catch
throw:not_found ->
return_http_error(404, <<"NOT FOUND">>)
end;
gateway(put, #{
body := GwConf0,
bindings := #{name := Name}
}) ->
GwConf = maps:without([<<"name">>], GwConf0),
try
LoadOrUpdateF =
case emqx_gateway:lookup(Name) of
undefined ->
fun emqx_gateway_conf:load_gateway/2;
_ ->
fun emqx_gateway_conf:update_gateway/2
end,
case LoadOrUpdateF(Name, GwConf) of
{ok, _} ->
{204};
{error, Reason} ->
emqx_gateway_http:reason2resp(Reason)
end
catch
error:{badconf, _} = Reason1 ->
emqx_gateway_http:reason2resp(Reason1);
throw:not_found ->
return_http_error(404, <<"NOT FOUND">>)
end.
gateway_enable(put, #{bindings := #{name := Name, enable := Enable}}) ->
try
case emqx_gateway:lookup(Name) of
undefined ->
return_http_error(404, <<"NOT FOUND">>);
_Gateway ->
{ok, _} = emqx_gateway_conf:update_gateway(Name, #{<<"enable">> => Enable}),
{204}
end
catch
throw:not_found ->
return_http_error(404, <<"NOT FOUND">>)
end.
%%--------------------------------------------------------------------
%% Swagger defines
%%--------------------------------------------------------------------
schema("/gateways") ->
#{
'operationId' => gateways,
get =>
#{
tags => ?TAGS,
desc => ?DESC(list_gateway),
summary => <<"List all gateways">>,
parameters => params_gateway_status_in_qs(),
responses =>
#{
200 => emqx_dashboard_swagger:schema_with_example(
hoconsc:array(ref(gateway_overview)),
examples_gateway_overview()
),
400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad request">>
)
}
}
};
schema("/gateways/:name") ->
#{
'operationId' => gateway,
get =>
#{
tags => ?TAGS,
desc => ?DESC(get_gateway),
summary => <<"Get gateway">>,
parameters => params_gateway_name_in_path(),
responses =>
#{
200 => schema_gateways_conf(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND, ?RESOURCE_NOT_FOUND], <<"Not Found">>
)
}
},
put =>
#{
tags => ?TAGS,
desc => ?DESC(update_gateway),
% [FIXME] add proper desc
summary => <<"Load or update the gateway confs">>,
parameters => params_gateway_name_in_path(),
'requestBody' => schema_load_or_update_gateways_conf(),
responses =>
?STANDARD_RESP(#{204 => <<"Gateway configuration updated">>})
}
};
schema("/gateways/:name/enable/:enable") ->
#{
'operationId' => gateway_enable,
put =>
#{
tags => ?TAGS,
desc => ?DESC(update_gateway),
summary => <<"Enable or disable gateway">>,
parameters => params_gateway_name_in_path() ++ params_gateway_enable_in_path(),
responses =>
#{
204 => <<"Gateway configuration updated">>,
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND, ?RESOURCE_NOT_FOUND], <<"Not Found">>
)
}
}
}.
%%--------------------------------------------------------------------
%% params defines
params_gateway_name_in_path() ->
[
{name,
mk(
hoconsc:enum(emqx_gateway_schema:gateway_names()),
#{
in => path,
desc => ?DESC(gateway_name_in_qs),
example => <<"stomp">>
}
)}
].
params_gateway_status_in_qs() ->
[
{status,
mk(
binary(),
#{
in => query,
enum => ?KNOWN_GATEWAY_STATUSES,
required => false,
desc => ?DESC(gateway_status_in_qs),
example => <<"running">>
}
)}
].
params_gateway_enable_in_path() ->
[
{enable,
mk(
boolean(),
#{
in => path,
desc => ?DESC(gateway_enable_in_path),
example => true
}
)}
].
%%--------------------------------------------------------------------
%% schemas
roots() ->
[
gateway_overview,
gateway_stats
].
fields(gateway_overview) ->
[
{name,
mk(
binary(),
#{desc => ?DESC(gateway_name)}
)},
{status,
mk(
hoconsc:enum([running, stopped, unloaded]),
#{desc => ?DESC(gateway_status)}
)},
{created_at,
mk(
binary(),
#{desc => ?DESC(gateway_created_at)}
)},
{started_at,
mk(
binary(),
#{
required => false,
desc => ?DESC(gateway_started_at)
}
)},
{stopped_at,
mk(
binary(),
#{
required => false,
desc => ?DESC(gateway_stopped_at)
}
)},
{max_connections,
mk(
pos_integer(),
#{desc => ?DESC(gateway_max_connections)}
)},
{current_connections,
mk(
non_neg_integer(),
#{desc => ?DESC(gateway_current_connections)}
)},
{listeners,
mk(
hoconsc:array(ref(gateway_listener_overview)),
#{
required => {false, recursively},
desc => ?DESC(gateway_listeners)
}
)},
{node_status,
mk(hoconsc:array(ref(gateway_node_status)), #{desc => ?DESC(gateway_node_status)})}
];
fields(gateway_listener_overview) ->
[
{id,
mk(
binary(),
#{desc => ?DESC(gateway_listener_id)}
)},
{running,
mk(
boolean(),
#{desc => ?DESC(gateway_listener_running)}
)},
{type,
mk(
hoconsc:enum([tcp, ssl, udp, dtls]),
#{desc => ?DESC(gateway_listener_type)}
)}
];
fields(gateway_node_status) ->
[
{node, mk(node(), #{desc => ?DESC(node)})},
{status,
mk(
hoconsc:enum([running, stopped, unloaded]),
#{desc => ?DESC(gateway_status)}
)},
{max_connections,
mk(
pos_integer(),
#{desc => ?DESC(gateway_max_connections)}
)},
{current_connections,
mk(
non_neg_integer(),
#{desc => ?DESC(gateway_current_connections)}
)}
];
fields(Gw) when
Gw == stomp;
Gw == mqttsn;
Gw == coap;
Gw == lwm2m;
Gw == exproto;
Gw == gbt32960;
Gw == ocpp;
Gw == jt808
->
[{name, mk(Gw, #{desc => ?DESC(gateway_name)})}] ++
convert_listener_struct(emqx_gateway_schema:gateway_schema(Gw));
fields(Gw) when
Gw == update_stomp;
Gw == update_mqttsn;
Gw == update_coap;
Gw == update_lwm2m;
Gw == update_exproto;
Gw == update_gbt32960;
Gw == update_ocpp;
Gw == update_jt808
->
"update_" ++ GwStr = atom_to_list(Gw),
Gw1 = list_to_existing_atom(GwStr),
remove_listener_and_authn(emqx_gateway_schema:gateway_schema(Gw1));
fields(Listener) when
Listener == tcp_listener;
Listener == ssl_listener;
Listener == udp_listener;
Listener == dtls_listener;
Listener == ws_listener;
Listener == wss_listener
->
Type =
case Listener of
tcp_listener -> tcp;
ssl_listener -> ssl;
udp_listener -> udp;
dtls_listener -> dtls;
ws_listener -> ws;
wss_listener -> wss
end,
[
{id,
mk(
binary(),
#{
desc => ?DESC(gateway_listener_id)
}
)},
{type,
mk(
Type,
#{desc => ?DESC(gateway_listener_type)}
)},
{name,
mk(
binary(),
#{desc => ?DESC(gateway_listener_name)}
)},
{running,
mk(
boolean(),
#{
desc => ?DESC(gateway_listener_running)
}
)}
] ++ emqx_gateway_schema:fields(Listener);
fields(gateway_stats) ->
[{key, mk(binary(), #{})}].
schema_load_or_update_gateways_conf() ->
Names = emqx_gateway_schema:gateway_names(),
emqx_dashboard_swagger:schema_with_examples(
hoconsc:union(
[
ref(?MODULE, Name)
|| Name <-
Names ++
[
erlang:list_to_existing_atom("update_" ++ erlang:atom_to_list(Name))
|| Name <- Names
]
]
),
examples_update_gateway_confs()
).
schema_gateways_conf() ->
emqx_dashboard_swagger:schema_with_examples(
hoconsc:union(
[
ref(?MODULE, Name)
|| Name <- emqx_gateway_schema:gateway_names()
]
),
examples_gateway_confs()
).
convert_listener_struct(Schema) ->
{value, {listeners, #{type := Type}}, Schema1} = lists:keytake(listeners, 1, Schema),
ListenerSchema = hoconsc:mk(
listeners_schema(Type),
#{required => {false, recursively}}
),
lists:keystore(listeners, 1, Schema1, {listeners, ListenerSchema}).
remove_listener_and_authn(Schema) ->
lists:keydelete(
authentication,
1,
lists:keydelete(listeners, 1, Schema)
).
listeners_schema(?R_REF(_Mod, tcp_listeners)) ->
hoconsc:array(hoconsc:union([ref(tcp_listener), ref(ssl_listener)]));
listeners_schema(?R_REF(_Mod, udp_listeners)) ->
hoconsc:array(hoconsc:union([ref(udp_listener), ref(dtls_listener)]));
listeners_schema(?R_REF(_Mod, tcp_udp_listeners)) ->
hoconsc:array(
hoconsc:union([
ref(tcp_listener),
ref(ssl_listener),
ref(udp_listener),
ref(dtls_listener)
])
);
listeners_schema(?R_REF(_Mod, ws_listeners)) ->
hoconsc:array(hoconsc:union([ref(ws_listener), ref(wss_listener)])).
listener_schema() ->
hoconsc:union([
ref(?MODULE, tcp_listener),
ref(?MODULE, ssl_listener),
ref(?MODULE, udp_listener),
ref(?MODULE, dtls_listener),
ref(?MODULE, ws_listener),
ref(?MODULE, wss_listener)
]).
%%--------------------------------------------------------------------
%% examples
examples_gateway_overview() ->
[
#{
name => <<"coap">>,
status => <<"unloaded">>
},
#{
name => <<"exproto">>,
status => <<"unloaded">>
},
#{
name => <<"lwm2m">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000,
listeners =>
[
#{
id => <<"lwm2m:udp:default">>,
type => <<"udp">>,
name => <<"default">>,
running => true
}
],
created_at => <<"2021-12-08T14:41:26.171+08:00">>,
started_at => <<"2021-12-08T14:41:26.202+08:00">>,
node_status => [
#{
node => <<"node@127.0.0.1">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000
}
]
},
#{
name => <<"mqttsn">>,
status => <<"stopped">>,
current_connections => 0,
max_connections => 1024000,
listeners =>
[
#{
id => <<"mqttsn:udp:default">>,
name => <<"default">>,
running => false,
type => <<"udp">>
}
],
created_at => <<"2021-12-08T14:41:45.071+08:00">>,
stopped_at => <<"2021-12-08T14:56:35.576+08:00">>,
node_status => [
#{
node => <<"node@127.0.0.1">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000
}
]
},
#{
name => <<"stomp">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000,
listeners =>
[
#{
id => <<"stomp:tcp:default">>,
name => <<"default">>,
running => true,
type => <<"tcp">>
}
],
created_at => <<"2021-12-08T14:42:15.272+08:00">>,
started_at => <<"2021-12-08T14:42:15.274+08:00">>,
node_status => [
#{
node => <<"node@127.0.0.1">>,
status => <<"running">>,
current_connections => 0,
max_connections => 1024000
}
]
}
].
examples_gateway_confs() ->
#{
stomp_gateway =>
#{
summary => <<"A simple STOMP gateway config">>,
value =>
#{
enable => true,
name => <<"stomp">>,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"stomp/">>,
frame =>
#{
max_headers => 10,
max_headers_length => 1024,
max_body_length => 65535
},
listeners =>
[
#{
type => <<"tcp">>,
name => <<"default">>,
bind => <<"61613">>,
max_connections => 1024000,
max_conn_rate => 1000
}
]
}
},
mqttsn_gateway =>
#{
summary => <<"A simple MQTT-SN gateway config">>,
value =>
#{
enable => true,
name => <<"mqttsn">>,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"mqttsn/">>,
gateway_id => 1,
broadcast => true,
enable_qos3 => true,
predefined =>
[
#{
id => <<"1001">>,
topic => <<"pred/1001">>
},
#{
id => <<"1002">>,
topic => <<"pred/1002">>
}
],
listeners =>
[
#{
type => <<"udp">>,
name => <<"default">>,
bind => <<"1884">>,
max_connections => 1024000,
max_conn_rate => 1000
}
]
}
},
coap_gateway =>
#{
summary => <<"A simple CoAP gateway config">>,
value =>
#{
enable => true,
name => <<"coap">>,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"coap/">>,
heartbeat => <<"30s">>,
connection_required => false,
notify_type => <<"qos">>,
subscribe_qos => <<"coap">>,
publish_qos => <<"coap">>,
listeners =>
[
#{
type => <<"udp">>,
name => <<"default">>,
bind => <<"5683">>,
max_connections => 1024000,
max_conn_rate => 1000
}
]
}
},
lwm2m_gateway =>
#{
summary => <<"A simple LwM2M gateway config">>,
value =>
#{
enable => true,
name => <<"lwm2m">>,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"lwm2m/">>,
xml_dir => <<"/etc/emqx/lwm2m_xml">>,
lifetime_min => <<"1s">>,
lifetime_max => <<"86400s">>,
qmode_time_window => <<"22s">>,
auto_observe => false,
update_msg_publish_condition => <<"always">>,
translators =>
#{
command => #{topic => <<"dn/#">>},
response => #{topic => <<"up/resp">>},
notify => #{topic => <<"up/notify">>},
register => #{topic => <<"up/resp">>},
update => #{topic => <<"up/resp">>}
},
listeners =>
[
#{
type => <<"udp">>,
name => <<"default">>,
bind => <<"5783">>,
max_connections => 1024000,
max_conn_rate => 1000
}
]
}
},
exproto_gateway =>
#{
summary => <<"A simple ExProto gateway config">>,
value =>
#{
enable => true,
name => <<"exproto">>,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"exproto/">>,
server =>
#{bind => <<"9100">>},
handler =>
#{address => <<"http://127.0.0.1:9001">>},
listeners =>
[
#{
type => <<"tcp">>,
name => <<"default">>,
bind => <<"7993">>,
max_connections => 1024000,
max_conn_rate => 1000
}
]
}
},
gbt32960_gateway =>
#{
summary => <<"A simple GBT32960 gateway config">>,
value =>
#{
enable => true,
name => <<"gbt32960">>,
enable_stats => true,
mountpoint => <<"gbt32960/${clientid}">>,
retry_interval => <<"8s">>,
max_retry_times => 3,
message_queue_len => 10,
listeners =>
[
#{
type => <<"tcp">>,
name => <<"default">>,
bind => <<"7325">>,
max_connections => 1024000,
max_conn_rate => 1000
}
]
}
},
ocpp_gateway =>
#{
summary => <<"A simple OCPP gateway config">>,
value =>
#{
enable => true,
name => <<"ocpp">>,
enable_stats => true,
mountpoint => <<"ocpp/">>,
default_heartbeat_interval => <<"60s">>,
upstream =>
#{
topic => <<"cp/${cid}">>,
reply_topic => <<"cp/${cid}/reply">>,
error_topic => <<"cp/${cid}/error">>
},
dnstream => #{topic => <<"cp/${cid}">>},
message_format_checking => disable,
listeners =>
[
#{
type => <<"ws">>,
name => <<"default">>,
bind => <<"33033">>,
max_connections => 1024000
}
]
}
}
}.
examples_update_gateway_confs() ->
#{
stomp_gateway =>
#{
summary => <<"A simple STOMP gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"stomp2/">>,
frame =>
#{
max_headers => 100,
max_headers_length => 10240,
max_body_length => 655350
}
}
},
mqttsn_gateway =>
#{
summary => <<"A simple MQTT-SN gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"mqttsn2/">>,
gateway_id => 1,
broadcast => true,
enable_qos3 => false,
predefined =>
[
#{
id => <<"1003">>,
topic => <<"pred/1003">>
}
]
}
},
coap_gateway =>
#{
summary => <<"A simple CoAP gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"coap2/">>,
heartbeat => <<"30s">>,
connection_required => false,
notify_type => <<"qos">>,
subscribe_qos => <<"coap">>,
publish_qos => <<"coap">>
}
},
lwm2m_gateway =>
#{
summary => <<"A simple LwM2M gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"lwm2m2/">>,
xml_dir => <<"/etc/emqx/lwm2m_xml">>,
lifetime_min => <<"1s">>,
lifetime_max => <<"86400s">>,
qmode_time_window => <<"22s">>,
auto_observe => false,
update_msg_publish_condition => <<"always">>,
translators =>
#{
command => #{topic => <<"dn/#">>},
response => #{topic => <<"up/resp">>},
notify => #{topic => <<"up/notify">>},
register => #{topic => <<"up/resp">>},
update => #{topic => <<"up/resp">>}
}
}
},
exproto_gateway =>
#{
summary => <<"A simple ExProto gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
idle_timeout => <<"30s">>,
mountpoint => <<"exproto2/">>,
server =>
#{bind => <<"9100">>},
handler =>
#{address => <<"http://127.0.0.1:9001">>}
}
},
gbt32960_gateway =>
#{
summary => <<"A simple GBT32960 gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
mountpoint => <<"gbt32960/${clientid}">>,
retry_interval => <<"8s">>,
max_retry_times => 3,
message_queue_len => 10
}
},
ocpp_gateway =>
#{
summary => <<"A simple OCPP gateway config">>,
value =>
#{
enable => true,
enable_stats => true,
mountpoint => <<"ocpp/">>,
default_heartbeat_interval => <<"60s">>,
upstream =>
#{
topic => <<"cp/${cid}">>,
reply_topic => <<"cp/${cid}/reply">>,
error_topic => <<"cp/${cid}/error">>
},
dnstream => #{topic => <<"cp/${cid}">>},
message_format_checking => disable
}
}
}.