refactor(gw): refactor config hot-upgrade mechnism

This commit is contained in:
JianBo He 2021-09-17 15:48:18 +08:00
parent a9e32ac106
commit f68dfff0d6
10 changed files with 839 additions and 393 deletions

View File

@ -5,345 +5,345 @@
## TODO: These configuration options are temporary example here.
## In the final version, it will be commented out.
gateway.stomp {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
frame {
max_headers = 10
max_headers_length = 1024
max_body_length = 8192
}
clientinfo_override {
username = "${Packet.headers.login}"
password = "${Packet.headers.passcode}"
}
authentication: {
mechanism = password-based
backend = built-in-database
user_id_type = clientid
}
listeners.tcp.default {
bind = 61613
acceptors = 16
max_connections = 1024000
max_conn_rate = 1000
access_rules = [
"allow all"
]
authentication: {
mechanism = password-based
backend = built-in-database
user_id_type = username
}
## TCP options
## See ${example_common_tcp_options} for more information
tcp.active_n = 100
tcp.backlog = 1024
tcp.buffer = 4KB
}
listeners.ssl.default {
bind = 61614
acceptors = 16
max_connections = 1024000
max_conn_rate = 1000
## TCP options
## See ${example_common_tcp_options} for more information
tcp.active_n = 100
tcp.backlog = 1024
tcp.buffer = 4KB
## SSL options
## See ${example_common_ssl_options} for more information
ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
#ssl.verify = verify_none
#ssl.fail_if_no_peer_cert = false
#ssl.server_name_indication = disable
#ssl.secure_renegotiate = false
#ssl.reuse_sessions = false
#ssl.honor_cipher_order = false
#ssl.handshake_timeout = 15s
#ssl.depth = 10
#ssl.password = foo
#ssl.dhfile = path-to-your-file
}
}
gateway.coap {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
## Enable or disable connection mode
## If true, you need to establish a connection before send any publish/subscribe
## requests
##
## Default: false
#connection_required = false
## The Notification Message Type.
## The notification message will be delivered to the CoAP client if a new
## message received on an observed topic.
## The type of delivered coap message can be set to:
## - non: Non-confirmable
## - con: Confirmable
## - qos: Mapping from QoS type of the recevied message.
## QoS0 -> non, QoS1,2 -> con.
##
## Enum: non | con | qos
## Default: qos
#notify_type = qos
## The *Default QoS Level* indicator for subscribe request.
## This option specifies the QoS level for the CoAP Client when establishing
## a subscription membership, if the subscribe request is not carried `qos`
## option.
## The indicator can be set to:
## - qos0, qos1, qos2: Fixed default QoS level
## - coap: Dynamic QoS level by the message type of subscribe request
## * qos0: If the subscribe request is non-confirmable
## * qos1: If the subscribe request is confirmable
##
## Enum: qos0 | qos1 | qos2 | coap
## Default: coap
#subscribe_qos = coap
## The *Default QoS Level* indicator for publish request.
## This option specifies the QoS level for the CoAP Client when publishing a
## message to EMQ X PUB/SUB system, if the publish request is not carried `qos`
## option.
## The indicator can be set to:
## - qos0, qos1, qos2: Fixed default QoS level
## - coap: Dynamic QoS level by the message type of publish request
## * qos0: If the publish request is non-confirmable
## * qos1: If the publish request is confirmable
##
## Enum: qos0 | qos1 | qos2 | coap
#publish_qos = coap
listeners.udp.default {
bind = 5683
max_connections = 102400
max_conn_rate = 1000
## UDP Options
## See ${example_common_udp_options} for more information
udp.active_n = 100
udp.buffer = 16KB
}
listeners.dtls.default {
bind = 5684
acceptors = 4
max_connections = 102400
max_conn_rate = 1000
## UDP Options
## See ${example_common_udp_options} for more information
udp.active_n = 100
udp.buffer = 16KB
## DTLS Options
## See #{example_common_dtls_options} for more information
dtls.versions = ["dtlsv1.2", "dtlsv1"]
dtls.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
dtls.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
dtls.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
dtls.handshake_timeout = 15s
}
}
gateway.mqttsn {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id = 1
## Enable broadcast this gateway to WLAN
broadcast = true
## To control whether accept and process the received
## publish message with qos=-1.
enable_qos3 = true
## The pre-defined topic name corresponding to the pre-defined topic
## id of N.
## Note that the pre-defined topic id of 0 is reserved.
predefined = [
{ id = 1
topic = "/predefined/topic/name/hello"
},
{ id = 2
topic = "/predefined/topic/name/nice"
}
]
### ClientInfo override
clientinfo_override {
username = "mqtt_sn_user"
password = "abc"
}
listeners.udp.default {
bind = 1884
max_connections = 10240000
max_conn_rate = 1000
}
listeners.dtls.default {
bind = 1885
acceptors = 4
max_connections = 102400
max_conn_rate = 1000
## UDP Options
## See ${example_common_udp_options} for more information
udp.active_n = 100
udp.buffer = 16KB
## DTLS Options
## See #{example_common_dtls_options} for more information
dtls.versions = ["dtlsv1.2", "dtlsv1"]
dtls.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
dtls.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
dtls.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
}
}
gateway.lwm2m {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = "lwm2m/%u"
xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"
##
##
lifetime_min = 1s
lifetime_max = 86400s
qmode_time_window = 22
auto_observe = false
## always | contains_object_list
update_msg_publish_condition = contains_object_list
translators {
command {
topic = "/dn/#"
qos = 0
}
response {
topic = "/up/resp"
qos = 0
}
notify {
topic = "/up/notify"
qos = 0
}
register {
topic = "/up/resp"
qos = 0
}
update {
topic = "/up/resp"
qos = 0
}
}
listeners.udp.default {
bind = 5783
}
}
gateway.exproto {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
## The gRPC server to accept requests
server {
bind = 9100
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
handler {
address = "http://127.0.0.1:9001"
#ssl.keyfile:
#ssl.certfile:
#ssl.cacertfile:
}
listeners.tcp.default {
bind = 7993
acceptors = 8
max_connections = 10240
max_conn_rate = 1000
}
#listeners.ssl.default: {}
#listeners.udp.default: {}
#listeners.dtls.default: {}
}
#gateway.stomp {
#
# ## How long time the connection will be disconnected if the
# ## connection is established but no bytes received
# idle_timeout = 30s
#
# ## To control whether write statistics data into ETS table
# ## for dashbord to read.
# enable_stats = true
#
# ## When publishing or subscribing, prefix all topics with a mountpoint string.
# mountpoint = ""
#
# frame {
# max_headers = 10
# max_headers_length = 1024
# max_body_length = 8192
# }
#
# clientinfo_override {
# username = "${Packet.headers.login}"
# password = "${Packet.headers.passcode}"
# }
#
# authentication: {
# mechanism = password-based
# backend = built-in-database
# user_id_type = clientid
# }
#
# listeners.tcp.default {
# bind = 61613
# acceptors = 16
# max_connections = 1024000
# max_conn_rate = 1000
#
# access_rules = [
# "allow all"
# ]
#
# authentication: {
# mechanism = password-based
# backend = built-in-database
# user_id_type = username
# }
#
# ## TCP options
# ## See ${example_common_tcp_options} for more information
# tcp.active_n = 100
# tcp.backlog = 1024
# tcp.buffer = 4KB
# }
#
# listeners.ssl.default {
# bind = 61614
# acceptors = 16
# max_connections = 1024000
# max_conn_rate = 1000
#
# ## TCP options
# ## See ${example_common_tcp_options} for more information
# tcp.active_n = 100
# tcp.backlog = 1024
# tcp.buffer = 4KB
#
# ## SSL options
# ## See ${example_common_ssl_options} for more information
# ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
# ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
# ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# #ssl.verify = verify_none
# #ssl.fail_if_no_peer_cert = false
# #ssl.server_name_indication = disable
# #ssl.secure_renegotiate = false
# #ssl.reuse_sessions = false
# #ssl.honor_cipher_order = false
# #ssl.handshake_timeout = 15s
# #ssl.depth = 10
# #ssl.password = foo
# #ssl.dhfile = path-to-your-file
# }
#}
#
#gateway.coap {
#
# ## How long time the connection will be disconnected if the
# ## connection is established but no bytes received
# idle_timeout = 30s
#
# ## To control whether write statistics data into ETS table
# ## for dashbord to read.
# enable_stats = true
#
# ## When publishing or subscribing, prefix all topics with a mountpoint string.
# mountpoint = ""
#
# ## Enable or disable connection mode
# ## If true, you need to establish a connection before send any publish/subscribe
# ## requests
# ##
# ## Default: false
# #connection_required = false
#
# ## The Notification Message Type.
# ## The notification message will be delivered to the CoAP client if a new
# ## message received on an observed topic.
# ## The type of delivered coap message can be set to:
# ## - non: Non-confirmable
# ## - con: Confirmable
# ## - qos: Mapping from QoS type of the recevied message.
# ## QoS0 -> non, QoS1,2 -> con.
# ##
# ## Enum: non | con | qos
# ## Default: qos
# #notify_type = qos
#
# ## The *Default QoS Level* indicator for subscribe request.
# ## This option specifies the QoS level for the CoAP Client when establishing
# ## a subscription membership, if the subscribe request is not carried `qos`
# ## option.
# ## The indicator can be set to:
# ## - qos0, qos1, qos2: Fixed default QoS level
# ## - coap: Dynamic QoS level by the message type of subscribe request
# ## * qos0: If the subscribe request is non-confirmable
# ## * qos1: If the subscribe request is confirmable
# ##
# ## Enum: qos0 | qos1 | qos2 | coap
# ## Default: coap
# #subscribe_qos = coap
#
# ## The *Default QoS Level* indicator for publish request.
# ## This option specifies the QoS level for the CoAP Client when publishing a
# ## message to EMQ X PUB/SUB system, if the publish request is not carried `qos`
# ## option.
# ## The indicator can be set to:
# ## - qos0, qos1, qos2: Fixed default QoS level
# ## - coap: Dynamic QoS level by the message type of publish request
# ## * qos0: If the publish request is non-confirmable
# ## * qos1: If the publish request is confirmable
# ##
# ## Enum: qos0 | qos1 | qos2 | coap
# #publish_qos = coap
#
# listeners.udp.default {
# bind = 5683
# max_connections = 102400
# max_conn_rate = 1000
#
# ## UDP Options
# ## See ${example_common_udp_options} for more information
# udp.active_n = 100
# udp.buffer = 16KB
# }
# listeners.dtls.default {
# bind = 5684
# acceptors = 4
# max_connections = 102400
# max_conn_rate = 1000
#
# ## UDP Options
# ## See ${example_common_udp_options} for more information
# udp.active_n = 100
# udp.buffer = 16KB
#
# ## DTLS Options
# ## See #{example_common_dtls_options} for more information
# dtls.versions = ["dtlsv1.2", "dtlsv1"]
# dtls.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# dtls.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
# dtls.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# dtls.handshake_timeout = 15s
# }
#}
#
#gateway.mqttsn {
#
# ## How long time the connection will be disconnected if the
# ## connection is established but no bytes received
# idle_timeout = 30s
#
# ## To control whether write statistics data into ETS table
# ## for dashbord to read.
# enable_stats = true
#
# ## When publishing or subscribing, prefix all topics with a mountpoint string.
# mountpoint = ""
#
# ## The MQTT-SN Gateway ID in ADVERTISE message.
# gateway_id = 1
#
# ## Enable broadcast this gateway to WLAN
# broadcast = true
#
# ## To control whether accept and process the received
# ## publish message with qos=-1.
# enable_qos3 = true
#
# ## The pre-defined topic name corresponding to the pre-defined topic
# ## id of N.
# ## Note that the pre-defined topic id of 0 is reserved.
# predefined = [
# { id = 1
# topic = "/predefined/topic/name/hello"
# },
# { id = 2
# topic = "/predefined/topic/name/nice"
# }
# ]
#
# ### ClientInfo override
# clientinfo_override {
# username = "mqtt_sn_user"
# password = "abc"
# }
#
# listeners.udp.default {
# bind = 1884
# max_connections = 10240000
# max_conn_rate = 1000
# }
#
# listeners.dtls.default {
# bind = 1885
# acceptors = 4
# max_connections = 102400
# max_conn_rate = 1000
#
# ## UDP Options
# ## See ${example_common_udp_options} for more information
# udp.active_n = 100
# udp.buffer = 16KB
#
# ## DTLS Options
# ## See #{example_common_dtls_options} for more information
# dtls.versions = ["dtlsv1.2", "dtlsv1"]
# dtls.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# dtls.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
# dtls.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
#
#}
#
#gateway.lwm2m {
#
# ## How long time the connection will be disconnected if the
# ## connection is established but no bytes received
# idle_timeout = 30s
#
# ## To control whether write statistics data into ETS table
# ## for dashbord to read.
# enable_stats = true
#
# ## When publishing or subscribing, prefix all topics with a mountpoint string.
# mountpoint = "lwm2m/%u"
#
# xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"
#
# ##
# ##
# lifetime_min = 1s
#
# lifetime_max = 86400s
#
# qmode_time_window = 22
#
# auto_observe = false
#
# ## always | contains_object_list
# update_msg_publish_condition = contains_object_list
#
#
# translators {
# command {
# topic = "/dn/#"
# qos = 0
# }
#
# response {
# topic = "/up/resp"
# qos = 0
# }
#
# notify {
# topic = "/up/notify"
# qos = 0
# }
#
# register {
# topic = "/up/resp"
# qos = 0
# }
#
# update {
# topic = "/up/resp"
# qos = 0
# }
# }
#
# listeners.udp.default {
# bind = 5783
# }
#}
#
#gateway.exproto {
#
# ## How long time the connection will be disconnected if the
# ## connection is established but no bytes received
# idle_timeout = 30s
#
# ## To control whether write statistics data into ETS table
# ## for dashbord to read.
# enable_stats = true
#
# ## When publishing or subscribing, prefix all topics with a mountpoint string.
# mountpoint = ""
#
# ## The gRPC server to accept requests
# server {
# bind = 9100
# #ssl.keyfile:
# #ssl.certfile:
# #ssl.cacertfile:
# }
#
# handler {
# address = "http://127.0.0.1:9001"
# #ssl.keyfile:
# #ssl.certfile:
# #ssl.cacertfile:
# }
#
# listeners.tcp.default {
# bind = 7993
# acceptors = 8
# max_connections = 10240
# max_conn_rate = 1000
# }
# #listeners.ssl.default: {}
# #listeners.udp.default: {}
# #listeners.dtls.default: {}
#}

View File

@ -20,11 +20,6 @@
-include("include/emqx_gateway.hrl").
%% callbacks for emqx_config_handler
-export([ pre_config_update/2
, post_config_update/4
]).
%% Gateway APIs
-export([ registered_gateway/0
, load/2
@ -36,8 +31,6 @@
, list/0
]).
-export([update_rawconf/2]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
@ -84,37 +77,6 @@ start(Name) ->
stop(Name) ->
emqx_gateway_sup:stop_gateway_insta(Name).
-spec update_rawconf(binary(), emqx_config:raw_config())
-> ok
| {error, any()}.
update_rawconf(RawName, RawConfDiff) ->
case emqx:update_config([gateway], {RawName, RawConfDiff}) of
{ok, _Result} -> ok;
{error, Reason} -> {error, Reason}
end.
%%--------------------------------------------------------------------
%% Config Handler
-spec pre_config_update(emqx_config:update_request(),
emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update({RawName, RawConfDiff}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}.
-spec post_config_update(emqx_config:update_request(), emqx_config:config(),
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update({RawName, _}, NewConfig, OldConfig, _AppEnvs) ->
GwName = binary_to_existing_atom(RawName),
SubConf = maps:get(GwName, NewConfig),
case maps:get(GwName, OldConfig, undefined) of
undefined ->
emqx_gateway:load(GwName, SubConf);
_ ->
emqx_gateway:update(GwName, SubConf)
end.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------

View File

@ -18,8 +18,157 @@
-behaviour(minirest_api).
-import(emqx_gateway_http,
[ return_http_error/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
, with_gateway/2
, checks/2
]).
%% minirest behaviour callbacks
-export([api_spec/0]).
%% http handlers
-export([authn/2]).
%%--------------------------------------------------------------------
%% minirest behaviour callbacks
%%--------------------------------------------------------------------
api_spec() ->
{[], []}.
{metadata(apis()), []}.
apis() ->
[ {"/gateway/:name/authentication", authn}
].
%%--------------------------------------------------------------------
%% http handlers
authn(get, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:authn(GwName) of
undefined ->
return_http_error(404, "No Authentication");
Auth ->
{200, Auth}
end
end);
authn(put, #{bindings := #{name := Name0},
body := Body}) ->
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:update_authn(GwName, Body) of
ok ->
{204};
{error, Reason} ->
return_http_error(500, Reason)
end
end);
authn(post, #{bindings := #{name := Name0},
body := Body}) ->
with_gateway(Name0, fun(GwName, _) ->
%% Exitence checking?
case emqx_gateway_http:update_authn(GwName, Body) of
ok -> {204};
{error, Reason} ->
return_http_error(500, Reason)
end
end);
authn(delete, #{bindings := #{name := Name0}}) ->
with_gateway(Name0, fun(GwName, _) ->
case emqx_gateway_http:remove_authn(GwName) of
ok -> {204};
{error, Reason} ->
return_http_error(500, Reason)
end
end).
%%--------------------------------------------------------------------
%% Swagger defines
%%--------------------------------------------------------------------
metadata(APIs) ->
metadata(APIs, []).
metadata([], APIAcc) ->
lists:reverse(APIAcc);
metadata([{Path, Fun}|More], APIAcc) ->
Methods = [get, post, put, delete, patch],
Mds = lists:foldl(fun(M, Acc) ->
try
Acc#{M => swagger(Path, M)}
catch
error : function_clause ->
Acc
end
end, #{}, Methods),
metadata(More, [{Path, Mds, Fun} | APIAcc]).
swagger("/gateway/:name/authentication", get) ->
#{ description => <<"Get the gateway authentication">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"200">> => schema_authn()
}
};
swagger("/gateway/:name/authentication", put) ->
#{ description => <<"Create the gateway authentication">>
, parameters => params_gateway_name_in_path()
, requestBody => schema_authn()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/authentication", post) ->
#{ description => <<"Add authentication for the gateway">>
, parameters => params_gateway_name_in_path()
, requestBody => schema_authn()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
swagger("/gateway/:name/authentication", delete) ->
#{ description => <<"Remove the gateway authentication">>
, parameters => params_gateway_name_in_path()
, responses =>
#{ <<"400">> => schema_bad_request()
, <<"404">> => schema_not_found()
, <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
}.
%%--------------------------------------------------------------------
%% params defines
params_gateway_name_in_path() ->
[#{ name => name
, in => path
, schema => #{type => string}
, required => true
}].
%%--------------------------------------------------------------------
%% schemas
schema_authn() ->
#{ description => <<"OK">>
, content => #{
'application/json' => #{
schema => minirest:ref(<<"AuthenticatorInstance">>)
}}
}.

View File

@ -20,12 +20,12 @@
-import(emqx_gateway_http,
[ return_http_error/2
, with_gateway/2
, checks/2
, schema_bad_request/0
, schema_not_found/0
, schema_internal_error/0
, schema_no_content/0
, with_gateway/2
, checks/2
]).
%% minirest behaviour callbacks
@ -47,6 +47,7 @@ apis() ->
[ {"/gateway/:name/listeners", listeners}
, {"/gateway/:name/listeners/:id", listeners_insta}
].
%%--------------------------------------------------------------------
%% http handlers

View File

@ -22,20 +22,15 @@
-export([start/2, stop/1]).
-define(CONF_CALLBACK_MODULE, emqx_gateway).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_gateway_sup:start_link(),
emqx_gateway_cli:load(),
load_default_gateway_applications(),
load_gateway_by_default(),
emqx_config_handler:add_handler([gateway], ?CONF_CALLBACK_MODULE),
{ok, Sup}.
stop(_State) ->
emqx_gateway_cli:unload(),
%% XXX: No api now
%emqx_config_handler:remove_handler([gateway], ?MODULE),
ok.
%%--------------------------------------------------------------------

View File

@ -0,0 +1,260 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc The gateway configuration management module
-module(emqx_gateway_conf).
%% Load/Unload
-export([ load/0
, unload/0
]).
%% APIs
-export([ load_gateway/2
, update_gateway/2
, remove_gateway/1
, add_listener/3
, update_listener/3
, remove_listener/2
, add_authn/2
, add_authn/3
, update_authn/2
, update_authn/3
, remove_authn/1
, remove_authn/2
]).
%% callbacks for emqx_config_handler
-export([ pre_config_update/2
, post_config_update/4
]).
-type atom_or_bin() :: atom() | binary().
-type listener_ref() :: {ListenerType :: atom_or_bin(),
ListenerName :: atom_or_bin()}.
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-spec load() -> ok.
load() ->
emqx_config_handler:add_handler([gateway], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_config_handler:remove_handler([gateway]).
%%--------------------------------------------------------------------
%% APIs
-spec load_gateway(atom_or_bin(), map()) -> ok | {error, any()}.
load_gateway(GwName, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), Conf})).
-spec update_gateway(atom_or_bin(), map()) -> ok | {error, any()}.
update_gateway(GwName, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), Conf})).
-spec remove_gateway(atom_or_bin()) -> ok | {error, any()}.
remove_gateway(GwName) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName)})).
-spec add_listener(atom_or_bin(), listener_ref(), map()) -> ok | {error, any()}.
add_listener(GwName, ListenerRef, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), ListenerRef, Conf})).
-spec update_listener(atom_or_bin(), listener_ref(), map()) -> ok | {error, any()}.
update_listener(GwName, ListenerRef, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), ListenerRef, Conf})).
-spec remove_listener(atom_or_bin(), listener_ref()) -> ok | {error, any()}.
remove_listener(GwName, ListenerRef) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), ListenerRef})).
add_authn(GwName, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), Conf})).
add_authn(GwName, ListenerRef, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), ListenerRef, Conf})).
update_authn(GwName, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), Conf})).
update_authn(GwName, ListenerRef, Conf) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), ListenerRef, Conf})).
remove_authn(GwName) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName)})).
remove_authn(GwName, ListenerRef) ->
res(emqx:update_config([gateway],
{?FUNCTION_NAME, bin(GwName), ListenerRef})).
res({ok, _Result}) -> ok;
res({error, Reason}) -> {error, Reason}.
bin(A) when is_atom(A) ->
atom_to_binary(A);
bin(B) when is_binary(B) ->
B.
%%--------------------------------------------------------------------
%% Config Handler
%%--------------------------------------------------------------------
-spec pre_config_update(emqx_config:update_request(),
emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => Conf})};
_ ->
{error, alredy_exist}
end;
pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
{error, not_found};
_ ->
NConf = maps:without([<<"listeners">>,
<<"authentication">>], Conf),
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
end;
pre_config_update({remove_gateway, GwName}, RawConf) ->
{ok, maps:remove(GwName, RawConf)};
pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined ->
NListener = #{LType => #{LName => Conf}},
{ok, emqx_map_lib:deep_merge(
RawConf,
#{GwName => #{<<"listeners">> => NListener}})};
_ ->
{error, alredy_exist}
end;
pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined ->
{error, not_found};
_OldConf ->
NListener = #{LType => #{LName => Conf}},
{ok, emqx_map_lib:deep_merge(
RawConf,
#{GwName => #{<<"listeners">> => NListener}})}
end;
pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
{ok, emqx_map_lib:deep_remove(
[GwName, <<"listeners">>, LType, LName], RawConf)};
pre_config_update({add_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"authentication">>], RawConf, undefined) of
undefined ->
{ok, emqx_map_lib:deep_merge(
RawConf,
#{GwName => #{<<"authentication">> => Conf}})};
_ ->
{error, alredy_exist}
end;
pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf, undefined) of
undefined ->
{error, not_found};
Listener ->
case maps:get(<<"authentication">>, Listener, undefined) of
undefined ->
NListener = maps:put(<<"authentication">>, Conf, Listener),
NGateway = #{GwName =>
#{<<"listeners">> =>
#{LType => #{LName => NListener}}}},
{ok, emqx_map_lib:deep_merge(RawConf, NGateway)};
_ ->
{error, alredy_exist}
end
end;
pre_config_update({update_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"authentication">>], RawConf, undefined) of
undefined ->
{error, not_found};
_ ->
{ok, emqx_map_lib:deep_merge(
RawConf,
#{GwName => #{<<"authentication">> => Conf}})}
end;
pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf, undefined) of
undefined ->
{error, not_found};
Listener ->
case maps:get(<<"authentication">>, Listener, undefined) of
undefined ->
{error, not_found};
Auth ->
NListener = maps:put(
<<"authentication">>,
emqx_map_lib:deep_merge(Auth, Conf),
Listener
),
NGateway = #{GwName =>
#{<<"listeners">> =>
#{LType => #{LName => NListener}}}},
{ok, emqx_map_lib:deep_merge(RawConf, NGateway)}
end
end;
pre_config_update({remove_authn, GwName}, RawConf) ->
{ok, emqx_map_lib:deep_remove(
[GwName, <<"authentication">>], RawConf)};
pre_config_update({remove_authn, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName, <<"authentication">>],
{ok, emqx_map_lib:deep_remove(Path, RawConf)};
pre_config_update(UnknownReq, _RawConf) ->
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
{error, badreq}.
-spec post_config_update(emqx_config:update_request(), emqx_config:config(),
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update(Req, NewConfig, OldConfig, _AppEnvs) ->
[_Tag, GwName0|_] = tuple_to_list(Req),
GwName = binary_to_existing_atom(GwName0),
SubConf = maps:get(GwName, NewConfig),
case maps:get(GwName, OldConfig, undefined) of
undefined ->
emqx_gateway:load(GwName, SubConf);
_ ->
emqx_gateway:update(GwName, SubConf)
end.

View File

@ -32,6 +32,11 @@
, mapping_listener_m2l/2
]).
-export([ authn/1
, update_authn/2
, remove_authn/1
]).
%% Mgmt APIs - clients
-export([ lookup_client/3
, lookup_client/4
@ -220,6 +225,26 @@ update_listener(ListenerId, NewConf0) ->
#{<<"listeners">> => #{Type => #{Name => NewConf}}
}).
-spec authn(gateway_name()) -> map() | undefined.
authn(GwName) ->
case emqx_map_lib:deep_get(
authentication,
emqx:get_config([gateway, GwName]),
undefined) of
undefined -> undefined;
AuthConf -> emqx_map_lib:jsonable_map(AuthConf)
end.
-spec update_authn(gateway_name(), map()) -> ok | {error, any()}.
update_authn(GwName, AuthConf) ->
emqx_gateway:update_rawconf(
atom_to_binary(GwName),
#{authentication => AuthConf}).
-spec remove_authn(gateway_name()) -> ok | {error, any()}.
remove_authn(_GwName) ->
{error, not_supported_now}.
%%--------------------------------------------------------------------
%% Mgmt APIs - clients
%%--------------------------------------------------------------------

View File

@ -95,6 +95,7 @@ init([Gateway, Ctx, _GwDscrptr]) ->
State = #state{
ctx = Ctx,
name = GwName,
authns = [],
config = Config,
child_pids = [],
status = stopped,

View File

@ -50,11 +50,11 @@ namespace() -> gateway.
roots() -> [gateway].
fields(gateway) ->
[{stomp, sc(ref(stomp))},
{mqttsn, sc(ref(mqttsn))},
{coap, sc(ref(coap))},
{lwm2m, sc(ref(lwm2m))},
{exproto, sc(ref(exproto))}
[{stomp, sc_meta(ref(stomp) , #{nullable => {true, recursively}})},
{mqttsn, sc_meta(ref(mqttsn) , #{nullable => {true, recursively}})},
{coap, sc_meta(ref(coap) , #{nullable => {true, recursively}})},
{lwm2m, sc_meta(ref(lwm2m) , #{nullable => {true, recursively}})},
{exproto, sc_meta(ref(exproto), #{nullable => {true, recursively}})}
];
fields(stomp) ->

View File

@ -0,0 +1,53 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_conf_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Conf) ->
emqx_ct_helpers:start_apps([]),
Conf.
end_per_suite(_Conf) ->
emqx_ct_helpers:stop_apps([]).
init_per_testcase(_CaseName, Conf) ->
emqx_gateway_conf:unload(),
emqx_config:put([gateway], #{}),
emqx_gateway_conf:load(),
Conf.
%%--------------------------------------------------------------------
%% Cases
%%--------------------------------------------------------------------
t_load_gateway(_) ->
ok = emqx_gateway_conf:load_gateway(stomp, #{listeners => #{ tcp => #{default => #{bind => 7993}}}}),
A = emqx:get_config([gateway, stomp]),
io:format(standard_error, "-~p~n", [A]),
ok.