Merge pull request #5997 from HJianBo/gw-fixes-part3

Gateway improvement part.3
This commit is contained in:
JianBo He 2021-11-19 09:29:51 +08:00 committed by GitHub
commit 905e58c93a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1968 additions and 1125 deletions

View File

@ -75,6 +75,8 @@
-export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]). -export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]).
-export([sc/2, map/2]). -export([sc/2, map/2]).
-elvis([{elvis_style, god_modules, disable}]).
namespace() -> undefined. namespace() -> undefined.
roots() -> roots() ->
@ -898,7 +900,8 @@ fields("alarm") ->
Currently supports two actions, 'log' and 'publish'. Currently supports two actions, 'log' and 'publish'.
'log' is to write the alarm to log (console or file). 'log' is to write the alarm to log (console or file).
'publish' is to publish the alarm as an MQTT message to the system topics: 'publish' is to publish the alarm as an MQTT message to the system topics:
<code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate</code> and <code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/deactivate</code>""" <code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate</code> and
<code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/deactivate</code>"""
}) })
} }
, {"size_limit", , {"size_limit",
@ -916,8 +919,9 @@ When this limit is exceeded, the oldest deactivated alarms are deleted to cap th
#{ default => "24h", #{ default => "24h",
example => "24h", example => "24h",
desc => desc =>
"""Retention time of deactivated alarms. Alarms are not deleted immediately when deactivated, but after the retention time. """Retention time of deactivated alarms. Alarms are not deleted immediately
""" when deactivated, but after the retention time.
"""
}) })
} }
]. ].
@ -1181,7 +1185,8 @@ default_tls_vsns(dtls_all_available) ->
default_tls_vsns(tls_all_available) -> default_tls_vsns(tls_all_available) ->
emqx_tls_lib:default_versions(). emqx_tls_lib:default_versions().
-spec ciphers_schema(quic | dtls_all_available | tls_all_available | undefined) -> hocon_schema:field_schema(). -spec ciphers_schema(quic | dtls_all_available | tls_all_available | undefined)
-> hocon_schema:field_schema().
ciphers_schema(Default) -> ciphers_schema(Default) ->
sc(hoconsc:array(string()), sc(hoconsc:array(string()),
#{ default => default_ciphers(Default) #{ default => default_ciphers(Default)

View File

@ -22,8 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -31,35 +29,37 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
ok = emqx_common_test_helpers:start_apps([emqx_authz]), [emqx_conf, emqx_authz], fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema),
ok. ok.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, []), {ok, _} = emqx_authz:update(?CMD_REPLACE, []),
Config. Config.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok.
-define(SOURCE1, #{<<"type">> => <<"http">>, -define(SOURCE1, #{<<"type">> => <<"http">>,
<<"enable">> => true, <<"enable">> => true,
<<"url">> => <<"https://fake.com:443/">>, <<"url">> => <<"https://fake.com:443/">>,
@ -153,7 +153,9 @@ t_update_source(_) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, []). {ok, _} = emqx_authz:update(?CMD_REPLACE, []).
t_move_source(_) -> t_move_source(_) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]), {ok, _} = emqx_authz:update(?CMD_REPLACE,
[?SOURCE1, ?SOURCE2, ?SOURCE3,
?SOURCE4, ?SOURCE5, ?SOURCE6]),
?assertMatch([ #{type := http} ?assertMatch([ #{type := http}
, #{type := mongodb} , #{type := mongodb}
, #{type := mysql} , #{type := mysql}

View File

@ -22,26 +22,10 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"""
authorization
{sources = [
{ type = \"built-in-database\"
enable = true
}
]}
""">>).
-define(HOST, "http://127.0.0.1:18083/"). -define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5"). -define(API_VERSION, "v5").
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
roots() -> ["authorization"].
fields("authorization") ->
emqx_authz_schema:fields("authorization") ++
emqx_schema:fields("authorization").
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -49,13 +33,18 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_authz, emqx_dashboard], ok = emqx_common_test_helpers:start_apps(
fun set_special_configs/1), [emqx_conf, emqx_authz, emqx_dashboard],
fun set_special_configs/1),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_dashboard]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
@ -70,9 +59,10 @@ set_special_configs(emqx_dashboard) ->
emqx_config:put([emqx_dashboard], Config), emqx_config:put([emqx_dashboard], Config),
ok; ok;
set_special_configs(emqx_authz) -> set_special_configs(emqx_authz) ->
ok = emqx_config:init_load(?MODULE, ?CONF_DEFAULT),
{ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny), {ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources],
[#{<<"type">> => <<"built-in-database">>}]),
ok; ok;
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.
@ -95,7 +85,9 @@ t_api(_) ->
, uri(["authorization", "sources", "built-in-database", "username", "user1"]) , uri(["authorization", "sources", "built-in-database", "username", "user1"])
, []), , []),
#{<<"data">> := [#{<<"username">> := <<"user1">>, <<"rules">> := Rules1}], #{<<"data">> := [#{<<"username">> := <<"user1">>, <<"rules">> := Rules1}],
<<"meta">> := #{<<"count">> := 1,<<"limit">> := 100,<<"page">> := 1}} = jsx:decode(Request1), <<"meta">> := #{<<"count">> := 1,
<<"limit">> := 100,
<<"page">> := 1}} = jsx:decode(Request1),
#{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2), #{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2),
?assertEqual(3, length(Rules1)), ?assertEqual(3, length(Rules1)),

View File

@ -22,8 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
-define(HOST, "http://127.0.0.1:18083/"). -define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5"). -define(API_VERSION, "v5").
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
@ -35,14 +33,18 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_authz, emqx_dashboard], fun set_special_configs/1), ok = emqx_common_test_helpers:start_apps(
{ok, _} = emqx:update_config([authorization, cache, enable], false), [emqx_conf, emqx_authz, emqx_dashboard],
{ok, _} = emqx:update_config([authorization, no_match], deny), fun set_special_configs/1),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_resource, emqx_authz, emqx_dashboard]), {ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
@ -56,6 +58,11 @@ set_special_configs(emqx_dashboard) ->
}, },
emqx_config:put([emqx_dashboard], Config), emqx_config:put([emqx_dashboard], Config),
ok; ok;
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.

View File

@ -22,8 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
-define(HOST, "http://127.0.0.1:18083/"). -define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5"). -define(API_VERSION, "v5").
-define(BASE_PATH, "api"). -define(BASE_PATH, "api").
@ -83,7 +81,9 @@
}). }).
-define(SOURCE6, #{<<"type">> => <<"file">>, -define(SOURCE6, #{<<"type">> => <<"file">>,
<<"enable">> => true, <<"enable">> => true,
<<"rules">> => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">> <<"rules">> =>
<<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}."
"\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
}). }).
all() -> all() ->
@ -94,35 +94,29 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, create_dry_run, fun(emqx_connector_mysql, _) -> {ok, meck_data}; meck:expect(emqx_resource, create_dry_run,
(T, C) -> meck:passthrough([T, C]) fun(emqx_connector_mysql, _) -> {ok, meck_data};
end), (T, C) -> meck:passthrough([T, C])
end),
meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, health_check, fun(_) -> ok end), meck:expect(emqx_resource, health_check, fun(_) -> ok end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz, emqx_dashboard],
ok = emqx_common_test_helpers:start_apps([emqx_authz, emqx_dashboard], fun set_special_configs/1), fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_resource, emqx_authz, emqx_dashboard]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema),
ok. ok.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
@ -137,7 +131,9 @@ set_special_configs(emqx_dashboard) ->
emqx_config:put([emqx_dashboard], Config), emqx_config:put([emqx_dashboard], Config),
ok; ok;
set_special_configs(emqx_authz) -> set_special_configs(emqx_authz) ->
emqx_config:put([authorization], #{sources => []}), {ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok; ok;
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.
@ -147,10 +143,11 @@ init_per_testcase(t_api, Config) ->
meck:expect(emqx_misc, gen_id, fun() -> "fake" end), meck:expect(emqx_misc, gen_id, fun() -> "fake" end),
meck:new(emqx, [non_strict, passthrough, no_history, no_link]), meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx, data_dir, fun() -> meck:expect(emqx, data_dir,
{data_dir, Data} = lists:keyfind(data_dir, 1, Config), fun() ->
Data {data_dir, Data} = lists:keyfind(data_dir, 1, Config),
end), Data
end),
Config; Config;
init_per_testcase(_, Config) -> Config. init_per_testcase(_, Config) -> Config.
@ -168,7 +165,8 @@ t_api(_) ->
{ok, 200, Result1} = request(get, uri(["authorization", "sources"]), []), {ok, 200, Result1} = request(get, uri(["authorization", "sources"]), []),
?assertEqual([], get_sources(Result1)), ?assertEqual([], get_sources(Result1)),
{ok, 204, _} = request(put, uri(["authorization", "sources"]), [?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]), {ok, 204, _} = request(put, uri(["authorization", "sources"]),
[?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6]),
{ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1), {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1),
{ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []), {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []),
@ -182,7 +180,8 @@ t_api(_) ->
], Sources), ], Sources),
?assert(filelib:is_file(emqx_authz:acl_conf_file())), ?assert(filelib:is_file(emqx_authz:acl_conf_file())),
{ok, 204, _} = request(put, uri(["authorization", "sources", "http"]), ?SOURCE1#{<<"enable">> := false}), {ok, 204, _} = request(put, uri(["authorization", "sources", "http"]),
?SOURCE1#{<<"enable">> := false}),
{ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []), {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []),
?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result3)), ?assertMatch(#{<<"type">> := <<"http">>, <<"enable">> := false}, jsx:decode(Result3)),
@ -207,14 +206,28 @@ t_api(_) ->
?assert(filelib:is_file(filename:join([data_dir(), "certs", "cert-fake.pem"]))), ?assert(filelib:is_file(filename:join([data_dir(), "certs", "cert-fake.pem"]))),
?assert(filelib:is_file(filename:join([data_dir(), "certs", "key-fake.pem"]))), ?assert(filelib:is_file(filename:join([data_dir(), "certs", "key-fake.pem"]))),
{ok, 204, _} = request(put, uri(["authorization", "sources", "mysql"]), ?SOURCE3#{<<"server">> := <<"192.168.1.100:3306">>}), {ok, 204, _} = request(
put,
uri(["authorization", "sources", "mysql"]),
?SOURCE3#{<<"server">> := <<"192.168.1.100:3306">>}),
{ok, 400, _} = request(put, uri(["authorization", "sources", "postgresql"]), ?SOURCE4#{<<"server">> := <<"fake">>}), {ok, 400, _} = request(
{ok, 400, _} = request(put, uri(["authorization", "sources", "redis"]), ?SOURCE5#{<<"servers">> := [<<"192.168.1.100:6379">>, <<"192.168.1.100:6380">>]}), put,
uri(["authorization", "sources", "postgresql"]),
?SOURCE4#{<<"server">> := <<"fake">>}),
{ok, 400, _} = request(
put,
uri(["authorization", "sources", "redis"]),
?SOURCE5#{<<"servers">> := [<<"192.168.1.100:6379">>,
<<"192.168.1.100:6380">>]}),
lists:foreach(fun(#{<<"type">> := Type}) -> lists:foreach(
{ok, 204, _} = request(delete, uri(["authorization", "sources", binary_to_list(Type)]), []) fun(#{<<"type">> := Type}) ->
end, Sources), {ok, 204, _} = request(
delete,
uri(["authorization", "sources", binary_to_list(Type)]),
[])
end, Sources),
{ok, 200, Result5} = request(get, uri(["authorization", "sources"]), []), {ok, 200, Result5} = request(get, uri(["authorization", "sources"]), []),
?assertEqual([], get_sources(Result5)), ?assertEqual([], get_sources(Result5)),
?assertEqual([], emqx:get_config([authorization, sources])), ?assertEqual([], emqx:get_config([authorization, sources])),

View File

@ -21,7 +21,6 @@
-include("emqx_authz.hrl"). -include("emqx_authz.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -30,22 +29,14 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
ok = emqx_common_test_helpers:start_apps([emqx_authz]), [emqx_conf, emqx_authz],
fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{<<"type">> => <<"http">>, Rules = [#{<<"type">> => <<"http">>,
<<"url">> => <<"https://fake.com:443/">>, <<"url">> => <<"https://fake.com:443/">>,
<<"headers">> => #{}, <<"headers">> => #{},
@ -57,10 +48,21 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -22,8 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -31,26 +29,28 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), ok = emqx_common_test_helpers:start_apps(
meck:expect(emqx_schema, fields, fun("authorization") -> [emqx_conf, emqx_authz],
meck:passthrough(["authorization"]) ++ fun set_special_configs/1
emqx_authz_schema:fields("authorization"); ),
(F) -> meck:passthrough([F])
end),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT),
ok = emqx_common_test_helpers:start_apps([emqx_authz]),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{<<"type">> => <<"built-in-database">>}],
{ok, _} = emqx_authz:update(replace, Rules),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz]), [authorization],
meck:unload(emqx_schema), #{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources],
[#{<<"type">> => <<"built-in-database">>}]),
ok;
set_special_configs(_App) ->
ok. ok.
init_per_testcase(t_authz, Config) -> init_per_testcase(t_authz, Config) ->
@ -96,13 +96,19 @@ t_authz(_) ->
listener => {tcp, default} listener => {tcp, default}
}, },
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), ClientInfo1, subscribe, <<"#">>)),
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo1, publish, <<"#">>)),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), ?assertEqual(allow, emqx_access_control:authorize(
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, subscribe, <<"#">>)), ClientInfo2, publish, <<"test/test_username">>)),
?assertEqual(allow, emqx_access_control:authorize(
ClientInfo2, subscribe, <<"#">>)),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo3, publish, <<"test/test_clientid">>)), ?assertEqual(allow, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"#">>)), ClientInfo3, publish, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, subscribe, <<"#">>)),
ok. ok.

View File

@ -22,8 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -31,21 +29,15 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
ok = emqx_common_test_helpers:start_apps([emqx_authz]), [emqx_conf, emqx_authz],
{ok, _} = emqx:update_config([authorization, cache, enable], false), fun set_special_configs/1
{ok, _} = emqx:update_config([authorization, no_match], deny), ),
Rules = [#{<<"type">> => <<"mongodb">>, Rules = [#{<<"type">> => <<"mongodb">>,
<<"mongo_type">> => <<"single">>, <<"mongo_type">> => <<"single">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"server">> => <<"127.0.0.1:27017">>,
@ -59,10 +51,21 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok. ok.
-define(SOURCE1,[#{<<"topics">> => [<<"#">>], -define(SOURCE1,[#{<<"topics">> => [<<"#">>],
@ -115,11 +118,17 @@ t_authz(_) ->
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
meck:expect(emqx_resource, query, fun(_, _) -> ?SOURCE3 ++ ?SOURCE4 end), meck:expect(emqx_resource, query, fun(_, _) -> ?SOURCE3 ++ ?SOURCE4 end),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, subscribe, <<"test/test_clientid">>)), ?assertEqual(allow, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_clientid">>)), ClientInfo2, subscribe, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, subscribe, <<"test/test_username">>)), ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), ClientInfo2, publish, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"test">>)), % nomatch ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, publish, <<"test">>)), % nomatch ClientInfo2, subscribe, <<"test/test_username">>)),
?assertEqual(allow, emqx_access_control:authorize(
ClientInfo2, publish, <<"test/test_username">>)),
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, subscribe, <<"test">>)), % nomatch
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, publish, <<"test">>)), % nomatch
ok. ok.

View File

@ -31,22 +31,14 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
ok = emqx_common_test_helpers:start_apps([emqx_authz]), [emqx_conf, emqx_authz],
fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{<<"type">> => <<"mysql">>, Rules = [#{<<"type">> => <<"mysql">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1, <<"pool_size">> => 1,
@ -61,10 +53,21 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok. ok.
-define(COLUMNS, [ <<"action">> -define(COLUMNS, [ <<"action">>
@ -113,11 +116,17 @@ t_authz(_) ->
?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE3 ++ ?SOURCE4} end), meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE3 ++ ?SOURCE4} end),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, subscribe, <<"test/test_clientid">>)), ?assertEqual(allow, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_clientid">>)), ClientInfo2, subscribe, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, subscribe, <<"test/test_username">>)), ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), ClientInfo2, publish, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"test">>)), % nomatch ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, publish, <<"test">>)), % nomatch ClientInfo2, subscribe, <<"test/test_username">>)),
?assertEqual(allow, emqx_access_control:authorize(
ClientInfo2, publish, <<"test/test_username">>)),
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, subscribe, <<"test">>)), % nomatch
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, publish, <<"test">>)), % nomatch
ok. ok.

View File

@ -22,8 +22,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -31,22 +29,14 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
ok = emqx_common_test_helpers:start_apps([emqx_authz]), [emqx_conf, emqx_authz],
fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{<<"type">> => <<"postgresql">>, Rules = [#{<<"type">> => <<"postgresql">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1, <<"pool_size">> => 1,
@ -61,10 +51,21 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), [authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok. ok.
-define(COLUMNS, [ {column, <<"action">>, meck, meck, meck, meck, meck, meck, meck} -define(COLUMNS, [ {column, <<"action">>, meck, meck, meck, meck, meck, meck, meck}
@ -113,11 +114,17 @@ t_authz(_) ->
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, subscribe, <<"+">>)), ?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, subscribe, <<"+">>)),
meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE3 ++ ?SOURCE4} end), meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE3 ++ ?SOURCE4} end),
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, subscribe, <<"test/test_clientid">>)), ?assertEqual(allow, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_clientid">>)), ClientInfo2, subscribe, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, subscribe, <<"test/test_username">>)), ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(allow, emqx_access_control:authorize(ClientInfo2, publish, <<"test/test_username">>)), ClientInfo2, publish, <<"test/test_clientid">>)),
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, subscribe, <<"test">>)), % nomatch ?assertEqual(deny, emqx_access_control:authorize(
?assertEqual(deny, emqx_access_control:authorize(ClientInfo3, publish, <<"test">>)), % nomatch ClientInfo2, subscribe, <<"test/test_username">>)),
?assertEqual(allow, emqx_access_control:authorize(
ClientInfo2, publish, <<"test/test_username">>)),
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, subscribe, <<"test">>)), % nomatch
?assertEqual(deny, emqx_access_control:authorize(
ClientInfo3, publish, <<"test">>)), % nomatch
ok. ok.

View File

@ -30,22 +30,14 @@ groups() ->
[]. [].
init_per_suite(Config) -> init_per_suite(Config) ->
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("authorization") ->
meck:passthrough(["authorization"]) ++
emqx_authz_schema:fields("authorization");
(F) -> meck:passthrough([F])
end),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ), meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
meck:expect(emqx_resource, remove, fun(_) -> ok end ), meck:expect(emqx_resource, remove, fun(_) -> ok end ),
ok = emqx_config:init_load(emqx_authz_schema, ?CONF_DEFAULT), ok = emqx_common_test_helpers:start_apps(
ok = emqx_common_test_helpers:start_apps([emqx_authz]), [emqx_conf, emqx_authz],
fun set_special_configs/1),
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
Rules = [#{<<"type">> => <<"redis">>, Rules = [#{<<"type">> => <<"redis">>,
<<"server">> => <<"127.0.0.1:27017">>, <<"server">> => <<"127.0.0.1:27017">>,
<<"pool_size">> => 1, <<"pool_size">> => 1,
@ -59,10 +51,21 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
{ok, _} = emqx_authz:update(replace, []), {ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok. ok.
-define(SOURCE1, [<<"test/%u">>, <<"publish">>]). -define(SOURCE1, [<<"test/%u">>, <<"publish">>]).

View File

@ -26,23 +26,42 @@
-define(SOURCE2, {allow, {ipaddr, "127.0.0.1"}, all, [{eq, "#"}, {eq, "+"}]}). -define(SOURCE2, {allow, {ipaddr, "127.0.0.1"}, all, [{eq, "#"}, {eq, "+"}]}).
-define(SOURCE3, {allow, {ipaddrs, ["127.0.0.1", "192.168.1.0/24"]}, subscribe, ["%c"]}). -define(SOURCE3, {allow, {ipaddrs, ["127.0.0.1", "192.168.1.0/24"]}, subscribe, ["%c"]}).
-define(SOURCE4, {allow, {'and', [{client, "test"}, {user, "test"}]}, publish, ["topic/test"]}). -define(SOURCE4, {allow, {'and', [{client, "test"}, {user, "test"}]}, publish, ["topic/test"]}).
-define(SOURCE5, {allow, {'or', [{username, {re, "^test"}}, {clientid, {re, "test?"}}]}, publish, ["%u", "%c"]}). -define(SOURCE5, {allow, {'or',
[{username, {re, "^test"}},
{clientid, {re, "test?"}}]},
publish, ["%u", "%c"]}).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:start_apps([emqx_authz]), ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
fun set_special_configs/1),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_authz]), {ok, _} = emqx:update_config(
[authorization],
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok. ok.
t_compile(_) -> t_compile(_) ->
?assertEqual({deny, all, all, [['#']]}, emqx_authz_rule:compile(?SOURCE1)), ?assertEqual({deny, all, all, [['#']]}, emqx_authz_rule:compile(?SOURCE1)),
?assertEqual({allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, all, [{eq, ['#']}, {eq, ['+']}]}, emqx_authz_rule:compile(?SOURCE2)), ?assertEqual({allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
all, [{eq, ['#']}, {eq, ['+']}]}, emqx_authz_rule:compile(?SOURCE2)),
?assertEqual({allow, ?assertEqual({allow,
{ipaddrs,[{{127,0,0,1},{127,0,0,1},32}, {ipaddrs,[{{127,0,0,1},{127,0,0,1},32},
@ -58,9 +77,9 @@ t_compile(_) ->
}, emqx_authz_rule:compile(?SOURCE4)), }, emqx_authz_rule:compile(?SOURCE4)),
?assertMatch({allow, ?assertMatch({allow,
{'or', [{username, {re_pattern, _, _, _, _}}, {clientid, {re_pattern, _, _, _, _}}]}, {'or', [{username, {re_pattern, _, _, _, _}},
publish, {clientid, {re_pattern, _, _, _, _}}]},
[{pattern, [<<"%u">>]}, {pattern, [<<"%c">>]}] publish, [{pattern, [<<"%u">>]}, {pattern, [<<"%c">>]}]
}, emqx_authz_rule:compile(?SOURCE5)), }, emqx_authz_rule:compile(?SOURCE5)),
ok. ok.
@ -92,47 +111,64 @@ t_match(_) ->
}, },
?assertEqual({matched, deny}, ?assertEqual({matched, deny},
emqx_authz_rule:match(ClientInfo1, subscribe, <<"#">>, emqx_authz_rule:compile(?SOURCE1))), emqx_authz_rule:match(ClientInfo1, subscribe, <<"#">>,
emqx_authz_rule:compile(?SOURCE1))),
?assertEqual({matched, deny}, ?assertEqual({matched, deny},
emqx_authz_rule:match(ClientInfo2, subscribe, <<"+">>, emqx_authz_rule:compile(?SOURCE1))), emqx_authz_rule:match(ClientInfo2, subscribe, <<"+">>,
emqx_authz_rule:compile(?SOURCE1))),
?assertEqual({matched, deny}, ?assertEqual({matched, deny},
emqx_authz_rule:match(ClientInfo3, subscribe, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE1))), emqx_authz_rule:match(ClientInfo3, subscribe, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE1))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo1, subscribe, <<"#">>, emqx_authz_rule:compile(?SOURCE2))), emqx_authz_rule:match(ClientInfo1, subscribe, <<"#">>,
emqx_authz_rule:compile(?SOURCE2))),
?assertEqual(nomatch, ?assertEqual(nomatch,
emqx_authz_rule:match(ClientInfo1, subscribe, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE2))), emqx_authz_rule:match(ClientInfo1, subscribe, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE2))),
?assertEqual(nomatch, ?assertEqual(nomatch,
emqx_authz_rule:match(ClientInfo2, subscribe, <<"#">>, emqx_authz_rule:compile(?SOURCE2))), emqx_authz_rule:match(ClientInfo2, subscribe, <<"#">>,
emqx_authz_rule:compile(?SOURCE2))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo1, subscribe, <<"test">>, emqx_authz_rule:compile(?SOURCE3))), emqx_authz_rule:match(ClientInfo1, subscribe, <<"test">>,
emqx_authz_rule:compile(?SOURCE3))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo2, subscribe, <<"test">>, emqx_authz_rule:compile(?SOURCE3))), emqx_authz_rule:match(ClientInfo2, subscribe, <<"test">>,
emqx_authz_rule:compile(?SOURCE3))),
?assertEqual(nomatch, ?assertEqual(nomatch,
emqx_authz_rule:match(ClientInfo2, subscribe, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE3))), emqx_authz_rule:match(ClientInfo2, subscribe, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE3))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo1, publish, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE4))), emqx_authz_rule:match(ClientInfo1, publish, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE4))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo2, publish, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE4))), emqx_authz_rule:match(ClientInfo2, publish, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE4))),
?assertEqual(nomatch, ?assertEqual(nomatch,
emqx_authz_rule:match(ClientInfo3, publish, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE4))), emqx_authz_rule:match(ClientInfo3, publish, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE4))),
?assertEqual(nomatch, ?assertEqual(nomatch,
emqx_authz_rule:match(ClientInfo4, publish, <<"topic/test">>, emqx_authz_rule:compile(?SOURCE4))), emqx_authz_rule:match(ClientInfo4, publish, <<"topic/test">>,
emqx_authz_rule:compile(?SOURCE4))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo1, publish, <<"test">>, emqx_authz_rule:compile(?SOURCE5))), emqx_authz_rule:match(ClientInfo1, publish, <<"test">>,
emqx_authz_rule:compile(?SOURCE5))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo2, publish, <<"test">>, emqx_authz_rule:compile(?SOURCE5))), emqx_authz_rule:match(ClientInfo2, publish, <<"test">>,
emqx_authz_rule:compile(?SOURCE5))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo3, publish, <<"test">>, emqx_authz_rule:compile(?SOURCE5))), emqx_authz_rule:match(ClientInfo3, publish, <<"test">>,
emqx_authz_rule:compile(?SOURCE5))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo3, publish, <<"fake">>, emqx_authz_rule:compile(?SOURCE5))), emqx_authz_rule:match(ClientInfo3, publish, <<"fake">>,
emqx_authz_rule:compile(?SOURCE5))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo4, publish, <<"test">>, emqx_authz_rule:compile(?SOURCE5))), emqx_authz_rule:match(ClientInfo4, publish, <<"test">>,
emqx_authz_rule:compile(?SOURCE5))),
?assertEqual({matched, allow}, ?assertEqual({matched, allow},
emqx_authz_rule:match(ClientInfo4, publish, <<"fake">>, emqx_authz_rule:compile(?SOURCE5))), emqx_authz_rule:match(ClientInfo4, publish, <<"fake">>,
emqx_authz_rule:compile(?SOURCE5))),
ok. ok.

View File

@ -81,8 +81,7 @@ get_node_and_config(KeyPath) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts0) -> update(KeyPath, UpdateReq, Opts0) ->
Args = [KeyPath, UpdateReq, Opts0], Args = [KeyPath, UpdateReq, Opts0],
{ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, update_config, Args), multicall(emqx, update_config, Args).
Res.
%% @doc Update the specified node's key path in local-override.conf. %% @doc Update the specified node's key path in local-override.conf.
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(), -spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(),
@ -98,8 +97,7 @@ update(Node, KeyPath, UpdateReq, Opts0) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts0) -> remove(KeyPath, Opts0) ->
Args = [KeyPath, Opts0], Args = [KeyPath, Opts0],
{ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, remove_config, Args), multicall(emqx, remove_config, Args).
Res.
%% @doc remove the specified node's key path in local-override.conf. %% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> -spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -114,8 +112,7 @@ remove(Node, KeyPath, Opts) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts0) -> reset(KeyPath, Opts0) ->
Args = [KeyPath, Opts0], Args = [KeyPath, Opts0],
{ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, reset_config, Args), multicall(emqx, reset_config, Args).
Res.
%% @doc reset the specified node's key path in local-override.conf. %% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> -spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
@ -124,3 +121,15 @@ reset(Node, KeyPath, Opts) when Node =:= node() ->
emqx:reset_config(KeyPath, Opts#{override_to => local}); emqx:reset_config(KeyPath, Opts#{override_to => local});
reset(Node, KeyPath, Opts) -> reset(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]). rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
multicall(M, F, Args) ->
case emqx_cluster_rpc:multicall(M, F, Args) of
{ok, _TnxId, Res} ->
Res;
{error, Reason} ->
{error, Reason}
end.

View File

@ -341,8 +341,9 @@ fields("cluster_call") ->
})} })}
, {"cleanup_interval", , {"cleanup_interval",
sc(emqx_schema:duration(), sc(emqx_schema:duration(),
#{ desc => "Time interval to clear completed but stale transactions. #{ desc =>
Ensure that the number of completed transactions is less than the max_history." "Time interval to clear completed but stale transactions.
Ensure that the number of completed transactions is less than the max_history."
, default => "5m" , default => "5m"
})} })}
]; ];
@ -505,7 +506,7 @@ fields("authorization") ->
translations() -> ["ekka", "kernel", "emqx"]. translations() -> ["ekka", "kernel", "emqx"].
translation("ekka") -> translation("ekka") ->
[ {"cluster_discovery", fun tr_cluster__discovery/1}]; [ {"cluster_discovery", fun tr_cluster_discovery/1}];
translation("kernel") -> translation("kernel") ->
[ {"logger_level", fun tr_logger_level/1} [ {"logger_level", fun tr_logger_level/1}
, {"logger", fun tr_logger/1}]; , {"logger", fun tr_logger/1}];
@ -540,7 +541,7 @@ tr_override_conf_file(Conf, Filename) ->
[_ | _] = DataDir, [_ | _] = DataDir,
filename:join([DataDir, "configs", Filename]). filename:join([DataDir, "configs", Filename]).
tr_cluster__discovery(Conf) -> tr_cluster_discovery(Conf) ->
Strategy = conf_get("cluster.discovery_strategy", Conf), Strategy = conf_get("cluster.discovery_strategy", Conf),
{Strategy, filter(options(Strategy, Conf))}. {Strategy, filter(options(Strategy, Conf))}.

View File

@ -56,6 +56,7 @@ handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx, CInfo) -
#{clientid := ClientId} = CInfo, #{clientid := ClientId} = CInfo,
MountTopic = mount(CInfo, Topic), MountTopic = mount(CInfo, Topic),
QOS = get_publish_qos(Msg), QOS = get_publish_qos(Msg),
%% TODO: Append message metadata into headers
MQTTMsg = emqx_message:make(ClientId, QOS, MountTopic, Payload), MQTTMsg = emqx_message:make(ClientId, QOS, MountTopic, Payload),
MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg), MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg),
_ = emqx_broker:publish(MQTTMsg2), _ = emqx_broker:publish(MQTTMsg2),

View File

@ -19,12 +19,8 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/1]). -export([start_link/1]).
%% XXX: needless
%-export([is_enabled/0]).
-export([ register_channel/2 -export([ register_channel/2
, unregister_channel/2 , unregister_channel/2
]). ]).
@ -40,8 +36,7 @@
, code_change/3 , code_change/3
]). ]).
-include_lib("emqx/include/emqx.hrl"). -define(CM_SHARD, emqx_gateway_cm_shard).
-define(LOCK, {?MODULE, cleanup_down}). -define(LOCK, {?MODULE, cleanup_down}).
-record(channel, {chid, pid}). -record(channel, {chid, pid}).
@ -113,8 +108,7 @@ handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) ->
Tab = tabname(Type), Tab = tabname(Type),
global:trans({?LOCK, self()}, global:trans({?LOCK, self()},
fun() -> fun() ->
%% FIXME: The shard name should be fixed later mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab])
mria:transaction(?MODULE, fun cleanup_channels/2, [Node, Tab])
end), end),
{noreply, State}; {noreply, State};

View File

@ -17,6 +17,8 @@
%% @doc The gateway configuration management module %% @doc The gateway configuration management module
-module(emqx_gateway_conf). -module(emqx_gateway_conf).
-include_lib("emqx/include/logger.hrl").
%% Load/Unload %% Load/Unload
-export([ load/0 -export([ load/0
, unload/0 , unload/0
@ -87,18 +89,19 @@ load_gateway(GwName, Conf) ->
unconvert_listeners(Ls) when is_list(Ls) -> unconvert_listeners(Ls) when is_list(Ls) ->
lists:foldl(fun(Lis, Acc) -> lists:foldl(fun(Lis, Acc) ->
{[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis), {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis),
emqx_map_lib:deep_merge(Acc, #{Type => #{Name => Lis1}}) NLis1 = maps:without([<<"id">>], Lis1),
emqx_map_lib:deep_merge(Acc, #{Type => #{Name => NLis1}})
end, #{}, Ls). end, #{}, Ls).
maps_key_take(Ks, M) -> maps_key_take(Ks, M) ->
maps_key_take(Ks, M, []). maps_key_take(Ks, M, []).
maps_key_take([], M, Acc) -> maps_key_take([], M, Acc) ->
{lists:reverse(Acc), M}; {lists:reverse(Acc), M};
maps_key_take([K|Ks], M, Acc) -> maps_key_take([K | Ks], M, Acc) ->
case maps:take(K, M) of case maps:take(K, M) of
error -> throw(bad_key); error -> throw(bad_key);
{V, M1} -> {V, M1} ->
maps_key_take(Ks, M1, [V|Acc]) maps_key_take(Ks, M1, [V | Acc])
end. end.
-spec update_gateway(atom_or_bin(), map()) -> ok_or_err(). -spec update_gateway(atom_or_bin(), map()) -> ok_or_err().
@ -107,6 +110,8 @@ update_gateway(GwName, Conf0) ->
<<"listeners">>, <<"authentication">>], Conf0), <<"listeners">>, <<"authentication">>], Conf0),
update({?FUNCTION_NAME, bin(GwName), Conf}). update({?FUNCTION_NAME, bin(GwName), Conf}).
%% FIXME: delete cert files ??
-spec unload_gateway(atom_or_bin()) -> ok_or_err(). -spec unload_gateway(atom_or_bin()) -> ok_or_err().
unload_gateway(GwName) -> unload_gateway(GwName) ->
update({?FUNCTION_NAME, bin(GwName)}). update({?FUNCTION_NAME, bin(GwName)}).
@ -224,10 +229,10 @@ remove_authn(GwName, ListenerRef) ->
%% @private %% @private
update(Req) -> update(Req) ->
res(emqx:update_config([gateway], Req)). res(emqx_conf:update([gateway], Req, #{override_to => cluster})).
res({ok, _Result}) -> ok; res({ok, _Result}) -> ok;
res({error, {pre_config_update,emqx_gateway_conf,Reason}}) -> {error, Reason}; res({error, {error, {pre_config_update,emqx_gateway_conf,Reason}}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}. res({error, Reason}) -> {error, Reason}.
bin({LType, LName}) -> bin({LType, LName}) ->
@ -247,7 +252,8 @@ bin(B) when is_binary(B) ->
pre_config_update({load_gateway, GwName, Conf}, RawConf) -> pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of case maps:get(GwName, RawConf, undefined) of
undefined -> undefined ->
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => Conf})}; NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})};
_ -> _ ->
{error, already_exist} {error, already_exist}
end; end;
@ -261,13 +267,18 @@ pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})} {ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
end; end;
pre_config_update({unload_gateway, GwName}, RawConf) -> pre_config_update({unload_gateway, GwName}, RawConf) ->
_ = tune_gw_certs(fun clear_certs/2,
GwName,
maps:get(GwName, RawConf, #{})
),
{ok, maps:remove(GwName, RawConf)}; {ok, maps:remove(GwName, RawConf)};
pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) -> pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get( case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined -> undefined ->
NListener = #{LType => #{LName => Conf}}, NConf = convert_certs(certs_dir(GwName), Conf),
NListener = #{LType => #{LName => NConf}},
{ok, emqx_map_lib:deep_merge( {ok, emqx_map_lib:deep_merge(
RawConf, RawConf,
#{GwName => #{<<"listeners">> => NListener}})}; #{GwName => #{<<"listeners">> => NListener}})};
@ -279,16 +290,23 @@ pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of [GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined -> undefined ->
{error, not_found}; {error, not_found};
_OldConf -> OldConf ->
NListener = #{LType => #{LName => Conf}}, NConf = convert_certs(certs_dir(GwName), Conf, OldConf),
NListener = #{LType => #{LName => NConf}},
{ok, emqx_map_lib:deep_merge( {ok, emqx_map_lib:deep_merge(
RawConf, RawConf,
#{GwName => #{<<"listeners">> => NListener}})} #{GwName => #{<<"listeners">> => NListener}})}
end; end;
pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) -> pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
{ok, emqx_map_lib:deep_remove( Path = [GwName, <<"listeners">>, LType, LName],
[GwName, <<"listeners">>, LType, LName], RawConf)}; case emqx_map_lib:deep_get(Path, RawConf, undefined) of
undefined ->
{ok, RawConf};
OldConf ->
clear_certs(certs_dir(GwName), OldConf),
{ok, emqx_map_lib:deep_remove(Path, RawConf)}
end;
pre_config_update({add_authn, GwName, Conf}, RawConf) -> pre_config_update({add_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get( case emqx_map_lib:deep_get(
@ -366,7 +384,7 @@ pre_config_update(UnknownReq, _RawConf) ->
-> ok | {ok, Result::any()} | {error, Reason::term()}. -> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) -> post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
[_Tag, GwName0|_] = tuple_to_list(Req), [_Tag, GwName0 | _] = tuple_to_list(Req),
GwName = binary_to_existing_atom(GwName0), GwName = binary_to_existing_atom(GwName0),
case {maps:get(GwName, NewConfig, undefined), case {maps:get(GwName, NewConfig, undefined),
@ -382,3 +400,56 @@ post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
end; end;
post_config_update(_Req, _NewConfig, _OldConfig, _AppEnvs) -> post_config_update(_Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok. ok.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
tune_gw_certs(Fun, GwName, Conf) ->
SubDir = certs_dir(GwName),
case maps:get(<<"listeners">>, Conf, undefined) of
undefined -> Conf;
Liss ->
maps:put(<<"listeners">>,
maps:map(fun(_, Lis) ->
maps:map(fun(_, LisConf) ->
erlang:apply(Fun, [SubDir, LisConf])
end, Lis)
end, Liss),
Conf)
end.
certs_dir(GwName) when is_binary(GwName) ->
GwName.
convert_certs(SubDir, Conf) ->
case emqx_tls_lib:ensure_ssl_files(
SubDir,
maps:get(<<"ssl">>, Conf, undefined)
) of
{ok, SSL} ->
new_ssl_config(Conf, SSL);
{error, Reason} ->
?SLOG(error, Reason#{msg => bad_ssl_config}),
throw({bad_ssl_config, Reason})
end.
convert_certs(SubDir, NConf, OConf) ->
OSSL = maps:get(<<"ssl">>, OConf, undefined),
NSSL = maps:get(<<"ssl">>, NConf, undefined),
case emqx_tls_lib:ensure_ssl_files(SubDir, NSSL) of
{ok, NSSL1} ->
ok = emqx_tls_lib:delete_ssl_files(SubDir, NSSL1, OSSL),
new_ssl_config(NConf, NSSL1);
{error, Reason} ->
?SLOG(error, Reason#{msg => bad_ssl_config}),
throw({bad_ssl_config, Reason})
end.
new_ssl_config(Conf, undefined) -> Conf;
new_ssl_config(Conf, SSL) -> Conf#{<<"ssl">> => SSL}.
clear_certs(SubDir, Conf) ->
SSL = maps:get(<<"ssl">>, Conf, undefined),
ok = emqx_tls_lib:delete_ssl_files(SubDir, undefined, SSL).

View File

@ -45,6 +45,7 @@
, comma_separated_list/0 , comma_separated_list/0
, ip_port/0 , ip_port/0
]). ]).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-export([namespace/0, roots/0 , fields/1]). -export([namespace/0, roots/0 , fields/1]).
@ -53,11 +54,40 @@ namespace() -> gateway.
roots() -> [gateway]. roots() -> [gateway].
fields(gateway) -> fields(gateway) ->
[{stomp, sc_meta(ref(stomp) , #{nullable => {true, recursively}})}, [{stomp,
{mqttsn, sc_meta(ref(mqttsn) , #{nullable => {true, recursively}})}, sc(ref(stomp),
{coap, sc_meta(ref(coap) , #{nullable => {true, recursively}})}, #{ nullable => {true, recursively}
{lwm2m, sc_meta(ref(lwm2m) , #{nullable => {true, recursively}})}, , desc =>
{exproto, sc_meta(ref(exproto), #{nullable => {true, recursively}})} "The Stomp Gateway configuration.<br>
This gateway supports v1.2/1.1/1.0"
})},
{mqttsn,
sc(ref(mqttsn),
#{ nullable => {true, recursively}
, desc =>
"The MQTT-SN Gateway configuration.<br>
This gateway only supports the v1.2 protocol"
})},
{coap,
sc(ref(coap),
#{ nullable => {true, recursively}
, desc =>
"The CoAP Gateway configuration.<br>
This gateway is implemented based on RFC-7252 and
https://core-wg.github.io/coap-pubsub/draft-ietf-core-pubsub.html"
})},
{lwm2m,
sc(ref(lwm2m),
#{ nullable => {true, recursively}
, desc =>
"The LwM2M Gateway configuration.<br>
This gateway only supports the v1.0.1 protocol"
})},
{exproto,
sc(ref(exproto),
#{ nullable => {true, recursively}
, desc => "The Extension Protocol configuration"
})}
]; ];
fields(stomp) -> fields(stomp) ->
@ -66,61 +96,198 @@ fields(stomp) ->
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(stomp_frame) -> fields(stomp_frame) ->
[ {max_headers, sc(integer(), 10)} [ {max_headers,
, {max_headers_length, sc(integer(), 1024)} sc(integer(),
, {max_body_length, sc(integer(), 8192)} #{ default => 10
, desc => "The maximum number of Header"
})}
, {max_headers_length,
sc(integer(),
#{ default => 1024
, desc => "The maximum string length of the Header Value"
})}
, {max_body_length,
sc(integer(),
#{ default => 65536
, desc => "Maximum number of bytes of Body allowed per Stomp packet"
})}
]; ];
fields(mqttsn) -> fields(mqttsn) ->
[ {gateway_id, sc(integer())} [ {gateway_id,
, {broadcast, sc(boolean(), false)} sc(integer(),
, {enable_qos3, sc(boolean(), true)} #{ default => 1
, {predefined, hoconsc:array(ref(mqttsn_predefined))} , desc =>
"MQTT-SN Gateway Id.<br>
When the <code>broadcast</code> option is enabled,
the gateway will broadcast ADVERTISE message with this value"
})}
, {broadcast,
sc(boolean(),
#{ default => false
, desc => "Whether to periodically broadcast ADVERTISE messages"
})}
%% TODO: rename
, {enable_qos3,
sc(boolean(),
#{ default => true
, desc =>
"Allows connectionless clients to publish messages with a Qos of -1.<br>
This feature is defined for very simple client implementations
which do not support any other features except this one.<br>
There is no connection setup nor tear down, no registration nor subscription.<br>
The client just sends its PUBLISH messages to a GW"
})}
, {predefined,
sc(hoconsc:array(ref(mqttsn_predefined)),
#{ default => []
, desc =>
"The Pre-defined topic ids and topic names.<br>
A 'pre-defined' topic id is a topic id whose mapping to a topic name
is known in advance by both the clients application and the gateway"
})}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(mqttsn_predefined) -> fields(mqttsn_predefined) ->
[ {id, sc(integer())} [ {id, sc(integer(), #{desc => "Topic Id.<br>Range: 1-65535"})}
, {topic, sc(binary())} , {topic, sc(binary(), #{desc => "Topic Name"})}
]; ];
fields(coap) -> fields(coap) ->
[ {heartbeat, sc(duration(), <<"30s">>)} [ {heartbeat,
, {connection_required, sc(boolean(), false)} sc(duration(),
, {notify_type, sc(hoconsc:union([non, con, qos]), qos)} #{ default => <<"30s">>
, {subscribe_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)} , desc =>
, {publish_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)} "The gateway server required minimum hearbeat interval.<br>
When connection mode is enabled, this parameter is used to set the minimum
heartbeat interval for the connection to be alive."
})}
, {connection_required,
sc(boolean(),
#{ default => false
, desc =>
"Enable or disable connection mode.<br>
Connection mode is a feature of non-standard protocols. When connection mode
is enabled, it is necessary to maintain the creation, authentication and alive
of connection resources"
})}
, {notify_type,
sc(hoconsc:union([non, con, qos]),
#{ default => qos
, desc =>
"The Notification Message will be delivered to the CoAP client if a new message
received on an observed topic.
The type of delivered coap message can be set to:<br>
1. non: Non-confirmable;<br>
2. con: Confirmable;<br>
3. qos: Mapping from QoS type of recevied message, QoS0 -> non, QoS1,2 -> con"
})}
, {subscribe_qos,
sc(hoconsc:union([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for subscribe request.<br>
This option specifies the QoS level for the CoAP Client when establishing a
subscription membership, if the subscribe request is not carried `qos` option.
The indicator can be set to:
- qos0, qos1, qos2: Fixed default QoS level
- coap: Dynamic QoS level by the message type of subscribe request
* qos0: If the subscribe request is non-confirmable
* qos1: If the subscribe request is confirmable"
})}
, {publish_qos,
sc(hoconsc:union([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for publish request.<br>
This option specifies the QoS level for the CoAP Client when publishing a
message to EMQ X PUB/SUB system, if the publish request is not carried `qos`
option. The indicator can be set to:
- qos0, qos1, qos2: Fixed default QoS level
- coap: Dynamic QoS level by the message type of publish request
* qos0: If the publish request is non-confirmable
* qos1: If the publish request is confirmable"
})}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(lwm2m) -> fields(lwm2m) ->
[ {xml_dir, sc(binary(), "etc/lwm2m_xml")} [ {xml_dir,
, {lifetime_min, sc(duration(), "1s")} sc(binary(),
, {lifetime_max, sc(duration(), "86400s")} #{ default =>"etc/lwm2m_xml"
, {qmode_time_window, sc(duration_s(), "22s")} , desc => "The Directory for LwM2M Resource defination"
})}
, {lifetime_min,
sc(duration(),
#{ default => "1s"
, desc => "Minimum value of lifetime allowed to be set by the LwM2M client"
})}
, {lifetime_max,
sc(duration(),
#{ default => "86400s"
, desc => "Maximum value of lifetime allowed to be set by the LwM2M client"
})}
, {qmode_time_window,
sc(duration_s(),
#{ default => "22s"
, desc =>
"The value of the time window during which the network link is considered
valid by the LwM2M Gateway in QMode mode.<br>
For example, after receiving an update message from a client, any messages
within this time window are sent directly to the LwM2M client, and all messages
beyond this time window are temporarily stored in memory."
})}
%% TODO: Support config resource path %% TODO: Support config resource path
, {auto_observe, sc(boolean(), false)} , {auto_observe,
, {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} sc(boolean(),
, {translators, sc_meta(ref(translators), #{nullable => false})} #{ default => false
, desc => "Automatically observe the object list of REGISTER packet"
})}
%% FIXME: not working now
, {update_msg_publish_condition,
sc(hoconsc:union([always, contains_object_list]),
#{ default => "contains_object_list"
, desc =>
"Policy for publishing UPDATE event message to EMQ X.<br>
- always: send update events as long as the UPDATE request is received.
- contains_object_list: send update events only if the UPDATE request carries any Object List."
})}
, {translators,
sc(ref(lwm2m_translators),
#{ nullable => false
, desc => "Topic configuration for LwM2M's gateway publishing and subscription"
})}
, {listeners, sc(ref(udp_listeners))} , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(exproto) -> fields(exproto) ->
[ {server, sc(ref(exproto_grpc_server))} [ {server,
, {handler, sc(ref(exproto_grpc_handler))} sc(ref(exproto_grpc_server),
#{ desc => "Configurations for starting the <code>ConnectionAdapter</code> service"
})}
, {handler,
sc(ref(exproto_grpc_handler),
#{ desc => "Configurations for request to <code>ConnectionHandler</code> service"
})}
, {listeners, sc(ref(udp_tcp_listeners))} , {listeners, sc(ref(udp_tcp_listeners))}
] ++ gateway_common_options(); ] ++ gateway_common_options();
fields(exproto_grpc_server) -> fields(exproto_grpc_server) ->
[ {bind, sc(hoconsc:union([ip_port(), integer()]))} [ {bind,
, {ssl, sc_meta(ref(ssl_server_opts), sc(hoconsc:union([ip_port(), integer()]))}
#{nullable => {true, recursively}})} , {ssl,
sc(ref(ssl_server_opts),
#{ nullable => {true, recursively}
})}
]; ];
fields(exproto_grpc_handler) -> fields(exproto_grpc_handler) ->
[ {address, sc(binary())} [ {address, sc(binary())}
, {ssl, sc_meta(ref(ssl_client_opts), , {ssl,
#{nullable => {true, recursively}})} sc(ref(ssl_client_opts),
#{ nullable => {true, recursively}
})}
]; ];
fields(ssl_server_opts) -> fields(ssl_server_opts) ->
@ -140,17 +307,42 @@ fields(clientinfo_override) ->
, {clientid, sc(binary())} , {clientid, sc(binary())}
]; ];
fields(translators) -> fields(lwm2m_translators) ->
[ {command, sc(ref(translator))} [ {command,
, {response, sc(ref(translator))} sc(ref(translator),
, {notify, sc(ref(translator))} #{ desc =>
, {register, sc(ref(translator))} "The topic for receiving downstream commands.<br>
, {update, sc(ref(translator))} For each new LwM2M client that succeeds in going online, the gateway creates
a the subscription relationship to receive downstream commands and send it to
the LwM2M client"
})}
, {response,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the acknowledge events from LwM2M client"
})}
, {notify,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the notify events from LwM2M client.<br>
After succeed observe a resource of LwM2M client, Gateway will send the
notifyevents via this topic, if the client reports any resource changes"
})}
, {register,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the register events from LwM2M client.<br>"
})}
, {update,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the update events from LwM2M client.<br>"
})}
]; ];
fields(translator) -> fields(translator) ->
[ {topic, sc(binary())} [ {topic, sc(binary())}
, {qos, sc(range(0, 2), 0)} , {qos, sc(range(0, 2), #{default => 0})}
]; ];
fields(udp_listeners) -> fields(udp_listeners) ->
@ -172,7 +364,7 @@ fields(udp_tcp_listeners) ->
fields(tcp_listener) -> fields(tcp_listener) ->
[ %% some special confs for tcp listener [ %% some special confs for tcp listener
{acceptors, sc(integer(), 16)} {acceptors, sc(integer(), #{default => 16})}
] ++ ] ++
tcp_opts() ++ tcp_opts() ++
proxy_protocol_opts() ++ proxy_protocol_opts() ++
@ -180,9 +372,11 @@ fields(tcp_listener) ->
fields(ssl_listener) -> fields(ssl_listener) ->
fields(tcp_listener) ++ fields(tcp_listener) ++
[{ssl, sc_meta(hoconsc:ref(emqx_schema, "listener_ssl_opts"), [{ssl,
#{desc => "SSL listener options"})}]; sc(hoconsc:ref(emqx_schema, "listener_ssl_opts"),
#{ desc => "SSL listener options"
})}
];
fields(udp_listener) -> fields(udp_listener) ->
[ [
@ -192,18 +386,17 @@ fields(udp_listener) ->
common_listener_opts(); common_listener_opts();
fields(dtls_listener) -> fields(dtls_listener) ->
[ {acceptors, sc(integer(), 16)} [ {acceptors, sc(integer(), #{default => 16})}
] ++ ] ++
fields(udp_listener) ++ fields(udp_listener) ++
[{dtls, sc_meta(ref(dtls_opts), [{dtls, sc(ref(dtls_opts), #{desc => "DTLS listener options"})}];
#{desc => "DTLS listener options"})}];
fields(udp_opts) -> fields(udp_opts) ->
[ {active_n, sc(integer(), 100)} [ {active_n, sc(integer(), #{default => 100})}
, {recbuf, sc(bytesize())} , {recbuf, sc(bytesize())}
, {sndbuf, sc(bytesize())} , {sndbuf, sc(bytesize())}
, {buffer, sc(bytesize())} , {buffer, sc(bytesize())}
, {reuseaddr, sc(boolean(), true)} , {reuseaddr, sc(boolean(), #{default => true})}
]; ];
fields(dtls_opts) -> fields(dtls_opts) ->
@ -215,66 +408,113 @@ fields(dtls_opts) ->
}, false). }, false).
authentication() -> authentication() ->
sc_meta(hoconsc:union( sc(hoconsc:union(
[ hoconsc:ref(emqx_authn_mnesia, config) [ hoconsc:ref(emqx_authn_mnesia, config)
, hoconsc:ref(emqx_authn_mysql, config) , hoconsc:ref(emqx_authn_mysql, config)
, hoconsc:ref(emqx_authn_pgsql, config) , hoconsc:ref(emqx_authn_pgsql, config)
, hoconsc:ref(emqx_authn_mongodb, standalone) , hoconsc:ref(emqx_authn_mongodb, standalone)
, hoconsc:ref(emqx_authn_mongodb, 'replica-set') , hoconsc:ref(emqx_authn_mongodb, 'replica-set')
, hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster') , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster')
, hoconsc:ref(emqx_authn_redis, standalone) , hoconsc:ref(emqx_authn_redis, standalone)
, hoconsc:ref(emqx_authn_redis, cluster) , hoconsc:ref(emqx_authn_redis, cluster)
, hoconsc:ref(emqx_authn_redis, sentinel) , hoconsc:ref(emqx_authn_redis, sentinel)
, hoconsc:ref(emqx_authn_http, get) , hoconsc:ref(emqx_authn_http, get)
, hoconsc:ref(emqx_authn_http, post) , hoconsc:ref(emqx_authn_http, post)
, hoconsc:ref(emqx_authn_jwt, 'hmac-based') , hoconsc:ref(emqx_authn_jwt, 'hmac-based')
, hoconsc:ref(emqx_authn_jwt, 'public-key') , hoconsc:ref(emqx_authn_jwt, 'public-key')
, hoconsc:ref(emqx_authn_jwt, 'jwks') , hoconsc:ref(emqx_authn_jwt, 'jwks')
, hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config)
]), ]),
#{nullable => {true, recursively}, #{ nullable => {true, recursively}
desc => , desc =>
"""Default authentication configs for all of the gateway listeners.<br> """Default authentication configs for all of the gateway listeners.<br>
For per-listener overrides see <code>authentication</code> For per-listener overrides see <code>authentication</code>
in listener configs"""}). in listener configs"""
}).
gateway_common_options() -> gateway_common_options() ->
[ {enable, sc(boolean(), true)} [ {enable,
, {enable_stats, sc(boolean(), true)} sc(boolean(),
, {idle_timeout, sc(duration(), <<"30s">>)} #{ default => true
, {mountpoint, sc(binary(), <<>>)} , desc => "Whether to enable this gateway"
, {clientinfo_override, sc(ref(clientinfo_override))} })}
, {enable_stats,
sc(boolean(),
#{ default => true
, desc => "Whether to enable client process statistic"
})}
, {idle_timeout,
sc(duration(),
#{ default => <<"30s">>
, desc =>
"The idle time of the client connection process.<br>
it has two purposes:
1. A newly created client process that does not receive any client requests
after that time will be closed directly.
2. A running client process that does not receive any client requests after
this time will go into hibernation to save resources."
})}
, {mountpoint,
sc(binary(),
#{ default => <<>>
%% TODO: variable support?
, desc => ""
})}
, {clientinfo_override,
sc(ref(clientinfo_override),
#{ desc => ""
})}
, {authentication, authentication()} , {authentication, authentication()}
]. ].
common_listener_opts() -> common_listener_opts() ->
[ {enable, sc(boolean(), true)} [ {enable,
, {bind, sc(hoconsc:union([ip_port(), integer()]))} sc(boolean(),
, {max_connections, sc(integer(), 1024)} #{ default => true
, {max_conn_rate, sc(integer())} })}
, {bind,
sc(hoconsc:union([ip_port(), integer()]),
#{})}
, {max_connections,
sc(integer(),
#{ default => 1024
})}
, {max_conn_rate,
sc(integer(),
#{ default => 1000
})}
, {authentication, authentication()} , {authentication, authentication()}
, {mountpoint, sc(binary(), undefined)} , {mountpoint,
, {access_rules, sc(hoconsc:array(string()), [])} sc(binary(),
#{ default => undefined
})}
, {access_rules,
sc(hoconsc:array(string()),
#{ default => []
})}
]. ].
tcp_opts() -> tcp_opts() ->
[{tcp, sc_meta(ref(emqx_schema, "tcp_opts"), #{})}]. [{tcp, sc(ref(emqx_schema, "tcp_opts"), #{})}].
udp_opts() -> udp_opts() ->
[{udp, sc_meta(ref(udp_opts), #{})}]. [{udp, sc(ref(udp_opts), #{})}].
proxy_protocol_opts() -> proxy_protocol_opts() ->
[ {proxy_protocol, sc(boolean(), false)} [ {proxy_protocol,
, {proxy_protocol_timeout, sc(duration(), "15s")} sc(boolean(),
#{ default => false
})}
, {proxy_protocol_timeout,
sc(duration(),
#{ default => "15s"
})}
]. ].
sc(Type) -> sc(Type) ->
sc_meta(Type, #{}). sc(Type, #{}).
sc(Type, Default) -> sc(Type, Meta) ->
sc_meta(Type, #{default => Default}).
sc_meta(Type, Meta) ->
hoconsc:mk(Type, Meta). hoconsc:mk(Type, Meta).
map(Name, Type) -> map(Name, Type) ->

View File

@ -603,6 +603,7 @@ send_to_mqtt(Ctx, EventType, Payload, {Topic, Qos},
proto_publish(Topic, Payload, Qos, Headers, WithContext, proto_publish(Topic, Payload, Qos, Headers, WithContext,
#session{endpoint_name = Epn} = Session) -> #session{endpoint_name = Epn} = Session) ->
MountedTopic = mount(Topic, Session), MountedTopic = mount(Topic, Session),
%% TODO: Append message metadata into headers
Msg = emqx_message:make(Epn, Qos, MountedTopic, Msg = emqx_message:make(Epn, Qos, MountedTopic,
emqx_json:encode(Payload), #{}, Headers), emqx_json:encode(Payload), #{}, Headers),
WithContext(publish, [MountedTopic, Msg]), WithContext(publish, [MountedTopic, Msg]),

View File

@ -51,6 +51,7 @@ stop() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([GwId, Port]) -> init([GwId, Port]) ->
%% FIXME:
Duration = application:get_env(emqx_sn, advertise_duration, ?DEFAULT_DURATION), Duration = application:get_env(emqx_sn, advertise_duration, ?DEFAULT_DURATION),
{ok, Sock} = gen_udp:open(0, [binary, {broadcast, true}]), {ok, Sock} = gen_udp:open(0, [binary, {broadcast, true}]),
{ok, ensure_advertise(#state{gwid = GwId, addrs = boradcast_addrs(), {ok, ensure_advertise(#state{gwid = GwId, addrs = boradcast_addrs(),

View File

@ -790,16 +790,26 @@ check_pub_authz({TopicName, _Flags, _Data},
end. end.
convert_pub_to_msg({TopicName, Flags, Data}, convert_pub_to_msg({TopicName, Flags, Data},
Channel = #channel{ Channel = #channel{clientinfo = #{clientid := ClientId}}) ->
clientinfo = #{clientid := ClientId}}) ->
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags, #mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
NewQoS = get_corrected_qos(QoS), NewQoS = get_corrected_qos(QoS),
Message = emqx_message:make(ClientId, NewQoS, TopicName, Data), Message = put_message_headers(
NMessage = emqx_message:set_flags( emqx_message:make(
#{dup => Dup, retain => Retain}, ClientId, NewQoS, TopicName, Data,
Message #{dup => Dup, retain => Retain}, #{}), Channel),
), {ok, Message, Channel}.
{ok, NMessage, Channel}.
put_message_headers(Msg, #channel{
conninfo = #{proto_ver := ProtoVer},
clientinfo = #{
protocol := Protocol,
username := Username,
peerhost := PeerHost}}) ->
emqx_message:set_headers(
#{proto_ver => ProtoVer,
protocol => Protocol,
username => Username,
peerhost => PeerHost}, Msg).
get_corrected_qos(?QOS_NEG1) -> ?QOS_0; get_corrected_qos(?QOS_NEG1) -> ?QOS_0;
get_corrected_qos(QoS) -> QoS. get_corrected_qos(QoS) -> QoS.
@ -1307,7 +1317,7 @@ ensure_disconnected(Reason, Channel = #channel{
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel; Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
ok = publish_will_msg(WillMsg), ok = publish_will_msg(put_message_headers(WillMsg, Channel)),
Channel#channel{will_msg = undefined}. Channel#channel{will_msg = undefined}.
publish_will_msg(Msg) -> publish_will_msg(Msg) ->

View File

@ -52,21 +52,6 @@
-record(emqx_sn_registry, {key, value}). -record(emqx_sn_registry, {key, value}).
%% Mnesia bootstrap
%-export([mnesia/1]).
%-boot_mnesia({mnesia, [boot]}).
%%% @doc Create or replicate tables.
%-spec(mnesia(boot | copy) -> ok).
%mnesia(boot) ->
% %% Optimize storage
% StoreProps = [{ets, [{read_concurrency, true}]}],
% ok = mria:create_table(?MODULE, [
% {attributes, record_info(fields, emqx_sn_registry)},
% {ram_copies, [node()]},
% {storage_properties, StoreProps}]).
-type registry() :: {Tab :: atom(), -type registry() :: {Tab :: atom(),
RegistryPid :: pid()}. RegistryPid :: pid()}.
@ -145,8 +130,6 @@ init([InstaId, PredefTopics]) ->
{rlog_shard, ?SN_SHARD} {rlog_shard, ?SN_SHARD}
]), ]),
ok = mria:wait_for_tables([Tab]), ok = mria:wait_for_tables([Tab]),
% FIXME:
%ok = mria_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl( MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName0}, AccId) -> fun(#{id := TopicId, topic := TopicName0}, AccId) ->
TopicName = iolist_to_binary(TopicName0), TopicName = iolist_to_binary(TopicName0),

View File

@ -123,7 +123,7 @@ initial_parse_state(Opts) ->
limit(Opts) -> limit(Opts) ->
#frame_limit{ #frame_limit{
max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM), max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM),
max_header_length = g(max_header_length, Opts, ?MAX_BODY_LENGTH), max_header_length = g(max_header_length, Opts, ?MAX_HEADER_LENGTH),
max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH) max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH)
}. }.

View File

@ -28,6 +28,10 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(CONF_DEFAULT, <<"
gateway {}
">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setup %% Setup
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -35,18 +39,12 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) -> init_per_suite(Conf) ->
%% FIXME: Magic line. for saving gateway schema name for emqx_config emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_config:init_load(emqx_gateway_schema, <<"gateway {}">>), emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
%% Start emqx-authn separately, due to emqx_authn_schema
%% not implementing the roots/0 method, it cannot be started with
%% emqx-ct-helpers at the moment.
{ok, _} = application:ensure_all_started(emqx_authn),
Conf. Conf.
end_per_suite(Conf) -> end_per_suite(Conf) ->
application:stop(emqx_authn), emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]),
Conf. Conf.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -34,15 +34,12 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) -> init_per_suite(Conf) ->
%% FIXME: Magic line. for saving gateway schema name for emqx_config
emqx_config:init_load(emqx_gateway_schema, <<"gateway {}">>), emqx_config:init_load(emqx_gateway_schema, <<"gateway {}">>),
emqx_common_test_helpers:start_apps([emqx_gateway]), emqx_common_test_helpers:start_apps([emqx_conf, emqx_authn, emqx_gateway]),
{ok, _} = application:ensure_all_started(emqx_authn),
Conf. Conf.
end_per_suite(_Conf) -> end_per_suite(_Conf) ->
application:stop(emqx_authn), emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn, emqx_conf]).
emqx_common_test_helpers:stop_apps([emqx_gateway]).
init_per_testcase(_CaseName, Conf) -> init_per_testcase(_CaseName, Conf) ->
_ = emqx_gateway_conf:unload_gateway(stomp), _ = emqx_gateway_conf:unload_gateway(stomp),
@ -52,6 +49,133 @@ init_per_testcase(_CaseName, Conf) ->
%% Cases %% Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(SVR_CA,
<<"-----BEGIN CERTIFICATE-----
MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV
BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD
DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD
VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE
AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1
EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2
juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur
MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ
uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D
tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ
KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj
EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB
/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa
ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5
CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y
E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo
88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30
IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==
-----END CERTIFICATE-----
">>).
-define(SVR_CERT,
<<"-----BEGIN CERTIFICATE-----
MIIDEzCCAfugAwIBAgIBAjANBgkqhkiG9w0BAQsFADA/MQswCQYDVQQGEwJDTjER
MA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UEAwwGUm9vdENB
MB4XDTIwMDUwODA4MDcwNVoXDTMwMDUwNjA4MDcwNVowPzELMAkGA1UEBhMCQ04x
ETAPBgNVBAgMCGhhbmd6aG91MQwwCgYDVQQKDANFTVExDzANBgNVBAMMBlNlcnZl
cjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALNeWT3pE+QFfiRJzKmn
AMUrWo3K2j/Tm3+Xnl6WLz67/0rcYrJbbKvS3uyRP/stXyXEKw9CepyQ1ViBVFkW
Aoy8qQEOWFDsZc/5UzhXUnb6LXr3qTkFEjNmhj+7uzv/lbBxlUG1NlYzSeOB6/RT
8zH/lhOeKhLnWYPXdXKsa1FL6ij4X8DeDO1kY7fvAGmBn/THh1uTpDizM4YmeI+7
4dmayA5xXvARte5h4Vu5SIze7iC057N+vymToMk2Jgk+ZZFpyXrnq+yo6RaD3ANc
lrc4FbeUQZ5a5s5Sxgs9a0Y3WMG+7c5VnVXcbjBRz/aq2NtOnQQjikKKQA8GF080
BQkCAwEAAaMaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcNAQEL
BQADggEBAJefnMZpaRDHQSNUIEL3iwGXE9c6PmIsQVE2ustr+CakBp3TZ4l0enLt
iGMfEVFju69cO4oyokWv+hl5eCMkHBf14Kv51vj448jowYnF1zmzn7SEzm5Uzlsa
sqjtAprnLyof69WtLU1j5rYWBuFX86yOTwRAFNjm9fvhAcrEONBsQtqipBWkMROp
iUYMkRqbKcQMdwxov+lHBYKq9zbWRoqLROAn54SRqgQk6c15JdEfgOOjShbsOkIH
UhqcwRkQic7n1zwHVGVDgNIZVgmJ2IdIWBlPEC7oLrRrBD/X1iEEXtKab6p5o22n
KB5mN+iQaE+Oe2cpGKZJiJRdM+IqDDQ=
-----END CERTIFICATE-----
">>).
-define(SVR_KEY,
<<"-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAs15ZPekT5AV+JEnMqacAxStajcraP9Obf5eeXpYvPrv/Stxi
sltsq9Le7JE/+y1fJcQrD0J6nJDVWIFUWRYCjLypAQ5YUOxlz/lTOFdSdvotevep
OQUSM2aGP7u7O/+VsHGVQbU2VjNJ44Hr9FPzMf+WE54qEudZg9d1cqxrUUvqKPhf
wN4M7WRjt+8AaYGf9MeHW5OkOLMzhiZ4j7vh2ZrIDnFe8BG17mHhW7lIjN7uILTn
s36/KZOgyTYmCT5lkWnJeuer7KjpFoPcA1yWtzgVt5RBnlrmzlLGCz1rRjdYwb7t
zlWdVdxuMFHP9qrY206dBCOKQopADwYXTzQFCQIDAQABAoIBAQCuvCbr7Pd3lvI/
n7VFQG+7pHRe1VKwAxDkx2t8cYos7y/QWcm8Ptwqtw58HzPZGWYrgGMCRpzzkRSF
V9g3wP1S5Scu5C6dBu5YIGc157tqNGXB+SpdZddJQ4Nc6yGHXYERllT04ffBGc3N
WG/oYS/1cSteiSIrsDy/91FvGRCi7FPxH3wIgHssY/tw69s1Cfvaq5lr2NTFzxIG
xCvpJKEdSfVfS9I7LYiymVjst3IOR/w76/ZFY9cRa8ZtmQSWWsm0TUpRC1jdcbkm
ZoJptYWlP+gSwx/fpMYftrkJFGOJhHJHQhwxT5X/ajAISeqjjwkWSEJLwnHQd11C
Zy2+29lBAoGBANlEAIK4VxCqyPXNKfoOOi5dS64NfvyH4A1v2+KaHWc7lqaqPN49
ezfN2n3X+KWx4cviDD914Yc2JQ1vVJjSaHci7yivocDo2OfZDmjBqzaMp/y+rX1R
/f3MmiTqMa468rjaxI9RRZu7vDgpTR+za1+OBCgMzjvAng8dJuN/5gjlAoGBANNY
uYPKtearBmkqdrSV7eTUe49Nhr0XotLaVBH37TCW0Xv9wjO2xmbm5Ga/DCtPIsBb
yPeYwX9FjoasuadUD7hRvbFu6dBa0HGLmkXRJZTcD7MEX2Lhu4BuC72yDLLFd0r+
Ep9WP7F5iJyagYqIZtz+4uf7gBvUDdmvXz3sGr1VAoGAdXTD6eeKeiI6PlhKBztF
zOb3EQOO0SsLv3fnodu7ZaHbUgLaoTMPuB17r2jgrYM7FKQCBxTNdfGZmmfDjlLB
0xZ5wL8ibU30ZXL8zTlWPElST9sto4B+FYVVF/vcG9sWeUUb2ncPcJ/Po3UAktDG
jYQTTyuNGtSJHpad/YOZctkCgYBtWRaC7bq3of0rJGFOhdQT9SwItN/lrfj8hyHA
OjpqTV4NfPmhsAtu6j96OZaeQc+FHvgXwt06cE6Rt4RG4uNPRluTFgO7XYFDfitP
vCppnoIw6S5BBvHwPP+uIhUX2bsi/dm8vu8tb+gSvo4PkwtFhEr6I9HglBKmcmog
q6waEQKBgHyecFBeM6Ls11Cd64vborwJPAuxIW7HBAFj/BS99oeG4TjBx4Sz2dFd
rzUibJt4ndnHIvCN8JQkjNG14i9hJln+H3mRss8fbZ9vQdqG+2vOWADYSzzsNI55
RFY7JjluKcVkp/zCDeUxTU3O6sS+v6/3VE11Cob6OYQx3lN5wrZ3
-----END RSA PRIVATE KEY-----
">>).
-define(SVR_CERT2,
<<"-----BEGIN CERTIFICATE-----
MIIDEzCCAfugAwIBAgIBATANBgkqhkiG9w0BAQsFADA/MQswCQYDVQQGEwJDTjER
MA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UEAwwGUm9vdENB
MB4XDTIwMDUwODA4MDY1N1oXDTMwMDUwNjA4MDY1N1owPzELMAkGA1UEBhMCQ04x
ETAPBgNVBAgMCGhhbmd6aG91MQwwCgYDVQQKDANFTVExDzANBgNVBAMMBkNsaWVu
dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMy4hoksKcZBDbY680u6
TS25U51nuB1FBcGMlF9B/t057wPOlxF/OcmbxY5MwepS41JDGPgulE1V7fpsXkiW
1LUimYV/tsqBfymIe0mlY7oORahKji7zKQ2UBIVFhdlvQxunlIDnw6F9popUgyHt
dMhtlgZK8oqRwHxO5dbfoukYd6J/r+etS5q26sgVkf3C6dt0Td7B25H9qW+f7oLV
PbcHYCa+i73u9670nrpXsC+Qc7Mygwa2Kq/jwU+ftyLQnOeW07DuzOwsziC/fQZa
nbxR+8U9FNftgRcC3uP/JMKYUqsiRAuaDokARZxVTV5hUElfpO6z6/NItSDvvh3i
eikCAwEAAaMaMBgwCQYDVR0TBAIwADALBgNVHQ8EBAMCBeAwDQYJKoZIhvcNAQEL
BQADggEBABchYxKo0YMma7g1qDswJXsR5s56Czx/I+B41YcpMBMTrRqpUC0nHtLk
M7/tZp592u/tT8gzEnQjZLKBAhFeZaR3aaKyknLqwiPqJIgg0pgsBGITrAK3Pv4z
5/YvAJJKgTe5UdeTz6U4lvNEux/4juZ4pmqH4qSFJTOzQS7LmgSmNIdd072rwXBd
UzcSHzsJgEMb88u/LDLjj1pQ7AtZ4Tta8JZTvcgBFmjB0QUi6fgkHY6oGat/W4kR
jSRUBlMUbM/drr2PVzRc2dwbFIl3X+ZE6n5Sl3ZwRAC/s92JU6CPMRW02muVu6xl
goraNgPISnrbpR6KjxLZkVembXzjNNc=
-----END CERTIFICATE-----
">>).
-define(SVR_KEY2,
<<"-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAzLiGiSwpxkENtjrzS7pNLblTnWe4HUUFwYyUX0H+3TnvA86X
EX85yZvFjkzB6lLjUkMY+C6UTVXt+mxeSJbUtSKZhX+2yoF/KYh7SaVjug5FqEqO
LvMpDZQEhUWF2W9DG6eUgOfDoX2milSDIe10yG2WBkryipHAfE7l1t+i6Rh3on+v
561LmrbqyBWR/cLp23RN3sHbkf2pb5/ugtU9twdgJr6Lve73rvSeulewL5BzszKD
BrYqr+PBT5+3ItCc55bTsO7M7CzOIL99BlqdvFH7xT0U1+2BFwLe4/8kwphSqyJE
C5oOiQBFnFVNXmFQSV+k7rPr80i1IO++HeJ6KQIDAQABAoIBAGWgvPjfuaU3qizq
uti/FY07USz0zkuJdkANH6LiSjlchzDmn8wJ0pApCjuIE0PV/g9aS8z4opp5q/gD
UBLM/a8mC/xf2EhTXOMrY7i9p/I3H5FZ4ZehEqIw9sWKK9YzC6dw26HabB2BGOnW
5nozPSQ6cp2RGzJ7BIkxSZwPzPnVTgy3OAuPOiJytvK+hGLhsNaT+Y9bNDvplVT2
ZwYTV8GlHZC+4b2wNROILm0O86v96O+Qd8nn3fXjGHbMsAnONBq10bZS16L4fvkH
5G+W/1PeSXmtZFppdRRDxIW+DWcXK0D48WRliuxcV4eOOxI+a9N2ZJZZiNLQZGwg
w3A8+mECgYEA8HuJFrlRvdoBe2U/EwUtG74dcyy30L4yEBnN5QscXmEEikhaQCfX
Wm6EieMcIB/5I5TQmSw0cmBMeZjSXYoFdoI16/X6yMMuATdxpvhOZGdUGXxhAH+x
xoTUavWZnEqW3fkUU71kT5E2f2i+0zoatFESXHeslJyz85aAYpP92H0CgYEA2e5A
Yozt5eaA1Gyhd8SeptkEU4xPirNUnVQHStpMWUb1kzTNXrPmNWccQ7JpfpG6DcYl
zUF6p6mlzY+zkMiyPQjwEJlhiHM2NlL1QS7td0R8ewgsFoyn8WsBI4RejWrEG9td
EDniuIw+pBFkcWthnTLHwECHdzgquToyTMjrBB0CgYEA28tdGbrZXhcyAZEhHAZA
Gzog+pKlkpEzeonLKIuGKzCrEKRecIK5jrqyQsCjhS0T7ZRnL4g6i0s+umiV5M5w
fcc292pEA1h45L3DD6OlKplSQVTv55/OYS4oY3YEJtf5mfm8vWi9lQeY8sxOlQpn
O+VZTdBHmTC8PGeTAgZXHZUCgYA6Tyv88lYowB7SN2qQgBQu8jvdGtqhcs/99GCr
H3N0I69LPsKAR0QeH8OJPXBKhDUywESXAaEOwS5yrLNP1tMRz5Vj65YUCzeDG3kx
gpvY4IMp7ArX0bSRvJ6mYSFnVxy3k174G3TVCfksrtagHioVBGQ7xUg5ltafjrms
n8l55QKBgQDVzU8tQvBVqY8/1lnw11Vj4fkE/drZHJ5UkdC1eenOfSWhlSLfUJ8j
ds7vEWpRPPoVuPZYeR1y78cyxKe1GBx6Wa2lF5c7xjmiu0xbRnrxYeLolce9/ntp
asClqpnHT8/VJYTD7Kqj0fouTTZf0zkig/y+2XERppd8k+pSKjUCPQ==
-----END RSA PRIVATE KEY-----
">>).
-define(CONF_STOMP_BAISC_1, -define(CONF_STOMP_BAISC_1,
#{ <<"idle_timeout">> => <<"10s">>, #{ <<"idle_timeout">> => <<"10s">>,
<<"mountpoint">> => <<"t/">>, <<"mountpoint">> => <<"t/">>,
@ -76,6 +200,31 @@ init_per_testcase(_CaseName, Conf) ->
-define(CONF_STOMP_LISTENER_2, -define(CONF_STOMP_LISTENER_2,
#{ <<"bind">> => <<"61614">> #{ <<"bind">> => <<"61614">>
}). }).
-define(CONF_STOMP_LISTENER_SSL,
#{ <<"bind">> => <<"61614">>,
<<"ssl">> =>
#{ <<"cacertfile">> => ?SVR_CA,
<<"certfile">> => ?SVR_CERT,
<<"keyfile">> => ?SVR_KEY
}
}).
-define(CONF_STOMP_LISTENER_SSL_2,
#{ <<"bind">> => <<"61614">>,
<<"ssl">> =>
#{ <<"cacertfile">> => ?SVR_CA,
<<"certfile">> => ?SVR_CERT2,
<<"keyfile">> => ?SVR_KEY2
}
}).
-define(CERTS_PATH(CertName), filename:join(["../../lib/emqx/etc/certs/", CertName])).
-define(CONF_STOMP_LISTENER_SSL_PATH,
#{ <<"bind">> => <<"61614">>,
<<"ssl">> =>
#{ <<"cacertfile">> => ?CERTS_PATH("cacert.pem"),
<<"certfile">> => ?CERTS_PATH("cert.pem"),
<<"keyfile">> => ?CERTS_PATH("key.pem")
}
}).
-define(CONF_STOMP_AUTHN_1, -define(CONF_STOMP_AUTHN_1,
#{ <<"mechanism">> => <<"password-based">>, #{ <<"mechanism">> => <<"password-based">>,
<<"backend">> => <<"built-in-database">>, <<"backend">> => <<"built-in-database">>,
@ -95,7 +244,6 @@ t_load_unload_gateway(_) ->
StompConf2 = compose(?CONF_STOMP_BAISC_2, StompConf2 = compose(?CONF_STOMP_BAISC_2,
?CONF_STOMP_AUTHN_1, ?CONF_STOMP_AUTHN_1,
?CONF_STOMP_LISTENER_1), ?CONF_STOMP_LISTENER_1),
ok = emqx_gateway_conf:load_gateway(stomp, StompConf1), ok = emqx_gateway_conf:load_gateway(stomp, StompConf1),
{error, already_exist} = {error, already_exist} =
emqx_gateway_conf:load_gateway(stomp, StompConf1), emqx_gateway_conf:load_gateway(stomp, StompConf1),
@ -213,6 +361,83 @@ t_load_remove_listener_authn(_) ->
), ),
ok. ok.
t_load_gateway_with_certs_content(_) ->
StompConf = compose_ssl_listener(
?CONF_STOMP_BAISC_1,
?CONF_STOMP_LISTENER_SSL
),
ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
SslConf = emqx_map_lib:deep_get(
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl">>],
emqx:get_raw_config([gateway, stomp])
),
ok = emqx_gateway_conf:unload_gateway(<<"stomp">>),
assert_ssl_confs_files_deleted(SslConf),
?assertException(error, {config_not_found, [gateway, stomp]},
emqx:get_raw_config([gateway, stomp])),
ok.
%% TODO: Comment out this test case for now, because emqx_tls_lib
%% will delete the configured certificate file.
%t_load_gateway_with_certs_path(_) ->
% StompConf = compose_ssl_listener(
% ?CONF_STOMP_BAISC_1,
% ?CONF_STOMP_LISTENER_SSL_PATH
% ),
% ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
% assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
% SslConf = emqx_map_lib:deep_get(
% [<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl">>],
% emqx:get_raw_config([gateway, stomp])
% ),
% ok = emqx_gateway_conf:unload_gateway(<<"stomp">>),
% assert_ssl_confs_files_deleted(SslConf),
% ?assertException(error, {config_not_found, [gateway, stomp]},
% emqx:get_raw_config([gateway, stomp])),
% ok.
t_add_listener_with_certs_content(_) ->
StompConf = ?CONF_STOMP_BAISC_1,
ok = emqx_gateway_conf:load_gateway(<<"stomp">>, StompConf),
assert_confs(StompConf, emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:add_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL),
assert_confs(
maps:merge(StompConf, ssl_listener(?CONF_STOMP_LISTENER_SSL)),
emqx:get_raw_config([gateway, stomp])),
ok = emqx_gateway_conf:update_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2),
assert_confs(
maps:merge(StompConf, ssl_listener(?CONF_STOMP_LISTENER_SSL_2)),
emqx:get_raw_config([gateway, stomp])),
SslConf = emqx_map_lib:deep_get(
[<<"listeners">>, <<"ssl">>, <<"default">>, <<"ssl">>],
emqx:get_raw_config([gateway, stomp])
),
ok = emqx_gateway_conf:remove_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}),
assert_ssl_confs_files_deleted(SslConf),
{error, not_found} =
emqx_gateway_conf:update_listener(
<<"stomp">>, {<<"ssl">>, <<"default">>}, ?CONF_STOMP_LISTENER_SSL_2),
?assertException(
error, {config_not_found, [gateway, stomp, listeners, ssl, default]},
emqx:get_raw_config([gateway, stomp, listeners, ssl, default])
),
ok.
assert_ssl_confs_files_deleted(SslConf) when is_map(SslConf) ->
Ks = [<<"cacertfile">>, <<"certfile">>, <<"keyfile">>],
lists:foreach(fun(K) ->
Path = maps:get(K, SslConf),
{error, enoent} = file:read_file(Path)
end, Ks).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Utils %% Utils
@ -224,6 +449,9 @@ compose(Basic, Authn, Listener) ->
compose_listener(Basic, Listener) -> compose_listener(Basic, Listener) ->
maps:merge(Basic, listener(Listener)). maps:merge(Basic, listener(Listener)).
compose_ssl_listener(Basic, Listener) ->
maps:merge(Basic, ssl_listener(Listener)).
compose_authn(Basic, Authn) -> compose_authn(Basic, Authn) ->
maps:merge(Basic, #{<<"authentication">> => Authn}). maps:merge(Basic, #{<<"authentication">> => Authn}).
@ -235,3 +463,7 @@ compose_listener_authn(Basic, Listener, Authn) ->
listener(L) -> listener(L) ->
#{<<"listeners">> => [L#{<<"type">> => <<"tcp">>, #{<<"listeners">> => [L#{<<"type">> => <<"tcp">>,
<<"name">> => <<"default">>}]}. <<"name">> => <<"default">>}]}.
ssl_listener(L) ->
#{<<"listeners">> => [L#{<<"type">> => <<"ssl">>,
<<"name">> => <<"default">>}]}.

View File

@ -21,7 +21,7 @@
assert_confs(Expected0, Effected) -> assert_confs(Expected0, Effected) ->
Expected = maybe_unconvert_listeners(Expected0), Expected = maybe_unconvert_listeners(Expected0),
case do_assert_confs(Expected, Effected) of case do_assert_confs(root, Expected, Effected) of
false -> false ->
io:format(standard_error, "Expected config: ~p,\n" io:format(standard_error, "Expected config: ~p,\n"
"Effected config: ~p", "Effected config: ~p",
@ -31,23 +31,36 @@ assert_confs(Expected0, Effected) ->
ok ok
end. end.
do_assert_confs(Expected, Effected) when is_map(Expected), do_assert_confs(_Key, Expected, Effected) when is_map(Expected),
is_map(Effected) -> is_map(Effected) ->
Ks1 = maps:keys(Expected), Ks1 = maps:keys(Expected),
lists:all(fun(K) -> lists:all(fun(K) ->
do_assert_confs(maps:get(K, Expected), do_assert_confs(K,
maps:get(K, Expected),
maps:get(K, Effected, undefined)) maps:get(K, Effected, undefined))
end, Ks1); end, Ks1);
do_assert_confs([Expected|More1], [Effected|More2]) -> do_assert_confs(Key, Expected, Effected) when Key == <<"cacertfile">>;
do_assert_confs(Expected, Effected) andalso do_assert_confs(More1, More2); Key == <<"certfile">>;
do_assert_confs([], []) -> Key == <<"keyfile">> ->
case Expected == Effected of
true -> true;
false ->
case file:read_file(Effected) of
{ok, Content} -> Expected == Content;
_ -> false
end
end;
do_assert_confs(Key, [Expected|More1], [Effected|More2]) ->
do_assert_confs(Key, Expected, Effected)
andalso do_assert_confs(Key, More1, More2);
do_assert_confs(_Key, [], []) ->
true; true;
do_assert_confs(Expected, Effected) -> do_assert_confs(Key, Expected, Effected) ->
Res = Expected =:= Effected, Res = Expected =:= Effected,
Res == false andalso Res == false andalso
ct:pal("Errors: conf not match, " ct:pal("Errors: ~p value not match, "
"expected: ~p, got: ~p~n", [Expected, Effected]), "expected: ~p, got: ~p~n", [Key, Expected, Effected]),
Res. Res.
maybe_unconvert_listeners(Conf) when is_map(Conf) -> maybe_unconvert_listeners(Conf) when is_map(Conf) ->

File diff suppressed because it is too large Load Diff

View File

@ -70,12 +70,13 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_gateway]), emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_gateway]),
Config. Config.
end_per_suite(Config) -> end_per_suite(Config) ->
timer:sleep(300), timer:sleep(300),
emqx_mgmt_api_test_util:end_suite([emqx_gateway]), {ok, _} = emqx_conf:remove([<<"gateway">>,<<"lwm2m">>], #{}),
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_conf]),
Config. Config.
init_per_testcase(_AllTestCase, Config) -> init_per_testcase(_AllTestCase, Config) ->
@ -106,13 +107,16 @@ t_lookup_cmd_read(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200), timer:sleep(200),
%% step 1, device register ... %% step 1, device register ...
test_send_coap_request( UdpSock, test_send_coap_request(
post, UdpSock,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]), post,
#coap_content{content_format = <<"text/plain">>, sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>}, #coap_content{
[], content_format = <<"text/plain">>,
MsgId1), payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
"</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock), #coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1), ?assertEqual({ok,created}, Method1),
@ -140,7 +144,14 @@ t_lookup_cmd_read(Config) ->
?LOGT("LwM2M client got ~p", [Request2]), ?LOGT("LwM2M client got ~p", [Request2]),
timer:sleep(50), timer:sleep(50),
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, content}, #coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>}, Request2, true), test_send_coap_response(
UdpSock,
"127.0.0.1",
?PORT,
{ok, content},
#coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>},
Request2,
true),
timer:sleep(200), timer:sleep(200),
normal_received_request(Epn, <<"/3/0/0">>, <<"read">>). normal_received_request(Epn, <<"/3/0/0">>, <<"read">>).
@ -176,13 +187,15 @@ t_lookup_cmd_discover(Config) ->
timer:sleep(50), timer:sleep(50),
PayloadDiscover = <<"</3/0/7>;dim=8;pmin=10;pmax=60;gt=50;lt=42.2,</3/0/8>">>, PayloadDiscover = <<"</3/0/7>;dim=8;pmin=10;pmax=60;gt=50;lt=42.2,</3/0/8>">>,
test_send_coap_response(UdpSock, test_send_coap_response(
"127.0.0.1", UdpSock,
?PORT, "127.0.0.1",
{ok, content}, ?PORT,
#coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover}, {ok, content},
Request2, #coap_content{content_format = <<"application/link-format">>,
true), payload = PayloadDiscover},
Request2,
true),
timer:sleep(200), timer:sleep(200),
discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>). discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>).
@ -194,13 +207,15 @@ t_read(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200), timer:sleep(200),
%% step 1, device register ... %% step 1, device register ...
test_send_coap_request( UdpSock, test_send_coap_request(
post, UdpSock,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]), post,
#coap_content{content_format = <<"text/plain">>, sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>}, #coap_content{content_format = <<"text/plain">>,
[], payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
MsgId1), "</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock), #coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1), ?assertEqual({ok,created}, Method1),
@ -224,13 +239,15 @@ t_write(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200), timer:sleep(200),
%% step 1, device register ... %% step 1, device register ...
test_send_coap_request( UdpSock, test_send_coap_request(
post, UdpSock,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]), post,
#coap_content{content_format = <<"text/plain">>, sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>}, #coap_content{content_format = <<"text/plain">>,
[], payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
MsgId1), "</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock), #coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1), ?assertEqual({ok,created}, Method1),
@ -256,13 +273,15 @@ t_observe(Config) ->
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0), emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200), timer:sleep(200),
%% step 1, device register ... %% step 1, device register ...
test_send_coap_request( UdpSock, test_send_coap_request(
post, UdpSock,
sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]), post,
#coap_content{content_format = <<"text/plain">>, sprintf("coap://127.0.0.1:~b/rd?ep=~ts&lt=600&lwm2m=1", [?PORT, Epn]),
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>}, #coap_content{content_format = <<"text/plain">>,
[], payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,"
MsgId1), "</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock), #coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1), ?assertEqual({ok,created}, Method1),

View File

@ -87,35 +87,36 @@ t_connect(_) ->
%% Connect will be failed, because of bad login or passcode %% Connect will be failed, because of bad login or passcode
%% FIXME: Waiting for authentication works %% FIXME: Waiting for authentication works
%with_connection(fun(Sock) -> %with_connection(
% gen_tcp:send(Sock, serialize(<<"CONNECT">>, % fun(Sock) ->
% [{<<"accept-version">>, ?STOMP_VER}, % gen_tcp:send(Sock, serialize(<<"CONNECT">>,
% {<<"host">>, <<"127.0.0.1:61613">>}, % [{<<"accept-version">>, ?STOMP_VER},
% {<<"login">>, <<"admin">>}, % {<<"host">>, <<"127.0.0.1:61613">>},
% {<<"passcode">>, <<"admin">>}, % {<<"login">>, <<"admin">>},
% {<<"heart-beat">>, <<"1000,2000">>}])), % {<<"passcode">>, <<"admin">>},
% {ok, Data} = gen_tcp:recv(Sock, 0), % {<<"heart-beat">>, <<"1000,2000">>}])),
% {ok, #stomp_frame{command = <<"ERROR">>, % {ok, Data} = gen_tcp:recv(Sock, 0),
% headers = _, % {ok, Frame, _, _} = parse(Data),
% body = <<"Login or passcode error!">>}, _, _} = % #stomp_frame{command = <<"ERROR">>,
% parse(Data) % headers = _,
% end), % body = <<"Login or passcode error!">>} = Frame
% end),
%% Connect will be failed, because of bad version %% Connect will be failed, because of bad version
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock,
[{<<"accept-version">>, <<"2.0,2.1">>}, serialize(<<"CONNECT">>,
{<<"host">>, <<"127.0.0.1:61613">>}, [{<<"accept-version">>, <<"2.0,2.1">>},
{<<"login">>, <<"guest">>}, {<<"host">>, <<"127.0.0.1:61613">>},
{<<"passcode">>, <<"guest">>}, {<<"login">>, <<"guest">>},
{<<"heart-beat">>, <<"1000,2000">>}])), {<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"1000,2000">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, {ok, Frame, _, _} = parse(Data),
#stomp_frame{command = <<"ERROR">>, #stomp_frame{
headers = _, command = <<"ERROR">>,
body = <<"Login Failed: Supported protocol versions < 1.2">>}, headers = _,
_, body = <<"Login Failed: Supported protocol versions < 1.2">>} = Frame
_ } = parse(Data)
end). end).
t_heartbeat(_) -> t_heartbeat(_) ->
@ -407,6 +408,7 @@ t_rest_clienit_info(_) ->
%% kickout %% kickout
{204, _} = request(delete, ClientPath), {204, _} = request(delete, ClientPath),
ignored = gen_server:call(emqx_cm, ignore, infinity), % sync
ok = emqx_pool:flush_async_tasks(), ok = emqx_pool:flush_async_tasks(),
{200, Clients2} = request(get, "/gateway/stomp/clients"), {200, Clients2} = request(get, "/gateway/stomp/clients"),
?assertEqual(0, length(maps:get(data, Clients2))) ?assertEqual(0, length(maps:get(data, Clients2)))

View File

@ -26,6 +26,23 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
%% CASE-SIDE-EFFICT:
%%
%% Running-Seq:
%% emqx_authz_api_mnesia_SUITE.erl
%% emqx_gateway_api_SUITE.erl
%% emqx_machine_SUITE.erl
%%
%% Reason:
%% the `emqx_machine_boot:ensure_apps_started()` will crashed
%% on starting `emqx_authz` with dirty confs, which caused the file
%% `.._build/test/lib/emqx_conf/etc/acl.conf` could not be found
%%
%% Workaround:
%% Unload emqx_authz to avoid reboot this application
%%
application:unload(emqx_authz),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([]),
Config. Config.

View File

@ -17,7 +17,8 @@
{elvis_style, operator_spaces, #{rules => [{right, "|"}, {elvis_style, operator_spaces, #{rules => [{right, "|"},
{left, "|"}, {left, "|"},
{right, "||"}, {right, "||"},
{left, "||"}]}} {left, "||"}]}},
{elvis_style, dont_repeat_yourself, #{ min_complexity => 20 }}
] ]
}, },
#{dirs => ["test", "apps/**/test"], #{dirs => ["test", "apps/**/test"],