chore(gw): adjust the configuration format

This commit is contained in:
JianBo He 2021-08-23 11:48:08 +08:00 committed by turtleDeng
parent f333a0b888
commit 914c375d9e
6 changed files with 66 additions and 41 deletions

View File

@ -6,6 +6,7 @@
## In the final version, it will be commented out. ## In the final version, it will be commented out.
gateway.stomp { gateway.stomp {
frame { frame {
max_headers = 10 max_headers = 10
max_headers_length = 1024 max_headers_length = 1024
@ -18,14 +19,13 @@ gateway.stomp {
} }
authenticator { authenticator {
#enable = true
name = "authenticator1" name = "authenticator1"
mechanism = password-based mechanism = password-based
server_type = built-in-database server_type = built-in-database
user_id_type = clientid user_id_type = clientid
} }
listener.tcp.1 { listeners.tcp.default {
bind = 61613 bind = 61613
acceptors = 16 acceptors = 16
max_connections = 1024000 max_connections = 1024000
@ -49,7 +49,7 @@ gateway.coap {
notify_type = qos notify_type = qos
subscribe_qos = qos0 subscribe_qos = qos0
publish_qos = qos1 publish_qos = qos1
listener.udp.1 { listeners.udp.default {
bind = 5683 bind = 5683
} }
} }
@ -90,7 +90,7 @@ gateway.mqttsn {
password = "abc" password = "abc"
} }
listener.udp.1 { listeners.udp.default {
bind = 1884 bind = 1884
max_connections = 10240000 max_connections = 10240000
max_conn_rate = 1000 max_conn_rate = 1000
@ -113,16 +113,16 @@ gateway.exproto {
#ssl.cacertfile: #ssl.cacertfile:
} }
listener.tcp.1 { listeners.tcp.default {
bind = 7993 bind = 7993
acceptors = 8 acceptors = 8
max_connections = 10240 max_connections = 10240
max_conn_rate = 1000 max_conn_rate = 1000
} }
#listener.ssl.1: {} #listeners.ssl.default: {}
#listener.udp.1: {} #listeners.udp.default: {}
#listener.dtls.1: {} #listeners.dtls.default: {}
} }
gateway.lwm2m { gateway.lwm2m {
@ -147,7 +147,7 @@ gateway.lwm2m {
update = "up/resp" update = "up/resp"
} }
listener.udp.1 { listeners.udp.default {
bind = 5783 bind = 5783
} }
} }

View File

@ -19,8 +19,15 @@
-type gateway_name() :: atom(). -type gateway_name() :: atom().
-type listener() :: #{}.
%% The RawConf got from emqx:get_config/1 %% The RawConf got from emqx:get_config/1
-type rawconf() :: map(). -type rawconf() ::
#{ clientinfo_override => map()
, authenticator => map()
, listeners => listener()
, atom() => any()
}.
%% @doc The Gateway defination %% @doc The Gateway defination
-type gateway() :: -type gateway() ::

View File

@ -106,7 +106,7 @@ init(ConnInfo = #{peername := {PeerHost, _},
#{ctx := Ctx} = Config) -> #{ctx := Ctx} = Config) ->
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Config, undefined), Mountpoint = maps:get(mountpoint, Config, undefined),
EnableAuth = maps:get(enable, maps:get(authentication, Config)), EnableAuth = is_authenticator_enabled(Config),
ClientInfo = set_peercert_infos( ClientInfo = set_peercert_infos(
Peercert, Peercert,
#{ zone => default #{ zone => default
@ -134,6 +134,13 @@ init(ConnInfo = #{peername := {PeerHost, _},
, keepalive = emqx_keepalive:init(maps:get(heartbeat, Config)) , keepalive = emqx_keepalive:init(maps:get(heartbeat, Config))
}. }.
is_authenticator_enabled(Cfg) ->
case maps:get(authenticator, Cfg, #{enable => false}) of
AuthCfg when is_map(AuthCfg) ->
maps:get(enable, AuthCfg, true);
_ -> false
end.
validator(Type, Topic, #exec_ctx{ctx = Ctx, validator(Type, Topic, #exec_ctx{ctx = Ctx,
clientinfo = ClientInfo}) -> clientinfo = ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic). emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
@ -290,7 +297,7 @@ handle_result(_, _, _, Channel) ->
{ok, Channel}. {ok, Channel}.
check_auth_state(Msg, #channel{config = Cfg} = Channel) -> check_auth_state(Msg, #channel{config = Cfg} = Channel) ->
#{authentication := #{enable := Enable}} = Cfg, Enable = is_authenticator_enabled(Cfg),
check_token(Enable, Msg, Channel). check_token(Enable, Msg, Channel).
check_token(true, check_token(true,

View File

@ -3,7 +3,7 @@
{vsn, "0.1.0"}, {vsn, "0.1.0"},
{registered, []}, {registered, []},
{mod, {emqx_gateway_app, []}}, {mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, grpc, lwm2m_coap, emqx]}, {applications, [kernel, stdlib, grpc, lwm2m_coap, emqx, emqx_authn]},
{env, []}, {env, []},
{modules, []}, {modules, []},
{licenses, ["Apache 2.0"]}, {licenses, ["Apache 2.0"]},

View File

@ -1,5 +1,23 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_schema). -module(emqx_gateway_schema).
-behaviour(hocon_schema).
-dialyzer(no_return). -dialyzer(no_return).
-dialyzer(no_match). -dialyzer(no_match).
-dialyzer(no_contracts). -dialyzer(no_contracts).
@ -8,17 +26,16 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-type ip_port() :: tuple().
-type duration() :: integer(). -type duration() :: integer().
-type bytesize() :: integer(). -type bytesize() :: integer().
-type comma_separated_list() :: list(). -type comma_separated_list() :: list().
-type ip_port() :: tuple().
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
-typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}). -typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}). -typerefl_from_string({comma_separated_list/0, emqx_schema,
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). to_comma_separated_list}).
-behaviour(hocon_schema).
-reflect_type([ duration/0 -reflect_type([ duration/0
, bytesize/0 , bytesize/0
@ -27,11 +44,15 @@
]). ]).
-export([structs/0 , fields/1]). -export([structs/0 , fields/1]).
-export([t/1, t/3, t/4, ref/1]). -export([t/1, t/3, t/4, ref/1]).
structs() -> ["gateway"]. %%--------------------------------------------------------------------
%% Structs
fields("gateway") -> structs() -> [gateway].
fields(gateway) ->
[{stomp, t(ref(stomp_structs))}, [{stomp, t(ref(stomp_structs))},
{mqttsn, t(ref(mqttsn_structs))}, {mqttsn, t(ref(mqttsn_structs))},
{coap, t(ref(coap_structs))}, {coap, t(ref(coap_structs))},
@ -43,7 +64,7 @@ fields(stomp_structs) ->
[ {frame, t(ref(stomp_frame))} [ {frame, t(ref(stomp_frame))}
, {clientinfo_override, t(ref(clientinfo_override))} , {clientinfo_override, t(ref(clientinfo_override))}
, {authenticator, t(authenticator(), undefined, undefined)} , {authenticator, t(authenticator(), undefined, undefined)}
, {listener, t(ref(tcp_listener_group))} , {listeners, t(ref(tcp_listener_group))}
]; ];
fields(stomp_frame) -> fields(stomp_frame) ->
@ -61,11 +82,10 @@ fields(mqttsn_structs) ->
, {predefined, hoconsc:array(ref(mqttsn_predefined))} , {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {clientinfo_override, t(ref(clientinfo_override))} , {clientinfo_override, t(ref(clientinfo_override))}
, {authenticator, t(authenticator(), undefined, undefined)} , {authenticator, t(authenticator(), undefined, undefined)}
, {listener, t(ref(udp_listener_group))} , {listeners, t(ref(udp_listener_group))}
]; ];
fields(mqttsn_predefined) -> fields(mqttsn_predefined) ->
%% FIXME: How to check the $id is a integer ???
[ {id, t(integer())} [ {id, t(integer())}
, {topic, t(string())} , {topic, t(string())}
]; ];
@ -80,18 +100,18 @@ fields(lwm2m_structs) ->
, {update_msg_publish_condition, t(union([always, contains_object_list]))} , {update_msg_publish_condition, t(union([always, contains_object_list]))}
, {translators, t(ref(translators))} , {translators, t(ref(translators))}
, {authenticator, t(authenticator(), undefined, undefined)} , {authenticator, t(authenticator(), undefined, undefined)}
, {listener, t(ref(udp_listener_group))} , {listeners, t(ref(udp_listener_group))}
]; ];
fields(exproto_structs) -> fields(exproto_structs) ->
[ {server, t(ref(exproto_grpc_server))} [ {server, t(ref(exproto_grpc_server))}
, {handler, t(ref(exproto_grpc_handler))} , {handler, t(ref(exproto_grpc_handler))}
, {authenticator, t(authenticator(), undefined, undefined)} , {authenticator, t(authenticator(), undefined, undefined)}
, {listener, t(ref(udp_tcp_listener_group))} , {listeners, t(ref(udp_tcp_listener_group))}
]; ];
fields(exproto_grpc_server) -> fields(exproto_grpc_server) ->
[ {bind, t(integer())} [ {bind, t(union(ip_port(), integer()))}
%% TODO: ssl options %% TODO: ssl options
]; ];
@ -139,9 +159,7 @@ fields(dtls_listener) ->
[ {"$name", t(ref(dtls_listener_settings))}]; [ {"$name", t(ref(dtls_listener_settings))}];
fields(listener_settings) -> fields(listener_settings) ->
% FIXME: [ {bind, t(union(ip_port(), integer()))}
%[ {"bind", t(union(ip_port(), integer()))}
[ {bind, t(integer())}
, {acceptors, t(integer(), undefined, 8)} , {acceptors, t(integer(), undefined, 8)}
, {max_connections, t(integer(), undefined, 1024)} , {max_connections, t(integer(), undefined, 1024)}
, {max_conn_rate, t(integer())} , {max_conn_rate, t(integer())}
@ -203,7 +221,7 @@ fields(coap_structs) ->
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {authenticator, t(authenticator(), undefined, undefined)} , {authenticator, t(authenticator(), undefined, undefined)}
, {listener, t(ref(udp_listener_group))} , {listeners, t(ref(udp_listener_group))}
]; ];
fields(ExtraField) -> fields(ExtraField) ->

View File

@ -17,6 +17,8 @@
%% @doc Utils funcs for emqx-gateway %% @doc Utils funcs for emqx-gateway
-module(emqx_gateway_utils). -module(emqx_gateway_utils).
-include("emqx_gateway.hrl").
-export([ childspec/2 -export([ childspec/2
, childspec/3 , childspec/3
, childspec/4 , childspec/4
@ -105,15 +107,6 @@ format_listenon({Addr, Port}) when is_list(Addr) ->
format_listenon({Addr, Port}) when is_tuple(Addr) -> format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
-type listener() :: #{}.
-type rawconf() ::
#{ clientinfo_override => #{}
, authenticators := list()
, listeners => listener()
, atom() => any()
}.
-spec normalize_rawconf(rawconf()) -spec normalize_rawconf(rawconf())
-> list({ Type :: udp | tcp | ssl | dtls -> list({ Type :: udp | tcp | ssl | dtls
, ListenOn :: esockd:listen_on() , ListenOn :: esockd:listen_on()
@ -121,8 +114,8 @@ format_listenon({Addr, Port}) when is_tuple(Addr) ->
, Cfg :: map() , Cfg :: map()
}). }).
normalize_rawconf(RawConf) -> normalize_rawconf(RawConf) ->
LisMap = maps:get(listener, RawConf, #{}), LisMap = maps:get(listeners, RawConf, #{}),
Cfg0 = maps:without([listener], RawConf), Cfg0 = maps:without([listeners], RawConf),
lists:append(maps:fold(fun(Type, Liss, AccIn1) -> lists:append(maps:fold(fun(Type, Liss, AccIn1) ->
Listeners = Listeners =
maps:fold(fun(_Name, Confs, AccIn2) -> maps:fold(fun(_Name, Confs, AccIn2) ->