Merge pull request #6297 from HJianBo/gw-improve-apis-2

Standardize HTTP-APIs return value and Status/Error Code
This commit is contained in:
JianBo He 2021-12-08 18:02:13 +08:00 committed by GitHub
commit afb2cf19c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 926 additions and 823 deletions

View File

@ -70,9 +70,9 @@ gateway.stomp {
## SSL options ## SSL options
## See ${example_common_ssl_options} for more information ## See ${example_common_ssl_options} for more information
ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
#ssl.verify = verify_none #ssl.verify = verify_none
#ssl.fail_if_no_peer_cert = false #ssl.fail_if_no_peer_cert = false
#ssl.server_name_indication = disable #ssl.server_name_indication = disable

View File

@ -0,0 +1,26 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(INTERNAL_ERROR, 'INTERNAL_SERVER_ERROR').
-define(STANDARD_RESP(R),
R#{ 400 => emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST], <<"Bad request">>)
, 404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Not Found">>)
}).

View File

@ -25,7 +25,7 @@
-export([request/2]). -export([request/2]).
-define(PREFIX, "/gateway/coap/:clientid"). -define(PREFIX, "/gateway/coap/clients/:clientid").
-define(DEF_WAIT_TIME, 10). -define(DEF_WAIT_TIME, 10).
-import(emqx_mgmt_util, [ schema/1 -import(emqx_mgmt_util, [ schema/1

View File

@ -16,22 +16,30 @@
%% %%
-module(emqx_gateway_api). -module(emqx_gateway_api).
-include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx/include/emqx_authentication.hrl"). -include_lib("emqx/include/emqx_authentication.hrl").
-behaviour(minirest_api). -behaviour(minirest_api).
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_gateway_http, -import(emqx_gateway_http,
[ return_http_error/2 [ return_http_error/2
, with_gateway/2 , with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]). ]).
%% minirest behaviour callbacks %% minirest/dashbaord_swagger behaviour callbacks
-export([api_spec/0]). -export([ api_spec/0
, paths/0
, schema/1
]).
-export([ roots/0
, fields/1
]).
%% http handlers %% http handlers
-export([ gateway/2 -export([ gateway/2
@ -44,12 +52,12 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
api_spec() -> api_spec() ->
{metadata(apis()), []}. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
apis() -> paths() ->
[ {"/gateway", gateway} [ "/gateway"
, {"/gateway/:name", gateway_insta} , "/gateway/:name"
, {"/gateway/:name/stats", gateway_insta_stats} , "/gateway/:name/stats"
]. ].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -137,77 +145,54 @@ gateway_insta_stats(get, _Req) ->
%% Swagger defines %% Swagger defines
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
metadata(APIs) -> schema("/gateway") ->
metadata(APIs, []). #{ 'operationId' => gateway,
metadata([], APIAcc) -> get =>
lists:reverse(APIAcc);
metadata([{Path, Fun}|More], APIAcc) ->
Methods = [get, post, put, delete, patch],
Mds = lists:foldl(fun(M, Acc) ->
try
Acc#{M => swagger(Path, M)}
catch
error : function_clause ->
Acc
end
end, #{}, Methods),
metadata(More, [{Path, Mds, Fun} | APIAcc]).
swagger("/gateway", get) ->
#{ description => <<"Get gateway list">> #{ description => <<"Get gateway list">>
, parameters => params_gateway_status_in_qs() , parameters => params_gateway_status_in_qs()
, responses => , responses =>
#{ <<"200">> => schema_gateway_overview_list() } ?STANDARD_RESP(#{200 => ref(gateway_overview)})
}; },
swagger("/gateway", post) -> post =>
#{ description => <<"Load a gateway">> #{ description => <<"Load a gateway">>
, requestBody => schema_gateway_conf() , 'requestBody' => schema_gateways_conf()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(#{201 => schema_gateways_conf()})
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
} }
}; };
swagger("/gateway/:name", get) -> schema("/gateway/:name") ->
#{ 'operationId' => gateway_insta,
get =>
#{ description => <<"Get the gateway configurations">> #{ description => <<"Get the gateway configurations">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(#{200 => schema_gateways_conf()})
, <<"404">> => schema_not_found() },
, <<"500">> => schema_internal_error() delete =>
, <<"200">> => schema_gateway_conf()
}
};
swagger("/gateway/:name", delete) ->
#{ description => <<"Delete/Unload the gateway">> #{ description => <<"Delete/Unload the gateway">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(#{204 => <<"Deleted">>})
, <<"404">> => schema_not_found() },
, <<"500">> => schema_internal_error() put =>
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name", put) ->
#{ description => <<"Update the gateway configurations/status">> #{ description => <<"Update the gateway configurations/status">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, requestBody => schema_gateway_conf() , 'requestBody' => schema_gateways_conf()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(#{200 => schema_gateways_conf()})
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
} }
}; };
swagger("/gateway/:name/stats", get) -> schema("/gateway/:name/stats") ->
#{ 'operationId' => gateway_insta_stats,
get =>
#{ description => <<"Get gateway Statistic">> #{ description => <<"Get gateway Statistic">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(
, <<"404">> => schema_not_found() #{200 => emqx_dashboard_swagger:schema_with_examples(
, <<"500">> => schema_internal_error() ref(gateway_stats),
, <<"200">> => schema_gateway_stats() examples_gateway_stats())
})
} }
}. }.
@ -215,192 +200,154 @@ swagger("/gateway/:name/stats", get) ->
%% params defines %% params defines
params_gateway_name_in_path() -> params_gateway_name_in_path() ->
[#{ name => name [{name,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Gateway Name">>
}]. })}
].
params_gateway_status_in_qs() -> params_gateway_status_in_qs() ->
[#{ name => status [{status,
, in => query mk(binary(),
, schema => #{type => string} #{ in => query
, required => false , nullable => true
}]. , desc => <<"Gateway Status">>
})}
].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% schemas %% schemas
schema_gateway_overview_list() -> roots() ->
emqx_mgmt_util:array_schema( [ gateway_overview
#{ type => object , gateway_stats
, properties => properties_gateway_overview() ].
},
<<"Gateway list">> fields(gateway_overview) ->
[ {name,
mk(string(),
#{ desc => <<"Gateway Name">>})}
, {status,
mk(hoconsc:enum([running, stopped, unloaded]),
#{ desc => <<"The Gateway status">>})}
, {created_at,
mk(string(),
#{desc => <<"The Gateway created datetime">>})}
, {started_at,
mk(string(),
#{ nullable => true
, desc => <<"The Gateway started datetime">>})}
, {stopped_at,
mk(string(),
#{ nullable => true
, desc => <<"The Gateway stopped datetime">>})}
, {max_connections,
mk(integer(),
#{ desc => <<"The Gateway allowed maximum connections/clients">>})}
, {current_connections,
mk(integer(),
#{ desc => <<"The Gateway current connected connections/clients">>
})}
, {listeners,
mk(hoconsc:array(ref(gateway_listener_overview)),
#{ nullable => {true, recursively}
, desc => <<"The Gateway listeners overview">>})}
];
fields(gateway_listener_overview) ->
[ {id,
mk(string(),
#{ desc => <<"Listener ID">>})}
, {running,
mk(boolean(),
#{ desc => <<"Listener Running status">>})}
, {type,
mk(hoconsc:enum([tcp, ssl, udp, dtls]),
#{ desc => <<"Listener Type">>})}
];
fields(Gw) when Gw == stomp; Gw == mqttsn;
Gw == coap; Gw == lwm2m;
Gw == exproto ->
[{name,
mk(string(), #{ desc => <<"Gateway Name">>})}
] ++ convert_listener_struct(emqx_gateway_schema:fields(Gw));
fields(Listener) when Listener == tcp_listener;
Listener == ssl_listener;
Listener == udp_listener;
Listener == dtls_listener ->
[ {id,
mk(string(),
#{ nullable => true
, desc => <<"Listener ID">>})}
, {type,
mk(hoconsc:union([tcp, ssl, udp, dtls]),
#{ desc => <<"Listener type">>})}
, {name,
mk(string(),
#{ desc => <<"Listener Name">>})}
, {running,
mk(boolean(),
#{ nullable => true
, desc => <<"Listener running status">>})}
] ++ emqx_gateway_schema:fields(Listener);
fields(gateway_stats) ->
[{key, mk(string(), #{})}].
schema_gateways_conf() ->
%% XXX: We need convert the emqx_gateway_schema's listener map
%% structure to array
emqx_dashboard_swagger:schema_with_examples(
hoconsc:union([ref(?MODULE, stomp), ref(?MODULE, mqttsn),
ref(?MODULE, coap), ref(?MODULE, lwm2m),
ref(?MODULE, exproto)]),
examples_gateway_confs()
). ).
%% XXX: This is whole confs for all type gateways. It is used to fill the convert_listener_struct(Schema) ->
%% default configurations and generate the swagger-schema {value, {listeners,
%% #{type := Type}}, Schema1} = lists:keytake(listeners, 1, Schema),
%% NOTE: It is a temporary measure to generate swagger-schema ListenerSchema = hoconsc:mk(listeners_schema(Type),
-define(COAP_GATEWAY_CONFS, #{ nullable => {true, recursively}
#{?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY => , desc => <<"The gateway listeners">>
#{<<"mechanism">> => <<"password-based">>, }),
<<"name">> => <<"authenticator1">>, lists:keystore(listeners, 1, Schema1, {listeners, ListenerSchema}).
<<"server_type">> => <<"built-in-database">>,
<<"user_id_type">> => <<"clientid">>},
<<"name">> => <<"coap">>,
<<"enable">> => true,
<<"enable_stats">> => true,<<"heartbeat">> => <<"30s">>,
<<"idle_timeout">> => <<"30s">>,
<<"listeners">> => [
#{<<"id">> => <<"coap:udp:default">>,
<<"type">> => <<"udp">>,
<<"running">> => true,
<<"acceptors">> => 8,<<"bind">> => 5683,
<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240}],
<<"mountpoint">> => <<>>,<<"notify_type">> => <<"qos">>,
<<"publish_qos">> => <<"qos1">>,
<<"subscribe_qos">> => <<"qos0">>}
).
-define(EXPROTO_GATEWAY_CONFS, listeners_schema(?R_REF(_Mod, tcp_listeners)) ->
#{<<"enable">> => true, hoconsc:array(hoconsc:union([ref(tcp_listener), ref(ssl_listener)]));
<<"name">> => <<"exproto">>, listeners_schema(?R_REF(_Mod, udp_listeners)) ->
<<"enable_stats">> => true, hoconsc:array(hoconsc:union([ref(udp_listener), ref(dtls_listener)]));
<<"handler">> => listeners_schema(?R_REF(_Mod, udp_tcp_listeners)) ->
#{<<"address">> => <<"http://127.0.0.1:9001">>}, hoconsc:array(hoconsc:union([ref(tcp_listener), ref(ssl_listener),
<<"idle_timeout">> => <<"30s">>, ref(udp_listener), ref(dtls_listener)])).
<<"listeners">> => [
#{<<"id">> => <<"exproto:tcp:default">>,
<<"type">> => <<"tcp">>,
<<"running">> => true,
<<"acceptors">> => 8,<<"bind">> => 7993,
<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240}],
<<"mountpoint">> => <<>>,
<<"server">> => #{<<"bind">> => 9100}}
).
-define(LWM2M_GATEWAY_CONFS,
#{<<"auto_observe">> => false,
<<"name">> => <<"lwm2m">>,
<<"enable">> => true,
<<"enable_stats">> => true,
<<"idle_timeout">> => <<"30s">>,
<<"lifetime_max">> => <<"86400s">>,
<<"lifetime_min">> => <<"1s">>,
<<"listeners">> => [
#{<<"id">> => <<"lwm2m:udp:default">>,
<<"type">> => <<"udp">>,
<<"running">> => true,
<<"bind">> => 5783}],
<<"mountpoint">> => <<"lwm2m/", ?PH_S_ENDPOINT_NAME, "/">>,
<<"qmode_time_windonw">> => 22,
<<"translators">> =>
#{<<"command">> => <<"dn/#">>,<<"notify">> => <<"up/notify">>,
<<"register">> => <<"up/resp">>,
<<"response">> => <<"up/resp">>,
<<"update">> => <<"up/resp">>},
<<"update_msg_publish_condition">> =>
<<"contains_object_list">>,
<<"xml_dir">> => <<"etc/lwm2m_xml">>}
).
-define(MQTTSN_GATEWAY_CONFS,
#{<<"broadcast">> => true,
<<"clientinfo_override">> =>
#{<<"password">> => <<"abc">>,
<<"username">> => <<"mqtt_sn_user">>},
<<"enable">> => true,
<<"name">> => <<"mqtt-sn">>,
<<"enable_qos3">> => true,<<"enable_stats">> => true,
<<"gateway_id">> => 1,<<"idle_timeout">> => <<"30s">>,
<<"listeners">> => [
#{<<"id">> => <<"mqttsn:udp:default">>,
<<"type">> => <<"udp">>,
<<"running">> => true,
<<"bind">> => 1884,<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240000}],
<<"mountpoint">> => <<>>,
<<"predefined">> =>
[#{<<"id">> => 1,
<<"topic">> => <<"/predefined/topic/name/hello">>},
#{<<"id">> => 2,
<<"topic">> => <<"/predefined/topic/name/nice">>}]}
).
-define(STOMP_GATEWAY_CONFS,
#{?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY =>
#{<<"mechanism">> => <<"password-based">>,
<<"name">> => <<"authenticator1">>,
<<"server_type">> => <<"built-in-database">>,
<<"user_id_type">> => <<"clientid">>},
<<"clientinfo_override">> =>
#{<<"password">> => <<"${Packet.headers.passcode}">>,
<<"username">> => <<"${Packet.headers.login}">>},
<<"enable">> => true,
<<"name">> => <<"stomp">>,
<<"enable_stats">> => true,
<<"frame">> =>
#{<<"max_body_length">> => 8192,<<"max_headers">> => 10,
<<"max_headers_length">> => 1024},
<<"idle_timeout">> => <<"30s">>,
<<"listeners">> => [
#{<<"id">> => <<"stomp:tcp:default">>,
<<"type">> => <<"tcp">>,
<<"running">> => true,
<<"acceptors">> => 16,<<"active_n">> => 100,
<<"bind">> => 61613,<<"max_conn_rate">> => 1000,
<<"max_connections">> => 1024000}],
<<"mountpoint">> => <<>>}
).
%% --- END
schema_gateway_conf() ->
emqx_mgmt_util:schema(
#{oneOf =>
[ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS)
, emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS)
, emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS)
, emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS)
, emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS)
]}).
schema_gateway_stats() ->
emqx_mgmt_util:schema(
#{ type => object
, properties =>
#{ a_key => #{type => string}
}}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% properties %% examples
properties_gateway_overview() -> examples_gateway_confs() ->
ListenerProps = #{ stomp_gateway =>
[ {id, string, #{ summary => <<"A simple STOMP gateway configs">>
<<"Listener ID">>} , value =>
, {running, boolean, #{ enable => true
<<"Listener Running status">>} , enable_stats => true
, {type, string, , idle_timeout => <<"30s">>
<<"Listener Type">>, [<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]} , mountpoint => <<"stomp/">>
], , frame =>
emqx_mgmt_util:properties( #{ max_header => 10
[ {name, string, , make_header_length => 1024
<<"Gateway Name">>} , max_body_length => 65535
, {status, string, }
<<"Gateway Status">>, }
[<<"running">>, <<"stopped">>, <<"unloaded">>]} }
, {created_at, string, , mqttsn_gateway =>
<<>>} #{ summary => <<"A simple MQTT-SN gateway configs">>
, {started_at, string, , value =>
<<>>} #{ enable => true
, {stopped_at, string, , enable_stats => true
<<>>} }
, {max_connections, integer, <<>>} }
, {current_connections, integer, <<>>} }.
, {listeners, {array, object}, ListenerProps}
]). examples_gateway_stats() ->
#{}.

View File

@ -13,17 +13,14 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%
-module(emqx_gateway_api_authn). -module(emqx_gateway_api_authn).
-behaviour(minirest_api). -behaviour(minirest_api).
-include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(INTERNAL_ERROR, 'INTERNAL_SERVER_ERROR').
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
-import(emqx_dashboard_swagger, [error_codes/2]). -import(emqx_dashboard_swagger, [error_codes/2]).
@ -82,17 +79,15 @@ authn(get, #{bindings := #{name := Name0}}) ->
authn(put, #{bindings := #{name := Name0}, authn(put, #{bindings := #{name := Name0},
body := Body}) -> body := Body}) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
%% TODO: return the authn instances? {ok, Authn} = emqx_gateway_http:update_authn(GwName, Body),
ok = emqx_gateway_http:update_authn(GwName, Body), {200, Authn}
{204}
end); end);
authn(post, #{bindings := #{name := Name0}, authn(post, #{bindings := #{name := Name0},
body := Body}) -> body := Body}) ->
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
%% TODO: return the authn instances? {ok, Authn} = emqx_gateway_http:add_authn(GwName, Body),
ok = emqx_gateway_http:add_authn(GwName, Body), {201, Authn}
{204}
end); end);
authn(delete, #{bindings := #{name := Name0}}) -> authn(delete, #{bindings := #{name := Name0}}) ->
@ -164,48 +159,30 @@ schema("/gateway/:name/authentication") ->
#{ description => <<"Get the gateway authentication">> #{ description => <<"Get the gateway authentication">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => schema_authn()
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => schema_authn()
, 204 => <<"Authentication does not initiated">> , 204 => <<"Authentication does not initiated">>
} })
}, },
put => put =>
#{ description => <<"Update authentication for the gateway">> #{ description => <<"Update authentication for the gateway">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, 'requestBody' => schema_authn() , 'requestBody' => schema_authn()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{200 => schema_authn()})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Updated">> %% XXX: ??? return the updated object
}
}, },
post => post =>
#{ description => <<"Add authentication for the gateway">> #{ description => <<"Add authentication for the gateway">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, 'requestBody' => schema_authn() , 'requestBody' => schema_authn()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{201 => schema_authn()})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Added">>
}
}, },
delete => delete =>
#{ description => <<"Remove the gateway authentication">> #{ description => <<"Remove the gateway authentication">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{204 => <<"Deleted">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Deleted">>
}
} }
}; };
schema("/gateway/:name/authentication/users") -> schema("/gateway/:name/authentication/users") ->
@ -215,14 +192,11 @@ schema("/gateway/:name/authentication/users") ->
, parameters => params_gateway_name_in_path() ++ , parameters => params_gateway_name_in_path() ++
params_paging_in_qs() params_paging_in_qs()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
}, },
post => post =>
#{ description => <<"Add user for the authentication">> #{ description => <<"Add user for the authentication">>
@ -231,14 +205,11 @@ schema("/gateway/:name/authentication/users") ->
ref(emqx_authn_api, request_user_create), ref(emqx_authn_api, request_user_create),
emqx_authn_api:request_user_create_examples()) emqx_authn_api:request_user_create_examples())
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 201 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 201 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
} }
}; };
schema("/gateway/:name/authentication/users/:uid") -> schema("/gateway/:name/authentication/users/:uid") ->
@ -249,14 +220,11 @@ schema("/gateway/:name/authentication/users/:uid") ->
, parameters => params_gateway_name_in_path() ++ , parameters => params_gateway_name_in_path() ++
params_userid_in_path() params_userid_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
}, },
put => put =>
#{ description => <<"Update the user info for the gateway " #{ description => <<"Update the user info for the gateway "
@ -267,14 +235,11 @@ schema("/gateway/:name/authentication/users/:uid") ->
ref(emqx_authn_api, request_user_update), ref(emqx_authn_api, request_user_update),
emqx_authn_api:request_user_update_examples()) emqx_authn_api:request_user_update_examples())
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
}, },
delete => delete =>
#{ description => <<"Delete the user for the gateway " #{ description => <<"Delete the user for the gateway "
@ -282,12 +247,7 @@ schema("/gateway/:name/authentication/users/:uid") ->
, parameters => params_gateway_name_in_path() ++ , parameters => params_gateway_name_in_path() ++
params_userid_in_path() params_userid_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{204 => <<"User Deleted">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"User Deleted">>
}
} }
}; };
schema("/gateway/:name/authentication/import_users") -> schema("/gateway/:name/authentication/import_users") ->
@ -300,13 +260,7 @@ schema("/gateway/:name/authentication/import_users") ->
emqx_authn_api:request_import_users_examples() emqx_authn_api:request_import_users_examples()
) )
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{204 => <<"Imported">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
%% XXX: Put a hint message into 204 return ?
, 204 => <<"Imported">>
}
} }
}. }.

View File

@ -16,12 +16,30 @@
-module(emqx_gateway_api_clients). -module(emqx_gateway_api_clients).
-behaviour(minirest_api). -include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% minirest behaviour callbacks -behaviour(minirest_api).
-export([api_spec/0]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
]).
%% minirest/dashbaord_swagger behaviour callbacks
-export([ api_spec/0
, paths/0
, schema/1
]).
-export([ roots/0
, fields/1
]).
%% http handlers %% http handlers
-export([ clients/2 -export([ clients/2
@ -34,27 +52,18 @@
, format_channel_info/1 , format_channel_info/1
]). ]).
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
api_spec() -> api_spec() ->
{metadata(apis()), []}. emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
apis() -> paths() ->
[ {"/gateway/:name/clients", clients} [ "/gateway/:name/clients"
, {"/gateway/:name/clients/:clientid", clients_insta} , "/gateway/:name/clients/:clientid"
, {"/gateway/:name/clients/:clientid/subscriptions", subscriptions} , "/gateway/:name/clients/:clientid/subscriptions"
, {"/gateway/:name/clients/:clientid/subscriptions/:topic", subscriptions} , "/gateway/:name/clients/:clientid/subscriptions/:topic"
]. ].
-define(CLIENT_QS_SCHEMA, -define(CLIENT_QS_SCHEMA,
@ -88,13 +97,15 @@ clients(get, #{ bindings := #{name := Name0}
TabName = emqx_gateway_cm:tabname(info, GwName), TabName = emqx_gateway_cm:tabname(info, GwName),
case maps:get(<<"node">>, Params, undefined) of case maps:get(<<"node">>, Params, undefined) of
undefined -> undefined ->
Response = emqx_mgmt_api:cluster_query(Params, TabName, Response = emqx_mgmt_api:cluster_query(
Params, TabName,
?CLIENT_QS_SCHEMA, ?query_fun), ?CLIENT_QS_SCHEMA, ?query_fun),
emqx_mgmt_util:generate_response(Response); emqx_mgmt_util:generate_response(Response);
Node1 -> Node1 ->
Node = binary_to_atom(Node1, utf8), Node = binary_to_atom(Node1, utf8),
ParamsWithoutNode = maps:without([<<"node">>], Params), ParamsWithoutNode = maps:without([<<"node">>], Params),
Response = emqx_mgmt_api:node_query(Node, ParamsWithoutNode, Response = emqx_mgmt_api:node_query(
Node, ParamsWithoutNode,
TabName, ?CLIENT_QS_SCHEMA, ?query_fun), TabName, ?CLIENT_QS_SCHEMA, ?query_fun),
emqx_mgmt_util:generate_response(Response) emqx_mgmt_util:generate_response(Response)
end end
@ -105,7 +116,8 @@ clients_insta(get, #{ bindings := #{name := Name0,
}) -> }) ->
ClientId = emqx_mgmt_util:urldecode(ClientId0), ClientId = emqx_mgmt_util:urldecode(ClientId0),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:lookup_client(GwName, ClientId, case emqx_gateway_http:lookup_client(
GwName, ClientId,
{?MODULE, format_channel_info}) of {?MODULE, format_channel_info}) of
[ClientInfo] -> [ClientInfo] ->
{200, ClientInfo}; {200, ClientInfo};
@ -154,7 +166,8 @@ subscriptions(post, #{ bindings := #{name := Name0,
{undefined, _} -> {undefined, _} ->
return_http_error(400, "Miss topic property"); return_http_error(400, "Miss topic property");
{Topic, QoS} -> {Topic, QoS} ->
case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of case emqx_gateway_http:client_subscribe(
GwName, ClientId, Topic, QoS) of
{error, Reason} -> {error, Reason} ->
return_http_error(404, Reason); return_http_error(404, Reason);
ok -> ok ->
@ -204,7 +217,8 @@ query(Tab, {Qs, []}, Continuation, Limit) ->
query(Tab, {Qs, Fuzzy}, Continuation, Limit) -> query(Tab, {Qs, Fuzzy}, Continuation, Limit) ->
Ms = qs2ms(Qs), Ms = qs2ms(Qs),
FuzzyFilterFun = fuzzy_filter_fun(Fuzzy), FuzzyFilterFun = fuzzy_filter_fun(Fuzzy),
emqx_mgmt_api:select_table_with_count(Tab, {Ms, FuzzyFilterFun}, Continuation, Limit, emqx_mgmt_api:select_table_with_count(
Tab, {Ms, FuzzyFilterFun}, Continuation, Limit,
fun format_channel_info/1). fun format_channel_info/1).
qs2ms(Qs) -> qs2ms(Qs) ->
@ -218,8 +232,10 @@ qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
qs2ms(Rest, N, {NMtchHead, Conds}); qs2ms(Rest, N, {NMtchHead, Conds});
qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), Holder = binary_to_atom(
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), iolist_to_binary(["$", integer_to_list(N)]), utf8),
NMtchHead = emqx_mgmt_util:merge_maps(
MtchHead, ms(element(1, Qs), Holder)),
NConds = put_conds(Qs, Holder, Conds), NConds = put_conds(Qs, Holder, Conds),
qs2ms(Rest, N+1, {NMtchHead, NConds}). qs2ms(Rest, N+1, {NMtchHead, NConds}).
@ -271,12 +287,14 @@ escape(B) when is_binary(B) ->
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE} | Fuzzy]) -> run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _},
[{Key, _, RE} | Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of Val = case maps:get(Key, ClientInfo, "") of
undefined -> ""; undefined -> "";
V -> V V -> V
end, end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_filter(E, Fuzzy). re:run(Val, RE, [{capture, none}]) == match
andalso run_fuzzy_filter(E, Fuzzy).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% format funcs %% format funcs
@ -294,15 +312,19 @@ format_channel_info({_, Infos, Stats} = R) ->
, {port, {peername, ConnInfo, fun peer_to_port/1}} , {port, {peername, ConnInfo, fun peer_to_port/1}}
, {is_bridge, ClientInfo, false} , {is_bridge, ClientInfo, false}
, {connected_at, , {connected_at,
{connected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} {connected_at, ConnInfo,
fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
, {disconnected_at, , {disconnected_at,
{disconnected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} {disconnected_at, ConnInfo,
, {connected, {conn_state, Infos, fun conn_state_to_connected/1}} fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
, {connected, {conn_state, Infos,
fun conn_state_to_connected/1}}
, {keepalive, ClientInfo, 0} , {keepalive, ClientInfo, 0}
, {clean_start, ConnInfo, true} , {clean_start, ConnInfo, true}
, {expiry_interval, ConnInfo, 0} , {expiry_interval, ConnInfo, 0}
, {created_at, , {created_at,
{created_at, SessInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} {created_at, SessInfo,
fun emqx_gateway_utils:unix_ts_to_rfc3339/1}}
, {subscriptions_cnt, Stats, 0} , {subscriptions_cnt, Stats, 0}
, {subscriptions_max, Stats, infinity} , {subscriptions_max, Stats, infinity}
, {inflight_cnt, Stats, 0} , {inflight_cnt, Stats, 0}
@ -384,275 +406,338 @@ conn_state_to_connected(_) -> false.
%% Swagger defines %% Swagger defines
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
metadata(APIs) -> schema("/gateway/:name/clients") ->
metadata(APIs, []). #{ 'operationId' => clients
metadata([], APIAcc) -> , get =>
lists:reverse(APIAcc); #{ description => <<"Get the gateway client list">>
metadata([{Path, Fun} | More], APIAcc) ->
Methods = [get, post, put, delete, patch],
Mds = lists:foldl(fun(M, Acc) ->
try
Acc#{M => swagger(Path, M)}
catch
error : function_clause ->
Acc
end
end, #{}, Methods),
metadata(More, [{Path, Mds, Fun} | APIAcc]).
swagger("/gateway/:name/clients", get) ->
#{ description => <<"Get the gateway clients">>
, parameters => params_client_query() , parameters => params_client_query()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(
, <<"404">> => schema_not_found() #{ 200 => emqx_dashboard_swagger:schema_with_examples(
, <<"500">> => schema_internal_error() hoconsc:array(ref(client)),
, <<"200">> => schema_clients_list() examples_client_list())})
} }
}; };
swagger("/gateway/:name/clients/:clientid", get) -> schema("/gateway/:name/clients/:clientid") ->
#{ 'operationId' => clients_insta
, get =>
#{ description => <<"Get the gateway client infomation">> #{ description => <<"Get the gateway client infomation">>
, parameters => params_client_insta() , parameters => params_client_insta()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(
, <<"404">> => schema_not_found() #{ 200 => emqx_dashboard_swagger:schema_with_examples(
, <<"500">> => schema_internal_error() ref(client),
, <<"200">> => schema_client() examples_client())})
} }
}; , delete =>
swagger("/gateway/:name/clients/:clientid", delete) ->
#{ description => <<"Kick out the gateway client">> #{ description => <<"Kick out the gateway client">>
, parameters => params_client_insta() , parameters => params_client_insta()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(#{204 => <<"Kicked">>})
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
} }
}; };
swagger("/gateway/:name/clients/:clientid/subscriptions", get) -> schema("/gateway/:name/clients/:clientid/subscriptions") ->
#{ 'operationId' => subscriptions
, get =>
#{ description => <<"Get the gateway client subscriptions">> #{ description => <<"Get the gateway client subscriptions">>
, parameters => params_client_insta() , parameters => params_client_insta()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(
, <<"404">> => schema_not_found() #{ 200 => emqx_dashboard_swagger:schema_with_examples(
, <<"500">> => schema_internal_error() hoconsc:array(ref(subscription)),
, <<"200">> => schema_subscription_list() examples_subsctiption_list())})
} }
}; , post =>
swagger("/gateway/:name/clients/:clientid/subscriptions", post) -> #{ description => <<"Create a subscription membership">>
#{ description => <<"Get the gateway client subscriptions">>
, parameters => params_client_insta() , parameters => params_client_insta()
, requestBody => schema_subscription() %% FIXME:
, requestBody => emqx_dashboard_swagger:schema_with_examples(
ref(subscription),
examples_subsctiption())
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(
, <<"404">> => schema_not_found() #{ 201 => emqx_dashboard_swagger:schema_with_examples(
, <<"500">> => schema_internal_error() ref(subscription),
, <<"204">> => schema_no_content() examples_subsctiption())})
} }
}; };
swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) -> schema("/gateway/:name/clients/:clientid/subscriptions/:topic") ->
#{ description => <<"Unsubscribe the topic for client">> #{ 'operationId' => subscriptions
, delete =>
#{ description => <<"Delete a subscriptions membership">>
, parameters => params_topic_name_in_path() ++ params_client_insta() , parameters => params_topic_name_in_path() ++ params_client_insta()
, responses => , responses =>
#{ <<"400">> => schema_bad_request() ?STANDARD_RESP(#{204 => <<"Unsubscribed">>})
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
} }
}. }.
params_client_query() -> params_client_query() ->
params_gateway_name_in_path() params_gateway_name_in_path()
++ params_client_searching_in_qs() ++ params_client_searching_in_qs()
++ emqx_mgmt_util:page_params(). ++ params_paging().
params_client_insta() -> params_client_insta() ->
params_clientid_in_path() params_clientid_in_path()
++ params_gateway_name_in_path(). ++ params_gateway_name_in_path().
params_client_searching_in_qs() -> params_client_searching_in_qs() ->
queries( M = #{in => query, nullable => true},
[ {node, string} [ {node,
, {clientid, string} mk(binary(),
, {username, string} M#{desc => <<"Match the client's node name">>})}
, {ip_address, string} , {clientid,
, {conn_state, string} mk(binary(),
, {proto_ver, string} M#{desc => <<"Match the client's ID">>})}
, {clean_start, boolean} , {username,
, {like_clientid, string} mk(binary(),
, {like_username, string} M#{desc => <<"Match the client's Username">>})}
, {gte_created_at, string} , {ip_address,
, {lte_created_at, string} mk(binary(),
, {gte_connected_at, string} M#{desc => <<"Match the client's ip address">>})}
, {lte_connected_at, string} , {conn_state,
]). mk(binary(),
M#{desc => <<"Match the client's connection state">>})}
, {proto_ver,
mk(binary(),
M#{desc => <<"Match the client's protocol version">>})}
, {clean_start,
mk(boolean(),
M#{desc => <<"Match the client's clean start flag">>})}
, {like_clientid,
mk(binary(),
M#{desc => <<"Use sub-string to match client's ID">>})}
, {like_username,
mk(binary(),
M#{desc => <<"Use sub-string to match client's username">>})}
, {gte_created_at,
mk(binary(),
M#{desc => <<"Match the session created datetime greater than "
"a certain value">>})}
, {lte_created_at,
mk(binary(),
M#{desc => <<"Match the session created datetime less than "
"a certain value">>})}
, {gte_connected_at,
mk(binary(),
M#{desc => <<"Match the client socket connected datetime greater "
"than a certain value">>})}
, {lte_connected_at,
mk(binary(),
M#{desc => <<"Match the client socket connected datatime less than "
" a certain value">>})}
].
params_paging() ->
[ {page,
mk(integer(),
#{ in => query
, nullable => true
, desc => <<"Page Index">>})}
, {limit,
mk(integer(),
#{ in => query
, desc => <<"Page Limit">>
, nullable => true})}
].
params_gateway_name_in_path() -> params_gateway_name_in_path() ->
[#{ name => name [{name,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Gateway Name">>
}]. })}
].
params_clientid_in_path() -> params_clientid_in_path() ->
[#{ name => clientid [{clientid,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Client ID">>
}]. })}
].
params_topic_name_in_path() -> params_topic_name_in_path() ->
[#{ name => topic [{topic,
, in => path mk(binary(),
, schema => #{type => string} #{ in => path
, required => true , desc => <<"Topic Filter/Name">>
}]. })}
].
queries(Ls) ->
lists:map(fun({K, Type}) ->
#{name => K, in => query,
schema => #{type => Type},
required => false
}
end, Ls).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% schemas %% schemas
schema_clients_list() -> roots() ->
emqx_mgmt_util:page_schema( [ client
#{ type => object , subscription
, properties => properties_client() ].
}
).
schema_client() -> fields(client) ->
emqx_mgmt_util:schema( %% XXX: enum for every protocol's client
#{ type => object [ {node,
, properties => properties_client() mk(string(),
}). #{ desc => <<"Name of the node to which the client is "
"connected">>})}
schema_subscription_list() -> , {clientid,
emqx_mgmt_util:array_schema( mk(string(),
#{ type => object #{ desc => <<"Client identifier">>})}
, properties => properties_subscription() , {username,
}, mk(string(),
<<"Client subscriptions">> #{ desc => <<"Username of client when connecting">>})}
). , {proto_name,
mk(string(),
schema_subscription() -> #{ desc => <<"Client protocol name">>})}
emqx_mgmt_util:schema( , {proto_ver,
#{ type => object mk(string(),
, properties => properties_subscription() #{ desc => <<"Protocol version used by the client">>})}
} , {ip_address,
). mk(string(),
#{ desc => <<"Client's IP address">>})}
%%-------------------------------------------------------------------- , {port,
%% properties defines mk(integer(),
#{ desc => <<"Client's port">>})}
properties_client() -> , {is_bridge,
%% FIXME: enum for every protocol's client mk(boolean(),
emqx_mgmt_util:properties( #{ desc => <<"Indicates whether the client is connected via "
[ {node, string, "bridge">>})}
<<"Name of the node to which the client is connected">>} , {connected_at,
, {clientid, string, mk(string(),
<<"Client identifier">>} #{ desc => <<"Client connection time">>})}
, {username, string, , {disconnected_at,
<<"Username of client when connecting">>} mk(string(),
, {proto_name, string, #{ desc => <<"Client offline time, This field is only valid and "
<<"Client protocol name">>} "returned when connected is false">>})}
, {proto_ver, string, , {connected,
<<"Protocol version used by the client">>} mk(boolean(),
, {ip_address, string, #{ desc => <<"Whether the client is connected">>})}
<<"Client's IP address">>}
, {port, integer,
<<"Client's port">>}
, {is_bridge, boolean,
<<"Indicates whether the client is connectedvia bridge">>}
, {connected_at, string,
<<"Client connection time">>}
, {disconnected_at, string,
<<"Client offline time, This field is only valid and returned "
"when connected is false">>}
, {connected, boolean,
<<"Whether the client is connected">>}
%% FIXME: the will_msg attribute is not a general attribute %% FIXME: the will_msg attribute is not a general attribute
%% for every protocol. But it should be returned to frontend if someone %% for every protocol. But it should be returned to frontend if someone
%% want it %% want it
%% %%
%, {will_msg, string, %, {will_msg,
% <<"Client will message">>} % mk(string(),
%, {zone, string, % #{ desc => <<"Client will message">>})}
% <<"Indicate the configuration group used by the client">>} %, {zone,
, {keepalive, integer, % mk(string(),
<<"keepalive time, with the unit of second">>} % #{ desc => <<"Indicate the configuration group used by the "
, {clean_start, boolean, % "client">>})}
<<"Indicate whether the client is using a brand new session">>} , {keepalive,
, {expiry_interval, integer, mk(integer(),
<<"Session expiration interval, with the unit of second">>} #{ desc => <<"keepalive time, with the unit of second">>})}
, {created_at, string, , {clean_start,
<<"Session creation time">>} mk(boolean(),
, {subscriptions_cnt, integer, #{ desc => <<"Indicate whether the client is using a brand "
<<"Number of subscriptions established by this client">>} "new session">>})}
, {subscriptions_max, integer, , {expiry_interval,
<<"v4 api name [max_subscriptions] Maximum number of " mk(integer(),
"subscriptions allowed by this client">>} #{ desc => <<"Session expiration interval, with the unit of "
, {inflight_cnt, integer, "second">>})}
<<"Current length of inflight">>} , {created_at,
, {inflight_max, integer, mk(string(),
<<"v4 api name [max_inflight]. Maximum length of inflight">>} #{ desc => <<"Session creation time">>})}
, {mqueue_len, integer, , {subscriptions_cnt,
<<"Current length of message queue">>} mk(integer(),
, {mqueue_max, integer, #{ desc => <<"Number of subscriptions established by this "
<<"v4 api name [max_mqueue]. Maximum length of message queue">>} "client">>})}
, {mqueue_dropped, integer, , {subscriptions_max,
<<"Number of messages dropped by the message queue due to " mk(integer(),
"exceeding the length">>} #{ desc => <<"Maximum number of subscriptions allowed by this "
, {awaiting_rel_cnt, integer, "client">>})}
<<"v4 api name [awaiting_rel] Number of awaiting PUBREC packet">>} , {inflight_cnt,
, {awaiting_rel_max, integer, mk(integer(),
<<"v4 api name [max_awaiting_rel]. Maximum allowed number of " #{ desc => <<"Current length of inflight">>})}
"awaiting PUBREC packet">>} , {inflight_max,
, {recv_oct, integer, mk(integer(),
<<"Number of bytes received by EMQ X Broker (the same below)">>} #{ desc => <<"Maximum length of inflight">>})}
, {recv_cnt, integer, , {mqueue_len,
<<"Number of TCP packets received">>} mk(integer(),
, {recv_pkt, integer, #{ desc => <<"Current length of message queue">>})}
<<"Number of MQTT packets received">>} , {mqueue_max,
, {recv_msg, integer, mk(integer(),
<<"Number of PUBLISH packets received">>} #{ desc => <<"Maximum length of message queue">>})}
, {send_oct, integer, , {mqueue_dropped,
<<"Number of bytes sent">>} mk(integer(),
, {send_cnt, integer, #{ desc => <<"Number of messages dropped by the message queue "
<<"Number of TCP packets sent">>} "due to exceeding the length">>})}
, {send_pkt, integer, , {awaiting_rel_cnt,
<<"Number of MQTT packets sent">>} mk(integer(),
, {send_msg, integer, #{ desc => <<"Number of awaiting PUBREC packet">>})}
<<"Number of PUBLISH packets sent">>} , {awaiting_rel_max,
, {mailbox_len, integer, mk(integer(),
<<"Process mailbox size">>} #{ desc => <<"Maximum allowed number of awaiting PUBREC "
, {heap_size, integer, "packet">>})}
<<"Process heap size with the unit of byte">>} , {recv_oct,
, {reductions, integer, mk(integer(),
<<"Erlang reduction">>} #{ desc => <<"Number of bytes received by EMQ X Broker">>})}
]). , {recv_cnt,
mk(integer(),
#{ desc => <<"Number of TCP packets received">>})}
, {recv_pkt,
mk(integer(),
#{ desc => <<"Number of MQTT packets received">>})}
, {recv_msg,
mk(integer(),
#{ desc => <<"Number of PUBLISH packets received">>})}
, {send_oct,
mk(integer(),
#{ desc => <<"Number of bytes sent">>})}
, {send_cnt,
mk(integer(),
#{ desc => <<"Number of TCP packets sent">>})}
, {send_pkt,
mk(integer(),
#{ desc => <<"Number of MQTT packets sent">>})}
, {send_msg,
mk(integer(),
#{ desc => <<"Number of PUBLISH packets sent">>})}
, {mailbox_len,
mk(integer(),
#{ desc => <<"Process mailbox size">>})}
, {heap_size,
mk(integer(),
#{ desc => <<"Process heap size with the unit of byte">>})}
, {reductions,
mk(integer(),
#{ desc => <<"Erlang reduction">>})}
];
fields(subscription) ->
[ {topic,
mk(string(),
#{ desc => <<"Topic Fillter">>})}
, {qos,
mk(integer(),
#{ desc => <<"QoS level, enum: 0, 1, 2">>})}
, {nl,
mk(integer(), %% FIXME: why not boolean?
#{ desc => <<"No Local option, enum: 0, 1">>})}
, {rap,
mk(integer(),
#{ desc => <<"Retain as Published option, enum: 0, 1">>})}
, {rh,
mk(integer(),
#{ desc => <<"Retain Handling option, enum: 0, 1, 2">>})}
, {sub_props,
mk(ref(extra_sub_props),
#{desc => <<"Subscription properties">>})}
];
fields(extra_sub_props) ->
[ {subid,
mk(string(),
#{ desc => <<"Only stomp protocol, an uniquely identity for "
"the subscription. range: 1-65535.">>})}
].
properties_subscription() -> %%--------------------------------------------------------------------
ExtraProps = [ {subid, string, %% examples
<<"Only stomp protocol, an uniquely identity for "
"the subscription. range: 1-65535.">>} examples_client_list() ->
], #{}.
emqx_mgmt_util:properties(
[ {topic, string, examples_client() ->
<<"Topic Fillter">>} #{}.
, {qos, integer,
<<"QoS level, enum: 0, 1, 2">>} examples_subsctiption_list() ->
, {nl, integer, %% FIXME: why not boolean? #{}.
<<"No Local option, enum: 0, 1">>}
, {rap, integer, examples_subsctiption() ->
<<"Retain as Published option, enum: 0, 1">>} #{}.
, {rh, integer,
<<"Retain Handling option, enum: 0, 1, 2">>}
, {sub_props, object, ExtraProps}
]).

View File

@ -18,14 +18,10 @@
-behaviour(minirest_api). -behaviour(minirest_api).
-include("emqx_gateway_http.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
-define(INTERNAL_ERROR, 'INTERNAL_SERVER_ERROR').
-import(hoconsc, [mk/2, ref/1, ref/2]). -import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_dashboard_swagger, [error_codes/2]).
-import(emqx_gateway_http, -import(emqx_gateway_http,
[ return_http_error/2 [ return_http_error/2
@ -93,8 +89,9 @@ listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
undefined -> undefined ->
ListenerId = emqx_gateway_utils:listener_id( ListenerId = emqx_gateway_utils:listener_id(
GwName, Type, LName), GwName, Type, LName),
ok = emqx_gateway_http:add_listener(ListenerId, LConf), {ok, RespConf} = emqx_gateway_http:add_listener(
{204}; ListenerId, LConf),
{201, RespConf};
_ -> _ ->
return_http_error(400, "Listener name has occupied") return_http_error(400, "Listener name has occupied")
end end
@ -123,8 +120,8 @@ listeners_insta(put, #{body := LConf,
}) -> }) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0), ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(_GwName, _) -> with_gateway(Name0, fun(_GwName, _) ->
ok = emqx_gateway_http:update_listener(ListenerId, LConf), {ok, RespConf} = emqx_gateway_http:update_listener(ListenerId, LConf),
{204} {200, RespConf}
end). end).
listeners_insta_authn(get, #{bindings := #{name := Name0, listeners_insta_authn(get, #{bindings := #{name := Name0,
@ -145,16 +142,17 @@ listeners_insta_authn(post, #{body := Conf,
id := ListenerId0}}) -> id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0), ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
ok = emqx_gateway_http:add_authn(GwName, ListenerId, Conf), {ok, Authn} = emqx_gateway_http:add_authn(GwName, ListenerId, Conf),
{204} {201, Authn}
end); end);
listeners_insta_authn(put, #{body := Conf, listeners_insta_authn(put, #{body := Conf,
bindings := #{name := Name0, bindings := #{name := Name0,
id := ListenerId0}}) -> id := ListenerId0}}) ->
ListenerId = emqx_mgmt_util:urldecode(ListenerId0), ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
with_gateway(Name0, fun(GwName, _) -> with_gateway(Name0, fun(GwName, _) ->
ok = emqx_gateway_http:update_authn(GwName, ListenerId, Conf), {ok, Authn} = emqx_gateway_http:update_authn(
{204} GwName, ListenerId, Conf),
{200, Authn}
end); end);
listeners_insta_authn(delete, #{bindings := #{name := Name0, listeners_insta_authn(delete, #{bindings := #{name := Name0,
id := ListenerId0}}) -> id := ListenerId0}}) ->
@ -226,14 +224,11 @@ schema("/gateway/:name/listeners") ->
#{ description => <<"Get the gateway listeners">> #{ description => <<"Get the gateway listeners">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_examples(
hoconsc:array(ref(listener)), hoconsc:array(ref(listener)),
examples_listener_list()) examples_listener_list())
} })
}, },
post => post =>
#{ description => <<"Create the gateway listener">> #{ description => <<"Create the gateway listener">>
@ -242,12 +237,11 @@ schema("/gateway/:name/listeners") ->
ref(listener), ref(listener),
examples_listener()) examples_listener())
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 201 => emqx_dashboard_swagger:schema_with_examples(
, 500 => error_codes([?INTERNAL_ERROR], ref(listener),
<<"Ineternal Server Error">>) examples_listener())
, 204 => <<"Created">> })
}
} }
}; };
schema("/gateway/:name/listeners/:id") -> schema("/gateway/:name/listeners/:id") ->
@ -257,26 +251,18 @@ schema("/gateway/:name/listeners/:id") ->
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
++ params_listener_id_in_path() ++ params_listener_id_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_examples(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_examples(
ref(listener), ref(listener),
examples_listener()) examples_listener())
} })
}, },
delete => delete =>
#{ description => <<"Delete the gateway listener">> #{ description => <<"Delete the gateway listener">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
++ params_listener_id_in_path() ++ params_listener_id_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{204 => <<"Deleted">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Deleted">>
}
}, },
put => put =>
#{ description => <<"Update the gateway listener">> #{ description => <<"Update the gateway listener">>
@ -286,12 +272,11 @@ schema("/gateway/:name/listeners/:id") ->
ref(listener), ref(listener),
examples_listener()) examples_listener())
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_examples(
, 500 => error_codes([?INTERNAL_ERROR], ref(listener),
<<"Ineternal Server Error">>) examples_listener())
, 200 => <<"Updated">> })
}
} }
}; };
schema("/gateway/:name/listeners/:id/authentication") -> schema("/gateway/:name/listeners/:id/authentication") ->
@ -301,13 +286,10 @@ schema("/gateway/:name/listeners/:id/authentication") ->
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
++ params_listener_id_in_path() ++ params_listener_id_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => schema_authn()
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => schema_authn()
, 204 => <<"Authentication does not initiated">> , 204 => <<"Authentication does not initiated">>
} })
}, },
post => post =>
#{ description => <<"Add authentication for the listener">> #{ description => <<"Add authentication for the listener">>
@ -315,12 +297,7 @@ schema("/gateway/:name/listeners/:id/authentication") ->
++ params_listener_id_in_path() ++ params_listener_id_in_path()
, 'requestBody' => schema_authn() , 'requestBody' => schema_authn()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{201 => schema_authn()})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Added">>
}
}, },
put => put =>
#{ description => <<"Update authentication for the listener">> #{ description => <<"Update authentication for the listener">>
@ -328,24 +305,14 @@ schema("/gateway/:name/listeners/:id/authentication") ->
++ params_listener_id_in_path() ++ params_listener_id_in_path()
, 'requestBody' => schema_authn() , 'requestBody' => schema_authn()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{200 => schema_authn()})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Updated">>
}
}, },
delete => delete =>
#{ description => <<"Remove authentication for the listener">> #{ description => <<"Remove authentication for the listener">>
, parameters => params_gateway_name_in_path() , parameters => params_gateway_name_in_path()
++ params_listener_id_in_path() ++ params_listener_id_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{200 => <<"Deleted">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 204 => <<"Deleted">>
}
} }
}; };
schema("/gateway/:name/listeners/:id/authentication/users") -> schema("/gateway/:name/listeners/:id/authentication/users") ->
@ -356,14 +323,11 @@ schema("/gateway/:name/listeners/:id/authentication/users") ->
params_listener_id_in_path() ++ params_listener_id_in_path() ++
params_paging_in_qs() params_paging_in_qs()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
}, },
post => post =>
#{ description => <<"Add user for the authentication">> #{ description => <<"Add user for the authentication">>
@ -373,14 +337,11 @@ schema("/gateway/:name/listeners/:id/authentication/users") ->
ref(emqx_authn_api, request_user_create), ref(emqx_authn_api, request_user_create),
emqx_authn_api:request_user_create_examples()) emqx_authn_api:request_user_create_examples())
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 201 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 201 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
} }
}; };
schema("/gateway/:name/listeners/:id/authentication/users/:uid") -> schema("/gateway/:name/listeners/:id/authentication/users/:uid") ->
@ -392,14 +353,11 @@ schema("/gateway/:name/listeners/:id/authentication/users/:uid") ->
params_listener_id_in_path() ++ params_listener_id_in_path() ++
params_userid_in_path() params_userid_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
}, },
put => put =>
#{ description => <<"Update the user info for the gateway " #{ description => <<"Update the user info for the gateway "
@ -411,14 +369,11 @@ schema("/gateway/:name/listeners/:id/authentication/users/:uid") ->
ref(emqx_authn_api, request_user_update), ref(emqx_authn_api, request_user_update),
emqx_authn_api:request_user_update_examples()) emqx_authn_api:request_user_update_examples())
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>) #{ 200 => emqx_dashboard_swagger:schema_with_example(
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user), ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples()) emqx_authn_api:response_user_examples())
} })
}, },
delete => delete =>
#{ description => <<"Delete the user for the gateway " #{ description => <<"Delete the user for the gateway "
@ -427,14 +382,7 @@ schema("/gateway/:name/listeners/:id/authentication/users/:uid") ->
params_listener_id_in_path() ++ params_listener_id_in_path() ++
params_userid_in_path() params_userid_in_path()
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{204 => <<"Deleted">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
, 200 => emqx_dashboard_swagger:schema_with_example(
ref(emqx_authn_api, response_user),
emqx_authn_api:response_user_examples())
}
} }
}; };
schema("/gateway/:name/listeners/:id/authentication/import_users") -> schema("/gateway/:name/listeners/:id/authentication/import_users") ->
@ -448,13 +396,7 @@ schema("/gateway/:name/listeners/:id/authentication/import_users") ->
emqx_authn_api:request_import_users_examples() emqx_authn_api:request_import_users_examples()
) )
, responses => , responses =>
#{ 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) ?STANDARD_RESP(#{204 => <<"Imported">>})
, 404 => error_codes([?NOT_FOUND], <<"Not Found">>)
, 500 => error_codes([?INTERNAL_ERROR],
<<"Ineternal Server Error">>)
%% XXX: Put a hint message into 204 return ?
, 204 => <<"Imported">>
}
} }
}. }.
@ -638,7 +580,96 @@ common_listener_opts() ->
%% examples %% examples
examples_listener_list() -> examples_listener_list() ->
#{stomp_listeners => [examples_listener()]}. [Config || #{value := Config} <- maps:values(examples_listener())].
examples_listener() -> examples_listener() ->
#{}. #{ tcp_listener=>
#{ summary => <<"A simple tcp listener example">>
, value =>
#{ bind => <<"61613">>
, acceptors => 16
, max_connections => 1024000
, max_conn_rate => 1000
, tcp =>
#{ active_n => 100
, backlog => 1024
, send_timeout => <<"15s">>
, send_timeout_close => true
, recbuf => <<"10KB">>
, sndbuf => <<"10KB">>
, buffer => <<"10KB">>
, high_watermark => <<"1MB">>
, nodelay => false
, reuseaddr => true
}
}
}
, ssl_listener =>
#{ summary => <<"A simple ssl listener example">>
, value =>
#{ bind => <<"61614">>
, acceptors => 16
, max_connections => 1024000
, max_conn_rate => 1000
, access_rules => [<<"allow all">>]
, ssl =>
#{ versions => [<<"tlsv1.3">>, <<"tlsv1.2">>,
<<"tlsv1.1">>, <<"tlsv1">>]
, cacertfile => <<"etc/certs/cacert.pem">>
, certfile => <<"etc/certs/cert.pem">>
, keyfile => <<"etc/certs/key.pem">>
, verify => <<"verify_none">>
, fail_if_no_peer_cert => false
, server_name_indication => disable
}
, tcp =>
#{ active_n => 100
, backlog => 1024
}
}
}
, udp_listener =>
#{ summary => <<"A simple udp listener example">>
, value =>
#{ bind => <<"0.0.0.0:1884">>
, udp =>
#{ active_n => 100
, recbuf => <<"10KB">>
, sndbuf => <<"10KB">>
, buffer => <<"10KB">>
, reuseaddr => true
}
}
}
, dtls_listener =>
#{ summary => <<"A simple dtls listener example">>
, value =>
#{ bind => <<"5684">>
, acceptors => 16
, max_connections => 1024000
, max_conn_rate => 1000
, access_rules => [<<"allow all">>]
, ssl =>
#{ versions => [<<"dtlsv1.2">>, <<"dtlsv1">>]
, cacertfile => <<"etc/certs/cacert.pem">>
, certfile => <<"etc/certs/cert.pem">>
, keyfile => <<"etc/certs/key.pem">>
, verify => <<"verify_none">>
, fail_if_no_peer_cert => false
, server_name_indication => disable
}
, tcp =>
#{ active_n => 100
, backlog => 1024
}
}
}
, dtls_listener_with_psk_ciphers =>
#{ summary => <<"todo">>
, value =>
#{}
}
, lisetner_with_authn =>
#{ summary => <<"todo">>
, value => #{}}
}.

View File

@ -59,7 +59,8 @@
-define(AUTHN_BIN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY). -define(AUTHN_BIN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY).
-type atom_or_bin() :: atom() | binary(). -type atom_or_bin() :: atom() | binary().
-type ok_or_err() :: ok_or_err(). -type ok_or_err() :: ok | {error, term()}.
-type map_or_err() :: {ok, map()} | {error, term()}.
-type listener_ref() :: {ListenerType :: atom_or_bin(), -type listener_ref() :: {ListenerType :: atom_or_bin(),
ListenerName :: atom_or_bin()}. ListenerName :: atom_or_bin()}.
@ -85,7 +86,8 @@ load_gateway(GwName, Conf) ->
{Ls, Conf1} -> {Ls, Conf1} ->
Conf1#{<<"listeners">> => unconvert_listeners(Ls)} Conf1#{<<"listeners">> => unconvert_listeners(Ls)}
end, end,
update({?FUNCTION_NAME, bin(GwName), NConf}). %% TODO:
ret_ok_err(update({?FUNCTION_NAME, bin(GwName), NConf})).
%% @doc convert listener array to map %% @doc convert listener array to map
unconvert_listeners(Ls) when is_list(Ls) -> unconvert_listeners(Ls) when is_list(Ls) ->
@ -111,13 +113,14 @@ update_gateway(GwName, Conf0) ->
Exclude0 = [listeners, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM], Exclude0 = [listeners, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM],
Exclude1 = [atom_to_binary(K, utf8) || K <- Exclude0], Exclude1 = [atom_to_binary(K, utf8) || K <- Exclude0],
Conf = maps:without(Exclude0 ++ Exclude1, Conf0), Conf = maps:without(Exclude0 ++ Exclude1, Conf0),
update({?FUNCTION_NAME, bin(GwName), Conf}).
ret_ok_err(update({?FUNCTION_NAME, bin(GwName), Conf})).
%% FIXME: delete cert files ?? %% FIXME: delete cert files ??
-spec unload_gateway(atom_or_bin()) -> ok_or_err(). -spec unload_gateway(atom_or_bin()) -> ok_or_err().
unload_gateway(GwName) -> unload_gateway(GwName) ->
update({?FUNCTION_NAME, bin(GwName)}). ret_ok_err(update({?FUNCTION_NAME, bin(GwName)})).
%% @doc Get the gateway configurations. %% @doc Get the gateway configurations.
%% Missing fields are filled with default values. This function is typically %% Missing fields are filled with default values. This function is typically
@ -139,18 +142,20 @@ convert_listeners(GwName, Ls) when is_map(Ls) ->
lists:append([do_convert_listener(GwName, Type, maps:to_list(Conf)) lists:append([do_convert_listener(GwName, Type, maps:to_list(Conf))
|| {Type, Conf} <- maps:to_list(Ls)]). || {Type, Conf} <- maps:to_list(Ls)]).
do_convert_listener(GwName, Type, Conf) -> do_convert_listener(GwName, LType, Conf) ->
[begin [ do_convert_listener2(GwName, LType, LName, LConf)
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LName), || {LName, LConf} <- Conf, is_map(LConf)].
do_convert_listener2(GwName, LType, LName, LConf) ->
ListenerId = emqx_gateway_utils:listener_id(GwName, LType, LName),
Running = emqx_gateway_utils:is_running(ListenerId, LConf), Running = emqx_gateway_utils:is_running(ListenerId, LConf),
bind2str( bind2str(
LConf#{ LConf#{
id => ListenerId, id => ListenerId,
type => Type, type => LType,
name => LName, name => LName,
running => Running running => Running
}) }).
end || {LName, LConf} <- Conf, is_map(LConf)].
bind2str(LConf = #{bind := Bind}) when is_integer(Bind) -> bind2str(LConf = #{bind := Bind}) when is_integer(Bind) ->
maps:put(bind, integer_to_binary(Bind), LConf); maps:put(bind, integer_to_binary(Bind), LConf);
@ -194,47 +199,55 @@ listener(ListenerId) ->
{error, Reason} {error, Reason}
end. end.
-spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). -spec add_listener(atom_or_bin(), listener_ref(), map()) -> map_or_err().
add_listener(GwName, ListenerRef, Conf) -> add_listener(GwName, ListenerRef, Conf) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf}). ret_listener_or_err(
GwName, ListenerRef,
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf})).
-spec update_listener(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). -spec update_listener(atom_or_bin(), listener_ref(), map()) -> map_or_err().
update_listener(GwName, ListenerRef, Conf) -> update_listener(GwName, ListenerRef, Conf) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf}). ret_listener_or_err(
GwName, ListenerRef,
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf})).
-spec remove_listener(atom_or_bin(), listener_ref()) -> ok_or_err(). -spec remove_listener(atom_or_bin(), listener_ref()) -> ok_or_err().
remove_listener(GwName, ListenerRef) -> remove_listener(GwName, ListenerRef) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef)}). ret_ok_err(update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef)})).
-spec add_authn(atom_or_bin(), map()) -> ok_or_err(). -spec add_authn(atom_or_bin(), map()) -> map_or_err().
add_authn(GwName, Conf) -> add_authn(GwName, Conf) ->
update({?FUNCTION_NAME, bin(GwName), Conf}). ret_authn(GwName, update({?FUNCTION_NAME, bin(GwName), Conf})).
-spec add_authn(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). -spec add_authn(atom_or_bin(), listener_ref(), map()) -> map_or_err().
add_authn(GwName, ListenerRef, Conf) -> add_authn(GwName, ListenerRef, Conf) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf}). ret_authn(
GwName, ListenerRef,
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf})).
-spec update_authn(atom_or_bin(), map()) -> ok_or_err(). -spec update_authn(atom_or_bin(), map()) -> map_or_err().
update_authn(GwName, Conf) -> update_authn(GwName, Conf) ->
update({?FUNCTION_NAME, bin(GwName), Conf}). ret_authn(GwName, update({?FUNCTION_NAME, bin(GwName), Conf})).
-spec update_authn(atom_or_bin(), listener_ref(), map()) -> ok_or_err(). -spec update_authn(atom_or_bin(), listener_ref(), map()) -> map_or_err().
update_authn(GwName, ListenerRef, Conf) -> update_authn(GwName, ListenerRef, Conf) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf}). ret_authn(
GwName, ListenerRef,
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef), Conf})).
-spec remove_authn(atom_or_bin()) -> ok_or_err(). -spec remove_authn(atom_or_bin()) -> ok_or_err().
remove_authn(GwName) -> remove_authn(GwName) ->
update({?FUNCTION_NAME, bin(GwName)}). ret_ok_err(update({?FUNCTION_NAME, bin(GwName)})).
-spec remove_authn(atom_or_bin(), listener_ref()) -> ok_or_err(). -spec remove_authn(atom_or_bin(), listener_ref()) -> ok_or_err().
remove_authn(GwName, ListenerRef) -> remove_authn(GwName, ListenerRef) ->
update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef)}). ret_ok_err(update({?FUNCTION_NAME, bin(GwName), bin(ListenerRef)})).
%% @private %% @private
update(Req) -> update(Req) ->
res(emqx_conf:update([gateway], Req, #{override_to => cluster})). res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
res({ok, _Result}) -> ok; res({ok, Result}) -> {ok, Result};
res({error, {pre_config_update,emqx_gateway_conf,Reason}}) -> {error, Reason}; res({error, {pre_config_update,emqx_gateway_conf,Reason}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}. res({error, Reason}) -> {error, Reason}.
@ -245,6 +258,32 @@ bin(A) when is_atom(A) ->
bin(B) when is_binary(B) -> bin(B) when is_binary(B) ->
B. B.
ret_ok_err({ok, _}) -> ok;
ret_ok_err(Err) -> Err.
ret_authn(GwName, {ok, #{raw_config := GwConf}}) ->
Authn = emqx_map_lib:deep_get(
[bin(GwName), <<"authentication">>],
GwConf),
{ok, Authn};
ret_authn(_GwName, Err) -> Err.
ret_authn(GwName, {LType, LName}, {ok, #{raw_config := GwConf}}) ->
Authn = emqx_map_lib:deep_get(
[bin(GwName), <<"listeners">>, bin(LType),
bin(LName), <<"authentication">>],
GwConf),
{ok, Authn};
ret_authn(_, _, Err) -> Err.
ret_listener_or_err(GwName, {LType, LName}, {ok, #{raw_config := GwConf}}) ->
LConf = emqx_map_lib:deep_get(
[bin(GwName), <<"listeners">>, bin(LType), bin(LName)],
GwConf),
{ok, do_convert_listener2(GwName, LType, LName, LConf)};
ret_listener_or_err(_, _, Err) ->
Err.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Config Handler %% Config Handler
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -146,14 +146,14 @@ get_listeners_status(GwName, Config) ->
%% Mgmt APIs - listeners %% Mgmt APIs - listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec add_listener(atom() | binary(), map()) -> ok. -spec add_listener(atom() | binary(), map()) -> {ok, map()}.
add_listener(ListenerId, NewConf0) -> add_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
NewConf = maps:without([<<"id">>, <<"name">>, NewConf = maps:without([<<"id">>, <<"name">>,
<<"type">>, <<"running">>], NewConf0), <<"type">>, <<"running">>], NewConf0),
confexp(emqx_gateway_conf:add_listener(GwName, {Type, Name}, NewConf)). confexp(emqx_gateway_conf:add_listener(GwName, {Type, Name}, NewConf)).
-spec update_listener(atom() | binary(), map()) -> ok. -spec update_listener(atom() | binary(), map()) -> {ok, map()}.
update_listener(ListenerId, NewConf0) -> update_listener(ListenerId, NewConf0) ->
{GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
@ -194,23 +194,23 @@ wrap_chain_name(ChainName, Conf) ->
Conf Conf
end. end.
-spec add_authn(gateway_name(), map()) -> ok. -spec add_authn(gateway_name(), map()) -> {ok, map()}.
add_authn(GwName, AuthConf) -> add_authn(GwName, AuthConf) ->
confexp(emqx_gateway_conf:add_authn(GwName, AuthConf)). confexp(emqx_gateway_conf:add_authn(GwName, AuthConf)).
-spec add_authn(gateway_name(), binary(), map()) -> ok. -spec add_authn(gateway_name(), binary(), map()) -> {ok, map()}.
add_authn(GwName, ListenerId, AuthConf) -> add_authn(GwName, ListenerId, AuthConf) ->
{_, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:add_authn(GwName, {Type, Name}, AuthConf)). confexp(emqx_gateway_conf:add_authn(GwName, {LType, LName}, AuthConf)).
-spec update_authn(gateway_name(), map()) -> ok. -spec update_authn(gateway_name(), map()) -> {ok, map()}.
update_authn(GwName, AuthConf) -> update_authn(GwName, AuthConf) ->
confexp(emqx_gateway_conf:update_authn(GwName, AuthConf)). confexp(emqx_gateway_conf:update_authn(GwName, AuthConf)).
-spec update_authn(gateway_name(), binary(), map()) -> ok. -spec update_authn(gateway_name(), binary(), map()) -> {ok, map()}.
update_authn(GwName, ListenerId, AuthConf) -> update_authn(GwName, ListenerId, AuthConf) ->
{_, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:update_authn(GwName, {Type, Name}, AuthConf)). confexp(emqx_gateway_conf:update_authn(GwName, {LType, LName}, AuthConf)).
-spec remove_authn(gateway_name()) -> ok. -spec remove_authn(gateway_name()) -> ok.
remove_authn(GwName) -> remove_authn(GwName) ->
@ -218,10 +218,11 @@ remove_authn(GwName) ->
-spec remove_authn(gateway_name(), binary()) -> ok. -spec remove_authn(gateway_name(), binary()) -> ok.
remove_authn(GwName, ListenerId) -> remove_authn(GwName, ListenerId) ->
{_, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId), {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
confexp(emqx_gateway_conf:remove_authn(GwName, {Type, Name})). confexp(emqx_gateway_conf:remove_authn(GwName, {LType, LName})).
confexp(ok) -> ok; confexp(ok) -> ok;
confexp({ok, Res}) -> {ok, Res};
confexp({error, not_found}) -> confexp({error, not_found}) ->
error({update_conf_error, not_found}); error({update_conf_error, not_found});
confexp({error, already_exist}) -> confexp({error, already_exist}) ->

View File

@ -143,9 +143,9 @@ The client just sends its PUBLISH messages to a GW"
sc(hoconsc:array(ref(mqttsn_predefined)), sc(hoconsc:array(ref(mqttsn_predefined)),
#{ default => [] #{ default => []
, desc => , desc =>
"The Pre-defined topic ids and topic names.<br> <<"The Pre-defined topic ids and topic names.<br>
A 'pre-defined' topic id is a topic id whose mapping to a topic name A 'pre-defined' topic id is a topic id whose mapping to a topic name
is known in advance by both the client's application and the gateway" is known in advance by both the clients application and the gateway">>
})} })}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();

View File

@ -23,7 +23,7 @@
-export([lookup_cmd/2, observe/2, read/2, write/2]). -export([lookup_cmd/2, observe/2, read/2, write/2]).
-define(PATH(Suffix), "/gateway/lwm2m/:clientid"Suffix). -define(PATH(Suffix), "/gateway/lwm2m/clients/:clientid"Suffix).
-define(DATA_TYPE, ['Integer', 'Float', 'Time', 'String', 'Boolean', 'Opaque', 'Objlnk']). -define(DATA_TYPE, ['Integer', 'Float', 'Time', 'String', 'Boolean', 'Opaque', 'Objlnk']).
-import(hoconsc, [mk/2, ref/1, ref/2]). -import(hoconsc, [mk/2, ref/1, ref/2]).

View File

@ -40,7 +40,9 @@ gateway.coap {
-define(HOST, "127.0.0.1"). -define(HOST, "127.0.0.1").
-define(PORT, 5683). -define(PORT, 5683).
-define(CONN_URI, "coap://127.0.0.1/mqtt/connection?clientid=client1&username=admin&password=public"). -define(CONN_URI,
"coap://127.0.0.1/mqtt/connection?clientid=client1&"
"username=admin&password=public").
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
@ -67,7 +69,7 @@ end_per_suite(Config) ->
t_send_request_api(_) -> t_send_request_api(_) ->
ClientId = start_client(), ClientId = start_client(),
timer:sleep(200), timer:sleep(200),
Path = emqx_mgmt_api_test_util:api_path(["gateway/coap/client1/request"]), Path = emqx_mgmt_api_test_util:api_path(["gateway/coap/clients/client1/request"]),
Token = <<"atoken">>, Token = <<"atoken">>,
Payload = <<"simple echo this">>, Payload = <<"simple echo this">>,
Req = #{token => Token, Req = #{token => Token,
@ -117,26 +119,40 @@ test_send_coap_request(UdpSock, Method, Content, Options, MsgId) ->
Request = Request0#coap_message{id = MsgId}, Request = Request0#coap_message{id = MsgId},
?LOGT("send_coap_request Request=~p", [Request]), ?LOGT("send_coap_request Request=~p", [Request]),
RequestBinary = emqx_coap_frame:serialize_pkt(Request, undefined), RequestBinary = emqx_coap_frame:serialize_pkt(Request, undefined),
?LOGT("test udp socket send to ~p:~p, data=~p", [IpAddr, Port, RequestBinary]), ?LOGT("test udp socket send to ~p:~p, data=~p",
[IpAddr, Port, RequestBinary]),
ok = gen_udp:send(UdpSock, IpAddr, Port, RequestBinary); ok = gen_udp:send(UdpSock, IpAddr, Port, RequestBinary);
{SchemeDiff, ChIdDiff, _, _} -> {SchemeDiff, ChIdDiff, _, _} ->
error(lists:flatten(io_lib:format("scheme ~ts or ChId ~ts does not match with socket", [SchemeDiff, ChIdDiff]))) error(
lists:flatten(
io_lib:format(
"scheme ~ts or ChId ~ts does not match with socket",
[SchemeDiff, ChIdDiff])
))
end. end.
test_recv_coap_response(UdpSock) -> test_recv_coap_response(UdpSock) ->
{ok, {Address, Port, Packet}} = gen_udp:recv(UdpSock, 0, 2000), {ok, {Address, Port, Packet}} = gen_udp:recv(UdpSock, 0, 2000),
{ok, Response, _, _} = emqx_coap_frame:parse(Packet, undefined), {ok, Response, _, _} = emqx_coap_frame:parse(Packet, undefined),
?LOGT("test udp receive from ~p:~p, data1=~p, Response=~p", [Address, Port, Packet, Response]), ?LOGT("test udp receive from ~p:~p, data1=~p, Response=~p",
#coap_message{type = ack, method = Method, id=Id, token = Token, options = Options, payload = Payload} = Response, [Address, Port, Packet, Response]),
?LOGT("receive coap response Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]), #coap_message{
type = ack, method = Method, id = Id,
token = Token, options = Options, payload = Payload} = Response,
?LOGT("receive coap response Method=~p, Id=~p, Token=~p, "
"Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]),
Response. Response.
test_recv_coap_request(UdpSock) -> test_recv_coap_request(UdpSock) ->
case gen_udp:recv(UdpSock, 0) of case gen_udp:recv(UdpSock, 0) of
{ok, {_Address, _Port, Packet}} -> {ok, {_Address, _Port, Packet}} ->
{ok, Request, _, _} = emqx_coap_frame:parse(Packet, undefined), {ok, Request, _, _} = emqx_coap_frame:parse(Packet, undefined),
#coap_message{type = con, method = Method, id=Id, token = Token, payload = Payload, options = Options} = Request, #coap_message{
?LOGT("receive coap request Method=~p, Id=~p, Token=~p, Options=~p, Payload=~p", [Method, Id, Token, Options, Payload]), type = con, method = Method, id = Id,
token = Token, payload = Payload, options = Options} = Request,
?LOGT("receive coap request Method=~p, Id=~p, "
"Token=~p, Options=~p, Payload=~p",
[Method, Id, Token, Options, Payload]),
Request; Request;
{error, Reason} -> {error, Reason} ->
?LOGT("test_recv_coap_request failed, Reason=~p", [Reason]), ?LOGT("test_recv_coap_request failed, Reason=~p", [Reason]),

View File

@ -196,12 +196,12 @@ t_authn(_) ->
backend => <<"built-in-database">>, backend => <<"built-in-database">>,
user_id_type => <<"clientid">> user_id_type => <<"clientid">>
}, },
{204, _} = request(post, "/gateway/stomp/authentication", AuthConf), {201, _} = request(post, "/gateway/stomp/authentication", AuthConf),
{200, ConfResp} = request(get, "/gateway/stomp/authentication"), {200, ConfResp} = request(get, "/gateway/stomp/authentication"),
assert_confs(AuthConf, ConfResp), assert_confs(AuthConf, ConfResp),
AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}), AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}),
{204, _} = request(put, "/gateway/stomp/authentication", AuthConf2), {200, _} = request(put, "/gateway/stomp/authentication", AuthConf2),
{200, ConfResp2} = request(get, "/gateway/stomp/authentication"), {200, ConfResp2} = request(get, "/gateway/stomp/authentication"),
assert_confs(AuthConf2, ConfResp2), assert_confs(AuthConf2, ConfResp2),
@ -219,7 +219,7 @@ t_authn_data_mgmt(_) ->
backend => <<"built-in-database">>, backend => <<"built-in-database">>,
user_id_type => <<"clientid">> user_id_type => <<"clientid">>
}, },
{204, _} = request(post, "/gateway/stomp/authentication", AuthConf), {201, _} = request(post, "/gateway/stomp/authentication", AuthConf),
{200, ConfResp} = request(get, "/gateway/stomp/authentication"), {200, ConfResp} = request(get, "/gateway/stomp/authentication"),
assert_confs(AuthConf, ConfResp), assert_confs(AuthConf, ConfResp),
@ -262,14 +262,14 @@ t_listeners(_) ->
type => <<"tcp">>, type => <<"tcp">>,
bind => <<"61613">> bind => <<"61613">>
}, },
{204, _} = request(post, "/gateway/stomp/listeners", LisConf), {201, _} = request(post, "/gateway/stomp/listeners", LisConf),
{200, ConfResp} = request(get, "/gateway/stomp/listeners"), {200, ConfResp} = request(get, "/gateway/stomp/listeners"),
assert_confs([LisConf], ConfResp), assert_confs([LisConf], ConfResp),
{200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"), {200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"),
assert_confs(LisConf, ConfResp1), assert_confs(LisConf, ConfResp1),
LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}), LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}),
{204, _} = request( {200, _} = request(
put, put,
"/gateway/stomp/listeners/stomp:tcp:def", "/gateway/stomp/listeners/stomp:tcp:def",
LisConf2 LisConf2
@ -298,12 +298,12 @@ t_listeners_authn(_) ->
user_id_type => <<"clientid">> user_id_type => <<"clientid">>
}, },
Path = "/gateway/stomp/listeners/stomp:tcp:def/authentication", Path = "/gateway/stomp/listeners/stomp:tcp:def/authentication",
{204, _} = request(post, Path, AuthConf), {201, _} = request(post, Path, AuthConf),
{200, ConfResp2} = request(get, Path), {200, ConfResp2} = request(get, Path),
assert_confs(AuthConf, ConfResp2), assert_confs(AuthConf, ConfResp2),
AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}), AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}),
{204, _} = request(put, Path, AuthConf2), {200, _} = request(put, Path, AuthConf2),
{200, ConfResp3} = request(get, Path), {200, ConfResp3} = request(get, Path),
assert_confs(AuthConf2, ConfResp3), assert_confs(AuthConf2, ConfResp3),
@ -325,7 +325,7 @@ t_listeners_authn_data_mgmt(_) ->
user_id_type => <<"clientid">> user_id_type => <<"clientid">>
}, },
Path = "/gateway/stomp/listeners/stomp:tcp:def/authentication", Path = "/gateway/stomp/listeners/stomp:tcp:def/authentication",
{204, _} = request(post, Path, AuthConf), {201, _} = request(post, Path, AuthConf),
{200, ConfResp2} = request(get, Path), {200, ConfResp2} = request(get, Path),
assert_confs(AuthConf, ConfResp2), assert_confs(AuthConf, ConfResp2),

View File

@ -268,12 +268,12 @@ t_load_remove_authn(_) ->
ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf), ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])), assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:add_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_1), {ok, _} = emqx_gateway_conf:add_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_1),
assert_confs( assert_confs(
maps:put(<<"authentication">>, ?CONF_STOMP_AUTHN_1, StompConf), maps:put(<<"authentication">>, ?CONF_STOMP_AUTHN_1, StompConf),
emqx:get_raw_config([gateway, stomp])), emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2), {ok, _} = emqx_gateway_conf:update_authn(<<"stomp">>, ?CONF_STOMP_AUTHN_2),
assert_confs( assert_confs(
maps:put(<<"authentication">>, ?CONF_STOMP_AUTHN_2, StompConf), maps:put(<<"authentication">>, ?CONF_STOMP_AUTHN_2, StompConf),
emqx:get_raw_config([gateway, stomp])), emqx:get_raw_config([gateway, stomp])),
@ -295,14 +295,16 @@ t_load_remove_listeners(_) ->
ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf), ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])), assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:add_listener( {ok, _} = emqx_gateway_conf:add_listener(
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_1), <<"stomp">>, {<<"tcp">>, <<"default">>},
?CONF_STOMP_LISTENER_1),
assert_confs( assert_confs(
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)), maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_1)),
emqx:get_raw_config([gateway, stomp])), emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:update_listener( {ok, _} = emqx_gateway_conf:update_listener(
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_LISTENER_2), <<"stomp">>, {<<"tcp">>, <<"default">>},
?CONF_STOMP_LISTENER_2),
assert_confs( assert_confs(
maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_2)), maps:merge(StompConf, listener(?CONF_STOMP_LISTENER_2)),
emqx:get_raw_config([gateway, stomp])), emqx:get_raw_config([gateway, stomp])),
@ -339,11 +341,11 @@ t_load_remove_listener_authn(_) ->
ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf), ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])), assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:add_authn( {ok, _} = emqx_gateway_conf:add_authn(
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_1), <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_1),
assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])), assert_confs(StompConf1, emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:update_authn( {ok, _} = emqx_gateway_conf:update_authn(
<<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2), <<"stomp">>, {<<"tcp">>, <<"default">>}, ?CONF_STOMP_AUTHN_2),
assert_confs(StompConf2, emqx:get_raw_config([gateway, stomp])), assert_confs(StompConf2, emqx:get_raw_config([gateway, stomp])),
@ -403,14 +405,16 @@ t_add_listener_with_certs_content(_) ->
ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf), ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])), assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:add_listener( {ok, _} = emqx_gateway_conf:add_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL), <<"stomp">>, {<<"ssl">>, <<"default">>},
?CONF_STOMP_LISTENER_SSL),
assert_confs( assert_confs(
maps:merge(StompConf, ssl_listener(?CONF_STOMP_LISTENER_SSL)), maps:merge(StompConf, ssl_listener(?CONF_STOMP_LISTENER_SSL)),
emqx:get_raw_config([gateway, stomp])), emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:update_listener( {ok, _} = emqx_gateway_conf:update_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2), <<"stomp">>, {<<"ssl">>, <<"default">>},
?CONF_STOMP_LISTENER_SSL_2),
assert_confs( assert_confs(
maps:merge(StompConf, ssl_listener(?CONF_STOMP_LISTENER_SSL_2)), maps:merge(StompConf, ssl_listener(?CONF_STOMP_LISTENER_SSL_2)),
emqx:get_raw_config([gateway, stomp])), emqx:get_raw_config([gateway, stomp])),

View File

@ -301,7 +301,7 @@ t_observe(Config) ->
%%% Internal Functions %%% Internal Functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
call_lookup_api(ClientId, Path, Action) -> call_lookup_api(ClientId, Path, Action) ->
ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, "lookup_cmd"]), ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m/clients", ClientId, "lookup_cmd"]),
Auth = emqx_mgmt_api_test_util:auth_header_(), Auth = emqx_mgmt_api_test_util:auth_header_(),
Query = io_lib:format("path=~ts&action=~ts", [Path, Action]), Query = io_lib:format("path=~ts&action=~ts", [Path, Action]),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, ApiPath, Query, Auth), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, ApiPath, Query, Auth),
@ -309,7 +309,7 @@ call_lookup_api(ClientId, Path, Action) ->
Response. Response.
call_send_api(ClientId, Cmd, Query) -> call_send_api(ClientId, Cmd, Query) ->
ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m", ClientId, Cmd]), ApiPath = emqx_mgmt_api_test_util:api_path(["gateway/lwm2m/clients", ClientId, Cmd]),
Auth = emqx_mgmt_api_test_util:auth_header_(), Auth = emqx_mgmt_api_test_util:auth_header_(),
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, ApiPath, Query, Auth), {ok, Response} = emqx_mgmt_api_test_util:request_api(post, ApiPath, Query, Auth),
?LOGT("rest api response:~ts~n", [Response]), ?LOGT("rest api response:~ts~n", [Response]),