chore(gw): cleanup the massive schema defination

This commit is contained in:
JianBo He 2021-08-25 11:43:33 +08:00 committed by turtleDeng
parent ef372e415d
commit 9c855ba8c2
14 changed files with 338 additions and 321 deletions

View File

@ -7,6 +7,17 @@
gateway.stomp { gateway.stomp {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
frame { frame {
max_headers = 10 max_headers = 10
max_headers_length = 1024 max_headers_length = 1024
@ -18,7 +29,7 @@ gateway.stomp {
password = "${Packet.headers.passcode}" password = "${Packet.headers.passcode}"
} }
authenticator { authentication {
name = "authenticator1" name = "authenticator1"
mechanism = password-based mechanism = password-based
server_type = built-in-database server_type = built-in-database
@ -36,42 +47,57 @@ gateway.stomp {
gateway.coap { gateway.coap {
enable_stats = false ## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
authenticator { ## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
heartbeat = 30s
notify_type = qos
subscribe_qos = qos0
publish_qos = qos1
authentication {
name = "authenticator1" name = "authenticator1"
mechanism = password-based mechanism = password-based
server_type = built-in-database server_type = built-in-database
user_id_type = clientid user_id_type = clientid
} }
heartbeat = 30s
notify_type = qos
subscribe_qos = qos0
publish_qos = qos1
listeners.udp.default { listeners.udp.default {
bind = 5683 bind = 5683
} }
} }
gateway.mqttsn { gateway.mqttsn {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
## The MQTT-SN Gateway ID in ADVERTISE message. ## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id = 1 gateway_id = 1
## Enable broadcast this gateway to WLAN ## Enable broadcast this gateway to WLAN
broadcast = true broadcast = true
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## To control whether accept and process the received ## To control whether accept and process the received
## publish message with qos=-1. ## publish message with qos=-1.
enable_qos3 = true enable_qos3 = true
## Idle timeout for a MQTT-SN channel
idle_timeout = 30s
## The pre-defined topic name corresponding to the pre-defined topic ## The pre-defined topic name corresponding to the pre-defined topic
## id of N. ## id of N.
## Note that the pre-defined topic id of 0 is reserved. ## Note that the pre-defined topic id of 0 is reserved.
@ -97,7 +123,55 @@ gateway.mqttsn {
} }
} }
gateway.lwm2m {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = "lwm2m/%e/"
xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"
lifetime_min = 1s
lifetime_max = 86400s
qmode_time_windonw = 22
auto_observe = false
## always | contains_object_list
update_msg_publish_condition = contains_object_list
translators {
command = "dn/#"
response = "up/resp"
notify = "up/notify"
register = "up/resp"
update = "up/resp"
}
listeners.udp.default {
bind = 5783
}
}
gateway.exproto { gateway.exproto {
## How long time the connection will be disconnected if the
## connection is established but no bytes received
idle_timeout = 30s
## To control whether write statistics data into ETS table
## for dashbord to read.
enable_stats = true
## When publishing or subscribing, prefix all topics with a mountpoint string.
mountpoint = ""
## The gRPC server to accept requests ## The gRPC server to accept requests
server { server {
bind = 9100 bind = 9100
@ -119,35 +193,7 @@ gateway.exproto {
max_connections = 10240 max_connections = 10240
max_conn_rate = 1000 max_conn_rate = 1000
} }
#listeners.ssl.default: {} #listeners.ssl.default: {}
#listeners.udp.default: {} #listeners.udp.default: {}
#listeners.dtls.default: {} #listeners.dtls.default: {}
} }
gateway.lwm2m {
xml_dir = "{{ platform_etc_dir }}/lwm2m_xml"
lifetime_min = 1s
lifetime_max = 86400s
qmode_time_windonw = 22
auto_observe = false
mountpoint = "lwm2m/%e/"
## always | contains_object_list
update_msg_publish_condition = contains_object_list
translators {
command = "dn/#"
response = "up/resp"
notify = "up/notify"
register = "up/resp"
update = "up/resp"
}
listeners.udp.default {
bind = 5783
}
}

View File

@ -106,7 +106,7 @@ init(ConnInfo = #{peername := {PeerHost, _},
#{ctx := Ctx} = Config) -> #{ctx := Ctx} = Config) ->
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Config, undefined), Mountpoint = maps:get(mountpoint, Config, undefined),
EnableAuth = is_authenticator_enabled(Config), EnableAuth = is_authentication_enabled(Config),
ClientInfo = set_peercert_infos( ClientInfo = set_peercert_infos(
Peercert, Peercert,
#{ zone => default #{ zone => default
@ -134,8 +134,8 @@ init(ConnInfo = #{peername := {PeerHost, _},
, keepalive = emqx_keepalive:init(maps:get(heartbeat, Config)) , keepalive = emqx_keepalive:init(maps:get(heartbeat, Config))
}. }.
is_authenticator_enabled(Cfg) -> is_authentication_enabled(Cfg) ->
case maps:get(authenticator, Cfg, #{enable => false}) of case maps:get(authentication, Cfg, #{enable => false}) of
AuthCfg when is_map(AuthCfg) -> AuthCfg when is_map(AuthCfg) ->
maps:get(enable, AuthCfg, true); maps:get(enable, AuthCfg, true);
_ -> false _ -> false
@ -297,7 +297,7 @@ handle_result(_, _, _, Channel) ->
{ok, Channel}. {ok, Channel}.
check_auth_state(Msg, #channel{config = Cfg} = Channel) -> check_auth_state(Msg, #channel{config = Cfg} = Channel) ->
Enable = is_authenticator_enabled(Cfg), Enable = is_authentication_enabled(Cfg),
check_token(Enable, Msg, Channel). check_token(Enable, Msg, Channel).
check_token(true, check_token(true,

View File

@ -51,7 +51,7 @@ load(Name, RawConf) ->
}, },
emqx_gateway_sup:load_gateway(Gateway). emqx_gateway_sup:load_gateway(Gateway).
-spec unload(gateway_name()) -> ok | {error, any()}. -spec unload(gateway_name()) -> ok | {error, not_found}.
unload(Name) -> unload(Name) ->
emqx_gateway_sup:unload_gateway(Name). emqx_gateway_sup:unload_gateway(Name).

View File

@ -20,6 +20,9 @@
-compile(nowarn_unused_function). -compile(nowarn_unused_function).
-import(emqx_mgmt_util, [ schema/1
]).
%% minirest behaviour callbacks %% minirest behaviour callbacks
-export([api_spec/0]). -export([api_spec/0]).
@ -41,21 +44,109 @@
} }
]). ]).
-define(EXAMPLE_STOMP_GATEWAY_CONF, #{ %% XXX: This is whole confs for all type gateways. It is used to fill the
frame => #{ %% default configurations and generate the swagger-schema
max_headers => 10, %%
max_headers_length => 1024, %% NOTE: It is a temporary measure to generate swagger-schema
max_body_length => 8192 -define(COAP_GATEWAY_CONFS,
}, #{<<"authentication">> =>
listener => #{ #{<<"mechanism">> => <<"password-based">>,
tcp => #{<<"default-stomp-listener">> => #{ <<"name">> => <<"authenticator1">>,
bind => <<"61613">> <<"server_type">> => <<"built-in-database">>,
}} <<"user_id_type">> => <<"clientid">>},
} <<"enable">> => true,
}). <<"enable_stats">> => true,<<"heartbeat">> => <<"30s">>,
<<"idle_timeout">> => <<"30s">>,
<<"listeners">> =>
#{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5683}}},
<<"mountpoint">> => <<>>,<<"notify_type">> => <<"qos">>,
<<"publish_qos">> => <<"qos1">>,
<<"subscribe_qos">> => <<"qos0">>}
).
-define(EXAMPLE_MQTTSN_GATEWAY_CONF, #{ -define(EXPROTO_GATEWAY_CONFS,
}). #{<<"enable">> => true,
<<"enable_stats">> => true,
<<"handler">> =>
#{<<"address">> => <<"http://127.0.0.1:9001">>},
<<"idle_timeout">> => <<"30s">>,
<<"listeners">> =>
#{<<"tcp">> =>
#{<<"default">> =>
#{<<"acceptors">> => 8,<<"bind">> => 7993,
<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240}}},
<<"mountpoint">> => <<>>,
<<"server">> => #{<<"bind">> => 9100}}
).
-define(LWM2M_GATEWAY_CONFS,
#{<<"auto_observe">> => false,
<<"enable">> => true,
<<"enable_stats">> => true,
<<"idle_timeout">> => <<"30s">>,
<<"lifetime_max">> => <<"86400s">>,
<<"lifetime_min">> => <<"1s">>,
<<"listeners">> =>
#{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5783}}},
<<"mountpoint">> => <<"lwm2m/%e/">>,
<<"qmode_time_windonw">> => 22,
<<"translators">> =>
#{<<"command">> => <<"dn/#">>,<<"notify">> => <<"up/notify">>,
<<"register">> => <<"up/resp">>,
<<"response">> => <<"up/resp">>,
<<"update">> => <<"up/resp">>},
<<"update_msg_publish_condition">> =>
<<"contains_object_list">>,
<<"xml_dir">> => <<"etc/lwm2m_xml">>}
).
-define(MQTTSN_GATEWAY_CONFS,
#{<<"broadcast">> => true,
<<"clientinfo_override">> =>
#{<<"password">> => <<"abc">>,
<<"username">> => <<"mqtt_sn_user">>},
<<"enable">> => true,
<<"enable_qos3">> => true,<<"enable_stats">> => true,
<<"gateway_id">> => 1,<<"idle_timeout">> => <<"30s">>,
<<"listeners">> =>
#{<<"udp">> =>
#{<<"default">> =>
#{<<"bind">> => 1884,<<"max_conn_rate">> => 1000,
<<"max_connections">> => 10240000}}},
<<"mountpoint">> => <<>>,
<<"predefined">> =>
[#{<<"id">> => 1,
<<"topic">> => <<"/predefined/topic/name/hello">>},
#{<<"id">> => 2,
<<"topic">> => <<"/predefined/topic/name/nice">>}]}
).
-define(STOMP_GATEWAY_CONFS,
#{<<"authentication">> =>
#{<<"mechanism">> => <<"password-based">>,
<<"name">> => <<"authenticator1">>,
<<"server_type">> => <<"built-in-database">>,
<<"user_id_type">> => <<"clientid">>},
<<"clientinfo_override">> =>
#{<<"password">> => <<"${Packet.headers.passcode}">>,
<<"username">> => <<"${Packet.headers.login}">>},
<<"enable">> => true,
<<"enable_stats">> => true,
<<"frame">> =>
#{<<"max_body_length">> => 8192,<<"max_headers">> => 10,
<<"max_headers_length">> => 1024},
<<"idle_timeout">> => <<"30s">>,
<<"listeners">> =>
#{<<"tcp">> =>
#{<<"default">> =>
#{<<"acceptors">> => 16,<<"active_n">> => 100,
<<"bind">> => 61613,<<"max_conn_rate">> => 1000,
<<"max_connections">> => 1024000}}},
<<"mountpoint">> => <<>>}
).
%% --- END
-define(EXAMPLE_GATEWAY_STATS, #{ -define(EXAMPLE_GATEWAY_STATS, #{
max_connection => 10240000, max_connection => 10240000,
@ -121,7 +212,7 @@ metadata(gateway_insta) ->
summary => <<"Not Found">>, summary => <<"Not Found">>,
value => #{ value => #{
code => <<"NOT_FOUND">>, code => <<"NOT_FOUND">>,
message => <<"gateway xxx not found">> message => <<"The gateway not found">>
} }
} }
} }
@ -140,46 +231,13 @@ metadata(gateway_insta) ->
parameters => [UriNameParamDef], parameters => [UriNameParamDef],
responses => #{ responses => #{
<<"404">> => NameNotFoundRespDef, <<"404">> => NameNotFoundRespDef,
<<"200">> => #{ <<"200">> => schema(schema_for_gateway_conf())
description => <<"OK">>,
content => #{
'application/json' => #{
schema => minirest:ref(<<"gateway_conf">>),
examples => #{
simple1 => #{
summary => <<"Stomp Gateway">>,
value => emqx_json:encode(?EXAMPLE_STOMP_GATEWAY_CONF)
},
simple2 => #{
summary => <<"MQTT-SN Gateway">>,
value => emqx_json:encode(?EXAMPLE_MQTTSN_GATEWAY_CONF)
}
}
}
}
}
} }
}, },
put => #{ put => #{
description => <<"Update the gateway configurations/status">>, description => <<"Update the gateway configurations/status">>,
parameters => [UriNameParamDef], parameters => [UriNameParamDef],
requestBody => #{ requestBody => schema(schema_for_gateway_conf()),
content => #{
'application/json' => #{
schema => minirest:ref(<<"gateway_conf">>),
examples => #{
simple1 => #{
summary => <<"Stom Gateway">>,
value => emqx_json:encode(?EXAMPLE_STOMP_GATEWAY_CONF)
},
simple2 => #{
summary => <<"MQTT-SN Gateway">>,
value => emqx_json:encode(?EXAMPLE_MQTTSN_GATEWAY_CONF)
}
}
}
}
},
responses => #{ responses => #{
<<"404">> => NameNotFoundRespDef, <<"404">> => NameNotFoundRespDef,
<<"204">> => #{description => <<"Created">>} <<"204">> => #{description => <<"Created">>}
@ -210,7 +268,6 @@ metadata(gateway_insta_stats) ->
schemas() -> schemas() ->
[ #{<<"gateway_overrview">> => schema_for_gateway_overrview()} [ #{<<"gateway_overrview">> => schema_for_gateway_overrview()}
, #{<<"gateway_conf">> => schema_for_gateway_conf()}
, #{<<"gateway_stats">> => schema_for_gateway_stats()} , #{<<"gateway_stats">> => schema_for_gateway_stats()}
]. ].
@ -262,109 +319,13 @@ schema_for_gateway_overrview() ->
schema_for_gateway_conf() -> schema_for_gateway_conf() ->
#{oneOf => #{oneOf =>
[ schema_for_gateway_conf_stomp() [ emqx_mgmt_api_configs:gen_schema(?STOMP_GATEWAY_CONFS)
, schema_for_gateway_conf_mqttsn() , emqx_mgmt_api_configs:gen_schema(?MQTTSN_GATEWAY_CONFS)
, schema_for_gateway_conf_coap() , emqx_mgmt_api_configs:gen_schema(?COAP_GATEWAY_CONFS)
, schema_for_gateway_conf_lwm2m() , emqx_mgmt_api_configs:gen_schema(?LWM2M_GATEWAY_CONFS)
, schema_for_gateway_conf_exproto() , emqx_mgmt_api_configs:gen_schema(?EXPROTO_GATEWAY_CONFS)
]}. ]}.
schema_for_clientinfo_override() ->
#{type => object,
properties => #{
clientid => #{type => string},
username => #{type => string},
password => #{type => string}
}}.
schema_for_authenticator() ->
%% TODO.
#{type => object, properties => #{
a_key => #{type => string}
}}.
schema_for_tcp_listener() ->
%% TODO.
#{type => object, properties => #{
a_key => #{type => string}
}}.
schema_for_udp_listener() ->
%% TODO.
#{type => object, properties => #{
a_key => #{type => string}
}}.
%% It should be generated by _schema.erl module
%% and emqx_gateway.conf
schema_for_gateway_conf_stomp() ->
#{type => object,
properties => #{
frame => #{
type => object,
properties => #{
max_headers => #{type => integer},
max_headers_length => #{type => integer},
max_body_length => #{type => integer}
}
},
clientinfo_override => schema_for_clientinfo_override(),
authenticator => schema_for_authenticator(),
listener => schema_for_tcp_listener()
}
}.
schema_for_gateway_conf_mqttsn() ->
#{type => object,
properties => #{
gateway_id => #{type => integer},
broadcast => #{type => boolean},
enable_stats => #{type => boolean},
enable_qos3 => #{type => boolean},
idle_timeout => #{type => integer},
predefined => #{
type => array,
items => #{
type => object,
properties => #{
id => #{type => integer},
topic => #{type => string}
}
}
},
clientinfo_override => schema_for_clientinfo_override(),
authenticator => schema_for_authenticator(),
listener => schema_for_udp_listener()
}}.
schema_for_gateway_conf_coap() ->
#{type => object,
properties => #{
clientinfo_override => schema_for_clientinfo_override(),
authenticator => schema_for_authenticator(),
listener => schema_for_udp_listener()
}}.
schema_for_gateway_conf_lwm2m() ->
#{type => object,
properties => #{
clientinfo_override => schema_for_clientinfo_override(),
authenticator => schema_for_authenticator(),
listener => schema_for_udp_listener()
}}.
schema_for_gateway_conf_exproto() ->
#{type => object,
properties => #{
clientinfo_override => schema_for_clientinfo_override(),
authenticator => schema_for_authenticator(),
listener => #{oneOf => [schema_for_tcp_listener(),
schema_for_udp_listener()
]
}
}}.
schema_for_gateway_stats() -> schema_for_gateway_stats() ->
#{type => object, #{type => object,
properties => #{ properties => #{
@ -382,13 +343,26 @@ gateway(get, Request) ->
end, end,
{200, emqx_gateway_intr:gateways(Status)}. {200, emqx_gateway_intr:gateways(Status)}.
gateway_insta(delete, _Request) -> gateway_insta(delete, Request) ->
{200, ok}; Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
gateway_insta(get, _Request) -> case emqx_gateway:unload(Name) of
{200, ok}; ok ->
gateway_insta(put, _Request) -> {200, ok};
{error, not_found} ->
{404, <<"Not Found">>};
{error, Reason} ->
{500, Reason}
end;
gateway_insta(get, Request) ->
Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
case emqx_gateway:lookup(Name) of
#{rawconf := RawConf} ->
{200, RawConf};
undefined ->
{404, <<"Not Found">>}
end;
gateway_insta(post, _Request) ->
{200, ok}. {200, ok}.
gateway_insta_stats(get, _Req) -> gateway_insta_stats(get, _Req) ->
{401, <<"Implement it later (maybe 5.1)">>}. {401, <<"Implement it later (maybe 5.1)">>}.

View File

@ -106,14 +106,14 @@ init([Gateway, Ctx0, _GwDscrptr]) ->
end. end.
do_init_context(GwName, RawConf, Ctx) -> do_init_context(GwName, RawConf, Ctx) ->
Auth = case maps:get(authenticator, RawConf, #{enable => false}) of Auth = case maps:get(authentication, RawConf, #{enable => false}) of
#{enable := false} -> undefined; #{enable := false} -> undefined;
AuthCfg when is_map(AuthCfg) -> AuthCfg when is_map(AuthCfg) ->
case maps:get(enable, AuthCfg, true) of case maps:get(enable, AuthCfg, true) of
false -> false ->
undefined; undefined;
_ -> _ ->
create_authenticator_for_gateway_insta(GwName, AuthCfg) create_authentication_for_gateway_insta(GwName, AuthCfg)
end; end;
_ -> _ ->
undefined undefined
@ -121,7 +121,7 @@ do_init_context(GwName, RawConf, Ctx) ->
Ctx#{auth => Auth}. Ctx#{auth => Auth}.
do_deinit_context(Ctx) -> do_deinit_context(Ctx) ->
cleanup_authenticator_for_gateway_insta(maps:get(auth, Ctx)), cleanup_authentication_for_gateway_insta(maps:get(auth, Ctx)),
ok. ok.
handle_call(info, _From, State = #state{gw = Gateway}) -> handle_call(info, _From, State = #state{gw = Gateway}) ->
@ -227,24 +227,24 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
create_authenticator_for_gateway_insta(GwName, AuthCfg) -> create_authentication_for_gateway_insta(GwName, AuthCfg) ->
ChainId = atom_to_binary(GwName, utf8), ChainId = atom_to_binary(GwName, utf8),
case emqx_authn:create_chain(#{id => ChainId}) of case emqx_authn:create_chain(#{id => ChainId}) of
{ok, _ChainInfo} -> {ok, _ChainInfo} ->
case emqx_authn:create_authenticator(ChainId, AuthCfg) of case emqx_authn:create_authenticator(ChainId, AuthCfg) of
{ok, _} -> ChainId; {ok, _} -> ChainId;
{error, Reason} -> {error, Reason} ->
logger:error("Failed to create authenticator ~p", [Reason]), logger:error("Failed to create authentication ~p", [Reason]),
throw({bad_autheticator, Reason}) throw({bad_authentication, Reason})
end; end;
{error, Reason} -> {error, Reason} ->
logger:error("Failed to create authentication chain: ~p", [Reason]), logger:error("Failed to create authentication chain: ~p", [Reason]),
throw({bad_chain, {ChainId, Reason}}) throw({bad_chain, {ChainId, Reason}})
end. end.
cleanup_authenticator_for_gateway_insta(undefined) -> cleanup_authentication_for_gateway_insta(undefined) ->
ok; ok;
cleanup_authenticator_for_gateway_insta(ChainId) -> cleanup_authentication_for_gateway_insta(ChainId) ->
case emqx_authn:delete_chain(ChainId) of case emqx_authn:delete_chain(ChainId) of
ok -> ok; ok -> ok;
{error, {not_found, _}} -> {error, {not_found, _}} ->

View File

@ -40,7 +40,7 @@ gateways(Status) ->
case emqx_gateway:lookup(GwName) of case emqx_gateway:lookup(GwName) of
undefined -> #{name => GwName, status => unloaded}; undefined -> #{name => GwName, status => unloaded};
GwInfo = #{rawconf := RawConf} -> GwInfo = #{rawconf := RawConf} ->
GwInfo0 = unix_ts_to_rfc3339( GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
[created_at, started_at, stopped_at], [created_at, started_at, stopped_at],
GwInfo), GwInfo),
GwInfo1 = maps:with([name, GwInfo1 = maps:with([name,
@ -76,14 +76,3 @@ get_listeners_status(GwName, RawConf) ->
%% @private %% @private
listener_name(GwName, Type, LisName) -> listener_name(GwName, Type, LisName) ->
list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])).
%% @private
unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) ->
lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys);
unix_ts_to_rfc3339(Key, Map) ->
case maps:get(Key, Map, undefined) of
undefined -> Map;
Ts ->
Map#{Key =>
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)}
end.

View File

@ -62,10 +62,8 @@ fields(gateway) ->
fields(stomp_structs) -> fields(stomp_structs) ->
[ {frame, t(ref(stomp_frame))} [ {frame, t(ref(stomp_frame))}
, {clientinfo_override, t(ref(clientinfo_override))}
, {authenticator, t(authenticator(), undefined, undefined)}
, {listeners, t(ref(tcp_listener_group))} , {listeners, t(ref(tcp_listener_group))}
]; ] ++ gateway_common_options();
fields(stomp_frame) -> fields(stomp_frame) ->
[ {max_headers, t(integer(), undefined, 10)} [ {max_headers, t(integer(), undefined, 10)}
@ -76,39 +74,40 @@ fields(stomp_frame) ->
fields(mqttsn_structs) -> fields(mqttsn_structs) ->
[ {gateway_id, t(integer())} [ {gateway_id, t(integer())}
, {broadcast, t(boolean())} , {broadcast, t(boolean())}
, {enable_stats, t(boolean())}
, {enable_qos3, t(boolean())} , {enable_qos3, t(boolean())}
, {idle_timeout, t(duration())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))} , {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {clientinfo_override, t(ref(clientinfo_override))}
, {authenticator, t(authenticator(), undefined, undefined)}
, {listeners, t(ref(udp_listener_group))} , {listeners, t(ref(udp_listener_group))}
]; ] ++ gateway_common_options();
fields(mqttsn_predefined) -> fields(mqttsn_predefined) ->
[ {id, t(integer())} [ {id, t(integer())}
, {topic, t(string())} , {topic, t(binary())}
]; ];
fields(coap_structs) ->
[ {heartbeat, t(duration(), undefined, "30s")}
, {notify_type, t(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {listeners, t(ref(udp_listener_group))}
] ++ gateway_common_options();
fields(lwm2m_structs) -> fields(lwm2m_structs) ->
[ {xml_dir, t(string())} [ {xml_dir, t(binary())}
, {lifetime_min, t(duration())} , {lifetime_min, t(duration())}
, {lifetime_max, t(duration())} , {lifetime_max, t(duration())}
, {qmode_time_windonw, t(integer())} , {qmode_time_windonw, t(integer())}
, {auto_observe, t(boolean())} , {auto_observe, t(boolean())}
, {mountpoint, t(string())}
, {update_msg_publish_condition, t(union([always, contains_object_list]))} , {update_msg_publish_condition, t(union([always, contains_object_list]))}
, {translators, t(ref(translators))} , {translators, t(ref(translators))}
, {authenticator, t(authenticator(), undefined, undefined)}
, {listeners, t(ref(udp_listener_group))} , {listeners, t(ref(udp_listener_group))}
]; ] ++ gateway_common_options();
fields(exproto_structs) -> fields(exproto_structs) ->
[ {server, t(ref(exproto_grpc_server))} [ {server, t(ref(exproto_grpc_server))}
, {handler, t(ref(exproto_grpc_handler))} , {handler, t(ref(exproto_grpc_handler))}
, {authenticator, t(authenticator(), undefined, undefined)}
, {listeners, t(ref(udp_tcp_listener_group))} , {listeners, t(ref(udp_tcp_listener_group))}
]; ] ++ gateway_common_options();
fields(exproto_grpc_server) -> fields(exproto_grpc_server) ->
[ {bind, t(union(ip_port(), integer()))} [ {bind, t(union(ip_port(), integer()))}
@ -116,18 +115,18 @@ fields(exproto_grpc_server) ->
]; ];
fields(exproto_grpc_handler) -> fields(exproto_grpc_handler) ->
[ {address, t(string())} [ {address, t(binary())}
%% TODO: ssl %% TODO: ssl
]; ];
fields(clientinfo_override) -> fields(clientinfo_override) ->
[ {username, t(string())} [ {username, t(binary())}
, {password, t(string())} , {password, t(binary())}
, {clientid, t(string())} , {clientid, t(binary())}
]; ];
fields(translators) -> fields(translators) ->
[{"$name", t(string())}]; [{"$name", t(binary())}];
fields(udp_listener_group) -> fields(udp_listener_group) ->
[ {udp, t(ref(udp_listener))} [ {udp, t(ref(udp_listener))}
@ -164,7 +163,6 @@ fields(listener_settings) ->
, {max_connections, t(integer(), undefined, 1024)} , {max_connections, t(integer(), undefined, 1024)}
, {max_conn_rate, t(integer())} , {max_conn_rate, t(integer())}
, {active_n, t(integer(), undefined, 100)} , {active_n, t(integer(), undefined, 100)}
%, {zone, t(string())}
%, {rate_limit, t(comma_separated_list())} %, {rate_limit, t(comma_separated_list())}
, {access, t(ref(access))} , {access, t(ref(access))}
, {proxy_protocol, t(boolean())} , {proxy_protocol, t(boolean())}
@ -208,27 +206,14 @@ fields(dtls_listener_settings) ->
, reuse_sessions => true}) ++ fields(listener_settings); , reuse_sessions => true}) ++ fields(listener_settings);
fields(access) -> fields(access) ->
[ {"$id", #{type => string(), [ {"$id", #{type => binary(),
nullable => true}}]; nullable => true}}];
fields(coap) ->
[{"$id", t(ref(coap_structs))}];
fields(coap_structs) ->
[ {enable_stats, t(boolean(), undefined, true)}
, {heartbeat, t(duration(), undefined, "30s")}
, {notify_type, t(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {authenticator, t(authenticator(), undefined, undefined)}
, {listeners, t(ref(udp_listener_group))}
];
fields(ExtraField) -> fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"), Mod = list_to_atom(ExtraField++"_schema"),
Mod:fields(ExtraField). Mod:fields(ExtraField).
authenticator() -> authentication() ->
hoconsc:union( hoconsc:union(
[ undefined [ undefined
, hoconsc:ref(emqx_authn_mnesia, config) , hoconsc:ref(emqx_authn_mnesia, config)
@ -252,6 +237,15 @@ authenticator() ->
% %
%translations(_) -> []. %translations(_) -> [].
gateway_common_options() ->
[ {enable, t(boolean(), undefined, true)}
, {enable_stats, t(boolean(), undefined, true)}
, {idle_timeout, t(duration(), undefined, "30s")}
, {mountpoint, t(binary())}
, {clientinfo_override, t(ref(clientinfo_override))}
, {authentication, t(authentication(), undefined, undefined)}
].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
@ -289,9 +283,9 @@ ssl(Mapping, Defaults) ->
end end, end end,
D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end,
[ {"enable", t(boolean(), M("enable"), D("enable"))} [ {"enable", t(boolean(), M("enable"), D("enable"))}
, {"cacertfile", t(string(), M("cacertfile"), D("cacertfile"))} , {"cacertfile", t(binary(), M("cacertfile"), D("cacertfile"))}
, {"certfile", t(string(), M("certfile"), D("certfile"))} , {"certfile", t(binary(), M("certfile"), D("certfile"))}
, {"keyfile", t(string(), M("keyfile"), D("keyfile"))} , {"keyfile", t(binary(), M("keyfile"), D("keyfile"))}
, {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))} , {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))}
, {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))} , {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))}
, {"secure_renegotiate", t(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))} , {"secure_renegotiate", t(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))}
@ -299,12 +293,12 @@ ssl(Mapping, Defaults) ->
, {"honor_cipher_order", t(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))} , {"honor_cipher_order", t(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))}
, {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))} , {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))}
, {"depth", t(integer(), M("depth"), D("depth"))} , {"depth", t(integer(), M("depth"), D("depth"))}
, {"password", hoconsc:t(string(), #{mapping => M("key_password"), , {"password", hoconsc:t(binary(), #{mapping => M("key_password"),
default => D("key_password"), default => D("key_password"),
sensitive => true sensitive => true
})} })}
, {"dhfile", t(string(), M("dhfile"), D("dhfile"))} , {"dhfile", t(binary(), M("dhfile"), D("dhfile"))}
, {"server_name_indication", t(union(disable, string()), M("server_name_indication"), , {"server_name_indication", t(union(disable, binary()), M("server_name_indication"),
D("server_name_indication"))} D("server_name_indication"))}
, {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))} , {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))}
, {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))} , {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))}

View File

@ -52,7 +52,10 @@ load_gateway(Gateway = #{name := GwName}) ->
emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr) emqx_gateway_gw_sup:create_insta(GwSup, Gateway, GwDscrptr)
end. end.
-spec unload_gateway(gateway_name()) -> ok | {error, not_found}. -spec unload_gateway(gateway_name())
-> ok
| {error, not_found}
| {error, any()}.
unload_gateway(GwName) -> unload_gateway(GwName) ->
case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of
false -> {error, not_found}; false -> {error, not_found};

View File

@ -28,6 +28,7 @@
-export([ apply/2 -export([ apply/2
, format_listenon/1 , format_listenon/1
, unix_ts_to_rfc3339/2
]). ]).
-export([ normalize_rawconf/1 -export([ normalize_rawconf/1
@ -107,6 +108,16 @@ format_listenon({Addr, Port}) when is_list(Addr) ->
format_listenon({Addr, Port}) when is_tuple(Addr) -> format_listenon({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) ->
lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys);
unix_ts_to_rfc3339(Key, Map) ->
case maps:get(Key, Map, undefined) of
undefined -> Map;
Ts ->
Map#{Key =>
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)}
end.
-spec normalize_rawconf(rawconf()) -spec normalize_rawconf(rawconf())
-> list({ Type :: udp | tcp | ssl | dtls -> list({ Type :: udp | tcp | ssl | dtls
, Name :: atom() , Name :: atom()

View File

@ -49,6 +49,7 @@
%% API Function Definitions %% API Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
-spec start_link(binary() | string()) -> {ok, pid()} | ignore | {error, any()}.
start_link(XmlDir) -> start_link(XmlDir) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [XmlDir], []).
@ -85,10 +86,10 @@ stop() ->
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([XmlDir]) -> init([XmlDir0]) ->
_ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_DEF_TAB, [set, named_table, protected]),
_ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]), _ = ets:new(?LWM2M_OBJECT_NAME_TO_ID_TAB, [set, named_table, protected]),
load(XmlDir), load(to_list(XmlDir0)),
{ok, #state{}}. {ok, #state{}}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
@ -140,3 +141,7 @@ load_xml(FileName) ->
[ObjectXml] = xmerl_xpath:string("/LWM2M/Object", Xml), [ObjectXml] = xmerl_xpath:string("/LWM2M/Object", Xml),
ObjectXml. ObjectXml.
to_list(B) when is_binary(B) ->
binary_to_list(B);
to_list(S) when is_list(S) ->
S.

View File

@ -70,7 +70,7 @@ set_special_cfg(emqx_gateway) ->
#{authentication => #{enable => false}, #{authentication => #{enable => false},
server => #{bind => 9100}, server => #{bind => 9100},
handler => #{address => "http://127.0.0.1:9001"}, handler => #{address => "http://127.0.0.1:9001"},
listener => listener_confs(LisType) listeners => listener_confs(LisType)
}); });
set_special_cfg(_App) -> set_special_cfg(_App) ->
ok. ok.

View File

@ -29,26 +29,24 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
gateway: { gateway.lwm2m {
lwm2m: { xml_dir = \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\"
xml_dir: \"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml\" lifetime_min = 1s
lifetime_min: 1s lifetime_max = 86400s
lifetime_max: 86400s qmode_time_windonw = 22
qmode_time_windonw: 22 auto_observe = false
auto_observe: false mountpoint = \"lwm2m/%e/\"
mountpoint: \"lwm2m/%e/\" update_msg_publish_condition = contains_object_list
update_msg_publish_condition: contains_object_list translators {
translators: { command = \"dn/#\"
command: \"dn/#\" response = \"up/resp\"
response: \"up/resp\" notify = \"up/notify\"
notify: \"up/notify\" register = \"up/resp\"
register: \"up/resp\" update = \"up/resp\"
update: \"up/resp\" }
} listeners.udp.default {
listener.udp.1 { bind = 5783
bind: 5783 }
}
}
} }
">>). ">>).

View File

@ -52,26 +52,25 @@
integer_to_list(erlang:system_time())])). integer_to_list(erlang:system_time())])).
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
gateway: { gateway.mqttsn {
mqttsn: { gateway_id = 1
gateway_id: 1 broadcast = true
broadcast: true enable_qos3 = true
enable_stats: true predefined = [
enable_qos3: true { id = 1,
predefined: [ topic = \"/predefined/topic/name/hello\"
{id: 1, topic: \"/predefined/topic/name/hello\"}, },
{id: 2, topic: \"/predefined/topic/name/nice\"} { id = 2,
] topic = \"/predefined/topic/name/nice\"
clientinfo_override: {
username: \"user1\"
password: \"pw123\"
}
listener.udp.1: {
bind: 1884
max_connections: 10240000
max_conn_rate: 1000
}
} }
]
clientinfo_override {
username = \"user1\"
password = \"pw123\"
}
listeners.udp.default {
bind = 1884
}
} }
">>). ">>).
@ -98,7 +97,7 @@ end_per_suite(_) ->
%% Connect %% Connect
t_connect(_) -> t_connect(_) ->
SockName = {'mqttsn:udp', 1884}, SockName = {'mqttsn:udp:default', 1884},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),

View File

@ -24,16 +24,14 @@
-define(HEARTBEAT, <<$\n>>). -define(HEARTBEAT, <<$\n>>).
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
gateway: { gateway.stomp {
stomp: { clientinfo_override {
clientinfo_override: { username = \"${Packet.headers.login}\"
username: \"${Packet.headers.login}\" password = \"${Packet.headers.passcode}\"
password: \"${Packet.headers.passcode}\" }
} listeners.tcp.default {
listener.tcp.1: { bind = 61613
bind: 61613 }
}
}
} }
">>). ">>).