diff --git a/.github/workflows/run_api_tests.yaml b/.github/workflows/run_api_tests.yaml
index 618b9383a..15dd92f3b 100644
--- a/.github/workflows/run_api_tests.yaml
+++ b/.github/workflows/run_api_tests.yaml
@@ -2,10 +2,10 @@ name: API Test Suite
on:
push:
- tags:
- - e*
- - v*
- pull_request:
+ tags:
+ - e*
+ - v*
+ pull_request:
jobs:
build:
@@ -36,6 +36,9 @@ jobs:
script_name:
- api_metrics
- api_subscriptions
+ - api_clients
+ - api_routes
+ - api_publish
steps:
- uses: actions/checkout@v2
with:
diff --git a/Makefile b/Makefile
index e646f5b7a..2ef0e1806 100644
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,7 @@ BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X
-export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.11
+export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.13
ifeq ($(OS),Windows_NT)
export REBAR_COLOR=none
endif
diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl
index 572d23155..7391b765a 100644
--- a/apps/emqx/include/emqx_release.hrl
+++ b/apps/emqx/include/emqx_release.hrl
@@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
--define(EMQX_RELEASE, {opensource, "5.0-alpha.5"}).
+-define(EMQX_RELEASE, {opensource, "5.0-alpha.6"}).
-else.
diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config
index bb3a588a9..a8462ad82 100644
--- a/apps/emqx/rebar.config
+++ b/apps/emqx/rebar.config
@@ -15,7 +15,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
- , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}}
+ , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl
index 8cc8cf2df..2e53d85eb 100644
--- a/apps/emqx/src/emqx_authentication.erl
+++ b/apps/emqx/src/emqx_authentication.erl
@@ -78,6 +78,25 @@
-define(VER_1, <<"1">>).
-define(VER_2, <<"2">>).
+-type chain_name() :: atom().
+-type authenticator_id() :: binary().
+-type position() :: top | bottom | {before, authenticator_id()}.
+-type update_request() :: {create_authenticator, chain_name(), map()}
+ | {delete_authenticator, chain_name(), authenticator_id()}
+ | {update_authenticator, chain_name(), authenticator_id(), map()}
+ | {move_authenticator, chain_name(), authenticator_id(), position()}.
+-type authn_type() :: atom() | {atom(), atom()}.
+-type provider() :: module().
+
+-type chain() :: #{name := chain_name(),
+ authenticators := [authenticator()]}.
+
+-type authenticator() :: #{id := authenticator_id(),
+ provider := provider(),
+ enable := boolean(),
+ state := map()}.
+
+
-type config() :: #{atom() => term()}.
-type state() :: #{atom() => term()}.
-type extra() :: #{is_superuser := boolean(),
@@ -128,7 +147,12 @@
-callback update_user(UserID, UserInfo, State)
-> {ok, User}
| {error, term()}
- when UserID::binary, UserInfo::map(), State::state(), User::user_info().
+ when UserID::binary(), UserInfo::map(), State::state(), User::user_info().
+
+-callback lookup_user(UserID, UserInfo, State)
+ -> {ok, User}
+ | {error, term()}
+ when UserID::binary(), UserInfo::map(), State::state(), User::user_info().
-callback list_users(State)
-> {ok, Users}
@@ -138,6 +162,7 @@
, add_user/2
, delete_user/2
, update_user/3
+ , lookup_user/3
, list_users/1
]).
@@ -159,6 +184,8 @@ authentication(_) -> undefined.
%% Callbacks of config handler
%%------------------------------------------------------------------------------
+-spec pre_config_update(update_request(), emqx_config:raw_config())
+ -> {ok, map() | list()} | {error, term()}.
pre_config_update(UpdateReq, OldConfig) ->
case do_pre_config_update(UpdateReq, to_list(OldConfig)) of
{error, Reason} -> {error, Reason};
@@ -185,22 +212,22 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
{error, Reason} -> {error, Reason};
{ok, Part1, [Found | Part2]} ->
case Position of
- <<"top">> ->
+ top ->
{ok, [Found | Part1] ++ Part2};
- <<"bottom">> ->
+ bottom ->
{ok, Part1 ++ Part2 ++ [Found]};
- <<"before:", Before/binary>> ->
+ {before, Before} ->
case split_by_id(Before, Part1 ++ Part2) of
{error, Reason} ->
{error, Reason};
{ok, NPart1, [NFound | NPart2]} ->
{ok, NPart1 ++ [Found, NFound | NPart2]}
- end;
- _ ->
- {error, {invalid_parameter, position}}
+ end
end
end.
+-spec post_config_update(update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
+ -> ok | {ok, map()} | {error, term()}.
post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(UpdateReq, check_config(to_list(NewConfig)), OldConfig, AppEnvs).
@@ -220,13 +247,7 @@ do_post_config_update({update_authenticator, ChainName, AuthenticatorID, _Config
update_authenticator(ChainName, AuthenticatorID, NConfig);
do_post_config_update({move_authenticator, ChainName, AuthenticatorID, Position}, _NewConfig, _OldConfig, _AppEnvs) ->
- NPosition = case Position of
- <<"top">> -> top;
- <<"bottom">> -> bottom;
- <<"before:", Before/binary>> ->
- {before, Before}
- end,
- move_authenticator(ChainName, AuthenticatorID, NPosition).
+ move_authenticator(ChainName, AuthenticatorID, Position).
check_config(Config) ->
#{authentication := CheckedConfig} = hocon_schema:check_plain(emqx_authentication,
@@ -269,6 +290,7 @@ do_authenticate([#authenticator{provider = Provider, state = State} | More], Cre
%% APIs
%%------------------------------------------------------------------------------
+-spec initialize_authentication(chain_name(), [#{binary() => term()}]) -> ok.
initialize_authentication(_, []) ->
ok;
initialize_authentication(ChainName, AuthenticatorsConfig) ->
@@ -283,43 +305,56 @@ initialize_authentication(ChainName, AuthenticatorsConfig) ->
end
end, CheckedConfig).
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+-spec stop() -> ok.
stop() ->
gen_server:stop(?MODULE).
+-spec get_refs() -> {ok, Refs} when Refs :: [{authn_type(), module()}].
get_refs() ->
gen_server:call(?MODULE, get_refs).
+-spec add_provider(authn_type(), module()) -> ok.
add_provider(AuthNType, Provider) ->
gen_server:call(?MODULE, {add_provider, AuthNType, Provider}).
+-spec remove_provider(authn_type()) -> ok.
remove_provider(AuthNType) ->
gen_server:call(?MODULE, {remove_provider, AuthNType}).
+-spec create_chain(chain_name()) -> {ok, chain()} | {error, term()}.
create_chain(Name) ->
gen_server:call(?MODULE, {create_chain, Name}).
+-spec delete_chain(chain_name()) -> ok | {error, term()}.
delete_chain(Name) ->
gen_server:call(?MODULE, {delete_chain, Name}).
+-spec lookup_chain(chain_name()) -> {ok, chain()} | {error, term()}.
lookup_chain(Name) ->
gen_server:call(?MODULE, {lookup_chain, Name}).
+-spec list_chains() -> {ok, [chain()]}.
list_chains() ->
Chains = ets:tab2list(?CHAINS_TAB),
{ok, [serialize_chain(Chain) || Chain <- Chains]}.
+-spec create_authenticator(chain_name(), config()) -> {ok, authenticator()} | {error, term()}.
create_authenticator(ChainName, Config) ->
gen_server:call(?MODULE, {create_authenticator, ChainName, Config}).
+-spec delete_authenticator(chain_name(), authenticator_id()) -> ok | {error, term()}.
delete_authenticator(ChainName, AuthenticatorID) ->
gen_server:call(?MODULE, {delete_authenticator, ChainName, AuthenticatorID}).
+-spec update_authenticator(chain_name(), authenticator_id(), config()) -> {ok, authenticator()} | {error, term()}.
update_authenticator(ChainName, AuthenticatorID, Config) ->
gen_server:call(?MODULE, {update_authenticator, ChainName, AuthenticatorID, Config}).
+-spec lookup_authenticator(chain_name(), authenticator_id()) -> {ok, authenticator()} | {error, term()}.
lookup_authenticator(ChainName, AuthenticatorID) ->
case ets:lookup(?CHAINS_TAB, ChainName) of
[] ->
@@ -333,6 +368,7 @@ lookup_authenticator(ChainName, AuthenticatorID) ->
end
end.
+-spec list_authenticators(chain_name()) -> {ok, [authenticator()]} | {error, term()}.
list_authenticators(ChainName) ->
case ets:lookup(?CHAINS_TAB, ChainName) of
[] ->
@@ -341,28 +377,36 @@ list_authenticators(ChainName) ->
{ok, serialize_authenticators(Authenticators)}
end.
+-spec move_authenticator(chain_name(), authenticator_id(), position()) -> ok | {error, term()}.
move_authenticator(ChainName, AuthenticatorID, Position) ->
gen_server:call(?MODULE, {move_authenticator, ChainName, AuthenticatorID, Position}).
+-spec import_users(chain_name(), authenticator_id(), binary()) -> ok | {error, term()}.
import_users(ChainName, AuthenticatorID, Filename) ->
gen_server:call(?MODULE, {import_users, ChainName, AuthenticatorID, Filename}).
+-spec add_user(chain_name(), authenticator_id(), user_info()) -> {ok, user_info()} | {error, term()}.
add_user(ChainName, AuthenticatorID, UserInfo) ->
gen_server:call(?MODULE, {add_user, ChainName, AuthenticatorID, UserInfo}).
+-spec delete_user(chain_name(), authenticator_id(), binary()) -> ok | {error, term()}.
delete_user(ChainName, AuthenticatorID, UserID) ->
gen_server:call(?MODULE, {delete_user, ChainName, AuthenticatorID, UserID}).
+-spec update_user(chain_name(), authenticator_id(), binary(), map()) -> {ok, user_info()} | {error, term()}.
update_user(ChainName, AuthenticatorID, UserID, NewUserInfo) ->
gen_server:call(?MODULE, {update_user, ChainName, AuthenticatorID, UserID, NewUserInfo}).
+-spec lookup_user(chain_name(), authenticator_id(), binary()) -> {ok, user_info()} | {error, term()}.
lookup_user(ChainName, AuthenticatorID, UserID) ->
gen_server:call(?MODULE, {lookup_user, ChainName, AuthenticatorID, UserID}).
%% TODO: Support pagination
+-spec list_users(chain_name(), authenticator_id()) -> {ok, [user_info()]} | {error, term()}.
list_users(ChainName, AuthenticatorID) ->
gen_server:call(?MODULE, {list_users, ChainName, AuthenticatorID}).
+-spec generate_id(config()) -> authenticator_id().
generate_id(#{mechanism := Mechanism0, backend := Backend0}) ->
Mechanism = atom_to_binary(Mechanism0),
Backend = atom_to_binary(Backend0),
@@ -484,7 +528,7 @@ handle_call({update_authenticator, ChainName, AuthenticatorID, Config}, _From, S
{error, Reason}
end;
false ->
- {error, mechanism_or_backend_change_is_not_alloed}
+ {error, change_of_authentication_type_is_not_allowed}
end
end
end,
@@ -679,7 +723,7 @@ call_authenticator(ChainName, AuthenticatorID, Func, Args) ->
true ->
erlang:apply(Provider, Func, Args ++ [State]);
false ->
- {error, unsupported_feature}
+ {error, unsupported_operation}
end
end
end,
diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl
index 06d900ed5..06d89c86d 100644
--- a/apps/emqx/src/emqx_listeners.erl
+++ b/apps/emqx/src/emqx_listeners.erl
@@ -253,7 +253,7 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
end.
delete_authentication(Type, ListenerName, _Conf) ->
- emqx_authentication:delete_chain(atom_to_binary(listener_id(Type, ListenerName))).
+ emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
%% Update the listeners at runtime
post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) ->
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 01989c5a1..ed3f64d0e 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -69,33 +69,82 @@
cipher/0,
comma_separated_atoms/0]).
--export([namespace/0, roots/0, fields/1]).
+-export([namespace/0, roots/0, roots/1, fields/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([ssl/1]).
namespace() -> undefined.
roots() ->
- ["zones",
- "mqtt",
- "flapping_detect",
- "force_shutdown",
- "force_gc",
- "conn_congestion",
- "rate_limit",
- "quota",
- {"listeners",
+ %% TODO change config importance to a field metadata
+ roots(high) ++ roots(medium) ++ roots(low).
+
+roots(high) ->
+ [ {"listeners",
sc(ref("listeners"),
- #{ desc => "MQTT listeners identified by their protocol type and assigned names. "
- "The listeners enabled by default are named with 'default'"})
- },
- "broker",
- "plugins",
- "stats",
- "sysmon",
- "alarm",
- "authorization",
- {"authentication", sc(hoconsc:lazy(hoconsc:array(map())), #{})}
+ #{ desc => "MQTT listeners identified by their protocol type and assigned names"
+ })
+ }
+ , {"zones",
+ sc(map("name", ref("zone")),
+ #{ desc => "A zone is a set of configs grouped by the zone name
.
"
+ "For flexible configuration mapping, the name
"
+ "can be set to a listener's zone
config.
"
+ "NOTE: A builtin zone named default
is auto created "
+ "and can not be deleted."
+ })}
+ , {"mqtt",
+ sc(ref("mqtt"),
+ #{ desc => "Global MQTT configuration.
"
+ "The configs here work as default values which can be overriden "
+ "in zone
configs"
+ })}
+ , {"authentication",
+ sc(hoconsc:lazy(hoconsc:array(map())),
+ #{ desc => "Default authentication configs for all MQTT listeners.
"
+ "For per-listener overrides see authentication
"
+ "in listener configs"
+ })}
+ , {"authorization",
+ sc(ref("authorization"),
+ #{})}
+ ];
+roots(medium) ->
+ [ {"broker",
+ sc(ref("broker"),
+ #{})}
+ , {"rate_limit",
+ sc(ref("rate_limit"),
+ #{})}
+ , {"force_shutdown",
+ sc(ref("force_shutdown"),
+ #{})}
+ ];
+roots(low) ->
+ [ {"force_gc",
+ sc(ref("force_gc"),
+ #{})}
+ , {"conn_congestion",
+ sc(ref("conn_congestion"),
+ #{})}
+ , {"quota",
+ sc(ref("quota"),
+ #{})}
+ , {"plugins", %% TODO: move to emqx_machine_schema
+ sc(ref("plugins"),
+ #{})}
+ , {"stats",
+ sc(ref("stats"),
+ #{})}
+ , {"sysmon",
+ sc(ref("sysmon"),
+ #{})}
+ , {"alarm",
+ sc(ref("alarm"),
+ #{})}
+ , {"flapping_detect",
+ sc(ref("flapping_detect"),
+ #{})}
].
fields("stats") ->
@@ -117,8 +166,7 @@ fields("authorization") ->
, {"cache",
sc(ref(?MODULE, "cache"),
#{
- })
- }
+ })}
];
fields("cache") ->
@@ -270,14 +318,7 @@ fields("mqtt") ->
})}
];
-fields("zones") ->
- [ {"$name",
- sc(ref("zone_settings"),
- #{
- }
- )}];
-
-fields("zone_settings") ->
+fields("zone") ->
Fields = ["mqtt", "stats", "flapping_detect", "force_shutdown",
"conn_congestion", "rate_limit", "quota", "force_gc"],
[{F, ref(emqx_zone_schema, F)} || F <- Fields];
@@ -375,48 +416,37 @@ fields("force_gc") ->
fields("listeners") ->
[ {"tcp",
- sc(ref("tcp_listeners"),
+ sc(map(name, ref("mqtt_tcp_listener")),
#{ desc => "TCP listeners"
+ , nullable => {true, recursive}
})
}
, {"ssl",
- sc(ref("ssl_listeners"),
+ sc(map(name, ref("mqtt_ssl_listener")),
#{ desc => "SSL listeners"
+ , nullable => {true, recursive}
})
}
, {"ws",
- sc(ref("ws_listeners"),
+ sc(map(name, ref("mqtt_ws_listener")),
#{ desc => "HTTP websocket listeners"
+ , nullable => {true, recursive}
})
}
, {"wss",
- sc(ref("wss_listeners"),
+ sc(map(name, ref("mqtt_wss_listener")),
#{ desc => "HTTPS websocket listeners"
+ , nullable => {true, recursive}
})
}
, {"quic",
- sc(ref("quic_listeners"),
+ sc(map(name, ref("mqtt_quic_listener")),
#{ desc => "QUIC listeners"
+ , nullable => {true, recursive}
})
}
];
-fields("tcp_listeners") ->
- [ {"$name", ref("mqtt_tcp_listener")}
- ];
-fields("ssl_listeners") ->
- [ {"$name", ref("mqtt_ssl_listener")}
- ];
-fields("ws_listeners") ->
- [ {"$name", ref("mqtt_ws_listener")}
- ];
-fields("wss_listeners") ->
- [ {"$name", ref("mqtt_wss_listener")}
- ];
-fields("quic_listeners") ->
- [ {"$name", ref("mqtt_quic_listener")}
- ];
-
fields("mqtt_tcp_listener") ->
[ {"tcp",
sc(ref("tcp_opts"),
@@ -1011,6 +1041,8 @@ ceiling(X) ->
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+map(Name, Type) -> hoconsc:map(Name, Type).
+
ref(Field) -> hoconsc:ref(?MODULE, Field).
ref(Module, Field) -> hoconsc:ref(Module, Field).
diff --git a/apps/emqx/test/emqx_authentication_SUITE.erl b/apps/emqx/test/emqx_authentication_SUITE.erl
index aa4d55fee..001a4b40e 100644
--- a/apps/emqx/test/emqx_authentication_SUITE.erl
+++ b/apps/emqx/test/emqx_authentication_SUITE.erl
@@ -206,7 +206,7 @@ t_update_config(_) ->
?assertMatch({ok, _}, update_config([authentication], {update_authenticator, Global, ID1, #{}})),
?assertMatch({ok, #{id := ID1, state := #{mark := 2}}}, ?AUTHN:lookup_authenticator(Global, ID1)),
- ?assertMatch({ok, _}, update_config([authentication], {move_authenticator, Global, ID2, <<"top">>})),
+ ?assertMatch({ok, _}, update_config([authentication], {move_authenticator, Global, ID2, top})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(Global)),
?assertMatch({ok, _}, update_config([authentication], {delete_authenticator, Global, ID1})),
@@ -223,7 +223,7 @@ t_update_config(_) ->
?assertMatch({ok, _}, update_config(ConfKeyPath, {update_authenticator, ListenerID, ID1, #{}})),
?assertMatch({ok, #{id := ID1, state := #{mark := 2}}}, ?AUTHN:lookup_authenticator(ListenerID, ID1)),
- ?assertMatch({ok, _}, update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, <<"top">>})),
+ ?assertMatch({ok, _}, update_config(ConfKeyPath, {move_authenticator, ListenerID, ID2, top})),
?assertMatch({ok, [#{id := ID2}, #{id := ID1}]}, ?AUTHN:list_authenticators(ListenerID)),
?assertMatch({ok, _}, update_config(ConfKeyPath, {delete_authenticator, ListenerID, ID1})),
diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl
index 5ba1419f0..e492100ee 100644
--- a/apps/emqx_authn/src/emqx_authn_api.erl
+++ b/apps/emqx_authn/src/emqx_authn_api.erl
@@ -37,7 +37,7 @@
-define(EXAMPLE_1, #{mechanism => <<"password-based">>,
backend => <<"built-in-database">>,
- query => <<"SELECT password_hash from built-in-database WHERE username = ${username}">>,
+ user_id_type => <<"username">>,
password_hash_algorithm => #{
name => <<"sha256">>
}}).
@@ -1193,10 +1193,10 @@ definitions() ->
enum => [<<"built-in-database">>],
example => <<"built-in-database">>
},
- query => #{
+ user_id_type => #{
type => string,
- default => <<"SELECT password_hash from built-in-database WHERE username = ${username}">>,
- example => <<"SELECT password_hash from built-in-database WHERE username = ${username}">>
+ enum => [<<"username">>, <<"clientid">>],
+ example => <<"username">>
},
password_hash_algorithm => minirest:ref(<<"PasswordHashAlgorithm">>)
}
@@ -1550,7 +1550,8 @@ definitions() ->
enable_pipelining => #{
type => boolean,
default => true
- }
+ },
+ ssl => minirest:ref(<<"SSL">>)
}
},
@@ -1584,6 +1585,10 @@ definitions() ->
certificate => #{
type => string
},
+ endpoint => #{
+ type => string,
+ example => <<"http://localhost:80">>
+ },
verify_claims => #{
type => object,
additionalProperties => #{
@@ -1631,6 +1636,7 @@ definitions() ->
},
salt_rounds => #{
type => integer,
+ description => <<"Only valid when the name field is set to bcrypt">>,
default => 10
}
}
@@ -1857,8 +1863,7 @@ list_authenticator(ConfKeyPath, AuthenticatorID) ->
update_authenticator(ConfKeyPath, ChainName0, AuthenticatorID, Config) ->
ChainName = to_atom(ChainName0),
- case update_config(ConfKeyPath,
- {update_authenticator, ChainName, AuthenticatorID, Config}) of
+ case update_config(ConfKeyPath, {update_authenticator, ChainName, AuthenticatorID, Config}) of
{ok, #{post_config_update := #{?AUTHN := #{id := ID}},
raw_config := AuthenticatorsConfig}} ->
{ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig),
@@ -1878,10 +1883,15 @@ delete_authenticator(ConfKeyPath, ChainName0, AuthenticatorID) ->
move_authenitcator(ConfKeyPath, ChainName0, AuthenticatorID, Position) ->
ChainName = to_atom(ChainName0),
- case update_config(ConfKeyPath, {move_authenticator, ChainName, AuthenticatorID, Position}) of
- {ok, _} ->
- {204};
- {error, {_, _, Reason}} ->
+ case parse_position(Position) of
+ {ok, NPosition} ->
+ case update_config(ConfKeyPath, {move_authenticator, ChainName, AuthenticatorID, NPosition}) of
+ {ok, _} ->
+ {204};
+ {error, {_, _, Reason}} ->
+ serialize_error(Reason)
+ end;
+ {error, Reason} ->
serialize_error(Reason)
end.
@@ -1963,29 +1973,69 @@ fill_defaults(Config) ->
serialize_error({not_found, {authenticator, ID}}) ->
{404, #{code => <<"NOT_FOUND">>,
- message => list_to_binary(io_lib:format("Authenticator '~s' does not exist", [ID]))}};
+ message => list_to_binary(
+ io_lib:format("Authenticator '~s' does not exist", [ID])
+ )}};
+
serialize_error({not_found, {listener, ID}}) ->
{404, #{code => <<"NOT_FOUND">>,
- message => list_to_binary(io_lib:format("Listener '~s' does not exist", [ID]))}};
+ message => list_to_binary(
+ io_lib:format("Listener '~s' does not exist", [ID])
+ )}};
+
+serialize_error({not_found, {chain, ?GLOBAL}}) ->
+ {500, #{code => <<"INTERNAL_SERVER_ERROR">>,
+ message => <<"Authentication status is abnormal">>}};
+
+serialize_error({not_found, {chain, Name}}) ->
+ {400, #{code => <<"BAD_REQUEST">>,
+ message => list_to_binary(
+ io_lib:format("No authentication has been create for listener '~s'", [Name])
+ )}};
+
serialize_error({already_exists, {authenticator, ID}}) ->
{409, #{code => <<"ALREADY_EXISTS">>,
message => list_to_binary(
io_lib:format("Authenticator '~s' already exist", [ID])
)}};
+
+serialize_error(no_available_provider) ->
+ {400, #{code => <<"BAD_REQUEST">>,
+ message => <<"Unsupported authentication type">>}};
+
+serialize_error(change_of_authentication_type_is_not_allowed) ->
+ {400, #{code => <<"BAD_REQUEST">>,
+ message => <<"Change of authentication type is not allowed">>}};
+
+serialize_error(unsupported_operation) ->
+ {400, #{code => <<"BAD_REQUEST">>,
+ message => <<"Operation not supported in this authentication type">>}};
+
serialize_error({missing_parameter, Name}) ->
{400, #{code => <<"MISSING_PARAMETER">>,
message => list_to_binary(
io_lib:format("The input parameter '~p' that is mandatory for processing this request is not supplied", [Name])
)}};
+
serialize_error({invalid_parameter, Name}) ->
{400, #{code => <<"INVALID_PARAMETER">>,
message => list_to_binary(
io_lib:format("The value of input parameter '~p' is invalid", [Name])
)}};
+
serialize_error(Reason) ->
{400, #{code => <<"BAD_REQUEST">>,
message => list_to_binary(io_lib:format("~p", [Reason]))}}.
+parse_position(<<"top">>) ->
+ {ok, top};
+parse_position(<<"bottom">>) ->
+ {ok, bottom};
+parse_position(<<"before:", Before/binary>>) ->
+ {ok, {before, Before}};
+parse_position(_) ->
+ {error, {invalid_parameter, position}}.
+
to_list(M) when is_map(M) ->
[M];
to_list(L) when is_list(L) ->
diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
index 5495b139a..daa7f8073 100644
--- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
+++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
@@ -214,7 +214,7 @@ default_headers_no_content_type() ->
transform_header_name(Headers) ->
maps:fold(fun(K0, V, Acc) ->
- K = list_to_binary(string:to_lower(binary_to_list(K0))),
+ K = list_to_binary(string:to_lower(to_list(K0))),
maps:put(K, V, Acc)
end, #{}, Headers).
@@ -301,3 +301,8 @@ parse_body(<<"application/x-www-form-urlencoded">>, Body) ->
{ok, maps:from_list(cow_qs:parse_qs(Body))};
parse_body(ContentType, _) ->
{error, {unsupported_content_type, ContentType}}.
+
+to_list(A) when is_atom(A) ->
+ atom_to_list(A);
+to_list(B) when is_binary(B) ->
+ binary_to_list(B).
diff --git a/apps/emqx_authz/etc/emqx_authz.conf b/apps/emqx_authz/etc/emqx_authz.conf
index ed4ad573c..c2856f0b5 100644
--- a/apps/emqx_authz/etc/emqx_authz.conf
+++ b/apps/emqx_authz/etc/emqx_authz.conf
@@ -22,7 +22,7 @@ authorization {
# certfile: "{{ platform_etc_dir }}/certs/client-cert.pem"
# keyfile: "{{ platform_etc_dir }}/certs/client-key.pem"
# }
- # sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
+ # query: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or clientid = '%c'"
# },
# {
# type: pgsql
@@ -33,7 +33,7 @@ authorization {
# password: public
# auto_reconnect: true
# ssl: {enable: false}
- # sql: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
+ # query: "select ipaddress, username, clientid, action, permission, topic from mqtt_authz where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"
# },
# {
# type: redis
@@ -53,7 +53,7 @@ authorization {
# database: mqtt
# ssl: {enable: false}
# collection: mqtt_authz
- # find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
+ # selector: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
# },
{
type: file
diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl
index e0e584806..6fe2d7565 100644
--- a/apps/emqx_authz/src/emqx_authz.erl
+++ b/apps/emqx_authz/src/emqx_authz.erl
@@ -290,7 +290,7 @@ init_source(#{enable := true,
end;
init_source(#{enable := true,
type := DB,
- sql := SQL
+ query := SQL
} = Source) when DB =:= mysql;
DB =:= pgsql ->
Mod = authz_module(DB),
@@ -298,7 +298,7 @@ init_source(#{enable := true,
{error, Reason} -> error({load_config_error, Reason});
Id -> Source#{annotations =>
#{id => Id,
- sql => Mod:parse_query(SQL)
+ query => Mod:parse_query(SQL)
}
}
end;
diff --git a/apps/emqx_authz/src/emqx_authz_api_schema.erl b/apps/emqx_authz/src/emqx_authz_api_schema.erl
index 09f145075..158e26eab 100644
--- a/apps/emqx_authz/src/emqx_authz_api_schema.erl
+++ b/apps/emqx_authz/src/emqx_authz_api_schema.erl
@@ -95,7 +95,7 @@ definitions() ->
},
method => #{
type => string,
- enum => [<<"get">>, <<"post">>, <<"put">>],
+ enum => [<<"get">>, <<"post">>],
example => <<"get">>
},
headers => #{type => object},
@@ -118,7 +118,7 @@ definitions() ->
required => [ type
, enable
, collection
- , find
+ , selector
, mongo_type
, server
, pool_size
@@ -140,7 +140,7 @@ definitions() ->
example => true
},
collection => #{type => string},
- find => #{type => object},
+ selector => #{type => object},
mongo_type => #{type => string,
enum => [<<"single">>],
example => <<"single">>},
@@ -173,7 +173,7 @@ definitions() ->
required => [ type
, enable
, collection
- , find
+ , selector
, mongo_type
, servers
, replica_set_name
@@ -196,7 +196,7 @@ definitions() ->
example => true
},
collection => #{type => string},
- find => #{type => object},
+ selector => #{type => object},
mongo_type => #{type => string,
enum => [<<"rs">>],
example => <<"rs">>},
@@ -231,7 +231,7 @@ definitions() ->
required => [ type
, enable
, collection
- , find
+ , selector
, mongo_type
, servers
, pool_size
@@ -253,7 +253,7 @@ definitions() ->
example => true
},
collection => #{type => string},
- find => #{type => object},
+ selector => #{type => object},
mongo_type => #{type => string,
enum => [<<"sharded">>],
example => <<"sharded">>},
@@ -286,7 +286,7 @@ definitions() ->
type => object,
required => [ type
, enable
- , sql
+ , query
, server
, database
, pool_size
@@ -305,7 +305,7 @@ definitions() ->
type => boolean,
example => true
},
- sql => #{type => string},
+ query => #{type => string},
server => #{type => string,
example => <<"127.0.0.1:3306">>
},
@@ -323,7 +323,7 @@ definitions() ->
type => object,
required => [ type
, enable
- , sql
+ , query
, server
, database
, pool_size
@@ -342,7 +342,7 @@ definitions() ->
type => boolean,
example => true
},
- sql => #{type => string},
+ query => #{type => string},
server => #{type => string,
example => <<"127.0.0.1:5432">>
},
@@ -484,7 +484,7 @@ definitions() ->
type => array,
items => #{
type => string,
- example => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.">>
+ example => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
}
},
path => #{
diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl
index 209bbc01f..a3f198a30 100644
--- a/apps/emqx_authz/src/emqx_authz_api_sources.erl
+++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl
@@ -32,9 +32,8 @@
-define(EXAMPLE_FILE,
#{type=> file,
enable => true,
- rules => [<<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.">>,
- <<"{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
- ]}).
+ rules => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
+ }).
-define(EXAMPLE_RETURNED_REDIS,
maps:put(annotations, #{status => healthy}, ?EXAMPLE_REDIS)
@@ -350,9 +349,7 @@ sources(put, #{body := Body}) when is_list(Body) ->
NBody = [ begin
case Source of
#{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable} ->
- {ok, Filename} = write_file(filename:join([emqx:get_config([node, data_dir]), "acl.conf"]),
- erlang:list_to_bitstring([<> || Rule <- Rules])
- ),
+ {ok, Filename} = write_file(filename:join([emqx:get_config([node, data_dir]), "acl.conf"]), Rules),
#{type => file, enable => Enable, path => Filename};
_ -> write_cert(Source)
end
@@ -396,9 +393,7 @@ source(get, #{bindings := #{type := Type}}) ->
{200, read_cert(NSource2)}
end;
source(put, #{bindings := #{type := <<"file">>}, body := #{<<"type">> := <<"file">>, <<"rules">> := Rules, <<"enable">> := Enable}}) ->
- {ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""),
- erlang:list_to_bitstring([<> || Rule <- Rules])
- ),
+ {ok, Filename} = write_file(maps:get(path, emqx_authz:lookup(file), ""), Rules),
case emqx_authz:update({replace_once, file}, #{type => file, enable => Enable, path => Filename}) of
{ok, _} -> {204};
{error, Reason} ->
@@ -457,21 +452,21 @@ write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]),
CaCert = case maps:is_key(<<"cacertfile">>, SSL) of
true ->
- {ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_rule_id:gen() ++".pem"]),
+ {ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"cacertfile">>, SSL)),
CaCertFile;
false -> ""
end,
Cert = case maps:is_key(<<"certfile">>, SSL) of
true ->
- {ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_rule_id:gen() ++".pem"]),
+ {ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"certfile">>, SSL)),
CertFile;
false -> ""
end,
Key = case maps:is_key(<<"keyfile">>, SSL) of
true ->
- {ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_rule_id:gen() ++".pem"]),
+ {ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"keyfile">>, SSL)),
KeyFile;
false -> ""
diff --git a/apps/emqx_authz/src/emqx_authz_mongo.erl b/apps/emqx_authz/src/emqx_authz_mongo.erl
index 25a787b8f..68808c20b 100644
--- a/apps/emqx_authz/src/emqx_authz_mongo.erl
+++ b/apps/emqx_authz/src/emqx_authz_mongo.erl
@@ -35,10 +35,10 @@ description() ->
authorize(Client, PubSub, Topic,
#{collection := Collection,
- find := Find,
+ selector := Selector,
annotations := #{id := ResourceID}
}) ->
- case emqx_resource:query(ResourceID, {find, Collection, replvar(Find, Client), #{}}) of
+ case emqx_resource:query(ResourceID, {find, Collection, replvar(Selector, Client), #{}}) of
{error, Reason} ->
?LOG(error, "[AuthZ] Query mongo error: ~p", [Reason]),
nomatch;
@@ -57,7 +57,7 @@ do_authorize(Client, PubSub, Topic, [Rule | Tail]) ->
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
end.
-replvar(Find, #{clientid := Clientid,
+replvar(Selector, #{clientid := Clientid,
username := Username,
peerhost := IpAddress
}) ->
@@ -76,7 +76,7 @@ replvar(Find, #{clientid := Clientid,
maps:put(K, V3, AccIn);
_Fun(K, V, AccIn) -> maps:put(K, V, AccIn)
end,
- maps:fold(Fun, #{}, Find).
+ maps:fold(Fun, #{}, Selector).
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
bin(B) when is_binary(B) -> B;
diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl
index d5550b2fb..ac8f04f32 100644
--- a/apps/emqx_authz/src/emqx_authz_mysql.erl
+++ b/apps/emqx_authz/src/emqx_authz_mysql.erl
@@ -47,10 +47,10 @@ parse_query(Sql) ->
authorize(Client, PubSub, Topic,
#{annotations := #{id := ResourceID,
- sql := {SQL, Params}
+ query := {Query, Params}
}
}) ->
- case emqx_resource:query(ResourceID, {sql, SQL, replvar(Params, Client)}) of
+ case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of
{ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);
diff --git a/apps/emqx_authz/src/emqx_authz_pgsql.erl b/apps/emqx_authz/src/emqx_authz_pgsql.erl
index d9555b85d..3e1f40fb2 100644
--- a/apps/emqx_authz/src/emqx_authz_pgsql.erl
+++ b/apps/emqx_authz/src/emqx_authz_pgsql.erl
@@ -51,10 +51,10 @@ parse_query(Sql) ->
authorize(Client, PubSub, Topic,
#{annotations := #{id := ResourceID,
- sql := {SQL, Params}
+ query := {Query, Params}
}
}) ->
- case emqx_resource:query(ResourceID, {sql, SQL, replvar(Params, Client)}) of
+ case emqx_resource:query(ResourceID, {sql, Query, replvar(Params, Client)}) of
{ok, _Columns, []} -> nomatch;
{ok, Columns, Rows} ->
do_authorize(Client, PubSub, Topic, Columns, Rows);
diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl
index b90d522e8..e17a55d0a 100644
--- a/apps/emqx_authz/src/emqx_authz_schema.erl
+++ b/apps/emqx_authz/src/emqx_authz_schema.erl
@@ -66,7 +66,7 @@ fields(http_get) ->
},
converter => fun (Headers0) ->
Headers1 = maps:fold(fun(K0, V, AccIn) ->
- K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))),
+ K1 = iolist_to_binary(string:to_lower(to_list(K0))),
maps:put(K1, V, AccIn)
end, #{}, Headers0),
maps:merge(#{ <<"accept">> => <<"application/json">>
@@ -84,7 +84,7 @@ fields(http_post) ->
, {enable, #{type => boolean(),
default => true}}
, {url, #{type => url()}}
- , {method, #{type => hoconsc:enum([post, put]),
+ , {method, #{type => post,
default => get}}
, {headers, #{type => map(),
default => #{ <<"accept">> => <<"application/json">>
@@ -116,24 +116,24 @@ fields(http_post) ->
fields(mongo_single) ->
connector_fields(mongo, single) ++
[ {collection, #{type => atom()}}
- , {find, #{type => map()}}
+ , {selector, #{type => map()}}
];
fields(mongo_rs) ->
connector_fields(mongo, rs) ++
[ {collection, #{type => atom()}}
- , {find, #{type => map()}}
+ , {selector, #{type => map()}}
];
fields(mongo_sharded) ->
connector_fields(mongo, sharded) ++
[ {collection, #{type => atom()}}
- , {find, #{type => map()}}
+ , {selector, #{type => map()}}
];
fields(mysql) ->
connector_fields(mysql) ++
- [ {sql, query()} ];
+ [ {query, query()} ];
fields(pgsql) ->
connector_fields(pgsql) ++
- [ {sql, query()} ];
+ [ {query, query()} ];
fields(redis_single) ->
connector_fields(redis, single) ++
[ {cmd, query()} ];
@@ -177,3 +177,8 @@ connector_fields(DB, Fields) ->
, {enable, #{type => boolean(),
default => true}}
] ++ Mod:fields(Fields).
+
+to_list(A) when is_atom(A) ->
+ atom_to_list(A);
+to_list(B) when is_binary(B) ->
+ binary_to_list(B).
\ No newline at end of file
diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl
index f2cb01d05..fe1f04bd2 100644
--- a/apps/emqx_authz/test/emqx_authz_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl
@@ -75,7 +75,7 @@ init_per_testcase(_, Config) ->
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
<<"collection">> => <<"fake">>,
- <<"find">> => #{<<"a">> => <<"b">>}
+ <<"selector">> => #{<<"a">> => <<"b">>}
}).
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
<<"enable">> => true,
@@ -86,7 +86,7 @@ init_per_testcase(_, Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
- <<"sql">> => <<"abcb">>
+ <<"query">> => <<"abcb">>
}).
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
<<"enable">> => true,
@@ -97,7 +97,7 @@ init_per_testcase(_, Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
- <<"sql">> => <<"abcb">>
+ <<"query">> => <<"abcb">>
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"enable">> => true,
diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
index 8c37189c9..b4b7b87c9 100644
--- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl
@@ -54,7 +54,7 @@
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
<<"collection">> => <<"fake">>,
- <<"find">> => #{<<"a">> => <<"b">>}
+ <<"selector">> => #{<<"a">> => <<"b">>}
}).
-define(SOURCE3, #{<<"type">> => <<"mysql">>,
<<"enable">> => true,
@@ -65,7 +65,7 @@
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
- <<"sql">> => <<"abcb">>
+ <<"query">> => <<"abcb">>
}).
-define(SOURCE4, #{<<"type">> => <<"pgsql">>,
<<"enable">> => true,
@@ -76,7 +76,7 @@
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
- <<"sql">> => <<"abcb">>
+ <<"query">> => <<"abcb">>
}).
-define(SOURCE5, #{<<"type">> => <<"redis">>,
<<"enable">> => true,
@@ -92,10 +92,7 @@
}).
-define(SOURCE6, #{<<"type">> => <<"file">>,
<<"enable">> => true,
- <<"rules">> =>
- [<<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.">>,
- <<"{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
- ]
+ <<"rules">> => <<"{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}.\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}.">>
}).
all() ->
@@ -151,8 +148,8 @@ set_special_configs(_App) ->
ok.
init_per_testcase(t_api, Config) ->
- meck:new(emqx_rule_id, [non_strict, passthrough, no_history, no_link]),
- meck:expect(emqx_rule_id, gen, fun() -> "fake" end),
+ meck:new(emqx_plugin_libs_id, [non_strict, passthrough, no_history, no_link]),
+ meck:expect(emqx_plugin_libs_id, gen, fun() -> "fake" end),
meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx, get_config, fun([node, data_dir]) ->
@@ -165,7 +162,7 @@ init_per_testcase(t_api, Config) ->
init_per_testcase(_, Config) -> Config.
end_per_testcase(t_api, _Config) ->
- meck:unload(emqx_rule_id),
+ meck:unload(emqx_plugin_libs_id),
meck:unload(emqx),
ok;
end_per_testcase(_, _Config) -> ok.
diff --git a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl
index 8f4a6f29f..ec4c4f384 100644
--- a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl
@@ -53,7 +53,7 @@ init_per_suite(Config) ->
<<"database">> => <<"mqtt">>,
<<"ssl">> => #{<<"enable">> => false},
<<"collection">> => <<"fake">>,
- <<"find">> => #{<<"a">> => <<"b">>}
+ <<"selector">> => #{<<"a">> => <<"b">>}
}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.
diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
index 1173b0e3e..32e52e7c0 100644
--- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl
@@ -55,7 +55,7 @@ init_per_suite(Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
- <<"sql">> => <<"abcb">>
+ <<"query">> => <<"abcb">>
}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.
diff --git a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl
index 24c2e7b35..570ea0e77 100644
--- a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl
+++ b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl
@@ -55,7 +55,7 @@ init_per_suite(Config) ->
<<"password">> => <<"ee">>,
<<"auto_reconnect">> => true,
<<"ssl">> => #{<<"enable">> => false},
- <<"sql">> => <<"abcb">>
+ <<"query">> => <<"abcb">>
}],
{ok, _} = emqx_authz:update(replace, Rules),
Config.
diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index 08873228d..bfba34e7c 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -2,11 +2,9 @@
## EMQ X Bridge
##--------------------------------------------------------------------
-#bridges.mqtt.my_mqtt_bridge {
+#bridges.mqtt.my_mqtt_bridge_to_aws {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
-# ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`.
-# clientid_prefix = "bridge_client:"
# username = "username1"
# password = ""
# clean_start = true
@@ -27,8 +25,9 @@
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
-# ## we will create one MQTT connection for each element of the `in`
-# in: [{
+# ## we will create one MQTT connection for each element of the `message_in`
+# message_in: [{
+# ## the `id` will be used as part of the clientid
# id = "pull_msgs_from_aws"
# subscribe_remote_topic = "aws/#"
# subscribe_qos = 1
@@ -37,8 +36,9 @@
# qos = "${qos}"
# retain = "${retain}"
# }]
-# ## we will create one MQTT connection for each element of the `out`
-# out: [{
+# ## we will create one MQTT connection for each element of the `message_out`
+# message_out: [{
+# ## the `id` will be used as part of the clientid
# id = "push_msgs_to_aws"
# subscribe_local_topic = "emqx/#"
# remote_topic = "from_emqx/${topic}"
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 159562f33..73412f388 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -58,14 +58,7 @@ fields(config) ->
, {pool_type, fun pool_type/1}
, {pool_size, fun pool_size/1}
, {enable_pipelining, fun enable_pipelining/1}
- ] ++ emqx_connector_schema_lib:ssl_fields();
-
-fields(ssl_opts) ->
- [ {cacertfile, fun cacertfile/1}
- , {keyfile, fun keyfile/1}
- , {certfile, fun certfile/1}
- , {verify, fun verify/1}
- ].
+ ] ++ emqx_connector_schema_lib:ssl_fields().
validations() ->
[ {check_ssl_opts, fun check_ssl_opts/1} ].
@@ -102,23 +95,6 @@ enable_pipelining(type) -> boolean();
enable_pipelining(default) -> true;
enable_pipelining(_) -> undefined.
-cacertfile(type) -> string();
-cacertfile(nullable) -> true;
-cacertfile(_) -> undefined.
-
-keyfile(type) -> string();
-keyfile(nullable) -> true;
-keyfile(_) -> undefined.
-
-%% TODO: certfile is required
-certfile(type) -> string();
-certfile(nullable) -> true;
-certfile(_) -> undefined.
-
-verify(type) -> boolean();
-verify(default) -> false;
-verify(_) -> undefined.
-
%% ===================================================================
on_start(InstId, #{base_url := #{scheme := Scheme,
host := Host,
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 6631fd23a..708bcdeb9 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -89,7 +89,8 @@ on_start(InstId, Conf) ->
NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
- InOutConfigs = check_channel_id_dup(maps:get(in, Conf, []) ++ maps:get(out, Conf, [])),
+ InOutConfigs = check_channel_id_dup(maps:get(message_in, Conf, [])
+ ++ maps:get(message_out, Conf, [])),
lists:foldl(fun
(_InOutConf, {error, Reason}) ->
{error, Reason};
@@ -110,7 +111,7 @@ on_stop(InstId, #{}) ->
end.
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
-%% `in` and `out` config
+%% `message_in` and `message_out` config
on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) ->
logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]),
@@ -136,19 +137,19 @@ check_channel_id_dup(Confs) ->
end, Confs),
Confs.
-%% this is an `in` bridge
-create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix,
- #{clientid_prefix := ClientPrefix} = BasicConf) ->
- logger:info("creating 'in' channel for: ~p", [BridgeId]),
- create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
- clientid => clientid(ClientPrefix, BridgeId),
+%% this is an `message_in` bridge
+create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) ->
+ logger:info("creating 'message_in' channel for: ~p", [Id]),
+ create_sub_bridge(BasicConf#{
+ name => bridge_name(NamePrefix, Id),
+ clientid => clientid(Id),
subscriptions => InConf, forwards => undefined});
-%% this is an `out` bridge
-create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix,
- #{clientid_prefix := ClientPrefix} = BasicConf) ->
- logger:info("creating 'out' channel for: ~p", [BridgeId]),
- create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
- clientid => clientid(ClientPrefix, BridgeId),
+%% this is an `message_out` bridge
+create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) ->
+ logger:info("creating 'message_out' channel for: ~p", [Id]),
+ create_sub_bridge(BasicConf#{
+ name => bridge_name(NamePrefix, Id),
+ clientid => clientid(Id),
subscriptions => undefined, forwards => OutConf}).
create_sub_bridge(#{name := Name} = Conf) ->
@@ -172,7 +173,6 @@ basic_config(#{
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMod,
- clientid_prefix := ClientIdPrefix,
username := User,
password := Password,
clean_start := CleanStart,
@@ -188,7 +188,6 @@ basic_config(#{
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
bridge_mode => BridgeMod,
- clientid_prefix => ClientIdPrefix,
username => User,
password => Password,
clean_start => CleanStart,
@@ -203,8 +202,8 @@ basic_config(#{
bridge_name(Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ str(Id)).
-clientid(Prefix, Id) ->
- list_to_binary(str(Prefix) ++ str(Id)).
+clientid(Id) ->
+ list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(4)).
str(A) when is_atom(A) ->
atom_to_list(A);
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index ed7fd4408..184a8610c 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -31,7 +31,6 @@ fields("config") ->
, {reconnect_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, hoconsc:mk(boolean(), #{default => true})}
- , {clientid_prefix, hoconsc:mk(string(), #{default => ""})}
, {username, hoconsc:mk(string())}
, {password, hoconsc:mk(string())}
, {clean_start, hoconsc:mk(boolean(), #{default => true})}
@@ -39,17 +38,17 @@ fields("config") ->
, {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
- , {in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "in")), #{default => []})}
- , {out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "out")), #{default => []})}
+ , {message_in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_in")), #{default => []})}
+ , {message_out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_out")), #{default => []})}
] ++ emqx_connector_schema_lib:ssl_fields();
-fields("in") ->
+fields("message_in") ->
[ {subscribe_remote_topic, #{type => binary(), nullable => false}}
, {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
] ++ common_inout_confs();
-fields("out") ->
+fields("message_out") ->
[ {subscribe_local_topic, #{type => binary(), nullable => false}}
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
] ++ common_inout_confs();
diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl
index e3c6d8ee9..fe2d3947d 100644
--- a/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl
+++ b/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl
@@ -14,13 +14,12 @@ fields("emqx_data_bridge") ->
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
default => []}}];
-fields(mysql) -> connector_fields(mysql);
-fields(pgsql) -> connector_fields(pgsql);
-fields(mongo) -> connector_fields(mongo);
-fields(redis) -> connector_fields(redis);
-fields(ldap) -> connector_fields(ldap).
+fields(mysql) -> connector_fields(emqx_connector_mysql, mysql);
+fields(pgsql) -> connector_fields(emqx_connector_pgsql, pgsql);
+fields(mongo) -> connector_fields(emqx_connector_mongo, mongo);
+fields(redis) -> connector_fields(emqx_connector_redis, redis);
+fields(ldap) -> connector_fields(emqx_connector_ldap, ldap).
-connector_fields(DB) ->
- Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
+connector_fields(ConnectModule, DB) ->
[{name, hoconsc:mk(typerefl:binary())},
- {type, #{type => DB}}] ++ Mod:roots().
+ {type, #{type => DB}}] ++ ConnectModule:roots().
diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl
index f264339a4..9037518c5 100644
--- a/apps/emqx_gateway/src/emqx_gateway_api.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_api.erl
@@ -20,6 +20,11 @@
-import(emqx_gateway_http,
[ return_http_error/2
+ , with_gateway/2
+ , schema_bad_request/0
+ , schema_not_found/0
+ , schema_internal_error/0
+ , schema_no_content/0
]).
%% minirest behaviour callbacks
@@ -55,44 +60,34 @@ gateway(get, Request) ->
{200, emqx_gateway_http:gateways(Status)}.
gateway_insta(delete, #{bindings := #{name := Name0}}) ->
- Name = binary_to_existing_atom(Name0),
- case emqx_gateway:unload(Name) of
- ok ->
- {204};
- {error, not_found} ->
- return_http_error(404, <<"Gateway not found">>)
- end;
+ with_gateway(Name0, fun(GwName, _) ->
+ _ = emqx_gateway:unload(GwName),
+ {204}
+ end);
gateway_insta(get, #{bindings := #{name := Name0}}) ->
- Name = binary_to_existing_atom(Name0),
- case emqx_gateway:lookup(Name) of
- #{config := _Config} ->
- GwCfs = filled_raw_confs([<<"gateway">>, Name0]),
- NGwCfs = GwCfs#{<<"listeners">> =>
- emqx_gateway_http:mapping_listener_m2l(
- Name0, maps:get(<<"listeners">>, GwCfs, #{})
- )
- },
- {200, NGwCfs};
- undefined ->
- return_http_error(404, <<"Gateway not found">>)
- end;
-gateway_insta(put, #{body := RawConfsIn0,
- bindings := #{name := Name}
+ with_gateway(Name0, fun(_, _) ->
+ GwConf = filled_raw_confs([<<"gateway">>, Name0]),
+ LisConf = maps:get(<<"listeners">>, GwConf, #{}),
+ NLisConf = emqx_gateway_http:mapping_listener_m2l(Name0, LisConf),
+ {200, GwConf#{<<"listeners">> => NLisConf}}
+ end);
+gateway_insta(put, #{body := GwConf0,
+ bindings := #{name := Name0}
}) ->
- RawConfsIn = maps:without([<<"authentication">>,
- <<"listeners">>], RawConfsIn0),
- %% FIXME: Cluster Consistence ??
- case emqx_gateway:update_rawconf(Name, RawConfsIn) of
- ok ->
- {200};
- {error, not_found} ->
- return_http_error(404, <<"Gateway not found">>);
- {error, Reason} ->
- return_http_error(500, Reason)
- end.
+ with_gateway(Name0, fun(_, _) ->
+ GwConf = maps:without([<<"authentication">>, <<"listeners">>], GwConf0),
+ case emqx_gateway:update_rawconf(Name0, GwConf) of
+ ok ->
+ {200};
+ {error, not_found} ->
+ return_http_error(404, "Gateway not found");
+ {error, Reason} ->
+ return_http_error(500, Reason)
+ end
+ end).
gateway_insta_stats(get, _Req) ->
- return_http_error(401, <<"Implement it later (maybe 5.1)">>).
+ return_http_error(401, "Implement it later (maybe 5.1)").
filled_raw_confs(Path) ->
RawConf = emqx_config:fill_defaults(
@@ -131,7 +126,9 @@ swagger("/gateway/:name", get) ->
#{ description => <<"Get the gateway configurations">>
, parameters => params_gateway_name_in_path()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_gateway_conf()
}
};
@@ -139,7 +136,9 @@ swagger("/gateway/:name", delete) ->
#{ description => <<"Delete/Unload the gateway">>
, parameters => params_gateway_name_in_path()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
@@ -148,7 +147,9 @@ swagger("/gateway/:name", put) ->
, parameters => params_gateway_name_in_path()
, requestBody => schema_gateway_conf()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
};
@@ -156,7 +157,9 @@ swagger("/gateway/:name/stats", get) ->
#{ description => <<"Get gateway Statistic">>
, parameters => params_gateway_name_in_path()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_gateway_stats()
}
}.
@@ -181,12 +184,6 @@ params_gateway_status_in_qs() ->
%%--------------------------------------------------------------------
%% schemas
-schema_not_found() ->
- emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
-
-schema_no_content() ->
- #{description => <<"No Content">>}.
-
schema_gateway_overview_list() ->
emqx_mgmt_util:array_schema(
#{ type => object
diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl
index fcfea7343..386d6e1ea 100644
--- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl
@@ -36,6 +36,11 @@
-import(emqx_gateway_http,
[ return_http_error/2
+ , with_gateway/2
+ , schema_bad_request/0
+ , schema_not_found/0
+ , schema_internal_error/0
+ , schema_no_content/0
]).
%%--------------------------------------------------------------------
@@ -71,102 +76,103 @@ apis() ->
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
-clients(get, #{ bindings := #{name := GwName0}
+clients(get, #{ bindings := #{name := Name0}
, query_string := Qs
}) ->
- GwName = binary_to_existing_atom(GwName0),
- TabName = emqx_gateway_cm:tabname(info, GwName),
- case maps:get(<<"node">>, Qs, undefined) of
- undefined ->
- Response = emqx_mgmt_api:cluster_query(
- Qs, TabName,
- ?CLIENT_QS_SCHEMA, ?query_fun
- ),
- {200, Response};
- Node1 ->
- Node = binary_to_atom(Node1, utf8),
- ParamsWithoutNode = maps:without([<<"node">>], Qs),
- Response = emqx_mgmt_api:node_query(
- Node, ParamsWithoutNode,
- TabName, ?CLIENT_QS_SCHEMA, ?query_fun
- ),
- {200, Response}
- end.
+ with_gateway(Name0, fun(GwName, _) ->
+ TabName = emqx_gateway_cm:tabname(info, GwName),
+ case maps:get(<<"node">>, Qs, undefined) of
+ undefined ->
+ Response = emqx_mgmt_api:cluster_query(
+ Qs, TabName,
+ ?CLIENT_QS_SCHEMA, ?query_fun
+ ),
+ {200, Response};
+ Node1 ->
+ Node = binary_to_atom(Node1, utf8),
+ ParamsWithoutNode = maps:without([<<"node">>], Qs),
+ Response = emqx_mgmt_api:node_query(
+ Node, ParamsWithoutNode,
+ TabName, ?CLIENT_QS_SCHEMA, ?query_fun
+ ),
+ {200, Response}
+ end
+ end).
-clients_insta(get, #{ bindings := #{name := GwName0,
+clients_insta(get, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
- GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
-
- case emqx_gateway_http:lookup_client(GwName, ClientId,
- {?MODULE, format_channel_info}) of
- [ClientInfo] ->
- {200, ClientInfo};
- [ClientInfo|_More] ->
- ?LOG(warning, "More than one client info was returned on ~s",
- [ClientId]),
- {200, ClientInfo};
- [] ->
- return_http_error(404, <<"Gateway or ClientId not found">>)
-
- end;
-
-clients_insta(delete, #{ bindings := #{name := GwName0,
+ with_gateway(Name0, fun(GwName, _) ->
+ case emqx_gateway_http:lookup_client(GwName, ClientId,
+ {?MODULE, format_channel_info}) of
+ [ClientInfo] ->
+ {200, ClientInfo};
+ [ClientInfo|_More] ->
+ ?LOG(warning, "More than one client info was returned on ~s",
+ [ClientId]),
+ {200, ClientInfo};
+ [] ->
+ return_http_error(404, "Client not found")
+ end
+ end);
+clients_insta(delete, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
- GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
- _ = emqx_gateway_http:kickout_client(GwName, ClientId),
- {200}.
+ with_gateway(Name0, fun(GwName, _) ->
+ _ = emqx_gateway_http:kickout_client(GwName, ClientId),
+ {200}
+ end).
%% FIXME:
%% List the subscription without mountpoint, but has SubOpts,
%% for example, share group ...
-subscriptions(get, #{ bindings := #{name := GwName0,
+subscriptions(get, #{ bindings := #{name := Name0,
clientid := ClientId0}
}) ->
- GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
- case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
- {error, Reason} ->
- return_http_error(404, Reason);
- {ok, Subs} ->
- {200, Subs}
- end;
+ with_gateway(Name0, fun(GwName, _) ->
+ case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
+ {error, Reason} ->
+ return_http_error(500, Reason);
+ {ok, Subs} ->
+ {200, Subs}
+ end
+ end);
%% Create the subscription without mountpoint
-subscriptions(post, #{ bindings := #{name := GwName0,
+subscriptions(post, #{ bindings := #{name := Name0,
clientid := ClientId0},
body := Body
}) ->
- GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
-
- case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
- {undefined, _} ->
- %% FIXME: more reasonable error code??
- return_http_error(404, <<"Request paramter missed: topic">>);
- {Topic, QoS} ->
- case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
- {error, Reason} ->
- return_http_error(404, Reason);
- ok ->
- {200}
- end
- end;
+ with_gateway(Name0, fun(GwName, _) ->
+ case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
+ {undefined, _} ->
+ return_http_error(400, "Miss topic property");
+ {Topic, QoS} ->
+ case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of
+ {error, Reason} ->
+ return_http_error(404, Reason);
+ ok ->
+ {200}
+ end
+ end
+ end);
%% Remove the subscription without mountpoint
-subscriptions(delete, #{ bindings := #{name := GwName0,
+subscriptions(delete, #{ bindings := #{name := Name0,
clientid := ClientId0,
topic := Topic0
}
}) ->
- GwName = binary_to_existing_atom(GwName0),
ClientId = emqx_mgmt_util:urldecode(ClientId0),
Topic = emqx_mgmt_util:urldecode(Topic0),
- _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
- {200}.
+ with_gateway(Name0, fun(GwName, _) ->
+ _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
+ {200}
+ end).
%%--------------------------------------------------------------------
%% Utils
@@ -379,7 +385,9 @@ swagger("/gateway/:name/clients", get) ->
#{ description => <<"Get the gateway clients">>
, parameters => params_client_query()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_clients_list()
}
};
@@ -387,7 +395,9 @@ swagger("/gateway/:name/clients/:clientid", get) ->
#{ description => <<"Get the gateway client infomation">>
, parameters => params_client_insta()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_client()
}
};
@@ -395,7 +405,9 @@ swagger("/gateway/:name/clients/:clientid", delete) ->
#{ description => <<"Kick out the gateway client">>
, parameters => params_client_insta()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
};
@@ -403,7 +415,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", get) ->
#{ description => <<"Get the gateway client subscriptions">>
, parameters => params_client_insta()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_subscription_list()
}
};
@@ -412,7 +426,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) ->
, parameters => params_client_insta()
, requestBody => schema_subscription()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"200">> => schema_no_content()
}
};
@@ -420,7 +436,9 @@ swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) ->
#{ description => <<"Unsubscribe the topic for client">>
, parameters => params_topic_name_in_path() ++ params_client_insta()
, responses =>
- #{ <<"404">> => schema_not_found()
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
, <<"204">> => schema_no_content()
}
}.
@@ -483,12 +501,6 @@ queries(Ls) ->
%%--------------------------------------------------------------------
%% schemas
-schema_not_found() ->
- emqx_mgmt_util:error_schema(<<"Gateway not found or unloaded">>).
-
-schema_no_content() ->
- #{description => <<"No Content">>}.
-
schema_clients_list() ->
emqx_mgmt_util:page_schema(
#{ type => object
diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl
new file mode 100644
index 000000000..374f2841d
--- /dev/null
+++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl
@@ -0,0 +1,316 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_api_listeners).
+
+-behaviour(minirest_api).
+
+-import(emqx_gateway_http,
+ [ return_http_error/2
+ , with_gateway/2
+ , checks/2
+ , schema_bad_request/0
+ , schema_not_found/0
+ , schema_internal_error/0
+ , schema_no_content/0
+ ]).
+
+%% minirest behaviour callbacks
+-export([api_spec/0]).
+
+%% http handlers
+-export([ listeners/2
+ , listeners_insta/2
+ ]).
+
+%%--------------------------------------------------------------------
+%% minirest behaviour callbacks
+%%--------------------------------------------------------------------
+
+api_spec() ->
+ {metadata(apis()), []}.
+
+apis() ->
+ [ {"/gateway/:name/listeners", listeners}
+ , {"/gateway/:name/listeners/:id", listeners_insta}
+ ].
+%%--------------------------------------------------------------------
+%% http handlers
+
+listeners(get, #{bindings := #{name := Name0}}) ->
+ with_gateway(Name0, fun(GwName, _) ->
+ {200, emqx_gateway_http:listeners(GwName)}
+ end);
+
+listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
+ with_gateway(Name0, fun(GwName, Gateway) ->
+ RunningConf = maps:get(config, Gateway),
+ %% XXX: check params miss? check badly data tpye??
+ _ = checks([<<"type">>, <<"name">>, <<"bind">>], LConf),
+
+ Type = binary_to_existing_atom(maps:get(<<"type">>, LConf)),
+ LName = binary_to_atom(maps:get(<<"name">>, LConf)),
+
+ Path = [listeners, Type, LName],
+ case emqx_map_lib:deep_get(Path, RunningConf, undefined) of
+ undefined ->
+ ListenerId = emqx_gateway_utils:listener_id(
+ GwName, Type, LName),
+ case emqx_gateway_http:update_listener(
+ ListenerId, LConf) of
+ ok ->
+ {204};
+ {error, Reason} ->
+ return_http_error(500, Reason)
+ end;
+ _ ->
+ return_http_error(400, "Listener name has occupied")
+ end
+ end).
+
+listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId0}}) ->
+ ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
+ with_gateway(Name0, fun(_GwName, _) ->
+ case emqx_gateway_http:remove_listener(ListenerId) of
+ ok -> {204};
+ {error, not_found} -> {204};
+ {error, Reason} ->
+ return_http_error(500, Reason)
+ end
+ end);
+listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
+ ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
+ with_gateway(Name0, fun(_GwName, _) ->
+ case emqx_gateway_http:listener(ListenerId) of
+ {ok, Listener} ->
+ {200, Listener};
+ {error, not_found} ->
+ return_http_error(404, "Listener not found");
+ {error, Reason} ->
+ return_http_error(500, Reason)
+ end
+ end);
+listeners_insta(put, #{body := LConf,
+ bindings := #{name := Name0, id := ListenerId0}
+ }) ->
+ ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
+ with_gateway(Name0, fun(_GwName, _) ->
+ case emqx_gateway_http:update_listener(ListenerId, LConf) of
+ ok ->
+ {204};
+ {error, Reason} ->
+ return_http_error(500, Reason)
+ end
+ end).
+
+%%--------------------------------------------------------------------
+%% Swagger defines
+%%--------------------------------------------------------------------
+
+metadata(APIs) ->
+ metadata(APIs, []).
+metadata([], APIAcc) ->
+ lists:reverse(APIAcc);
+metadata([{Path, Fun}|More], APIAcc) ->
+ Methods = [get, post, put, delete, patch],
+ Mds = lists:foldl(fun(M, Acc) ->
+ try
+ Acc#{M => swagger(Path, M)}
+ catch
+ error : function_clause ->
+ Acc
+ end
+ end, #{}, Methods),
+ metadata(More, [{Path, Mds, Fun} | APIAcc]).
+
+swagger("/gateway/:name/listeners", get) ->
+ #{ description => <<"Get the gateway listeners">>
+ , parameters => params_gateway_name_in_path()
+ , responses =>
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
+ , <<"200">> => schema_listener_list()
+ }
+ };
+swagger("/gateway/:name/listeners", post) ->
+ #{ description => <<"Create the gateway listener">>
+ , parameters => params_gateway_name_in_path()
+ , requestBody => schema_listener()
+ , responses =>
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
+ , <<"200">> => schema_listener_list()
+ }
+ };
+swagger("/gateway/:name/listeners/:id", get) ->
+ #{ description => <<"Get the gateway listener configurations">>
+ , parameters => params_gateway_name_in_path()
+ ++ params_listener_id_in_path()
+ , responses =>
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
+ , <<"200">> => schema_listener()
+ }
+ };
+swagger("/gateway/:name/listeners/:id", delete) ->
+ #{ description => <<"Delete the gateway listener">>
+ , parameters => params_gateway_name_in_path()
+ ++ params_listener_id_in_path()
+ , responses =>
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
+ , <<"204">> => schema_no_content()
+ }
+ };
+swagger("/gateway/:name/listeners/:id", put) ->
+ #{ description => <<"Update the gateway listener">>
+ , parameters => params_gateway_name_in_path()
+ ++ params_listener_id_in_path()
+ , requestBody => schema_listener()
+ , responses =>
+ #{ <<"400">> => schema_bad_request()
+ , <<"404">> => schema_not_found()
+ , <<"500">> => schema_internal_error()
+ , <<"200">> => schema_no_content()
+ }
+ }.
+
+%%--------------------------------------------------------------------
+%% params defines
+
+params_gateway_name_in_path() ->
+ [#{ name => name
+ , in => path
+ , schema => #{type => string}
+ , required => true
+ }].
+
+params_listener_id_in_path() ->
+ [#{ name => id
+ , in => path
+ , schema => #{type => string}
+ , required => true
+ }].
+
+%%--------------------------------------------------------------------
+%% schemas
+
+schema_listener_list() ->
+ emqx_mgmt_util:array_schema(
+ #{ type => object
+ , properties => properties_listener()
+ },
+ <<"Listener list">>
+ ).
+
+schema_listener() ->
+ emqx_mgmt_util:schema(
+ #{ type => object
+ , properties => properties_listener()
+ }
+ ).
+
+%%--------------------------------------------------------------------
+%% properties
+
+properties_listener() ->
+ emqx_mgmt_util:properties(
+ raw_properties_common_listener() ++
+ [ {tcp, object, raw_properties_tcp_opts()}
+ , {ssl, object, raw_properties_ssl_opts()}
+ , {udp, object, raw_properties_udp_opts()}
+ , {dtls, object, raw_properties_dtls_opts()}
+ ]).
+
+raw_properties_tcp_opts() ->
+ [ {active_n, integer, <<>>}
+ , {backlog, integer, <<>>}
+ , {buffer, string, <<>>}
+ , {recbuf, string, <<>>}
+ , {sndbuf, string, <<>>}
+ , {high_watermark, string, <<>>}
+ , {nodelay, boolean, <<>>}
+ , {reuseaddr, boolean, <<>>}
+ , {send_timeout, string, <<>>}
+ , {send_timeout_close, boolean, <<>>}
+ ].
+
+raw_properties_ssl_opts() ->
+ [ {cacertfile, string, <<>>}
+ , {certfile, string, <<>>}
+ , {keyfile, string, <<>>}
+ , {verify, string, <<>>}
+ , {fail_if_no_peer_cert, boolean, <<>>}
+ , {server_name_indication, boolean, <<>>}
+ , {depth, integer, <<>>}
+ , {password, string, <<>>}
+ , {handshake_timeout, string, <<>>}
+ , {versions, {array, string}, <<>>}
+ , {ciphers, {array, string}, <<>>}
+ , {user_lookup_fun, string, <<>>}
+ , {reuse_sessions, boolean, <<>>}
+ , {secure_renegotiate, boolean, <<>>}
+ , {honor_cipher_order, boolean, <<>>}
+ , {dhfile, string, <<>>}
+ ].
+
+raw_properties_udp_opts() ->
+ [ {active_n, integer, <<>>}
+ , {buffer, string, <<>>}
+ , {recbuf, string, <<>>}
+ , {sndbuf, string, <<>>}
+ , {reuseaddr, boolean, <<>>}
+ ].
+
+raw_properties_dtls_opts() ->
+ Ls = lists_key_without(
+ [versions,ciphers,handshake_timeout], 1,
+ raw_properties_ssl_opts()
+ ),
+ [ {versions, {array, string}, <<>>}
+ , {ciphers, {array, string}, <<>>}
+ | Ls].
+
+lists_key_without([], _N, L) ->
+ L;
+lists_key_without([K|Ks], N, L) ->
+ lists_key_without(Ks, N, lists:keydelete(K, N, L)).
+
+raw_properties_common_listener() ->
+ [ {enable, boolean, <<"Whether to enable this listener">>}
+ , {id, string, <<"Listener Id">>}
+ , {name, string, <<"Listener name">>}
+ , {type, string,
+ <<"Listener type. Enum: tcp, udp, ssl, dtls">>,
+ [<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]}
+ , {running, boolean, <<"Listener running status">>}
+ %% FIXME:
+ , {bind, string, <<"Listener bind address or port">>}
+ , {acceptors, integer, <<"Listener acceptors number">>}
+ , {access_rules, {array, string}, <<"Listener Access rules for client">>}
+ , {max_conn_rate, integer, <<"Max connection rate for the listener">>}
+ , {max_connections, integer, <<"Max connections for the listener">>}
+ , {mountpoint, string,
+ <<"The Mounpoint for clients of the listener. "
+ "The gateway-level mountpoint configuration can be overloaded "
+ "when it is not null or empty string">>}
+ %% FIXME:
+ , {authentication, string, <<"NOT-SUPPORTED-NOW">>}
+ ].
diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl
index f233a6151..ed8e511c7 100644
--- a/apps/emqx_gateway/src/emqx_gateway_http.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_http.erl
@@ -26,7 +26,9 @@
%% Mgmt APIs - listeners
-export([ listeners/1
- , listener/2
+ , listener/1
+ , remove_listener/1
+ , update_listener/2
, mapping_listener_m2l/2
]).
@@ -42,6 +44,12 @@
%% Utils for http, swagger, etc.
-export([ return_http_error/2
+ , with_gateway/2
+ , checks/2
+ , schema_bad_request/0
+ , schema_not_found/0
+ , schema_internal_error/0
+ , schema_no_content/0
]).
-type gateway_summary() ::
@@ -108,7 +116,7 @@ get_listeners_status(GwName, Config) ->
lists:map(fun({Type, LisName, ListenOn, _, _}) ->
Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName),
Name = {Name0, ListenOn},
- LisO = #{id => Name0, type => Type},
+ LisO = #{id => Name0, type => Type, name => LisName},
case catch esockd:listener(Name) of
_Pid when is_pid(_Pid) ->
LisO#{running => true};
@@ -121,7 +129,8 @@ get_listeners_status(GwName, Config) ->
%% Mgmt APIs - listeners
%%--------------------------------------------------------------------
-listeners(GwName) when is_atom (GwName) ->
+-spec listeners(atom() | binary()) -> list().
+listeners(GwName) when is_atom(GwName) ->
listeners(atom_to_binary(GwName));
listeners(GwName) ->
RawConf = emqx_config:fill_defaults(
@@ -131,8 +140,27 @@ listeners(GwName) ->
[<<"gateway">>, GwName, <<"listeners">>], RawConf)),
mapping_listener_m2l(GwName, Listeners).
-listener(_GwName, _ListenerId) ->
- ok.
+-spec listener(binary()) -> {ok, map()} | {error, not_found} | {error, any()}.
+listener(ListenerId) ->
+ {GwName, Type, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
+ RootConf = emqx_config:fill_defaults(
+ emqx_config:get_root_raw([<<"gateway">>])),
+ try
+ Path = [<<"gateway">>, GwName, <<"listeners">>, Type, LName],
+ LConf = emqx_map_lib:deep_get(Path, RootConf),
+ Running = is_running(binary_to_existing_atom(ListenerId), LConf),
+ {ok, emqx_map_lib:jsonable_map(
+ LConf#{
+ id => ListenerId,
+ type => Type,
+ name => LName,
+ running => Running})}
+ catch
+ error : {config_not_found, _} ->
+ {error, not_found};
+ _Class : Reason ->
+ {error, Reason}
+ end.
mapping_listener_m2l(GwName, Listeners0) ->
Listeners = maps:to_list(Listeners0),
@@ -146,6 +174,7 @@ listener(GwName, Type, Conf) ->
LConf#{
id => ListenerId,
type => Type,
+ name => LName,
running => Running
}
end || {LName, LConf} <- Conf, is_map(LConf)].
@@ -159,6 +188,28 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) ->
false
end.
+-spec remove_listener(binary()) -> ok | {error, not_found} | {error, any()}.
+remove_listener(ListenerId) ->
+ {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
+ LConf = emqx:get_raw_config(
+ [<<"gateway">>, GwName, <<"listeners">>, Type]
+ ),
+ NLConf = maps:remove(Name, LConf),
+ emqx_gateway:update_rawconf(
+ GwName,
+ #{<<"listeners">> => #{Type => NLConf}}
+ ).
+
+-spec update_listener(atom() | binary(), map()) -> ok | {error, any()}.
+update_listener(ListenerId, NewConf0) ->
+ {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
+ NewConf = maps:without([<<"id">>, <<"name">>,
+ <<"type">>, <<"running">>], NewConf0),
+ emqx_gateway:update_rawconf(
+ GwName,
+ #{<<"listeners">> => #{Type => #{Name => NewConf}}
+ }).
+
%%--------------------------------------------------------------------
%% Mgmt APIs - clients
%%--------------------------------------------------------------------
@@ -248,7 +299,7 @@ with_channel(GwName, ClientId, Fun) ->
%% Utils
%%--------------------------------------------------------------------
--spec return_http_error(integer(), binary()) -> {integer(), binary()}.
+-spec return_http_error(integer(), any()) -> {integer(), binary()}.
return_http_error(Code, Msg) ->
{Code, emqx_json:encode(
#{code => codestr(Code),
@@ -256,10 +307,61 @@ return_http_error(Code, Msg) ->
})
}.
-codestr(404) -> 'RESOURCE_NOT_FOUND';
+codestr(400) -> 'BAD_REQUEST';
codestr(401) -> 'NOT_SUPPORTED_NOW';
+codestr(404) -> 'RESOURCE_NOT_FOUND';
codestr(500) -> 'UNKNOW_ERROR'.
+-spec with_gateway(binary(), function()) -> any().
+with_gateway(GwName0, Fun) ->
+ try
+ GwName = try
+ binary_to_existing_atom(GwName0)
+ catch _ : _ -> error(badname)
+ end,
+ case emqx_gateway:lookup(GwName) of
+ undefined ->
+ return_http_error(404, "Gateway not load");
+ Gateway ->
+ Fun(GwName, Gateway)
+ end
+ catch
+ error : badname ->
+ return_http_error(404, "Bad gateway name");
+ error : {miss_param, K} ->
+ return_http_error(400, [K, " is required"]);
+ error : {invalid_listener_id, Id} ->
+ return_http_error(400, ["invalid listener id: ", Id]);
+ Class : Reason : Stk ->
+ ?LOG(error, "Uncatched error: {~p, ~p}, stacktrace: ~0p",
+ [Class, Reason, Stk]),
+ return_http_error(500, {Class, Reason, Stk})
+ end.
+
+-spec checks(list(), map()) -> ok.
+checks([], _) ->
+ ok;
+checks([K|Ks], Map) ->
+ case maps:is_key(K, Map) of
+ true -> checks(Ks, Map);
+ false ->
+ error({miss_param, K})
+ end.
+
+%%--------------------------------------------------------------------
+%% common schemas
+
+schema_bad_request() ->
+ emqx_mgmt_util:error_schema(
+ <<"Some Params missed">>, ['PARAMETER_MISSED']).
+schema_internal_error() ->
+ emqx_mgmt_util:error_schema(
+ <<"Ineternal Server Error">>, ['INTERNAL_SERVER_ERROR']).
+schema_not_found() ->
+ emqx_mgmt_util:error_schema(<<"Resource Not Found">>).
+schema_no_content() ->
+ #{description => <<"No Content">>}.
+
%%--------------------------------------------------------------------
%% Internal funcs
diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl
index 7fb945ba0..3811d56c6 100644
--- a/apps/emqx_gateway/src/emqx_gateway_schema.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl
@@ -50,16 +50,16 @@ namespace() -> gateway.
roots() -> [gateway].
fields(gateway) ->
- [{stomp, sc(ref(stomp_structs))},
- {mqttsn, sc(ref(mqttsn_structs))},
- {coap, sc(ref(coap_structs))},
- {lwm2m, sc(ref(lwm2m_structs))},
- {exproto, sc(ref(exproto_structs))}
+ [{stomp, sc(ref(stomp))},
+ {mqttsn, sc(ref(mqttsn))},
+ {coap, sc(ref(coap))},
+ {lwm2m, sc(ref(lwm2m))},
+ {exproto, sc(ref(exproto))}
];
-fields(stomp_structs) ->
+fields(stomp) ->
[ {frame, sc(ref(stomp_frame))}
- , {listeners, sc(ref(tcp_listener_group))}
+ , {listeners, sc(ref(tcp_listeners))}
] ++ gateway_common_options();
fields(stomp_frame) ->
@@ -68,12 +68,12 @@ fields(stomp_frame) ->
, {max_body_length, sc(integer(), 8192)}
];
-fields(mqttsn_structs) ->
+fields(mqttsn) ->
[ {gateway_id, sc(integer())}
, {broadcast, sc(boolean())}
, {enable_qos3, sc(boolean())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
- , {listeners, sc(ref(udp_listener_group))}
+ , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(mqttsn_predefined) ->
@@ -81,34 +81,34 @@ fields(mqttsn_predefined) ->
, {topic, sc(binary())}
];
-fields(coap_structs) ->
+fields(coap) ->
[ {heartbeat, sc(duration(), <<"30s">>)}
, {connection_required, sc(boolean(), false)}
- , {notify_type, sc(union([non, con, qos]), qos)}
- , {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), coap)}
- , {publish_qos, sc(union([qos0, qos1, qos2, coap]), coap)}
- , {listeners, sc(ref(udp_listener_group))}
+ , {notify_type, sc(hoconsc:union([non, con, qos]), qos)}
+ , {subscribe_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)}
+ , {publish_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)}
+ , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
-fields(lwm2m_structs) ->
+fields(lwm2m) ->
[ {xml_dir, sc(binary())}
, {lifetime_min, sc(duration())}
, {lifetime_max, sc(duration())}
, {qmode_time_windonw, sc(integer())}
, {auto_observe, sc(boolean())}
- , {update_msg_publish_condition, sc(union([always, contains_object_list]))}
+ , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))}
, {translators, sc(ref(translators))}
- , {listeners, sc(ref(udp_listener_group))}
+ , {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
-fields(exproto_structs) ->
+fields(exproto) ->
[ {server, sc(ref(exproto_grpc_server))}
, {handler, sc(ref(exproto_grpc_handler))}
- , {listeners, sc(ref(udp_tcp_listener_group))}
+ , {listeners, sc(ref(udp_tcp_listeners))}
] ++ gateway_common_options();
fields(exproto_grpc_server) ->
- [ {bind, sc(union(ip_port(), integer()))}
+ [ {bind, sc(hoconsc:union([ip_port(), integer()]))}
%% TODO: ssl options
];
@@ -136,62 +136,45 @@ fields(translator) ->
, {qos, sc(range(0, 2))}
];
-fields(udp_listener_group) ->
- [ {udp, sc(ref(udp_listener))}
- , {dtls, sc(ref(dtls_listener))}
+fields(udp_listeners) ->
+ [ {udp, sc(map(name, ref(udp_listener)))}
+ , {dtls, sc(map(name, ref(dtls_listener)))}
];
-fields(tcp_listener_group) ->
- [ {tcp, sc(ref(tcp_listener))}
- , {ssl, sc(ref(ssl_listener))}
+fields(tcp_listeners) ->
+ [ {tcp, sc(map(name, ref(tcp_listener)))}
+ , {ssl, sc(map(name, ref(ssl_listener)))}
];
-fields(udp_tcp_listener_group) ->
- [ {udp, sc(ref(udp_listener))}
- , {dtls, sc(ref(dtls_listener))}
- , {tcp, sc(ref(tcp_listener))}
- , {ssl, sc(ref(ssl_listener))}
+fields(udp_tcp_listeners) ->
+ [ {udp, sc(map(name, ref(udp_listener)))}
+ , {dtls, sc(map(name, ref(dtls_listener)))}
+ , {tcp, sc(map(name, ref(tcp_listener)))}
+ , {ssl, sc(map(name, ref(ssl_listener)))}
];
fields(tcp_listener) ->
- [ {"$name", sc(ref(tcp_listener_settings))}];
-
-fields(ssl_listener) ->
- [ {"$name", sc(ref(ssl_listener_settings))}];
-
-fields(udp_listener) ->
- [ {"$name", sc(ref(udp_listener_settings))}];
-
-fields(dtls_listener) ->
- [ {"$name", sc(ref(dtls_listener_settings))}];
-
-fields(tcp_listener_settings) ->
[
%% some special confs for tcp listener
- ] ++ tcp_opts()
- ++ proxy_protocol_opts()
- ++ common_listener_opts();
+ ] ++
+ tcp_opts() ++
+ proxy_protocol_opts() ++
+ common_listener_opts();
-fields(ssl_listener_settings) ->
- [
- %% some special confs for ssl listener
- ] ++ tcp_opts()
- ++ ssl_opts()
- ++ proxy_protocol_opts()
- ++ common_listener_opts();
+fields(ssl_listener) ->
+ fields(tcp_listener) ++
+ ssl_opts();
-fields(udp_listener_settings) ->
+fields(udp_listener) ->
[
%% some special confs for udp listener
- ] ++ udp_opts()
- ++ common_listener_opts();
+ ] ++
+ udp_opts() ++
+ common_listener_opts();
-fields(dtls_listener_settings) ->
- [
- %% some special confs for dtls listener
- ] ++ udp_opts()
- ++ dtls_opts()
- ++ common_listener_opts();
+fields(dtls_listener) ->
+ fields(udp_listener) ++
+ dtls_opts();
fields(udp_opts) ->
[ {active_n, sc(integer(), 100)}
@@ -218,11 +201,7 @@ fields(dtls_listener_ssl_opts) ->
lists:keyreplace("versions", 1, Base, {"versions", DtlsVers}),
{"ciphers", Ciphers}
)
- );
-
-fields(ExtraField) ->
- Mod = list_to_atom(ExtraField++"_schema"),
- Mod:fields(ExtraField).
+ ).
default_ciphers() ->
["ECDHE-ECDSA-AES256-GCM-SHA384",
@@ -286,16 +265,16 @@ common_listener_opts() ->
].
tcp_opts() ->
- [{tcp, sc(ref(emqx_schema, "tcp_opts"), #{})}].
+ [{tcp, sc_meta(ref(emqx_schema, "tcp_opts"), #{})}].
udp_opts() ->
- [{udp, sc(ref(udp_opts), #{})}].
+ [{udp, sc_meta(ref(udp_opts), #{})}].
ssl_opts() ->
- [{ssl, sc(ref(emqx_schema, "listener_ssl_opts"), #{})}].
+ [{ssl, sc_meta(ref(emqx_schema, "listener_ssl_opts"), #{})}].
dtls_opts() ->
- [{dtls, sc(ref(dtls_listener_ssl_opts), #{})}].
+ [{dtls, sc_meta(ref(dtls_listener_ssl_opts), #{})}].
proxy_protocol_opts() ->
[ {proxy_protocol, sc(boolean())}
@@ -308,18 +287,20 @@ default_dtls_vsns() ->
dtls_vsn(<<"dtlsv1.2">>) -> 'dtlsv1.2';
dtls_vsn(<<"dtlsv1">>) -> 'dtlsv1'.
-%%--------------------------------------------------------------------
-%% Helpers
-
-%% types
-
-sc(Type) -> #{type => Type}.
+sc(Type) ->
+ sc_meta(Type, #{}).
sc(Type, Default) ->
- hoconsc:mk(Type, #{default => Default}).
+ sc_meta(Type, #{default => Default}).
-ref(Field) ->
- hoconsc:ref(?MODULE, Field).
+sc_meta(Type, Meta) ->
+ hoconsc:mk(Type, Meta).
+
+map(Name, Type) ->
+ hoconsc:map(Name, Type).
+
+ref(StructName) ->
+ ref(?MODULE, StructName).
ref(Mod, Field) ->
hoconsc:ref(Mod, Field).
diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl
index 4f19db23b..6d19cbbcf 100644
--- a/apps/emqx_gateway/src/emqx_gateway_utils.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl
@@ -133,11 +133,12 @@ listener_id(GwName, Type, LisName) ->
(bin(LisName))/binary
>>).
+parse_listener_id(Id) when is_atom(Id) ->
+ parse_listener_id(atom_to_binary(Id));
parse_listener_id(Id) ->
try
[GwName, Type, Name] = binary:split(bin(Id), <<":">>, [global]),
- {binary_to_existing_atom(GwName), binary_to_existing_atom(Type),
- binary_to_atom(Name)}
+ {GwName, Type, Name}
catch
_ : _ -> error({invalid_listener_id, Id})
end.
@@ -161,6 +162,8 @@ unix_ts_to_rfc3339(Ts) ->
emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>).
-spec stringfy(term()) -> binary().
+stringfy(T) when is_list(T); is_binary(T) ->
+ iolist_to_binary(T);
stringfy(T) ->
iolist_to_binary(io_lib:format("~0p", [T])).
diff --git a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl
index cb2ccf3f8..c9d91748a 100644
--- a/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_lwm2m_api_SUITE.erl
@@ -100,13 +100,15 @@ t_lookup_cmd_read(Config) ->
%% step 1, device register ...
test_send_coap_request( UdpSock,
post,
- sprintf("coap://127.0.0.1:~b/rd?ep=~s<=345&lwm2m=1", [?PORT, Epn]),
+ sprintf("coap://127.0.0.1:~b/rd?ep=~s<=600&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<";rt=\"oma.lwm2m\";ct=11543,,,">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
+
+ timer:sleep(100),
test_recv_mqtt_response(RespTopic),
%% step2, send a READ command to device
@@ -122,8 +124,8 @@ t_lookup_cmd_read(Config) ->
CommandJson = emqx_json:encode(Command),
?LOGT("CommandJson=~p", [CommandJson]),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
- timer:sleep(50),
+ timer:sleep(200),
no_received_request(Epn, <<"/3/0/0">>, <<"read">>),
Request2 = test_recv_coap_request(UdpSock),
@@ -131,8 +133,8 @@ t_lookup_cmd_read(Config) ->
timer:sleep(50),
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, content}, #coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>}, Request2, true),
- timer:sleep(100),
+ timer:sleep(200),
normal_received_request(Epn, <<"/3/0/0">>, <<"read">>).
t_lookup_cmd_discover(Config) ->
@@ -158,6 +160,7 @@ t_lookup_cmd_discover(Config) ->
CommandJson = emqx_json:encode(Command),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
+ timer:sleep(200),
no_received_request(Epn, <<"/3/0/7">>, <<"discover">>),
timer:sleep(50),
@@ -172,7 +175,7 @@ t_lookup_cmd_discover(Config) ->
#coap_content{content_format = <<"application/link-format">>, payload = PayloadDiscover},
Request2,
true),
- timer:sleep(100),
+ timer:sleep(200),
discover_received_request(Epn, <<"/3/0/7">>, <<"discover">>).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf
index 3ec09f2d4..eeab91154 100644
--- a/apps/emqx_machine/etc/emqx_machine.conf
+++ b/apps/emqx_machine/etc/emqx_machine.conf
@@ -41,6 +41,9 @@ node {
## Default: 15m
global_gc_interval = 15m
+ ## Sets the etc directory
+ etc_dir = "{{ platform_etc_dir }}"
+
## Sets the net_kernel tick time in seconds.
## Notice that all communicating nodes are to have the same
## TickTime value specified.
diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl
index ae2bec50a..aca518b0d 100644
--- a/apps/emqx_machine/src/emqx_machine_schema.erl
+++ b/apps/emqx_machine/src/emqx_machine_schema.erl
@@ -42,8 +42,7 @@
%% The list can not be made a dynamic read at run-time as it is used
%% by nodetool to generate app.